专业编程基础技术教程

网站首页 > 基础教程 正文

用 C++ 写线程池是怎样一种体验?(线程池c语言)

ccvgpt 2024-07-26 00:48:13 基础教程 10 ℃



用 C++ 写线程池是怎样一种体验?(线程池c语言)

在过去的几年里,我用 C++ 写过各种各样的线程池,包括最简单固定线程数的、运行时动态调整线程数的、有存活时间的、有任务队列长度限制的、底层用纤程的、支持多个线程池合并以复用资源的、脱离管程完全从底层实现的(仅以 POSIX 中 Futex 作为同步原语)、支持定时任务的、支持批量提交的、支持 Future 的等等。所以我认为我有资格回答这个问题。

起初我写线程池的时候完全不知道什么是管程(Monitor,事实上这也是大部分线程池实现的基础,不管是什么语言),只是想封装一个足够便携、扩展性强大的线程池自己用(毕竟当时也不知道什么是“开源”)。

于是乎,我便开始了艰难的尝试,先后用过 Windows 下(没错,那时候我还没接触过 Linux 系统,不了解 C++11 的并发库,也不知道有什么第三方库,主要参考的是 MoreWindows 同学的 CSDN 博客)的各种 API,包括 Event、Mutex、Semaphore 等各种组件,唯独没试过用 CRITICAL_SECTION + ConditionVariable 的黄金组合(这才是“真 · 管程”!),因为 ConditionVariable 是 Windows Vista 以后才支持的,而我当时用的是 Windows XP(微软说,这锅我不背)。

当然,不用管程写线程池的结局当然是很悲惨的(哭)。即使之前搞过算法竞赛,我写出来的线程池似乎也永远都有 de 不完的 bug(我总能构造出让我的线程池罢工的 case,但事实上我到现在也不会用 IDE debug);直到写过几个版本后,我终于用 Windows 的 Event 实现了一个线程池,并用洪荒之力从数学的角度证明了它是正确的——这也是我第一个“可以用”的线程池(隔着屏幕可以感受到我的兴奋吗!)。

正如很多孩子得到一个新玩具后爱不释手那样,我拿这个线程池在我的低配电脑上跑了各种并行算法,根本停不下来!我可怜的 CPU 也因为我的“创作”而日常飙到 100%。后来嘛,也正如小孩子玩一个玩具久了之后就不想玩了,我也开始想搞点更大的事情出来。对了,既然并发程序这么难写,为什么不写一套完整的 C++ 并发库呢!

然后,我就开始了我的“并发之旅”。这期间,我买了各种书,问了各种专家,读了各种 paper,也写过各种不靠谱的“并发库”。经过了 89 次的推翻和自我否定,我终于写成了一套看起来还不错的库,我希望它可以被加入到 C++ 标准库,而且我也正在向这个方向努力(这套库目前已经是一篇 C++ 提案了, http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2017/p0642r1.pdf )。

在我做并发库的同时,我也发现了 C++ 对于多态支持的严重不足(如果提交的任务类型不统一的话,线程池的实现通常也依赖于多态),尤其是一个类与其多态性质的耦合大大降低了程序的扩展性;程序规模越大,这个限制越明显。于是我又做了一套 C++ 多态的解决方案(PFA),目前也是一篇 C++ 提案了(http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/p0957r0.pdf)。当然,任何一个库的设计都可能存在缺陷,也欢迎大家的 feedback!

回归正题,其实用 C++ 写线程池的感觉和用其他语言(例如 Java)很不一样。

姑且不说是实现一个“线程池”,即使是任意一个库,由于 C++ 本身的模板机制和强大的编译优化很容易使得你程序的性能瓶颈暴露无遗,而作为一个基础的“库”,性能通常是一个硬指标,所以要“做好”肯定是有难度的;而 Java 等有 VM 或类 VM 的语言本身就有性能瓶颈,且有运行时的语义抽象开销,所以在你的“库”不会比“语言本身”更低效的情况下,通常不需要考虑性能问题。

而对于“线程池”这类“并发组件”而言,如果我们的目标是“尽善尽美”的话,C++ 的实现难度无疑更大;为了更好的性能,我们也有可能针对不同的平台选择不同的原语。但如果不需要追求这种“理论上的最优”,用 C++ 写一个高性能线程池也并没有大家想象中的那么复杂;只要你可以熟练使用管程(即 std::mutex 和 std::condition_variable)就足够了。例如,下面这就是我写的一个最基本的“固定线程数的线程池”实现:

#include <mutex>
#include <condition_variable>
#include <functional>
#include <queue>
#include <thread>

class fixed_thread_pool {
 public:
  explicit fixed_thread_pool(size_t thread_count)
      : data_(std::make_shared<data>()) {
    for (size_t i = 0; i < thread_count; ++i) {
      std::thread([data = data_] {
        std::unique_lock<std::mutex> lk(data->mtx_);
        for (;;) {
          if (!data->tasks_.empty()) {
            auto current = std::move(data->tasks_.front());
            data->tasks_.pop();
            lk.unlock();
            current();
            lk.lock();
          } else if (data->is_shutdown_) {
            break;
          } else {
            data->cond_.wait(lk);
          }
        }
      }).detach();
    }
  }

  fixed_thread_pool() = default;
  fixed_thread_pool(fixed_thread_pool&&) = default;

  ~fixed_thread_pool() {
    if ((bool) data_) {
      {
        std::lock_guard<std::mutex> lk(data_->mtx_);
        data_->is_shutdown_ = true;
      }
      data_->cond_.notify_all();
    }
  }

  template <class F>
  void execute(F&& task) {
    {
      std::lock_guard<std::mutex> lk(data_->mtx_);
      data_->tasks_.emplace(std::forward<F>(task));
    }
    data_->cond_.notify_one();
  }

 private:
  struct data {
    std::mutex mtx_;
    std::condition_variable cond_;
    bool is_shutdown_ = false;
    std::queue<std::function<void()>> tasks_;
  };
  std::shared_ptr<data> data_;
};

是的!就这么几行代码足够了!不需要保存 std::thread 对象、不需要构造各种复杂的消息模型、不需要构造各种复杂的自动机!核心工作线程的代码仅仅是一个 for 嵌套了一个 if-else 而已! 而且最重要的是,它是正确、可靠的!放在头文件里就能用!

唯一美中不足的就是这个实现不支持提交“不能拷贝构造的对象”,这受限于 std::function 的性质;当然如果把 std::function 换成我关于多态的 C++ 提案中的 proxy 就不存在这个问题了!

另外,关于“管程”的靠谱的中文资料较少。如果题主感兴趣可以去看看 ISO C++ 标准或 IEEE POSIX 标准对其的阐述,搞明白它到底是什么、为什么 wait 需要“原子地”解锁、为什么设计者允许它“假唤醒”。


同学给出的 C++11 实现,有以下看法(很多同学在实现线程池时也会遇到这些问题):


1. 没有必要把停止标志设计为 atomic,更没有必要用 acquire-release 同步。

2. 线程池销毁时等待所有任务执行完成,通常是没有必要的。

3. 工作线程内部存在冗余逻辑;在尚有任务未完成时没有必要检查线程池是否停止。

4. 添加任务时没有必要将任务封装为 std::packaged_task,因为线程池的基本职能是管理 Execution Agents; 如果一定要设计这样一个方法,那也应该保留一个直接提交任务不返回 Future 的方法;事实上,在 C++ 提案 P0443 (http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/p0443r5.html)中,也有类似的设计,不返回 Future 的叫做 "one-way",返回 Future 的叫做 "two-way",很多时候需要从外部控制同步时,"one-way" 比 "two-way" 更实用。

5. 一个 bug:"add" 方法实现中使用了 std::bind,而 std::bind 看起来并不适用于这里。我猜你设计这个方法的语义是执行 add(std::forward<F>(fcn), std::forward<Args>(args)...),从你的返回值类型上也可以看出这一点;但不幸的是,std::bind 的返回值被调用时会讲所有参数转为左值引用! 也就是说,在你的实现中,所有参数在 fnc 执行时都会被拷贝构造一份,对于不能拷贝构造的参数会直接编译出错!

6. 另一个 bug:在线程池的析构函数中,没有对 stop_ 的赋值加锁。为什么需要对 stop_ 的赋值加锁呢?因为这个操作 必须 与工作线程对于 std::condition_variable 的 wait 检查操作互斥!具体原因如下:

对于 std::condition_variable 的 wait(带 Predicate 的版本)展开的代码与下面的代码等价:

while (!pred()) {
  cond_var.wait();
}

如果 pred() 不与对于状态的修改互斥的话,工作线程可能会陷入无线等待,也就导致了线程泄漏。

  • 另外还有一些关于c++ Linux后台服务器开发的一些知识点分享:Linux,Nginx,MySQL,Redis,P2P,K8S,Docker,TCP/IP,协程,DPDK,webrtc,音视频等等视频。
  • 喜欢的朋友可以后台私信【1】获取学习视频

    附上一份c++ Linux后台服务器开发 学习课程大纲给大家


    Tags:

    最近发表
    标签列表