网站首页 > 基础教程 正文
Redis的架构随着版本的迭代也在发生着变化。我们常说Redis是单线程模型,主要是说其网络IO操作及命令处理是串行执行的。在Redis 4.0版本做了架构升级,加入了后台线程,其目的主要是为了处理慢命令,防止慢命令降低服务器的处理效率。在Redis 6.0引入了多线程模型,Redis的高耗时主要发送在网络IO这一块,多线程模型的引入就是为了解决网络IO处理慢的问题。
架构演进
单线程模型
- fired events:就绪的事件,通过多路复用程序api获取到的就绪事件,然后根据事件对应的事件处理器rfileProc和wfileProc驱动业务流程。
- aeApiPoll:I/O多路复用API,是基于 epoll_wait/select/kevent 等系统调用的封装,监听等待就绪的事件,它是EventLoop的核心函数,是事件驱动得以运行的基础。
- beforeSleep:事件循环到来前执行的函数,比如其会做client命令回复缓冲区回写客户端操作,AOF缓冲区刷盘操作等。
- acceptTcpHandler:链接应答处理器,其底层是调用操作系统的accept函数来接受客户端连接,同时创建client对象,然后创建AE_READABLE事件到事件队列。
- readQueryFromClient:命令应答处理器,读取客户端发送的数据并执行命令,然后创建AE_WRITABLE事件到事件队列。
- processCommand:命令执行器,是命令的抽象。
- addReplay:当命令执行完成后调用该函数,该函数将回写给客户端的数据写入client对象的命令回复缓冲区中。
- sendReplyToClient&writeToClient:将client 命令回复缓冲区中的数据写入到client 的socket中。
- 事件的详细执行过程参见:《图解Redis-单机数据库实现》中的文件事件执行过程部分
单线程&后台线程模型
Redis的单线程模型也存在一些弊端,比如长耗时的命令可能会阻塞事件循环,导致服务器QPS严重降低。例如:删除一个或多个超大的键值对命令。要解决这个问题,Redis作者也经过的多方案的思考,例如渐进式删除方案、多线程方案等,最终敲定多线程方案,利用多线程来实现这一类非阻塞的命令(详见作者博客:Lazy Redis is better Redis[http://antirez.com/news/93]),在4.0版本中新增了一些ASYNC命令:UNLINK、FLUSHALL ASYNC、FLUSHDB ASYNC。UNLINK命令的实现是将key先从键空间中移除,然后将空间释放的任务加入lazyfree jobs queue中,让Lazyfree Thread逐步去删除。
多线程&后台线程模型
Redis最初的架构设计理念来自于思想:“CPU不会成为性能瓶颈,瓶颈往往来自于内存和网络,因此单线程足够处理”。但随着互联网的飞速发展,Redis所承载的流量越来越大,单线程模式在网络I/O的开销越来越大,导致吞吐量的急剧下降。要想提升Redis的性能,有两个方向可以解决:提升内存的读写速度、增强网络I/O的读写能力,前者更多的只能依赖硬件的发展,而网络I/O的优化可以结合CPU多核优势及零拷贝技术等快速的解决。
Redis在6.0 开始将单Reactor模式升级为了“Multi-Reactors 模式”,其实与真正的Multi-Reactors 模式有一些不同,Redis的实现是将读取客户端请求数据和回复客户端数据这两块的I/O操作使用多线程处理(详见上图),真正执行客户端命令逻辑的还是主线程。
- clients_pending_read:等待读取socket缓冲区数据的client队列,该队列的客户端将会被平均分配给主线程和IO线程。
- clients_pending_write:等待写入客户端socket的client队列,同样,该队列的客户端会被平均分配给I/O线程以及主线程本身。
多线程模型原理
多线程初始化
- Redis进程启动时会在最后一个环节初始化I/O线程池(类似java线程池)。
- 会为每一个I/O线程创建一个本地任务队列、剩余任务计数器及状态锁。
- I/O线程在没有获取任务时是忙轮询状态或者是休眠等待唤醒状态。
- 主线程将任务加入线程本地队列,设置任务数量后,唤起休眠线程开始执行任务。
- 在I/O线程执行任务期间主线程不会访问其正在处理的任务。
- 主线程是通过累计所有线程的剩余任务数来判断是否所有I/O线程执行完成了。
int main(int argc, char **argv) {
...
InitServerLast();
...
}
/* Some steps in server initialization need to be done last (after modules
* are loaded).
* Specifically, creation of threads due to a race bug in ld.so, in which
* Thread Local Storage initialization collides with dlopen call.
* see: https://sourceware.org/bugzilla/show_bug.cgi?id=19329 */
void InitServerLast() {
// Initialize the background system, spawning the thread
bioInit();
// 启动I/O线程
initThreadedIO();
...
}
// 类似java中线程池
void initThreadedIO(void) {
server.io_threads_active = 0; /* We start with threads not active. */
// 就配置了一个I/O线程的话就主线程来处理好了
if (server.io_threads_num == 1) return;
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
/* 按照io_threads_num,设置生成并初始化 IO 线程 */
for (int i = 0; i < server.io_threads_num; i++) {
// 初始化 I/O 线程的本地任务队列。
io_threads_list[i] = listCreate();
if (i == 0) continue; /* 0为主线程,不需要初始化下面的信息 */
/* Things we do only for the additional threads. */
pthread_t tid;
// 每个 I/O 线程会分配一个本地锁,用来休眠和唤醒线程
pthread_mutex_init(&io_threads_mutex[i],NULL);
// 每个 I/O 线程分配一个原子计数器,用来记录当前待执行的任务数量。
io_threads_pending[i] = 0;
// 主线程在启动 I/O 线程的时候会默认先锁住它,直到有 I/O 任务才唤醒它。
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
// 这里开始启动线程IOThreadMain
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
}
}
// 这个类似java中的Runable实现
void *IOThreadMain(void *myid) {
/* The ID is the thread number (from 0 to server.iothreads_num-1), and is
* used by the thread to just manipulate a single sub-array of clients. */
long id = (unsigned long)myid;
char thdname[16];
snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
redis_set_thread_title(thdname);
redisSetCpuAffinity(server.server_cpulist);
makeThreadKillable();
while(1) {
// 忙轮询,100w 次循环,等待主线程分配 I/O 任务。
for (int j = 0; j < 1000000; j++) {
if (io_threads_pending[id] != 0) break;
}
// 忙轮询结束仍没有等到任务,则通过尝试加锁进入休眠,
// 等待主线程分配任务之后调用 startThreadedIO 解锁,唤醒 I/O 线程去执行
if (io_threads_pending[id] == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
serverAssert(io_threads_pending[id] != 0);
if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));
// Process:主线程分配任务给 I/O 线程之时,会把任务均匀分配到每个线程的本地任务队列 io_threads_list[id],
// 但是当 I/O 线程开始执行任务之后,主线程就不会再去访问这些任务队列,避免数据竞争。
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
// 写I/O,将client reply缓冲区内容写入客户端socket
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
}
// 读I/O,读取客户端socket中的数据到client querybuf中
else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
// 所有任务执行完之后把自己的计数器置 0,主线程通过累加所有 I/O 线程的计数器判断是否所有 I/O 线程都已经完成工作。
listEmpty(io_threads_list[id]);
io_threads_pending[id] = 0;
if (tio_debug) printf("[%ld] Done\n", id);
}
}
多线程程读
- clients_pending_read 队列的任务是由主线程获取到就绪的读事件后加入到该队列的。
- 在执行beforeSleep时触发队列任务的分配,通过RR轮询将所有任务均匀分配给所有I/O线程及主线程本身。
- 分配完成后唤醒所有I/O线程开始执行,主线程本身也开始处理任务。
- 主线程处理完任务后会进入盲轮询,知道所有I/O线程的剩余任务数为0时结束忙轮询。
- 主线程在所有任务处理完成后将开始循环处理clients_pending_read队列,处理队列中每一个client读取解析到的命令。
struct redisServer {
...
list *clients_pending_read; /* Client has pending read socket buffers. */
...
}
// 读I/O,读取客户端socket中的数据到client querybuf中
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread, readlen;
size_t qblen;
/* Check if we want to read from the client later when exiting from
* the event loop. This is the case if threaded I/O is enabled. */
if (postponeClientRead(c)) return;
...
}
// 任务入队
int postponeClientRead(client *c)
// 当多线程 I/O 模式开启、主线程没有在处理阻塞任务时,将 client 加入异步队列。
if (server.io_threads_active &&
server.io_threads_do_reads &&
!clientsArePaused() &&
!ProcessingEventsWhileBlocked &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
{
// 将client加入到clients_pending_read队列,等待读
c->flags |= CLIENT_PENDING_READ;
listAddNodeHead(server.clients_pending_read,c);
return 1;
} else {
return 0;
}
}
// 分配
int handleClientsWithPendingReadsUsingThreads(void) {
if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
int processed = listLength(server.clients_pending_read);
if (processed == 0) return 0;
if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);
// 遍历待读取clients_pending_read 队列, 通过 RR 轮询均匀地分配给 I/O 线程及主线程自身。
listIter li;
listNode *ln;
listRewind(server.clients_pending_read,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
// 设置当前 I/O 操作为读取操作,给每个 I/O 线程的计数器设置分配的任务数量,
// 让 I/O 线程可以开始工作:只读取和解析命令,不执行命令。
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
// 主线程参与任务的处理(0号任务队列代表主线程任务队列)
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);
// 忙轮询,当所有I/O线程的剩余任务数累计为0时结束阻塞
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O READ All threads finshed\n");
// 循环执行clients_pending_read队列中所有已读取解析的命令
while(listLength(server.clients_pending_read)) {
ln = listFirst(server.clients_pending_read);
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_READ;
listDelNode(server.clients_pending_read,ln);
/* Clients can become paused while executing the queued commands,
* so we need to check in between each command. If a pause was
* executed, we still remove the command and it will get picked up
* later when clients are unpaused and we re-queue all clients. */
if (clientsArePaused()) continue;
if (processPendingCommandsAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid
* processing the client later. So we just go
* to the next. */
continue;
}
processInputBuffer(c);
/* We may have pending replies if a thread readQueryFromClient() produced
* replies and did not install a write handler (it can't).
*/
if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
clientInstallWriteHandler(c);
}
/* Update processed count on server */
server.stat_io_reads_processed += processed;
return processed;
}
多线程写
- clients_pending_write队列的任务是由主线程在执行完成命令后通过addReply加入的。
- 在执行beforeSleep时触发队列任务的分配,通过RR轮询将所有任务均匀分配给所有I/O线程及主线程本身。
- 分配完成后唤醒所有I/O线程开始执行,主线程本身也开始处理任务。
- 主线程处理完任务后会进入盲轮询,知道所有I/O线程的剩余任务数为0时结束忙轮询。
- 最终确认所有客户端输出缓冲区是否有残留数据,如果有则为 client 注册一个命令回复器 sendReplyToClient,等待客户端写就绪再继续把数据回写。
struct redisServer {
...
list *clients_pending_write; /* There is to write or install handler. */
...
}
// 入队
void addReply(client *c, robj *obj) {
if (prepareClientToWrite(c) != C_OK) return;
...
}
int prepareClientToWrite(client *c) {
...
if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ))
clientInstallWriteHandler(c);
/* Authorize the caller to queue in the output buffer of this client. */
return C_OK;
}
void clientInstallWriteHandler(client *c) {
/* Schedule the client to write the output buffers to the socket only
* if not already done and, for slaves, if the slave can actually receive
* writes at this stage. */
if (!(c->flags & CLIENT_PENDING_WRITE) &&
(c->replstate == REPL_STATE_NONE ||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
{
// 加入写队列
c->flags |= CLIENT_PENDING_WRITE;
listAddNodeHead(server.clients_pending_write,c);
}
}
// 分配
int handleClientsWithPendingWritesUsingThreads(void) {
int processed = listLength(server.clients_pending_write);
if (processed == 0) return 0; /* Return ASAP if there are no clients. */
// 如果用户设置的 I/O 线程数等于 1 或者当前 clients_pending_write 队列中待写出的 client
// 数量不足 I/O 线程数的两倍,则不用多线程的逻辑,让所有 I/O 线程进入休眠,
// 直接在主线程把所有 client 的相应数据回写到客户端。
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}
// 唤醒正在休眠的 I/O 线程(如果有的话)
if (!server.io_threads_active) startThreadedIO();
if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);
// 遍历待写出的 client 队列 clients_pending_write,通过 RR 轮询均匀地分配给 I/O 线程和主线程本身。
listIter li;
listNode *ln;
listRewind(server.clients_pending_write,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
/* Remove clients from the list of pending writes since
* they are going to be closed ASAP. */
if (c->flags & CLIENT_CLOSE_ASAP) {
listDelNode(server.clients_pending_write, ln);
continue;
}
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
// 设置当前 I/O 操作为写出操作,给每个 I/O 线程的计数器设置分配的任务数量,
// 让 I/O 线程可以开始工作,把写出缓冲区(client->buf 或 c->reply)中的响应数据回写到客户端。 io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
// 主线程自己也会去执行读取客户端请求命令的任务,以达到最大限度利用 CPU。
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c,0);
}
listEmpty(io_threads_list[0]);
// 忙轮询,当所有I/O线程的剩余任务数累计为0时结束阻塞
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O WRITE All threads finshed\n");
// 最后再遍历一次 clients_pending_write 队列,检查是否还有 client 的写出缓冲区中有残留数据,
// 如果有,那就为 client 注册一个命令回复器 sendReplyToClient,等待客户端写就绪再继续把数据回写。
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
/* Install the write handler if there are pending writes in some
* of the clients. */
if (clientHasPendingReplies(c) &&
connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
{
freeClientAsync(c);
}
}
listEmpty(server.clients_pending_write);
/* Update processed count on server */
server.stat_io_writes_processed += processed;
return processed;
}
以上就是Redis架构演进的介绍,如果各位还想了解更多,欢迎评论+关注,Redis图解系列专栏持续更新中。
猜你喜欢
- 2024-11-27 【Redis源码分析】Server启动过程
- 2024-11-27 Redis单线程不行了,快来割VM/ BIO/ IO多线程的韭菜!(附源码)
- 2024-11-27 php操作redis大全
- 2024-11-27 GO语言中Redis的相关知识记录
- 2024-11-27 Zookeeper 简介
- 2024-11-27 Redis实战(5)-数据结构Set实战之过滤用户注册重复提交的信息
- 2024-11-27 只需5分钟,完成Redis所有命令操作~
- 最近发表
- 标签列表
-
- gitpush (61)
- pythonif (68)
- location.href (57)
- tail-f (57)
- pythonifelse (59)
- deletesql (62)
- c++模板 (62)
- css3动画 (57)
- c#event (59)
- linuxgzip (68)
- 字符串连接 (73)
- nginx配置文件详解 (61)
- html标签 (69)
- c++初始化列表 (64)
- exec命令 (59)
- canvasfilltext (58)
- mysqlinnodbmyisam区别 (63)
- arraylistadd (66)
- node教程 (59)
- console.table (62)
- c++time_t (58)
- phpcookie (58)
- mysqldatesub函数 (63)
- window10java环境变量设置 (66)
- c++虚函数和纯虚函数的区别 (66)