实现代码:TinyOperateSystem:C++系统 - AtomGit | GitCode
阶段1:核心模块开发
阶段2:I/O 系统开发
阶段3:前端接口开
阶段4:支持系统开发
阶段5:集成与测试
阶段6:部署
关键时序图
sequenceDiagram
par 并发处理
LogStream ->> LogStream: LogStream(LogLevel level, const char* file, int line)
activate LogStream
LogStream ->> FixedBuffer: Append(const char* buf, size_t len)
LogStream ->> LogFormat: Default()
activate LogFormat
note over LogFormat: 增加 时间戳/LogLevel/文件名/线程号/信息
LogFormat ->> LogFormat: AddFormatter(std::shared_ptr<LogFormatter> formatter)
deactivate LogFormat
LogStream ->> LogStream: operator<<(const LogFormat &format)
LogStream ->> LogFormat: Format(LogStream &stream)
activate LogFormat
loop 遍历增加的LogFormatter
LogFormat ->>+ XXXFormatter: Format(LogStream &stream)
note over XXXFormatter: 为日志添加之前配置的前置信息
XXXFormatter -->>- LogFormat: return LogStream
end
deactivate LogFormat
deactivate LogStream
LogStream ->> LogStream: ~LogStream();
activate LogStream
LogStream ->> LogStream: Flush()
LogStream ->> AsyncLogger: Append(const char* message, size_t len)
activate AsyncLogger
AsyncLogger ->>+ DoubleBuffer: Append(const char* msg, size_t len)
DoubleBuffer ->>+ LockFreeRingBuffer: Append(const char* msg, size_t len)
LockFreeRingBuffer ->> BufferNode: __builtin_memcpy(curNode->data + curIndex, msg, len)
note left of BufferNode: 将数据写入缓存节点
LockFreeRingBuffer -->>- DoubleBuffer: return ErrCode
DoubleBuffer -->>- AsyncLogger: return ErrCode
deactivate AsyncLogger
LogStream ->> FixedBuffer: Reset()
deactivate LogStream
and
loop running_.load(std::memory_order_acquire)
AsyncLogger ->> AsyncLogger: BackendThreadFunc()
activate AsyncLogger
AsyncLogger ->>+ DoubleBuffer: SafeSwap()
DoubleBuffer -->>- AsyncLogger: return ErrCode
AsyncLogger ->>+ DoubleBuffer: GetBackBuffer
DoubleBuffer -->>- AsyncLogger: return LockFreeRingBuffer
AsyncLogger ->> AsyncLogger: ProcessBuffer(LockFreeRingBuffer &buffer)
AsyncLogger ->> LockFreeRingBuffer: Collect(std::function<ErrCode(const char*, size_t)>)
loop node链表有数据
activate LockFreeRingBuffer
LockFreeRingBuffer ->>+ IOManager: Write(const char* data, size_t len)
IOManager ->>+ XXXOutputStrategy: Write(const char* data, size_t len)
note over XXXOutputStrategy: 调用不同输出策略的写入操作
XXXOutputStrategy -->>- IOManager: return ErrCode
IOManager -->>- LockFreeRingBuffer: return ErrCode
end
deactivate LockFreeRingBuffer
AsyncLogger ->> DoubleBuffer: ReleaseBackBuffer()
activate DoubleBuffer
DoubleBuffer ->> LockFreeRingBuffer: Reset()
loop node链表有数据
activate LockFreeRingBuffer
LockFreeRingBuffer ->> BufferNode: 清空数据
deactivate LockFreeRingBuffer
end
deactivate DoubleBuffer
deactivate AsyncLogger
end
end
阶段1:核心模块开发 采用三级缓冲架构 实现高性能日志处理:
BufferNode:基础存储单元(4MB固定大小)
LockFreeRingBuffer:基于节点链表的无锁环形缓冲区
DoubleBuffer:双缓冲机制实现读写分离
1. 缓冲系统实现
graph TD
A[日志生产者] --> B[DoubleBuffer]
B --> C[Front Buffer]
B --> D[Back Buffer]
C --> E[LockFreeRingBuffer]
D --> F[LockFreeRingBuffer]
E --> G[BufferNode链表]
F --> H[BufferNode链表]
G --> I[BufferNodePool]
H --> I
J[日志消费者] --> D
1.1 BufferNode 1.1.1 设计目标
固定大小缓冲区(默认4MB)
原子操作管理写入位置
缓存行对齐避免伪共享
链表结构支持动态扩展
1.1.2 关键实现 1 2 3 4 5 6 7 template <size_t SingleBufferSize = DEFAULT_BUFFER_SIZE>struct alignas (CACHE_LINE_SIZE) BufferNode { std::atomic<BufferNode*> next; alignas (CACHE_LINE_SIZE) std::atomic<size_t > writeIndex{0 }; char data[SingleBufferSize]; BufferNode () : next (nullptr ), writeIndex (0 ) {} };
1.1.3 优化特性
64节缓存行对齐
原子变量独立缓存行
预分配内存空间
1.2 BufferNodePool 1.2.1 设计目标
对象池管理节点生命周期
限制最大节点数防内存耗尽
线程安全的分配/回收机制
1.2.2 工作流程
flowchart TD
A[分配节点] --> B{空闲列表有节点?}
B -->|是| C[从空闲列表取出节点]
B -->|否| D{已达最大节点数?}
D -->|是| E[返回空指针]
D -->|否| F[新建节点]
F --> G[加入已分配列表]
H[释放节点] --> I[重置节点状态]
I --> J[加入空闲列表]
1.2.3 关键实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 template <size_t SingleBufferSize = DEFAULT_BUFFER_SIZE>class BufferNodePool {public : using BufferNode = Base::SystemLog::BufferNode<SingleBufferSize>; explicit BufferNodePool (size_t maxNodes = MAX_BUFFER_NODES) ; ~BufferNodePool (); BufferNode* Allocate () ; void Deallocate (BufferNode* node) ; size_t GetTotalAllocatedCount () const ; private : std::vector<BufferNode*> freeList_; std::vector<BufferNode*> allocatedNodes_; std::mutex mutex_; std::atomic<size_t > totalAllocatedCount_{0 }; const size_t maxNodes_; }; template <size_t SingleBufferSize>BufferNodePool<SingleBufferSize>::BufferNodePool (size_t maxNodes) : maxNodes_ (maxNodes) {} template <size_t SingleBufferSize>BufferNodePool<SingleBufferSize>::~BufferNodePool () { std::lock_guard<std::mutex> lock (mutex_) ; for (auto node : allocatedNodes_) { free (node); } } template <size_t SingleBufferSize>auto BufferNodePool<SingleBufferSize>::Allocate () -> BufferNode*{ { std::lock_guard<std::mutex> lock (mutex_) ; if (!freeList_.empty ()) { BufferNode* node = freeList_.back (); freeList_.pop_back (); return node; } } if (totalAllocatedCount_.load (std::memory_order_relaxed) >= maxNodes_) { return nullptr ; } void * memory = malloc (sizeof (BufferNode) + SingleBufferSize); if (!memory) { return nullptr ; } BufferNode* node = new (memory) BufferNode (); node->writeIndex.store (0 , std::memory_order_relaxed); node->next.store (nullptr , std::memory_order_relaxed); { std::lock_guard<std::mutex> lock (mutex_) ; allocatedNodes_.push_back (node); } totalAllocatedCount_.fetch_add (1 , std::memory_order_relaxed); return node; } template <size_t SingleBufferSize>void BufferNodePool<SingleBufferSize>::Deallocate (BufferNode* node){ if (node == nullptr ) { return ; } node->writeIndex.store (0 , std::memory_order_release); BufferNode* next = node->next.exchange (nullptr , std::memory_order_release); if (next) { syslog (LOG_WARNING, "deallocating node with active next pointer" ); } std::lock_guard<std::mutex> lock (mutex_) ; freeList_.push_back (node); } template <size_t SingleBufferSize>size_t BufferNodePool<SingleBufferSize>::GetTotalAllocatedCount () const { return totalAllocatedCount_.load (std::memory_order_relaxed); }
1.2.4 异常处理
分配失败返回nullptr
释放时检测异常指针(如非空next指针)
1.3 LockFreeRingBuffer 1.3.1 设计目标
基于BufferNode构建动态环形缓冲区
CAS实现无锁写入
支持消息分片(>4MB消息)
小数据复制优化
1.3.2 写入流程
sequenceDiagram
participant Producer
participant RingBuffer
Producer->>RingBuffer: Append(msg, len)
alt len > 4MB
RingBuffer->>RingBuffer: 拆分消息为多个分片
loop 每个分片
RingBuffer->>RingBuffer: 写入分片数据
end
else len <= 4MB
RingBuffer->>当前节点: 检查剩余空间
alt 空间不足
RingBuffer->>RingBuffer: 扩展新节点
end
RingBuffer->>当前节点: CAS更新写入位置
RingBuffer->>当前节点: 复制数据
end
1.3.3 关键实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 using MessageConsumer = std::function<ErrCode (const char *, size_t )>;template <size_t SingleBufferSize = DEFAULT_BUFFER_SIZE> class LockFreeRingBuffer {public : using BufferNode = Base::SystemLog::BufferNode<SingleBufferSize>; LockFreeRingBuffer (size_t maxNodes = MAX_BUFFER_NODES); ~LockFreeRingBuffer (); ErrCode Append (const char * msg, size_t len) ; void Collect (MessageConsumer messageConsumer) ; void Reset () ; size_t GetSize () const ; size_t GetNodeCount () const ; private : BufferNode* CreateNode () ; ErrCode ExpandBuffer () ; void CopySmallData (char * dest, const char * src, size_t len) ; ErrCode HandleOversizedMessage (const char * msg, size_t len) ; void HandleOutOfMemory () ; void HandleNodeLimitReached () ; const size_t maxNodes_; std::shared_ptr<BufferNodePool<SingleBufferSize>> pool_; alignas (CACHE_LINE_SIZE) std::atomic<BufferNode*> headNode_{nullptr }; alignas (CACHE_LINE_SIZE) std::atomic<BufferNode*> currentNode_{nullptr }; }; template <size_t SingleBufferSize>LockFreeRingBuffer<SingleBufferSize>::LockFreeRingBuffer (size_t maxNodes) : maxNodes_ (maxNodes), pool_ (std::make_shared<BufferNodePool<SingleBufferSize>>(maxNodes)) { currentNode_ = CreateNode (); headNode_.store (currentNode_.load (std::memory_order_relaxed), std::memory_order_relaxed); } template <size_t SingleBufferSize>LockFreeRingBuffer<SingleBufferSize>::~LockFreeRingBuffer () { BufferNode* node = headNode_.load (std::memory_order_acquire); while (node) { BufferNode* next = node->next.load (std::memory_order_relaxed); pool_->Deallocate (node); node = next; } } template <size_t SingleBufferSize>ErrCode LockFreeRingBuffer<SingleBufferSize>::Append (const char * msg, size_t len) { if (len > SingleBufferSize) { return HandleOversizedMessage (msg, len); } int32_t retryCount = 0 ; while (retryCount++ < MAX_RETRY_TIMES) { BufferNode* curNode = currentNode_.load (std::memory_order_acquire); if (curNode == nullptr ) { auto errCode = ExpandBuffer (); if (errCode != ERR_OK) { return errCode; } continue ; } if (retryCount > 10 ) { int32_t backoff = 0 ; backoff = 1 << (retryCount / 5 ); for (int i = 0 ; i < backoff; ++i) { _mm_pause(); } } size_t curIndex = curNode->writeIndex.load (std::memory_order_acquire); if (curIndex + len > SingleBufferSize) { auto errCode = ExpandBuffer (); if (errCode != ERR_OK) { return errCode; } } if (curNode->writeIndex.compare_exchange_weak (curIndex, curIndex + len, std::memory_order_acq_rel, std::memory_order_acquire)) { if (len <= CACHE_LINE_SIZE) { CopySmallData (curNode->data + curIndex, msg, len); } else { __builtin_memcpy(curNode->data + curIndex, msg, len); } return ERR_OK; } } return ERR_BUFFER_APPEND_FAILED; } template <size_t SingleBufferSize>void LockFreeRingBuffer<SingleBufferSize>::Collect (MessageConsumer messageConsumer){ BufferNode* node = headNode_.load (std::memory_order_acquire); while (node) { size_t writeIndex = node->writeIndex.load (std::memory_order_acquire); if (writeIndex > 0 ) { auto errCode = messageConsumer (node->data, writeIndex); if (errCode != ERR_OK) { syslog (LOG_ERR, "messageConsumer function failed: %d" , errCode); } } BufferNode* next = node->next.load (std::memory_order_relaxed); if (next == nullptr ) { break ; } node = next; } } template <size_t SingleBufferSize>void LockFreeRingBuffer<SingleBufferSize>::Reset (){ BufferNode* node = headNode_.load (std::memory_order_acquire); while (node) { node->writeIndex.store (0 , std::memory_order_release); BufferNode* next = node->next.load (std::memory_order_relaxed); if (next == nullptr ) { break ; } node = next; } currentNode_.store (headNode_.load (std::memory_order_acquire), std::memory_order_release); } template <size_t SingleBufferSize>size_t LockFreeRingBuffer<SingleBufferSize>::GetSize () const { size_t size = 0 ; BufferNode* node = headNode_.load (std::memory_order_acquire); while (node) { size += node->writeIndex.load (std::memory_order_relaxed); BufferNode* next = node->next.load (std::memory_order_relaxed); if (next == nullptr ) { break ; } node = next; } return size; } template <size_t SingleBufferSize>size_t LockFreeRingBuffer<SingleBufferSize>::GetNodeCount () const { return pool_->GetTotalAllocatedCount (); } template <size_t SingleBufferSize>auto LockFreeRingBuffer<SingleBufferSize>::CreateNode () -> BufferNode*{ if (pool_ == nullptr ) { syslog (LOG_ERR, "BufferNodePool is not initialized" ); return nullptr ; } BufferNode* newNode = pool_->Allocate (); if (newNode == nullptr ) { HandleOutOfMemory (); return nullptr ; } newNode->writeIndex.store (0 , std::memory_order_relaxed); newNode->next.store (nullptr , std::memory_order_relaxed); __builtin_memset(newNode->data, 0 , SingleBufferSize); return newNode; } template <size_t SingleBufferSize>ErrCode LockFreeRingBuffer<SingleBufferSize>::ExpandBuffer () { BufferNode* curNode = currentNode_.load (std::memory_order_acquire); if (curNode->next.load (std::memory_order_acquire)) { currentNode_.store (curNode->next, std::memory_order_release); return ERR_OK; } if (pool_ == nullptr ) { throw std::runtime_error ("BufferNodePool is not initialized" ); } if (pool_->GetTotalAllocatedCount () >= maxNodes_) { HandleNodeLimitReached (); return ERR_POOL_NODES_OVERFLOW; } BufferNode* newNode = CreateNode (); if (newNode == nullptr ) { return ERR_BUFFER_CREATE_NODE_FAILED; } BufferNode* expected = nullptr ; if (curNode->next.compare_exchange_strong (expected, newNode, std::memory_order_release, std::memory_order_relaxed)) { currentNode_.store (newNode, std::memory_order_release); return ERR_OK; } else { pool_->Deallocate (newNode); currentNode_.store (curNode->next.load (std::memory_order_acquire), std::memory_order_release); return ERR_OK; } } template <size_t SingleBufferSize>void LockFreeRingBuffer<SingleBufferSize>::CopySmallData (char * dest, const char * src, size_t len){ if (len == 0 ) { return ; } if (len <= sizeof (uint64_t )) { switch (len) { case 8 : *reinterpret_cast <uint64_t *>(dest) = *reinterpret_cast <const uint64_t *>(src); return ; case 4 : *reinterpret_cast <uint32_t *>(dest) = *reinterpret_cast <const uint32_t *>(src); return ; case 2 : *reinterpret_cast <uint16_t *>(dest) = *reinterpret_cast <const uint16_t *>(src); return ; case 1 : *dest = *src; return ; default : break ; } } __builtin_memcpy(dest, src, len); } template <size_t SingleBufferSize>ErrCode LockFreeRingBuffer<SingleBufferSize>::HandleOversizedMessage (const char * msg, size_t len) { size_t written = 0 ; while (written < len) { size_t chunkSize = std::min (SingleBufferSize, len - written); auto errCode = Append (msg + written, chunkSize); if (errCode != ERR_OK) { return errCode; } written += chunkSize; } return ERR_OK; } template <size_t SingleBufferSize>void LockFreeRingBuffer<SingleBufferSize>::HandleOutOfMemory (){ static int32_t oomCount = 0 ; const char * warnMsg = "WARNING: Log buffer out of memory!\n" ; auto result = write (STDERR_FILENO, warnMsg, strlen (warnMsg)); if (result == -1 ) { perror ("Failed to write warning message" ); } oomCount++; if (oomCount > 10 ) { oomCount = 0 ; Reset (); } } template <size_t SingleBufferSize>void LockFreeRingBuffer<SingleBufferSize>::HandleNodeLimitReached (){ static int limitCount = 0 ; limitCount++; if (limitCount > 100 ) { limitCount = 0 ; Reset (); } }
1.3.4 性能优化
小数据直接复制(避免memcpy)
指数退避缓解CAS竞争
分片处理超大消息
1.4 DoubleBuffer 1.4.1 设计目标
双缓冲(Front/Back)机制
安全交换保证数据一致性
读写分离提高并发性
1.4.2 状态转换
stateDiagram-v2
[*] --> Normal : 初始状态到Normal
Normal --> Swapping : 定时交换
Normal --> Emergency : 缓冲区满
Swapping --> Waiting : 后端缓冲占用中
Waiting --> Normal : 后端缓冲释放
Emergency --> Normal : 强制交换完成
1.4.3 关键实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 template <size_t SingleBufferSize = DEFAULT_BUFFER_SIZE> class DoubleBuffer {public : using LockFreeRingBuffer = Base::SystemLog::LockFreeRingBuffer<SingleBufferSize>; DoubleBuffer () : frontBuffer_ (std::make_unique <LockFreeRingBuffer>()), backBuffer_ (std::make_unique <LockFreeRingBuffer>()) {} ErrCode Append (const char * msg, size_t len) ; LockFreeRingBuffer &GetBackBuffer () { return *backBuffer_; } ErrCode SafeSwap () ; void EmergencySwap () ; void ReleaseBackBuffer () ; size_t GetFrontSize () const { return frontBuffer_->GetSize (); } size_t GetBackSize () const { return backBuffer_->GetSize (); } bool IsBackBufferInUse () const { return backBufferInUse_; } private : void InnerSwapBuffers () { backBufferInUse_ = true ; std::swap (frontBuffer_, backBuffer_); } std::unique_ptr<LockFreeRingBuffer> frontBuffer_; std::unique_ptr<LockFreeRingBuffer> backBuffer_; mutable std::mutex swapMutex_; std::condition_variable swapCondition_; bool backBufferInUse_ = false ; std::atomic<bool > swapping_{false }; }; template <size_t SingleBufferSize>ErrCode DoubleBuffer<SingleBufferSize>::Append (const char * msg, size_t len) { if (!swapping_.load (std::memory_order_acquire)) { return frontBuffer_->Append (msg, len); } std::lock_guard<std::mutex> lock (swapMutex_) ; return frontBuffer_->Append (msg, len); } template <size_t SingleBufferSize>ErrCode DoubleBuffer<SingleBufferSize>::SafeSwap () { if (swapping_.exchange (true )) { return ERR_OPERATION_IN_PROGRESS; } std::unique_lock<std::mutex> lock (swapMutex_) ; auto bakBackBufferInUse = backBufferInUse_; swapping_.store (true , std::memory_order_release); swapCondition_.wait (lock, [this ] { return !backBufferInUse_; }); InnerSwapBuffers (); swapping_.store (false , std::memory_order_release); backBufferInUse_ = bakBackBufferInUse; return ERR_OK; } template <size_t SingleBufferSize>void DoubleBuffer<SingleBufferSize>::EmergencySwap (){ std::unique_lock<std::mutex> lock (swapMutex_, std::try_to_lock) ; if (!lock.owns_lock ()) { syslog (LOG_ERR, "EmergencySwap failed to acquire lock" ); return ; } swapping_.store (true , std::memory_order_release); InnerSwapBuffers (); frontBuffer_->Reset (); swapping_.store (false , std::memory_order_release); } template <size_t SingleBufferSize>void DoubleBuffer<SingleBufferSize>::ReleaseBackBuffer (){ { std::lock_guard<std::mutex> lock (swapMutex_) ; backBufferInUse_ = false ; backBuffer_->Reset (); } swapCondition_.notify_one (); }
2. 异步调度器实现 2.1 AsyncLogger架构
graph TD
A[应用线程] -->|写入日志| B(DoubleBuffer前端缓冲)
C[后台线程] -->|定时检查| D{达到刷新条件?}
D -->|是| E[安全交换缓冲区]
E --> F[处理后端缓冲数据]
F --> G[调用输出回调]
G --> H[重置后端缓冲]
D -->|否| I[等待下一周期]
2.2 核心机制
双触发条件 :
定时刷新(默认1000ms)
阈值触发(软阈值2MB/硬阈值3.8MB)
三级缓冲状态 :
前端缓冲:接收新数据
后端缓冲:后台处理中
已释放缓冲:可复用状态
2.3 关键实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 using BackProcessTaskCallback = MessageConsumer;templatetemplate<size_t SingleBufferSize = DEFAULT_BUFFER_SIZE> class AsyncLogger {public : using DoubleBuffer = Base::SystemLog::DoubleBuffer<SingleBufferSize>; using LockFreeRingBuffer = Base::SystemLog::LockFreeRingBuffer<SingleBufferSize>; explicit AsyncLogger (BackProcessTaskCallback taskCallback) ; ~AsyncLogger (); ErrCode Append (const char * message, size_t len) ; ErrCode Flush () ; ErrCode Start () ; ErrCode Stop () ; void SetFlushInterval (uint32_t milliseconds) { flushInterval_ = std::chrono::milliseconds (milliseconds); } void SetFlushThreshold (size_t softFlushThreshold = DEFAULT_SOFT_FLUSH_THRESHOLD, size_t hardFlushThreshold = DEFAULT_HARD_FLUSH_THRESHOLD) { softFlushThreshold_ = softFlushThreshold; hardFlushThreshold_ = hardFlushThreshold; } private : void BackendThreadFunc () ; void ProcessBuffer (LockFreeRingBuffer &buffer) ; void EmergencyFlush () ; BackProcessTaskCallback taskCallback_; DoubleBuffer doubleBuffer_; std::unique_ptr<std::thread> backendThread_; std::atomic<bool > running_{false }; std::atomic<bool > forceFlush_{false }; std::atomic<bool > processing_{false }; std::mutex mutex_; std::condition_variable cond_; std::chrono::milliseconds flushInterval_{DEFAULT_FLUSH_INTERVAL_MS}; size_t softFlushThreshold_{DEFAULT_SOFT_FLUSH_THRESHOLD}; size_t hardFlushThreshold_{DEFAULT_HARD_FLUSH_THRESHOLD}; }; template <size_t SingleBufferSize>AsyncLogger<SingleBufferSize>::AsyncLogger (BackProcessTaskCallback taskCallback) : taskCallback_ (std::move (taskCallback)) { if (!taskCallback_) { throw std::invalid_argument ("taskCallback function must be valid" ); } } template <size_t SingleBufferSize>AsyncLogger<SingleBufferSize>::~AsyncLogger () { try { Stop (); } catch (...) { syslog (LOG_ERR, "exception in AsyncLogger destructor" ); } } template <size_t SingleBufferSize>ErrCode AsyncLogger<SingleBufferSize>::Start () { if (running_.load (std::memory_order_acquire)) { return ERR_ALREADY_RUNNING; } running_.store (true , std::memory_order_release); forceFlush_.store (false , std::memory_order_release); processing_.store (false , std::memory_order_release); try { backendThread_ = std::make_unique <std::thread>([this ] { BackendThreadFunc (); }); auto handle = backendThread_->native_handle (); pthread_setname_np (handle, "log_backend" ); return ERR_OK; } catch (const std::system_error &e) { syslog (LOG_CRIT, "failed to start backend thread: %s" , e.what ()); running_.store (false , std::memory_order_release); return ERR_THREAD_START_FAILED; } } template <size_t SingleBufferSize>ErrCode AsyncLogger<SingleBufferSize>::Stop () { if (!running_.load (std::memory_order_acquire)) { return ERR_NOT_RUNNING; } running_.store (false , std::memory_order_release); { std::lock_guard<std::mutex> lock (mutex_) ; cond_.notify_one (); } try { if (backendThread_ && backendThread_->joinable ()) { backendThread_->join (); } if (doubleBuffer_.GetFrontSize () > 0 ) { EmergencyFlush (); } return ERR_OK; } catch (const std::system_error &e) { syslog (LOG_ERR, "failed to join backend thread: %s" , e.what ()); return ERR_THREAD_JOIN_FAILED; } } template <size_t SingleBufferSize>ErrCode AsyncLogger<SingleBufferSize>::Append (const char * message, size_t len) { if (!running_.load (std::memory_order_acquire)) { return ERR_LOGGER_NOT_RUNNING; } if (!message || len == 0 ) { return ERR_INVALID_ARGUMENT; } auto errCode = doubleBuffer_.Append (message, len); if (errCode != ERR_OK) { return errCode; } if (doubleBuffer_.GetFrontSize () >= softFlushThreshold_ && !processing_.load (std::memory_order_acquire)) { std::lock_guard<std::mutex> lock (mutex_) ; cond_.notify_one (); return ERR_OK; } if (doubleBuffer_.GetFrontSize () >= hardFlushThreshold_) { std::lock_guard<std::mutex> lock (mutex_) ; forceFlush_.store (true , std::memory_order_release); cond_.notify_one (); return ERR_OK; } return ERR_OK; } template <size_t SingleBufferSize>ErrCode AsyncLogger<SingleBufferSize>::Flush () { if (!running_.load (std::memory_order_acquire)) { return ERR_LOGGER_NOT_RUNNING; } { std::lock_guard<std::mutex> lock (mutex_) ; forceFlush_.store (true , std::memory_order_release); cond_.notify_one (); } for (int i = 0 ; i < MAX_RETRY_TIMES; ++i) { if (doubleBuffer_.GetFrontSize () == 0 && !processing_.load (std::memory_order_acquire)) { return ERR_OK; } std::this_thread::sleep_for (std::chrono::milliseconds (10 )); } return ERR_FLUSH_TIMEOUT; } template <size_t SingleBufferSize>void AsyncLogger<SingleBufferSize>::BackendThreadFunc (){ while (running_.load (std::memory_order_acquire)) { std::unique_lock<std::mutex> lock (mutex_) ; cond_.wait_for (lock, flushInterval_, [this ] { return forceFlush_.load (std::memory_order_acquire) || !running_.load (std::memory_order_acquire) || doubleBuffer_.GetFrontSize () >= softFlushThreshold_; }); if (!running_.load (std::memory_order_acquire)) { break ; } if (doubleBuffer_.GetFrontSize () == 0 ) { continue ; } processing_.store (true , std::memory_order_release); if (auto errCode = doubleBuffer_.SafeSwap (); errCode != ERR_OK) { syslog (LOG_ERR, "SafeSwap failed: %d" , errCode); processing_.store (false , std::memory_order_release); continue ; } lock.unlock (); try { auto &backBuffer = doubleBuffer_.GetBackBuffer (); ProcessBuffer (backBuffer); } catch (const std::exception &e) { syslog (LOG_CRIT, "exception during buffer processing: %s" , e.what ()); } doubleBuffer_.ReleaseBackBuffer (); processing_.store (false , std::memory_order_release); } } template <size_t SingleBufferSize>void AsyncLogger<SingleBufferSize>::ProcessBuffer (LockFreeRingBuffer &buffer){ const size_t dataSize = buffer.GetSize (); if (dataSize == 0 ) return ; buffer.Collect (taskCallback_); } template <size_t SingleBufferSize>void AsyncLogger<SingleBufferSize>::EmergencyFlush (){ syslog (LOG_WARNING, "Performing emergency flush" ); if (doubleBuffer_.SafeSwap () != ERR_OK) { doubleBuffer_.EmergencySwap (); } auto &backBuffer = doubleBuffer_.GetBackBuffer (); ProcessBuffer (backBuffer); doubleBuffer_.ReleaseBackBuffer (); }
2.4 后台线程流程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 void BackendThreadFunc () { while (running_) { std::unique_lock lock (mutex_) ; cond_.wait_for (lock, flushInterval_, [this ]{ return forceFlush_ || !running_ || buffer_.GetFrontSize () >= softThreshold_; }); if (!running_) break ; buffer_.SafeSwap (); lock.unlock (); ProcessBuffer (backBuffer); buffer_.ReleaseBackBuffer (); } }
2.5 异常处理机制
后台线程异常 :
停止过程保障 :
内存溢出防护 :
3. 性能优化总结
无锁设计 :核心写入路径无锁(CAS操作)
缓存优化 :热点数据缓存行隔离
批量处理 :双缓冲+定时刷新减少I/O次数
小消息优化 :特化复制函数消除函数调用开销
动态扩展 :按需分配避免内存浪费
4. 测试框架设计 4.1 测试架构概览
graph TD
A[测试入口] --> B[BufferNodePool测试]
A --> C[LockFreeRingBuffer测试]
A --> D[DoubleBuffer测试]
A --> E[AsyncLogger测试]
B --> F[内存泄漏检测]
C --> F
D --> F
E --> F
F --> G[测试报告]
4.2 测试策略
单元测试 :验证每个模块的基本功能
性能测试 :评估系统在高负载下的表现
内存泄漏检测 :使用GoogleTest结合Valgrind检测内存问题
多线程测试 :验证并发场景下的稳定性
4.3 测试覆盖率增强
空缓冲区处理 :测试空缓冲区的收集操作
单字节写入 :验证最小消息的处理
缓冲区满场景 :测试硬阈值触发机制
池耗尽情况 :验证节点池达到上限时的行为4.3 测试覆盖率增强
4.4 内存泄漏检测方法 4.4.1 使用Valgrind运行测试 1 2 3 4 5 6 7 8 9 10 g++ -std=c++20 -g -O0 -o log_system_test \ buffer_node_pool_test.cpp \ lock_free_ring_buffer_test.cpp \ double_buffer_test.cpp \ async_logger_test.cpp \ -lgtest -lgtest_main -lpthread valgrind --leak-check=full --show-leak-kinds=all ./log_system_test
4.4.2 内存泄漏检测关键点
对象池管理 :确保所有分配的BufferNode都被正确回收
环形缓冲区 :验证节点链表在销毁时完全释放
双缓冲系统 :检查交换过程中的资源管理
异步日志器 :确认后台线程的资源清理
4.5 测试报告
内存泄漏检测结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ==190811== Memcheck, a memory error detector ==190811== Copyright (C) 2002-2022, and GNU GPL'd, by Julian Seward et al. ==190811== Using Valgrind-3.22.0 and LibVEX; rerun with -h for copyright info ==190811== Command: ./out/double_buffer_test ==190811== Parent PID: 2626 ==190811== ==190811== ==190811== HEAP SUMMARY: ==190811== in use at exit: 0 bytes in 0 blocks ==190811== total heap usage: 549 allocs, 549 frees, 1,728,222,415 bytes allocated ==190811== ==190811== All heap blocks were freed -- no leaks are possible ==190811== ==190811== For lists of detected and suppressed errors, rerun with: -s ==190811== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
性能测试结果
1 2 3 4 5 6 7 8 9 10 [PERFORMANCE REPORT] Success: 160000 | Failed: 0 Threads: 16 Messages per thread: 10000 Total messages: 160000 Message size: 128 bytes Elapsed time: 1.16963 seconds Throughput: 136796 msg/sec Bandwidth: 16.2445 MB/sec TotalBytes: 20480000 | expectedBytes: 20480000
4.6 持续集成方案(1.0实现)
Jenkins流水线配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 pipeline { agent any stages { stage('Build' ) { steps { sh 'g++ -std=c++20 -o log_system_test *.cpp -lgtest -lgtest_main -lpthread' } } stage('Test' ) { steps { sh './log_system_test' sh 'valgrind --leak-check=full --error-exitcode=1 ./log_system_test' } } stage('Coverage' ) { steps { sh 'gcov -r *.cpp' } } } }
5. 关键设计决策
设计点
方案选择
优势
同步机制
写入路径无锁+交换路径有锁
兼顾性能与正确性
内存管理
对象池+动态扩展
减少碎片+防OOM
缓冲区结构
节点链表+环形索引
避免数据拷贝
生产者-消费者
双缓冲+条件变量
解耦读写操作
内存序
精细控制 memory_order
方案选择平衡性能与一致性
通过三级缓冲架构实现高性能日志处理,在保证数据可靠性的同时,最大化系统吞吐量。
阶段2:I/O 系统开发 1. 设计思路 采用策略模式 实现可扩展的I/O系统:
定义统一的IOutputStrategy接口
实现多种输出策略:控制台、文件、网络等
使用工厂模式创建输出策略实例
I/O管理器组合多种输出策略
classDiagram
class IOutputStrategy {
<<interface>>
+Write(const char* data, size_t size) ErrCode
+Flush() ErrCode
+Close() ErrCode
}
class ConsoleOutputStrategy {
+Write(const char* data, size_t size) override
+Flush() override
}
class FileOutputStrategy {
-fileDescriptor_: int
-currentSize_: size_t
-maxFileSize_: size_t
-rollInterval_: chrono::seconds
-lastRollTime_: time_point
+Write(const char* data, size_t size) override
+Flush() override
+RollFile() ErrCode
}
class MMapFileStrategy {
-mappedAddress_: void*
-mappedSize_: size_t
+Write(const char* data, size_t size) override
}
class DirectIOStrategy {
-alignment_: size_t
+Write(const char* data, size_t size) override
}
class IOManager {
-strategies_: vector<unique_ptr<IOutputStrategy>>
+AddStrategy(unique_ptr<IOutputStrategy>)
+Write(const char* data, size_t size) ErrCode
+Flush() ErrCode
}
IOutputStrategy <|.. ConsoleOutputStrategy
IOutputStrategy <|.. FileOutputStrategy
FileOutputStrategy <|-- MMapFileStrategy
FileOutputStrategy <|-- DirectIOStrategy
IOManager o-- IOutputStrategy
1.1 时序图 (日志写入流程)
sequenceDiagram
participant AsyncLogger
participant IOManager
participant FileOutput
participant MMapStrategy
AsyncLogger->>IOManager: Write(data, size)
IOManager->>FileOutput: Write(data, size)
FileOutput->>MMapStrategy: Write(data, size)
MMapStrategy-->>FileOutput: OK
FileOutput-->>IOManager: OK
IOManager-->>AsyncLogger: OK
loop 定时任务
AsyncLogger->>IOManager: Flush()
IOManager->>FileOutput: Flush()
FileOutput->>MMapStrategy: Flush()
MMapStrategy-->>FileOutput: OK
FileOutput-->>IOManager: OK
end
1.2 文件滚动流程图
flowchart TD
A[写入日志] --> B{当前文件大小 >= 阈值?}
B -->|是| C[滚动文件]
B -->|否| D{当前时间 - 上次滚动时间 >= 间隔?}
D -->|是| C
D -->|否| E[继续写入]
C --> F[关闭当前文件]
F --> G[生成新文件名]
G --> H[创建新文件]
H --> I[更新文件指针]
I --> J[重置文件大小]
J --> E
2. 实现 2.1 I/O策略接口 2.2 控制台输出实现 2.3 文件输出基础类 2.4 内存映射文件输出 2.5 直接I/O输出 2.6 I/O管理器 2.7 集成AsyncLogger 2.8 异步fsync优化 3. I/O性能优化策略批量写入优化
批量写入 :
在内存映射和直接I/O中实现大块写入
减少系统调用次数
异步fsync :
写入策略对比 :
策略
适用场景
优点
缺点
标准文件I/O
通用场景
简单可靠
性能一般
内存映射文件
高吞吐量
零拷贝高效
文件大小受限
直接I/O
避免双重缓存
绕过页缓存
需要对齐处理
文件滚动优化 :
按大小滚动:防止单个文件过大
按时间滚动:方便日志归档
滚动时异步执行,减少阻塞
4. 性能测试结果
5. 设计总结
可扩展架构 :
策略模式支持灵活添加新输出方式
工厂模式简化输出策略创建
高性能设计 :
内存映射文件实现零拷贝写入
直接I/O绕过系统缓存
异步fsync减少I/O阻塞
健壮性保障 :
文件滚动防止磁盘空间耗尽
异常处理确保系统稳定
回退机制(O_DIRECT失败时使用普通模式)
资源管理 :
RAII管理文件描述符和内存映射
对象生命周期明确
本I/O系统满足高性能日志处理需求,通过多种优化策略实现每秒GB级的日志处理能力,同时保持系统稳定性和可扩展性。
阶段3:前端接口开发 1. 流式API 1.1 实现LogStream
2. 格式化与宏
2.2 日志宏系统
阶段4:支持系统开发 1. 配置管理
2. 监控系统
3. 资源治理
阶段5:集成与测试 1. 系统集成
2. 性能测试
测试类型
指标
目标值
单线程吞吐量
日志条数/秒
>500,000
多线程扩展性
16线程吞吐量
>1,200,000
延迟测试
单条日志处理时间
<700ns
崩溃恢复
最后日志丢失时间窗口
<100ms
内存压力
最大内存占用
<512MB
3. 稳定性测试
阶段6:部署 1. 部署方案
graph TD
A[应用程序] --> B[日志库]
B --> C[本地日志代理]
C --> D[文件系统]
C --> E[日志收集器]
E --> F[Elasticsearch]
F --> G[Kibana]
subgraph K8s集群
A1[Pod1] --> B
A2[Pod2] --> B
A3[Pod3] --> B
C --> E
end
风险管理 1. 无锁编程复杂性
缓解:使用TSAN检测数据竞争
缓解:充分压力测试
2. 崩溃安全可靠性
缓解:多平台信号处理测试
缓解:使用async-signal-safe函数
3. 性能优化瓶颈
缓解:使用perf工具分析热点
缓解:多级性能指标监控
最终检查清单
无锁队列通过并发压力测试
崩溃时日志完整性验证
达到1.2M logs/sec吞吐量
内存使用不超过512MB上限
支持动态配置更新
完善的监控指标输出
文档覆盖所有使用场景
这个实现计划从核心模块开始逐步扩展到完整系统,每阶段都有明确的交付物和验证标准。建议从缓冲系统开始实现,因为它是最关键的性能基础。在实现过程中,持续进行性能测试和内存分析,确保达到设计目标。