专业编程基础技术教程

网站首页 > 基础教程 正文

图解Redis-Redis架构演进之路

ccvgpt 2024-11-27 12:07:17 基础教程 1 ℃

Redis的架构随着版本的迭代也在发生着变化。我们常说Redis是单线程模型,主要是说其网络IO操作及命令处理是串行执行的。在Redis 4.0版本做了架构升级,加入了后台线程,其目的主要是为了处理慢命令,防止慢命令降低服务器的处理效率。在Redis 6.0引入了多线程模型,Redis的高耗时主要发送在网络IO这一块,多线程模型的引入就是为了解决网络IO处理慢的问题。

架构演进

单线程模型

  • fired events就绪的事件,通过多路复用程序api获取到的就绪事件,然后根据事件对应的事件处理器rfileProc和wfileProc驱动业务流程。
  • aeApiPollI/O多路复用API,是基于 epoll_wait/select/kevent 等系统调用的封装,监听等待就绪的事件,它是EventLoop的核心函数,是事件驱动得以运行的基础。
  • beforeSleep:事件循环到来前执行的函数,比如其会做client命令回复缓冲区回写客户端操作,AOF缓冲区刷盘操作等。
  • acceptTcpHandler:链接应答处理器,其底层是调用操作系统的accept函数来接受客户端连接,同时创建client对象,然后创建AE_READABLE事件到事件队列。
  • readQueryFromClient:命令应答处理器,读取客户端发送的数据并执行命令,然后创建AE_WRITABLE事件到事件队列。
  • processCommand:命令执行器,是命令的抽象。
  • addReplay:当命令执行完成后调用该函数,该函数将回写给客户端的数据写入client对象的命令回复缓冲区中。
  • sendReplyToClient&writeToClient:将client 命令回复缓冲区中的数据写入到client 的socket中。
  • 事件的详细执行过程参见:《图解Redis-单机数据库实现》中的文件事件执行过程部分

单线程&后台线程模型

Redis的单线程模型也存在一些弊端,比如长耗时的命令可能会阻塞事件循环,导致服务器QPS严重降低。例如:删除一个或多个超大的键值对命令。要解决这个问题,Redis作者也经过的多方案的思考,例如渐进式删除方案、多线程方案等,最终敲定多线程方案,利用多线程来实现这一类非阻塞的命令(详见作者博客:Lazy Redis is better Redis[http://antirez.com/news/93]),在4.0版本中新增了一些ASYNC命令:UNLINK、FLUSHALL ASYNC、FLUSHDB ASYNC。UNLINK命令的实现是将key先从键空间中移除,然后将空间释放的任务加入lazyfree jobs queue中,让Lazyfree Thread逐步去删除。

图解Redis-Redis架构演进之路

多线程&后台线程模型

Redis最初的架构设计理念来自于思想:“CPU不会成为性能瓶颈,瓶颈往往来自于内存和网络,因此单线程足够处理”。但随着互联网的飞速发展,Redis所承载的流量越来越大,单线程模式在网络I/O的开销越来越大,导致吞吐量的急剧下降。要想提升Redis的性能,有两个方向可以解决:提升内存的读写速度、增强网络I/O的读写能力,前者更多的只能依赖硬件的发展,而网络I/O的优化可以结合CPU多核优势及零拷贝技术等快速的解决。

Redis在6.0 开始将单Reactor模式升级为了“Multi-Reactors 模式”,其实与真正的Multi-Reactors 模式有一些不同,Redis的实现是将读取客户端请求数据和回复客户端数据这两块的I/O操作使用多线程处理(详见上图),真正执行客户端命令逻辑的还是主线程。

  • clients_pending_read:等待读取socket缓冲区数据的client队列,该队列的客户端将会被平均分配给主线程和IO线程。
  • clients_pending_write:等待写入客户端socket的client队列,同样,该队列的客户端会被平均分配给I/O线程以及主线程本身。

多线程模型原理

多线程初始化

  • Redis进程启动时会在最后一个环节初始化I/O线程池(类似java线程池)。
  • 会为每一个I/O线程创建一个本地任务队列、剩余任务计数器及状态锁。
  • I/O线程在没有获取任务时是忙轮询状态或者是休眠等待唤醒状态。
  • 主线程将任务加入线程本地队列,设置任务数量后,唤起休眠线程开始执行任务。
  • 在I/O线程执行任务期间主线程不会访问其正在处理的任务。
  • 主线程是通过累计所有线程的剩余任务数来判断是否所有I/O线程执行完成了。
int main(int argc, char **argv) {
    ...
        
    InitServerLast();
    
    ...
}

/* Some steps in server initialization need to be done last (after modules
 * are loaded).
 * Specifically, creation of threads due to a race bug in ld.so, in which
 * Thread Local Storage initialization collides with dlopen call.
 * see: https://sourceware.org/bugzilla/show_bug.cgi?id=19329 */
void InitServerLast() {
    // Initialize the background system, spawning the thread
    bioInit();
    // 启动I/O线程
    initThreadedIO();
    
    ...
}

// 类似java中线程池
void initThreadedIO(void) {
    server.io_threads_active = 0; /* We start with threads not active. */

    // 就配置了一个I/O线程的话就主线程来处理好了
    if (server.io_threads_num == 1) return;

    if (server.io_threads_num > IO_THREADS_MAX_NUM) {
        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
                             "The maximum number is %d.", IO_THREADS_MAX_NUM);
        exit(1);
    }

    /* 按照io_threads_num,设置生成并初始化 IO 线程 */
    for (int i = 0; i < server.io_threads_num; i++) {
        // 初始化 I/O 线程的本地任务队列。
        io_threads_list[i] = listCreate();
        if (i == 0) continue; /* 0为主线程,不需要初始化下面的信息 */

        /* Things we do only for the additional threads. */
        pthread_t tid;
        // 每个 I/O 线程会分配一个本地锁,用来休眠和唤醒线程
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        // 每个 I/O 线程分配一个原子计数器,用来记录当前待执行的任务数量。
        io_threads_pending[i] = 0;
        // 主线程在启动 I/O 线程的时候会默认先锁住它,直到有 I/O 任务才唤醒它。
        pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
        // 这里开始启动线程IOThreadMain
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
            exit(1);
        }
        io_threads[i] = tid;
    }
}

// 这个类似java中的Runable实现
void *IOThreadMain(void *myid) {
    /* The ID is the thread number (from 0 to server.iothreads_num-1), and is
     * used by the thread to just manipulate a single sub-array of clients. */
    long id = (unsigned long)myid;
    char thdname[16];

    snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
    redis_set_thread_title(thdname);
    redisSetCpuAffinity(server.server_cpulist);
    makeThreadKillable();

    while(1) {
        // 忙轮询,100w 次循环,等待主线程分配 I/O 任务。
        for (int j = 0; j < 1000000; j++) {
            if (io_threads_pending[id] != 0) break;
        }

        // 忙轮询结束仍没有等到任务,则通过尝试加锁进入休眠,
        // 等待主线程分配任务之后调用 startThreadedIO 解锁,唤醒 I/O 线程去执行
        if (io_threads_pending[id] == 0) {
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue;
        }

        serverAssert(io_threads_pending[id] != 0);

        if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));

        // Process:主线程分配任务给 I/O 线程之时,会把任务均匀分配到每个线程的本地任务队列 io_threads_list[id],
        // 但是当 I/O 线程开始执行任务之后,主线程就不会再去访问这些任务队列,避免数据竞争。
        listIter li;
        listNode *ln;
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            // 写I/O,将client reply缓冲区内容写入客户端socket
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c,0);
            }
            // 读I/O,读取客户端socket中的数据到client querybuf中
            else if (io_threads_op == IO_THREADS_OP_READ) {
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        // 所有任务执行完之后把自己的计数器置 0,主线程通过累加所有 I/O 线程的计数器判断是否所有 I/O 线程都已经完成工作。
        listEmpty(io_threads_list[id]);
        io_threads_pending[id] = 0;

        if (tio_debug) printf("[%ld] Done\n", id);
    }
}

多线程程读

  • clients_pending_read 队列的任务是由主线程获取到就绪的读事件后加入到该队列的。
  • 在执行beforeSleep时触发队列任务的分配,通过RR轮询将所有任务均匀分配给所有I/O线程及主线程本身。
  • 分配完成后唤醒所有I/O线程开始执行,主线程本身也开始处理任务。
  • 主线程处理完任务后会进入盲轮询,知道所有I/O线程的剩余任务数为0时结束忙轮询。
  • 主线程在所有任务处理完成后将开始循环处理clients_pending_read队列,处理队列中每一个client读取解析到的命令。
struct redisServer {
    ...
    
    list *clients_pending_read;  /* Client has pending read socket buffers. */
        
    ...
}

// 读I/O,读取客户端socket中的数据到client querybuf中
void readQueryFromClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    int nread, readlen;
    size_t qblen;

    /* Check if we want to read from the client later when exiting from
     * the event loop. This is the case if threaded I/O is enabled. */
    if (postponeClientRead(c)) return;
    
    ...
}

// 任务入队
int postponeClientRead(client *c) 
    // 当多线程 I/O 模式开启、主线程没有在处理阻塞任务时,将 client 加入异步队列。
    if (server.io_threads_active &&
        server.io_threads_do_reads &&
        !clientsArePaused() &&
        !ProcessingEventsWhileBlocked &&
        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
    {
        // 将client加入到clients_pending_read队列,等待读
        c->flags |= CLIENT_PENDING_READ;
        listAddNodeHead(server.clients_pending_read,c);
        return 1;
    } else {
        return 0;
    }
}

// 分配
int handleClientsWithPendingReadsUsingThreads(void) {
    if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
    int processed = listLength(server.clients_pending_read);
    if (processed == 0) return 0;

    if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);

    // 遍历待读取clients_pending_read 队列, 通过 RR 轮询均匀地分配给 I/O 线程及主线程自身。
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_read,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    // 设置当前 I/O 操作为读取操作,给每个 I/O 线程的计数器设置分配的任务数量,
    // 让 I/O 线程可以开始工作:只读取和解析命令,不执行命令。
    io_threads_op = IO_THREADS_OP_READ;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
    }

    // 主线程参与任务的处理(0号任务队列代表主线程任务队列)
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
    }
    listEmpty(io_threads_list[0]);

    // 忙轮询,当所有I/O线程的剩余任务数累计为0时结束阻塞
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    if (tio_debug) printf("I/O READ All threads finshed\n");

    // 循环执行clients_pending_read队列中所有已读取解析的命令
    while(listLength(server.clients_pending_read)) {
        ln = listFirst(server.clients_pending_read);
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_READ;
        listDelNode(server.clients_pending_read,ln);
        /* Clients can become paused while executing the queued commands,
         * so we need to check in between each command. If a pause was
         * executed, we still remove the command and it will get picked up
         * later when clients are unpaused and we re-queue all clients. */
        if (clientsArePaused()) continue;

        if (processPendingCommandsAndResetClient(c) == C_ERR) {
            /* If the client is no longer valid, we avoid
             * processing the client later. So we just go
             * to the next. */
            continue;
        }
        processInputBuffer(c);

        /* We may have pending replies if a thread readQueryFromClient() produced
         * replies and did not install a write handler (it can't).
         */
        if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
            clientInstallWriteHandler(c);
    }

    /* Update processed count on server */
    server.stat_io_reads_processed += processed;

    return processed;
}

多线程写

  • clients_pending_write队列的任务是由主线程在执行完成命令后通过addReply加入的。
  • 在执行beforeSleep时触发队列任务的分配,通过RR轮询将所有任务均匀分配给所有I/O线程及主线程本身。
  • 分配完成后唤醒所有I/O线程开始执行,主线程本身也开始处理任务。
  • 主线程处理完任务后会进入盲轮询,知道所有I/O线程的剩余任务数为0时结束忙轮询。
  • 最终确认所有客户端输出缓冲区是否有残留数据,如果有则为 client 注册一个命令回复器 sendReplyToClient,等待客户端写就绪再继续把数据回写。
struct redisServer {
    ...
    
    list *clients_pending_write; /* There is to write or install handler. */
        
    ...
}

// 入队
void addReply(client *c, robj *obj) {
    if (prepareClientToWrite(c) != C_OK) return;

    ...
}

int prepareClientToWrite(client *c) {
    ...
        
    if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ))
            clientInstallWriteHandler(c);

    /* Authorize the caller to queue in the output buffer of this client. */
    return C_OK;
}

void clientInstallWriteHandler(client *c) {
    /* Schedule the client to write the output buffers to the socket only
     * if not already done and, for slaves, if the slave can actually receive
     * writes at this stage. */
    if (!(c->flags & CLIENT_PENDING_WRITE) &&
        (c->replstate == REPL_STATE_NONE ||
         (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
    {
        // 加入写队列
        c->flags |= CLIENT_PENDING_WRITE;
        listAddNodeHead(server.clients_pending_write,c);
    }
}

// 分配
int handleClientsWithPendingWritesUsingThreads(void) {
    int processed = listLength(server.clients_pending_write);
    if (processed == 0) return 0; /* Return ASAP if there are no clients. */

    // 如果用户设置的 I/O 线程数等于 1 或者当前 clients_pending_write 队列中待写出的 client
    // 数量不足 I/O 线程数的两倍,则不用多线程的逻辑,让所有 I/O 线程进入休眠,
    // 直接在主线程把所有 client 的相应数据回写到客户端。
    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
        return handleClientsWithPendingWrites();
    }

    // 唤醒正在休眠的 I/O 线程(如果有的话)
    if (!server.io_threads_active) startThreadedIO();

    if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);

    // 遍历待写出的 client 队列 clients_pending_write,通过 RR 轮询均匀地分配给 I/O 线程和主线程本身。
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_write,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;

        /* Remove clients from the list of pending writes since
         * they are going to be closed ASAP. */
        if (c->flags & CLIENT_CLOSE_ASAP) {
            listDelNode(server.clients_pending_write, ln);
            continue;
        }

        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    // 设置当前 I/O 操作为写出操作,给每个 I/O 线程的计数器设置分配的任务数量,
    // 让 I/O 线程可以开始工作,把写出缓冲区(client->buf 或 c->reply)中的响应数据回写到客户端。    io_threads_op = IO_THREADS_OP_WRITE;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
    }

    // 主线程自己也会去执行读取客户端请求命令的任务,以达到最大限度利用 CPU。
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        writeToClient(c,0);
    }
    listEmpty(io_threads_list[0]);

    // 忙轮询,当所有I/O线程的剩余任务数累计为0时结束阻塞
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    if (tio_debug) printf("I/O WRITE All threads finshed\n");

    // 最后再遍历一次 clients_pending_write 队列,检查是否还有 client 的写出缓冲区中有残留数据,
    // 如果有,那就为 client 注册一个命令回复器 sendReplyToClient,等待客户端写就绪再继续把数据回写。
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);

        /* Install the write handler if there are pending writes in some
         * of the clients. */
        if (clientHasPendingReplies(c) &&
                connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
        {
            freeClientAsync(c);
        }
    }
    listEmpty(server.clients_pending_write);

    /* Update processed count on server */
    server.stat_io_writes_processed += processed;

    return processed;
}


以上就是Redis架构演进的介绍,如果各位还想了解更多,欢迎评论+关注,Redis图解系列专栏持续更新中。

最近发表
标签列表