一、Multi-Reactor架构
Multi-Reactor是典型的主从Reactor模型,EventLoop负责创建和管理多TaskScheduler(Rector线程),第一个task_schdulers【0】是主Rector,负责监听socket的新连接 其他task_schdulers是子Rector 负责已建立连接的IO事件,Acceptor模块在 Main Reactor 中运行,负责 accept()
新连接,并通过轮询或其他策略将其分配给某个 Sub Reactor。
角色 | 职责说明 |
---|---|
Main Reactor | 只负责监听 listenfd ,处理新连接的到来,并调用 Acceptor 接收连接。接收完成后,将新连接分配给某个 Sub Reactor。 |
Sub Reactor | 每个 Sub Reactor 运行在一个独立线程中,负责处理已建立连接的 I/O 事件(读/写/关闭等)。 |
Acceptor | 在 Main Reactor 中运行,负责 accept() 新连接,并通过轮询或其他策略将其分配给某个 Sub Reactor。 |
Muduo库有三个核心组件支撑一个reactor实现 [持续] 的 [监听] 一组fd,并根据每个fd上发生的事件 [调用] 相应的处理函数。这三个组件分别是Channel
类、Poller/EpollPoller类以及EventLoop
类。
二、Channel类
Channel类其实相当于一个文件描述符的保姆!
在TCP网络编程中,想要IO多路复用监听某个文件描述符,就要把这个fd和该fd感兴趣的事件通过epoll_ctl注册到IO多路复用模块上。当事件监听器监听到该fd发生了某个事件。事件监听器返回 [发生事件的fd集合]以及[每个fd都发生了什么事件]
Channel类则封装了一个 [fd] 和这个 [fd感兴趣事件] 以及事件监听器监听到 [该fd实际发生的事件]。同时Channel类还提供了设置该fd的感兴趣事件,以及将该fd及其感兴趣事件注册到事件监听器或从事件监听器上移除,以及保存了该fd的每种事件对应的处理函数。
✨2.1 Channel类重要成员变量
- EventLoop* loop_; 这个fd属于哪个EventLoop对象 (后面会分析EventLoop模块)
- const int fd_; 这个Channel对象照看的文件描述符(一个fd对应一个Channe;)
- int events_; 代表fd感兴趣的事件类型集合
- int revents_; 代表事件监听器实际监听到该fd发生的事件类型集合,当事件监听器监听到一个fd发生了什么事件,通过
Channel::set_revents()
函数来设置revents值 read_callback_
、write_callback_
、close_callback_
、error_callback_
:这些是std::function类型,代表着这个Channel为这个文件描述符保存的各事件类型发生时的处理函数。比如这个fd发生了可读事件,需要执行可读事件处理函数,这时候Channel类都替你保管好了这些可调用函数,直接管保姆要就好了- std::weak_ptr<void> tie_:用于解决回调期间对象销毁的问题,是muduo线程安全设计的重要体现
- bool tied_; 标记是否已绑定所有者
✨2.2 Channel类重要成员函数
向Channel对象注册各类事件的回调处理函数
void setReadCallback(ReadEventCallback cb)
{ readCallback_ = std::move(cb); }
void setWriteCallback(EventCallback cb)
{ writeCallback_ = std::move(cb); }
void setCloseCallback(EventCallback cb)
{ closeCallback_ = std::move(cb); }
void setErrorCallback(EventCallback cb)
{ errorCallback_ = std::move(cb); }
一个文件描述符会发生可读、可写、关闭、错误事件。当发生这些事件后,就需要调用相应的处理函数来处理。外部通过调用上面这四个函数可以将事件处理函数放进Channel类中,当需要调用的时候就可以直接拿出来调用了。
将Channel中的文件描述符及其感兴趣事件 注册到事件监听器上或从事件监听器上移除
void enableReading() { events_ |= kReadEvent; update(); }
void disableReading() { events_ &= ~kReadEvent; update(); }
void enableWriting() { events_ |= kWriteEvent; update(); }
void disableWriting() { events_ &= ~kWriteEvent; update(); }
void disableAll() { events_ = kNoneEvent; update(); }
外部通过这几个函数来告知Channel你所监管的文件描述符都对哪些事件类型感兴趣,并把这个文件描述符及其感兴趣事件封装成Channel然后注册到事件监听器(IO多路复用模块)上。上面这些函数里面都有一个update()
私有成员方法,update函数如下:
void Channel::update()
{
addedToLoop_ = true;
loop_->updateChannel(this);
}
void EventLoop::updateChannel(Channel* channel)
{
assert(channel->ownerLoop() == this);
assertInLoopThread();
poller_->updateChannel(channel);
}
调用流程:
Channel层调用(如enableReading)->EventLoop层updateChannel->多态调用具体的Poller执行updateChannel
最后的EpollPoller的updateChannel里面本质上是调用了epoll_ctl()
。
结合下面的图可以更好的理解我们调用Channel::enableReading和其他设置感兴趣类型的函数目前是在哪一层。在“✨4.3 梳理EventLoop、Channel、Poller在整个Multi-Reactor通信架构中的角色”会进行一个总结
监听到事件后 将事件封装到Channel中
void set_revents(int revt) { revents_ = revt; }
当事件监听器监听到某个文件描述符发生了什么事件,通过这个函数可以将这个文件描述符实际发生的事件封装进这个Channel中,进行赋值revents_成员变量,供后面handleEvent使用。
epoll发生事件后交由Cahnnel::handleEvent处理
void handleEvent(Timestamp receiveTime);
当调用epoll_wait()
后,可以得知事件监听器上哪些Channel(文件描述符)发生了哪些事件,事件发生后自然就要调用这些Channel对应的处理函数。 Channel::handleEvent
,让每个发生了事件的Channel调用自己保管的事件处理函数。每个Channel会根据自己文件描述符实际发生的事件(通过Channel中的revents_
变量得知)和感兴趣的事件(通过Channel中的events_
变量得知)来选择调用read_callback_
和/或write_callback_
和/或close_callback_
和/或error_callback_
,同时使用tie_
机制确保在处理事件期间Channel的所有者对象不会被销毁。
receiveTime参数是一个Timestamp类型参数,表示事件发生的时间戳,主要用于传递给readCallback_,让应用层知道事件发生的准确时间,在日志记录和性能分析中很有用
Channel::handleEvent内部实际调用处理的函数handleEventWithGuard
void Channel::handleEventWithGuard(Timestamp receiveTime)
Channel::handleEvent
函数内部会去调用这个函数,它是是实际执行事件处理的内部函数。处理的事件类型包括:
- POLLOUT: 可写事件,调用writeCallback_
- POLLHUP: 连接挂起事件,调用closeCallback_
- POLLERR/POLLNVAL: 错误事件,调用errorCallback_
- POLLIN/POLLPRI/POLLRDHUP: 可读事件,调用readCallback_
✨2.3 Channel类的tie机制以及整个生命周期
std::weak_ptr<void> tie_:用于解决回调期间对象销毁的问题,是muduo线程安全设计的重要体现
bool tied_; 标记是否已绑定所有者
void Channel::tie(const std::shared_ptr<void>& obj)
{
tie_ = obj; // 保存为weak_ptr
tied_ = true; // 标记已绑定
}
tie_是一个std::weak_ptr,用于弱引用持有该Channel的对象,主要目的是防止在事件回调期间对象被意外销毁,通过weak_ptr可以安全地检查对象是否还存在
tie机制主要通过std::weak_ptr tie_和bool tied_还有void Channel::tie函数实现的
- 当Channel需要处理事件时,会先尝试将weak_ptr提升为shared_ptr(强引用)
- 如果提升成功,说明对象仍然存在,可以安全执行回调
- 如果提升失败,说明对象已被销毁,跳过回调执行
为什么需要tie,可以看一个场景:
如果没有tie
void TcpConnection::handleRead(Timestamp receiveTime) {
// ... existing code ...
messageCallback_(this, &inputBuffer_, receiveTime); // 使用裸指针this
// 如果回调期间对象被销毁,后续代码将访问非法内存
}
线程A执行消息回调:进行调用messageCallback_函数,而同时线程B同时销毁TcpConnection对象,回调返回后继续执行:
if (n == 0) {
handleClose(); // 访问已释放的成员变量
}
导致的后果:访问已释放内存导致段错误,虚函数表被破坏导致程序崩溃,数据竞争导致不可预测行为
而如果有tie机制的话则会通过shared_ptr的强引用保证回调期间对象存活
生命周期流程理解:
创建 → tie()绑定所有者 → 设置回调 → enableReading() → handleEvent() → remove()
- 创建:构造Channel对象,初始化成员变量
- tie()绑定:将Channel与拥有它的对象(如TcpConnection)绑定
- 设置回调:配置各种事件处理回调函数
- enableReading():注册读事件监听
- handleEvent():事件触发时的处理流程
- remove():从EventLoop中注销Channel
tie()会与它的对象TcpConnection进行绑定
void TcpConnection::connectEstablished()
{
loop_->assertInLoopThread();
assert(state_ == kConnecting);
setState(kConnected);
channel_->tie(shared_from_this());
channel_->enableReading();
connectionCallback_(shared_from_this());
}
在TCP连接建立完成后立即调用channel_->tie(shared_from_this());,这个步骤使用 shared_from_this() 获取当前对象的共享指针,然后建立起建立Channel与TcpConnection的生命周期关联,当epoll检测到事件时,会调用Channel的 handleEvent(),这个函数如下:
void Channel::handleEvent(Timestamp receiveTime)
{
std::shared_ptr<void> guard;
if (tied_)
{
guard = tie_.lock(); // 尝试获取所有者对象的强引用
if (guard)
{
handleEventWithGuard(receiveTime);
}
}
// ... existing code ...
}
可以看到这里面handleEvent() 会先尝试提升weak_ptr,检查对象是否存在,如果存在的话才安全地去调用回调函数,同时weak_ptr也不增加引用计数,不影响正常生命周期管理。这个设计挺秒的,
tie机制设计的意义
weak_ptr 不会增加引用计数,不会阻止对象销毁,通过 lock() 可以安全地检查对象是否还存在,如果对象已销毁, lock() 会返回空指针,从而跳过回调执行,如果对象存在,提升后的 shared_ptr 会保证回调期间对象不被销毁,这种机制完美解决了网络编程中常见的”回调期间对象销毁”问题,是muduo线程安全设计的重要体现。
✨2.4 Channel类总结
核心定位:
- I/O事件分发器,连接文件描述符与事件回调的桥梁
- 采用”One Channel per fd”设计原则
封装了文件描述符(fd)和事件回调,主要处理四种事件:读、写、关闭、错误。Channel可以将事件注册到EventLoop上面表示关心什么事件,然后epoll那边触发事件了就到Channel这边来调用特定的回调函数。
事件注册:
enableReading() // 注册读事件
enableWriting() // 注册写事件
通过 update() 将事件同步到EPoller 告诉I/O多路复用模块关心什么事件
事件处理流程 :
epoll_wait → EventLoop获取活跃Channel → Channel::handleEvent() → 执行对应回调
回调系统:
ReadEventCallback readCallback_; // 可读回调(带时间戳)
EventCallback writeCallback_; // 可写回调
EventCallback closeCallback_; // 关闭回调
EventCallback errorCallback_; // 错误回调
设计亮点
tie机制实现通过shared_ptr的强引用保证回调期间对象存活,避免出现”回调期间对象销毁”问题
✨2.5 个人思考
Channel这个模块看起来没有很大,但却是muduo库里面一个很重要的部分,凡是和fd有关的,以及关心的事件,都离不开这个Channel模块,后面很多都会用到它。
Channel的作用主要就是把epoll的底层事件(EPOLLIN/EPOLLOUT等)封装为高级的读写事件接口,提供统一的 enableReading() / enableWriting() 等方法来管理事件监听,构建了一个介于epoll和业务逻辑之间的中间层,既隐藏了底层IO复用的复杂性又提供了灵活的事件处理能力。
如果是原始的单Reactor没用Channel的话,则会是下面这样:
void eventLoop(int epoll_fd) {
struct epoll_event events[MAX_EVENTS];
while (true) {
int n = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
for (int i = 0; i < n; i++) {
int fd = events[i].data.fd;
uint32_t revents = events[i].events;
// 需要手动处理各种事件类型
if (revents & EPOLLIN) {
// 处理读事件
char buf[1024];
read(fd, buf, sizeof(buf));
// ...业务逻辑...
}
else if (revents & EPOLLOUT) {
// 处理写事件
// ...业务逻辑...
}
else if (revents & EPOLLERR) {
// 处理错误
close(fd);
}
// 需要处理更多事件类型...
}
}
}
这种从代码组织方面来看所有事件处理在一个大循环中,可维护性和可扩展性都比较差,添加新事件类型需修改主循环,而使用Channel的话则将事件处理分散到各回调函数,各事件处理逻辑隔离,当需要添加新事件类型则只需添加新回调。
还有一个是tie机制的引入,这种线程安全的对象生命周期管理挺值得学习的。
三、Poller / EpollPoller/PollPoller
✨3.1 Poller/EpollPoller概述
负责监听文件描述符事件是否触发以及返回发生事件的文件描述符以及具体事件的模块就是Poller。所以一个Poller对象对应一个事件监听器。在multi-reactor模型中,有多少reactor就有多少Poller。
Poller作为基类,抽象接口类,定义统一的多路复用接口,核心职责:
virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels) = 0;
virtual void updateChannel(Channel* channel) = 0;
它会去提供跨平台的事件监听抽象,不直接与poll和epoll交互,由子类实现具体机制。
Poller(抽象基类)
├── EPollPoller(Linux epoll实现)
└── PollPoller(传统poll实现)
✨3.2 Poller/EpollPoller/PollPoller重要成员变量
typedef std::map<int, Channel*> ChannelMap;
ChannelMap channels_;
EventLoop* ownerLoop_;
基类Poller:通过channels_ 维护统一的Channel管理,管理所有注册的Channel(fd到Channel的映射),以及有一个EventLoop对象(后面会展开说)
int epollfd_; // 用epoll_create方法返回的epoll句柄
EventList events_; // epoll_event数组(动态扩容)
// 继承的成员:
// channels_ 来自Poller基类
// ownerLoop_ 来自Poller基类
子类EpollPoller:包含epoll文件描述符和存储epoll_events的容器
typedef std::vector<struct pollfd> PollFdList;
PollFdList pollfds_; // pollfd数组
// 继承的成员:
// channels_ 来自Poller基类
// ownerLoop_ 来自Poller基类
子类PollPoller:包含存储pollfd的容器
✨3.3子类重要的成员函数
poll函数
Timestamp poll(int timeoutMs, ChannelList* activeChannels) override;
这个函数可以说是Poller的核心了,在Poller中这是一个纯虚函数,定义了IO多路复用的统一接口,参数timeoutM表示超时时间(毫秒),activeChannels表示输出参数,用于返回有事件发生的Channel列表,ChannelList是一个std::vector<Channel*>存储Channel指针的一个容器。这个函数作为抽象接口,由子类PollPoller和EpollPoller实现。
Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
LOG_TRACE << "fd total count " << channels_.size();
int numEvents = ::epoll_wait(epollfd_, // epoll实例的文件描述符
&*events_.begin(), // 事件缓冲区
static_cast<int>(events_.size()),
timeoutMs); // <-- 阻塞参数 会阻塞timeoutMs毫秒
int savedErrno = errno;
Timestamp now(Timestamp::now());
if (numEvents > 0)
{
LOG_TRACE << numEvents << " events happened";
fillActiveChannels(numEvents, activeChannels);
if (implicit_cast<size_t>(numEvents) == events_.size())
{
events_.resize(events_.size()*2);
}
}
......剩余代码
}
在子类EpollPoller中,当Timestamp EPollPoller::poll方法被调用的时候,它的工作流程是这样的:
- 调用 epoll_wait 等待事件发生
- 通过 fillActiveChannels 方法将活跃事件填充到 activeChannels 容器中
- 返回事件发生的时间戳
这里有两个要梳理的点,一个是这里的epoll_wait如果满足一定条件,它是会去阻塞的,可以看到它设置了timeoutMs参数,另一个要理的是fillActiveChannels函数,先看看epoll_wait吧
timeoutMs参数是来自EventLoop上层传来的 kPollTimeMs ,默认值10000ms,是10s。当以下情况同时满足的时候,epoll会去进行阻塞:
- 没有文件描述符事件发生
- 没有其他线程调用wakeup()触发eventfd (后面EventLoop会讲到)
- 未达到超时时间
# 场景1:无事件且无唤醒
poll() -> 阻塞10秒 -> 超时返回 -> 处理空队列activeChannels_ -> 开启下一轮poll()
# 场景2:中途有唤醒事件
poll()开始 -> 阻塞3秒后收到唤醒 -> 立即返回 -> Channel::handleEvent处理事件 -> 继续下一轮poll()
为什么这里要设置这个epoll阻塞呢?不是说好的网络IO非阻塞设计吗?其实是不冲突的。epoll_wait是阻塞的 (刻意设计用于高效等待事件),而socket是非阻塞的 (通过 fcntl(O_NONBLOCK) 设置),socket的非阻塞是在Socket对象构造函数的时候就会去设置好了的。
组件 | 阻塞特性 | 设计目的 |
---|---|---|
epoll_wait | 阻塞式等待 | 降低 CPU 空转,高效事件检测 |
socket I/O | 非阻塞操作 | 避免进程挂起,支持异步处理 |
增加唤醒机制的量化对比:
# 性能对比(假设单核CPU)
| 唤醒方式 | 延迟 | CPU占用率 |
|------------|---------|----------|
| 阻塞10秒 | 10秒 | 0.1% |
| 忙轮询 | <1ms | 100% |
这里的唤醒机制可以看下“✨4.2 EventLoop重要成员函数”整理的唤醒机制。
接下来是fillActiveChannels 函数的梳理:
activeChannels是一个ChannelList*对象,用来存放活跃的Channel
void EPollPoller::fillActiveChannels(int numEvents,
ChannelList* activeChannels) const
{
assert(implicit_cast<size_t>(numEvents) <= events_.size());
for (int i = 0; i < numEvents; ++i)
{
Channel* channel = static_cast<Channel*>(events_[i].data.ptr);
#ifndef NDEBUG
int fd = channel->fd();
ChannelMap::const_iterator it = channels_.find(fd);
assert(it != channels_.end());
assert(it->second == channel);
#endif
channel->set_revents(events_[i].events);
activeChannels->push_back(channel);
}
}
上面是fillActiveChannels 这个方法,它的参数一个是numEvents也就是epoll_wait返回的活跃事件数量,一个是activeChannels用于存放有事件发生的Channel指针。它先遍历 events_ 容器(存储epoll返回的事件),对前 numEvents 个活跃事件进行处理,每个事件的处理步骤:
- 1、从 epoll_event.data.ptr 取出关联的Channel对象
- 2、将事件类型设置到Channel的 revents_ 成员 (这步很重要,Channel在handleEvent里就是根据这个revents_ 去看看要处理什么事件,可以回看下前面的Channel章节)
- 3、将Channel指针加入活跃列表 activeChannels
整体就是:
epoll_wait → 填充events_ → fillActiveChannels处理 → 设置Channel.revents_ → 加入activeChannels
updateChannel函数
EPollPoller::updateChannel
void EPollPoller::updateChannel(Channel* channel) {
// 状态判断与操作分发
const int index = channel->index();
if (index == kNew || index == kDeleted) {
// 处理新注册或删除后重新注册的 Channel
int fd = channel->fd();
if (index == kNew) {
assert(!channels_.count(fd)); // 确保首次注册
channels_[fd] = channel; // 添加到文件描述符映射表
}
channel->set_index(kAdded); // 更新状态为已注册
update(EPOLL_CTL_ADD, channel); // 执行 epoll_ctl 添加操作
} else {
// 处理已有 Channel 的更新
if (channel->isNoneEvent()) {
update(EPOLL_CTL_DEL, channel); // 无监听事件则移除
channel->set_index(kDeleted); // 标记为已删除状态
} else {
update(EPOLL_CTL_MOD, channel); // 有事件则修改监听类型
}
}
}
作为低层的更新状态的函数,它会去进行状态判断与操作分发
| 操作类型 | 触发条件 | 系统调用 |
|----------------|-----------------------------|-----------------|
| EPOLL_CTL_ADD | Channel 首次注册 (kNew) | epoll_ctl(ADD) |
| EPOLL_CTL_MOD | 已注册 Channel 事件类型变化 | epoll_ctl(MOD) |
| EPOLL_CTL_DEL | Channel 取消所有事件监听 | epoll_ctl(DEL) |
- 新连接建立时通过 EPOLL_CTL_ADD 注册读事件
- 发送缓冲区满时通过 EPOLL_CTL_MOD 添加写事件
- 连接关闭时通过 EPOLL_CTL_DEL 移除监听
EPollPoller::update函数
void EPollPoller::update(int operation, Channel* channel)
{
struct epoll_event event;
memZero(&event, sizeof event);
event.events = channel->events();
event.data.ptr = channel;
int fd = channel->fd();
LOG_TRACE << "epoll_ctl op = " << operationToString(operation)
<< " fd = " << fd << " event = { " << channel->eventsToString() << " }";
if (::epoll_ctl(epollfd_, operation, fd, &event) < 0)
{
if (operation == EPOLL_CTL_DEL)
{
LOG_SYSERR << "epoll_ctl op =" << operationToString(operation) << " fd =" << fd;
}
else
{
LOG_SYSFATAL << "epoll_ctl op =" << operationToString(operation) << " fd =" << fd;
}
}
}
作为最低层的更新状态的函数,当Channel的事件状态变化时(enableReading()/disableWriting()),最终会触发这里的epoll事件更新
支持三种操作类型:EPOLL_CTL_ADD(增加) ,EPOLL_CTL_MOD(修改) ,EPOLL_CTL_DEL(删除)
会去创建epoll_event结构体并填充事件数据
struct epoll_event event;
event.events = channel->events(); // 从Channel获取关注的事件类型
event.data.ptr = channel; // 将Channel对象指针关联到事件
✨3.4 Poller/EpollPoller/PollPoller总结
Poller (抽象基类) 定义了IO多路复用的统一接口,采用抽象基类设计,强制子类实现关键方法,维护ChannelMap(fd到Channel的映射)。
而子类EpollPoller和PollPoller则分别基于不同的系统调用实现了这些接口功能,它们与EventLoop和Channel形成了紧密的协作关系。这两种实现都通过EventLoop进行统一调度,当Channel需要更新监控状态时,会通过EventLoop::updateChannel方法转发给具体的Poller实现。这种设计使得muduo能够根据运行环境自动选择最优的IO多路复用实现,同时保持了上层接口的统一性。
✨3.5 个人思考
Poller、EpollPoller、PollPoller实现了抽象与实现的分离,这种设计使得上层业务逻辑只需关注统一的接口,而不用关心底层是使用epoll还是poll实现。例如下面的过程:
Channel::enableReading()
→ Channel::update()
→ EventLoop::updateChannel()
→ Poller::updateChannel()
→ EPollPoller::updateChannel()或PollPoller::updateChannel()
这种设计完美体现了面向对象的设计原则,特别是”依赖倒置”原则,即高层模块不依赖低层模块,二者都依赖抽象,(EventLoop作为高层模块,只依赖低层的Poller抽象接口去调用具体的Poller->EPollPoller/PollPoller)。
在实际开发中,这种分层思想可以应用于任何需要支持多平台的模块设计,可以轻松支持Linux系统和其他系统。
四、EventLoop
我们回看前面的Poller模块可以发现,Poller是封装了和事件监听有关的方法和成员,调用一次Poller::poll
方法它就能你返回事件监听器的监听结果(发生事件的fd 及其 发生的事件)。
virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels) = 0;
但是作为一个网络服务器,需要有持续监听、持续获取监听结果、持续处理监听结果对应的事件的能力,也就是我们需要循环的去 【调用Poller:poll
方法获取实际发生事件的Channel集合,然后调用这些Channel里面保管的不同类型事件的处理函数(调用Channel::HandlerEvent
方法)。】
EventLoop就是负责实现“循环”,负责驱动“循环”的重要模块!!Channel和Poller其实相当于EventLoop的手下,EventLoop整合封装了二者并向上提供了更方便的接口来使用。
✨4.1 EventLoop重要成员变量
I/O核心组件:
std::unique_ptr<Poller> poller_; // 多路复用器(自动管理EPollPoller/PollPoller)
ChannelList activeChannels_; // 当前活跃Channel列表(来自Poller::poll())
Channel* currentActiveChannel_; // 正在处理的事件Channel(调试用)
线程绑定:
const pid_t threadId_; // 所属线程ID(通过CurrentThread::tid()初始化)
定时器管理:
std::unique_ptr<TimerQueue> timerQueue_; // 定时器队列(管理定时任务)
跨线程唤醒机制:
int wakeupFd_; // eventfd文件描述符
std::unique_ptr<Channel> wakeupChannel_; // 唤醒事件专用Channel
任务队列:
mutable MutexLock mutex_; // 互斥锁(保护pendingFunctors_)
std::vector<Functor> pendingFunctors_; // 待执行回调队列(跨线程安全)
这些成员组成了Reactor模式的核心实现,其中
- poller_ 与 activeChannels_ 负责IO事件检测
- wakeupFd_ + wakeupChannel_ 实现跨线程唤醒
- mutex_ + pendingFunctors_ 保障线程安全的任务队列
✨4.2 EventLoop重要成员函数
void EventLoop::loop() 事件循环核心
void EventLoop::loop()
{
while (!quit_) {
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
// 处理IO事件
for (Channel* channel : activeChannels_) {
currentActiveChannel_ = channel;
channel->handleEvent(pollReturnTime_);
}
// 执行pending任务
doPendingFunctors();
}
}
这个函数是EventLoop里面最重要的一个函数,是Reactor模式的核心实现。每个EventLoop对象都唯一绑定了一个线程,这个线程其实就在一直执行这个函数里面的while循环,这个while循环的大致逻辑比较简单。就是调用Poller::poll
方法获取事件监听器上的监听结果。接下来在loop里面就会调用监听结果中每一个Channel的处理函数HandlerEvent( )
。每一个Channel的处理函数会根据Channel类中封装的实际发生的事件,执行Channel类中封装的各事件处理函数。(比如一个Channel发生了可读事件,可写事件,则这个Channel的HandlerEvent( )
就会调用提前注册在这个Channel的可读事件和可写事件处理函数,又比如另一个Channel只发生了可读事件,那么HandlerEvent( )
就只会调用提前注册在这个Channel中的可读事件处理函数)
EventLoop 线程启动
↓
while(true) {
↓
Poller::poll() 获取事件列表
↓
遍历列表中的每个 Channel
↓
Channel::HandlerEvent()
↓
根据实际事件 → 调用对应回调
↓
} 回到循环开始
✨4.3 EventLoop处理异步任务的核心方法
runAfter() 和 runInLoop() 是 EventLoop 中处理异步任务的核心方法
void EventLoop::runInLoop(Functor cb)
{
if (isInLoopThread()) { // 当前线程是事件循环线程
cb(); // 立即执行回调
} else {
queueInLoop(std::move(cb)); // 跨线程时需要加入队列
}
}
EventLoop::runInLoop函数去执行任务的时候会先去判断当前线程是否是事件循环线程,是则立即执行回调,如果是跨线程则需要将回调函数加入线程安全的任务队列,传入queueInLoop函数后面加入一个队列去异步执行,执行的时候还会设计一个唤醒机制wake,后面会提及。这样的设计确保特定操作在 IO 线程执行。
void EventLoop::queueInLoop(Functor cb)
{
{
MutexLockGuard lock(mutex_);
pendingFunctors_.push_back(std::move(cb)); //将回调函数存入EventLoop的待执行任务队列
}
if (!isInLoopThread() || callingPendingFunctors_) //不在I/O线程里 需要去wake
{
wakeup();
}
}
EventLoop::queueInLoop函数在muduo网络库中承担跨线程任务调度的核心职责,当调用 loop_->queueInLoop() 时,会将回调函数存入EventLoop的待执行任务队列(pendingFunctors_),如果当前线程不是EventLoop所属线程,或EventLoop正在处理回调,会进行调用wakeup函数,内部会通过eventfd唤醒处于poll状态的EventLoop线程。
TimerId EventLoop::runAfter(double delay, TimerCallback cb)
{
Timestamp time(addTime(Timestamp::now(), delay));
return runAt(time, std::move(cb));
}
// runAt函数
TimerId EventLoop::runAt(Timestamp time, TimerCallback cb)
{
return timerQueue_->addTimer(std::move(cb), time, 0.0);
}
runAfter() -> runAt() -> TimerQueue::addTimer()
EventLoop::runAfter函数通过 addTime 函数将当前时间 Timestamp::now() 与延迟时间delay相加,得到未来时间点,然后托给 runAt 函数处理绝对时间的定时任务。
定时器到期 -> timerfd 产生读事件
-> Poller 返回活跃通道
-> Channel::handleEvent 调用 TimerQueue::handleRead
-> 执行用户注册的定时器回调
✨4.4 跨线程唤醒机制
在muduo网络库中,唤醒机制的核心是通过eventfd实现的线程间事件通知机制。主要作用是在跨线程操作时唤醒事件循环线程,确保事件循环及时处理新的任务或I/O事件。让 EventLoop 从 poll/epoll_wait
的等待中立即返回,处理“其他线程”投递的任务。
void EventLoop::wakeup()
{
uint64_t one = 1;
ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);
if (n != sizeof one)
{
LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
}
}
这里的wakeupFd_在EventLoop的构造函数会去进行初始化
int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 后将evtfd初始化给wakeupFd_
也就是说wakeupFd_是一个eventfd文件描述符。
接着会去将wakeupFd_封装成Channel,监听可读事件
wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));
wakeupChannel_->enableReading(); // 注册到poller监听可读事件
当wakeup函数进行对wakeupFd_描述符进行write写入字节的时候,就会去触发wakeupChannel_的读回调!!!读回调会去调用什么呢?会调用EventLoop::handleRead函数 此时就会去解除 poll/epoll_wait 的阻塞状态!!
void EventLoop::handleRead() {
uint64_t one;
ssize_t n = sockets::read(wakeupFd_, &one, sizeof one); // 必须读取数据,否则会重复触发
// 继续处理pendingFunctors_中的任务...
}
具体流程:
其他线程调用 wakeup()
↓
向 wakeupFd_ 写入数据(触发可读事件)
↓
poll/epoll_wait 立即返回(解除阻塞)
↓
EventLoop 处理活跃通道 → wakeupChannel_ 触发 handleRead()
↓
读取 wakeupFd_ 数据(清空事件,避免重复触发)
↓
执行 doPendingFunctors() 处理任务队列 (这些任务在queueInLoop函数里会去加入)
处理异步任务的核心机制EventLoop::doPendingFunctors
void EventLoop::doPendingFunctors()
{ // typedef std::function<void()> Functor;
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
MutexLockGuard lock(mutex_);
functors.swap(pendingFunctors_); // 原子交换减少锁持有时间
}
for (const Functor& functor : functors) // 遍历functors执行回调函数
{
functor();
}
callingPendingFunctors_ = false;
}
doPendingFunctors函数会去通过互斥锁保证线程安全,以及使用原子交换减少锁持有时间,将队列内容转移到局部变量,减少锁竞争,然后去遍历functors执行回调函数,这些回调函数在queueInLoop的时候加入的。
EventLoop::updateChannel函数
他将传来的Channel进行再一层传递给Poller,然后进行多态调用具体的Poller例如EpollPoller的UpdateChannel函数,进行epoll_ctl()进行最终的事件注册。
void EventLoop::updateChannel(Channel* channel)
{
assert(channel->ownerLoop() == this);
assertInLoopThread();
poller_->updateChannel(channel);
}
线程绑定、IO线程安全机制
void EventLoop::assertInLoopThread()
{
if (!isInLoopThread()) {
abortNotInLoopThread(); // 确保线程安全
}
}
在muduo中,每个EventLoop通过 threadId_ 成员与创建线程绑定,实现”one loop per thread”的设计模式。像EventLoop的成员函数updateChannel和removeChannel,它们在执行的时候都会去进行检查是否是在当前IO线程里面执行的,如果不是则会调用abortNotInLoopThread函数报错。
✨4.5 梳理EventLoop、Channel、Poller在整个Multi-Reactor通信架构中的角色
组件角色梳理:
EventLoop (事件循环中枢)
- 每个IO线程运行一个EventLoop(一个EventLoop对应一个Reactor线程)
- 负责驱动Poller进行事件监听
- 管理Channel生命周期和回调调度
- 线程绑定:通过 assertInLoopThread() 确保线程安全
Channel (事件处理器)
- 封装文件描述符和事件回调
- 事件状态管理(EPOLLIN/EPOLLOUT等)
- 通过 update() 方法触发Poller更新
Poller (IO复用抽象层)
- 实现具体IO复用机制(EPollPoller)
- 维护fd到Channel的映射表( channels_ )
- 通过 epoll_ctl 系统调用管理事件注册
我们可以模拟一次完整的通信流程,梳理下各个模块协同工作的流程:
// 初始化阶段
EventLoop mainLoop; // 主Reactor
// Acceptor是运行在主Reactor上面的连接器 连接到达后dispatch给子Reactor
Acceptor acceptor(&mainLoop, listenAddr); // 创建监听套接字
// 1. 注册监听事件
acceptor.listen() // 绑定在Acceptor监听的套接字的Channel他会去设置读事件回调
→ Channel::enableReading() // 从Channel层开始到EventLoop再到最后的EPollPoller层
→ Channel::update()
→ EventLoop::updateChannel()
→ EPollPoller::updateChannel(EPOLL_CTL_ADD)
// 2. 新连接到达
epoll_wait返回监听socket的EPOLLIN事件
→ EventLoop处理活跃Channel
→ Acceptor::handleRead() 创建TcpConnection // 当有新连接到了就会去触发读回调设置好的handleRead函数 会去创建一个TcpConnection对象 表示一个连接
// 3. TcpConnection建立
TcpConnection conn(subLoop, sockfd); // 子Reactor
conn.connectEstablished()
→ Channel::enableReading() // TcpConnection对象会去进行绑定到一个子Reactor中去,然后设置Channel对象的读回调,也就是当数据到来的时候就会去触发读回调函数
→ subLoop的Poller执行EPOLL_CTL_ADD
// 4. 数据到达处理
epoll_wait返回客户端socket的EPOLLIN事件
→ subLoop的EventLoop调用Channel::handleEvent()
→ TcpConnection::handleRead() 读取数据
→ 触发用户设置的消息回调
// 5. 数据发送流程
conn.send(data) → 若输出缓冲区满
→ Channel::enableWriting()
→ EPollPoller::updateChannel(EPOLL_CTL_MOD)
→ 当可写时触发EPOLLOUT事件
→ TcpConnection::handleWrite() 发送数据
可以看到这个事件注册链是一层接一层的:
Channel状态变化 → EventLoop::updateChannel() → Poller::updateChannel() → epoll_ctl系统调用
多Reactor协作:
Main Reactor(accept线程)
│
├── 通过epoll监听listen_fd
│
└── 新连接到达后dispatch给Sub Reactor(IO线程)
Sub Reactor(处理IO)
│
├── 管理多个TcpConnection的Channel
│
└── 独立运行epoll_wait事件循环
跨线程调度:
// 非IO线程操作示例
otherThread.postTask([]{
mainLoop.queueInLoop([]{
acceptor.listen(); // 确保在IO线程执行
});
});
总结:
Channel 是“事件-回调”的载体,EventLoop 是“线程-驱动”的心脏,Poller 是“fd-监控”的引擎;三者通过 update 链式调用把状态变化从 Channel 经 EventLoop 同步到 Poller,最终由 epoll 完成事件注册与分发,构成 Muduo 的 Reactor 核心。
✨4.6 个人思考
EventLoop模块作为事件循环中枢,在Multi-Reactor中是一个很重要的角色,一个EventLoop对应一个Reactor线程,通过EventLoop实现线程-驱动模式。
EventLoop这个模块中有两个是挺值得学习的。一个是处理异步任务的核心方法queueInLoop,一个是跨线程唤醒的机制wakeup。
Thread A → queueInLoop() → wakeup() → eventfd写入 → 中断epoll_wait
↓
Thread B的EventLoop → 处理wakeup事件 → 执行pendingFunctors_
机制拆解:
queueInLoop():把 lambda/std::function 塞进 pendingFunctors_,保证所有对 EventLoop 状态的修改都发生在其所属线程。
wakeup():向 eventfd 写入 8 字节,立即打断阻塞在 epoll_wait 上的 EventLoop 线程,让它在下一轮循环里把任务取出来执行。
这两步把“线程驱动”变成了“消息驱动”,线程外:只管投递,不碰内部状态,线程内:拿到消息后再统一修改状态,天然无锁。
可以看看缺失这两种机制的典型弊端:
场景 1:没有 queueInLoop(),直接在别的线程调用 Channel::update()
假设 Thread A 直接执行 conn->channel()->enableWriting(),而 Thread B 的 EventLoop 此时正在遍历 channels_ 并准备 epoll_ctl(EPOLL_CTL_DEL)。
结果:两个线程并发操作同一 fd 的 epoll 注册,极易触发 “Bad file descriptor” 或 “Operation now in progress”,甚至导致 epoll 内部红黑树被破坏,进程 core dump。
场景 2:有 queue 但忘了 wakeup()
Thread A 向 pendingFunctors_ 推了一个“立刻关闭连接”的任务,但由于没有唤醒,Thread B 的 EventLoop 仍阻塞在 epoll_wait。
任务延迟到下一次“自然”事件到来时才执行,客户端已超时重传多次,用户体验极差;极端情况下,大量 CLOSE_WAIT 堆积,耗尽文件描述符。
queueInLoop() 和 wakeup() 像一对“邮差 + 门铃” ,邮差(queueInLoop)把信件放到信箱(pendingFunctors_),门铃(wakeup)叫醒屋里的人(EventLoop)去取信。