专业编程基础技术教程

网站首页 > 基础教程 正文

c++ 线程池 C++ 线程池代码

ccvgpt 2024-11-11 11:23:46 基础教程 7 ℃

线程池

线程池是一种常用的组件, 在使用时需要使用者保证外部资源有效,避免在调用时资源已经释放,从而导致崩溃。下面是一个简单实现

#include <set>
#include <queue>
#include <vector>  
#include <mutex>
#include <thread>  
#include <functional>
#include <condition_variable>

class ThreadPool {
public:
    ThreadPool(int maxThreads = 8) : m_maxThreads(maxThreads) {}
    ~ThreadPool() { Stop(); }

    //添加任务
    int AddTask(std::function<void()>&& func, int type = 0) {
        
        std::lock_guard<std::mutex> lock(m_mutex);
        if (m_maxTaskId > 0x3FFFFFFA) {
            m_maxTaskId = 1;
        }
        Task task(m_maxTaskId++, type, func);
        m_tasks.emplace(task);
        m_cond.notify_one();
        if (!m_threads.empty() && (m_maxThreads == m_threads.size() || m_tasks.size() < 2)) {
            return task.id;
        }

        m_threads.emplace_back([this]() {

            Task task(0, 0, nullptr);
            int freeCount = 0;
            do {
                if (GetTask(task)) {
                    freeCount = 0;
                    task.func();
                    continue;
                }

                task.Clear();
                if (++freeCount > 120) {
                    std::lock_guard<std::mutex> lock(m_mutex);
                    std::thread::id tid = std::this_thread::get_id();
                    for (auto iter = m_threads.begin(); m_tasks.empty() && iter != m_threads.end(); ++iter) {
                        if (iter->get_id() != tid) {
                            continue;
                        }    
                        iter->detach();
                        m_threads.erase(iter);
                        return;
                    }
                }
            } while (!m_exit);
        });

        return task.id;
    }

    //删除特定ID的任务
    void RemoveFromId(int id) {

        std::lock_guard<std::mutex> lock(m_mutex);
        if (!m_tasks.empty() && m_tasks.front().id < id) {
            m_removeId.emplace(id);
        }
    }

    //删除特定类型的任务(一般用于外部资源清理之前)
    void RemoveFromType(int type) {

        std::lock_guard<std::mutex> lock(m_mutex);
        if (type != 0) {
            m_removeType.emplace(Type(type, m_maxTaskId));
        }
    }

    //获取全新未用任务类型
    int GetNewType() {

        std::lock_guard<std::mutex> lock(m_mutex);
        return m_maxTaskType++;
    }

    //关闭
    void Stop() {

        m_exit = true;
        while (!m_threads.empty()) {
            if (m_threads[0].joinable()) {
                m_threads[0].join();
            }
            m_threads.erase(m_threads.begin());
        }
    }
private:
    struct Task;
    bool GetTask(struct Task& task) {

        std::unique_lock<std::mutex> lock(m_mutex);
        if (m_tasks.empty()) {
            //|| std::cv_status::timeout == m_cond.wait_for(lock, std::chrono::seconds(1))
            m_cond.wait_for(lock, std::chrono::seconds(1), [this]() {return m_exit || !m_tasks.empty(); });
        }

        if (!m_exit && !m_tasks.empty()) {

            task = std::move(m_tasks.front());
            m_tasks.pop();

            //判断是否已经删除的任务
            if (!m_removeId.empty() && m_removeId.end() != m_removeId.find(task.id)) {
                m_removeId.erase(task.id);
                return false;
            }

            //判断是否已经删除的任务类型
            if (!m_removeType.empty() && m_removeType.end() != m_removeType.find(Type(task.type))) {
                if (task.id < m_removeType.find(Type(task.type))->maxId) {
                    return false;
                }
                m_removeType.erase(Type(task.type));
            }
            return true;
        }
        return false;
    }

private:
    struct Task {
        int id;
        int type;
        std::function<void()> func;
        
        Task(int _id, int _type, std::function<void()> _func) : id(_id), type(_type),func(_func){}
        void operator()()const { func(); }
        void Clear() { func = nullptr;}
    };

    struct Type {
        int type;
        int maxId;
        Type(int _type, int _maxId = 0) : type(_type), maxId(_maxId){}
        bool operator<(const Type& _type) const {
            return type < _type.type;
        }
    };

private:
    bool m_exit = false;
    int m_maxThreads;
    int m_maxTaskId = 0;
    int m_maxTaskType = 1;
    std::mutex m_mutex;
    std::vector<std::thread> m_threads;
    std::queue<Task> m_tasks;
    std::set<int> m_removeId;
    std::set<Type> m_removeType;
    std::condition_variable m_cond;
};

如下是文心一言的解析:

c++ 线程池 C++ 线程池代码

这段代码定义了一个线程池类ThreadPool。线程池是一种编程模型,它允许你同时执行多个任务,而不是等待一个任务完成后再执行下一个任务。线程池通常用于管理并发任务,以避免创建和销毁线程的开销。

以下是该代码中ThreadPool类的功能和方法的详细解释:

  • 构造函数:线程池的构造函数接受一个可选参数maxThreads,表示线程池中最大线程数。如果没有提供参数,默认值为8。
  • 析构函数:在ThreadPool对象被销毁时调用Stop()方法,以停止所有正在执行的任务并等待所有线程完成。
  • AddTask:此方法用于添加一个任务到线程池中。任务是一个无参数的函数对象,它在执行时不会返回任何值。方法接受两个参数:func表示要执行的任务,type表示任务的类型(在此代码中未使用)。任务通过std::function<void()>类型表示,这是一种通用、类型安全的函数包装器。
  1. RemoveFromId(int id):
* 这个方法接受一个整数参数`id`,并尝试删除具有该ID的任务。
* 它首先使用互斥锁来保护对`m_tasks`的访问,以避免同时发生多个线程的读写操作。
* 如果任务队列不为空,并且队列中的第一个任务的ID小于要删除的ID,那么它将该ID添加到`m_removeId`中。这可能意味着它只删除ID大于或等于给定ID的任务。
  1. RemoveFromType(int type):
* 这个方法接受一个整数参数`type`,并尝试删除具有该类型的任务。
* 它也使用互斥锁来保护对`m_tasks`的访问。
* 如果提供的类型不为0,它将创建一个新的`Type`对象(具体内容未在代码中给出),并将其与最大的任务ID一起添加到`m_removeType`中。
  1. GetNewType():
* 这个方法返回一个新的、未使用的任务类型。
* 它也使用互斥锁来保护对`m_maxTaskType`的访问。
* 它只是简单地递增`m_maxTaskType`并返回新值。
  1. Stop():
* 这个方法用于停止线程池中的所有活动。
* 它设置一个退出标志`m_exit`为true,这可能用于通知线程池中的线程停止执行。
* 然后,它循环等待所有线程完成执行。如果第一个线程(索引为0的线程)是可join的(即仍在运行),则等待它完成,然后从线程池中删除它。这个过程会一直重复,直到所有线程都已完成。

这段代码是一个C++代码片段,它定义了一个私有成员函数GetTask,该函数尝试从一个线程池中获取一个任务。以下是这段代码的详细解释:

  1. 前置声明:
struct Task;

这里声明了一个名为Task的结构体,但具体的定义没有给出。这通常是为了确保在使用结构体之前定义它,但在这里,由于结构体的定义不在当前作用域中,所以编译器会认为它已经定义过了。

  1. 函数定义:
bool GetTask(struct Task& task)

定义了一个名为GetTask的函数,它接受一个Task类型的引用参数task,并返回一个布尔值。这个函数的主要目的是从线程池中获取一个任务。

  1. 互斥锁:
std::unique_lock<std::mutex> lock(m_mutex);

使用互斥锁m_mutex来保护线程池中的数据结构,确保在多线程环境中对它们的访问是线程安全的。

  1. 等待任务: 如果线程池是空的,则线程会等待直到有任务可用或超时。代码中有一个注释掉的分支,表示如果任务超时,线程会继续等待。如果线程池中有任务可用,代码将继续执行。
  2. 检查退出标志和任务: 如果退出标志未设置且线程池不为空,则执行以下操作: a. 从线程池中获取一个任务并将其移动到提供的task参数中。 b. 从线程池中删除该任务。 c. 检查是否需要从已删除的任务ID列表或已删除的任务类型列表中删除该任务。如果需要,则返回false。否则,返回true表示成功获取了任务。
  3. 返回值: 如果成功获取了任务,函数返回true;否则,返回false

总的来说,这段代码定义了一个从线程池中安全获取任务的函数,并处理了与已删除任务相关的逻辑。

  1. Task结构体:
* 定义了一个任务的结构体,其中包含任务的ID、类型、函数和清除函数。
* 构造函数用于初始化任务的ID、类型和函数。
* 定义了一个运算符重载,使得该结构体可以像函数一样被调用。
* 定义了一个`Clear`函数,用于清除任务函数。
  1. Type结构体:
* 定义了一个任务类型的结构体,其中包含类型和最大ID。
* 构造函数接受两个参数,分别是类型和最大ID,并初始化它们。
* 定义了一个运算符重载,使得两个Type对象可以根据它们的类型进行比较。
  1. 私有成员变量:
* `m_exit`: 一个布尔标志,用于指示线程池是否应该退出。
* `m_maxThreads`: 一个整数,表示线程池中的最大线程数。
* `m_maxTaskId` 和 `m_maxTaskType`: 分别表示任务的最大ID和最大类型。
* `m_mutex`: 一个互斥锁,用于保护线程池中的数据结构。
* `m_threads`: 一个线程向量,存储线程池中的所有线程。
* `m_tasks`: 一个任务队列,存储等待执行的任务。
* `m_removeId` 和 `m_removeType`: 分别用于存储要删除的任务ID和任务类型。
* `m_cond`: 一个条件变量,用于等待任务或通知其他线程任务已可用。

示例:

class A {
public:
    A(ThreadPool& _pool) : pool(_pool), type(_pool.GetNewType()){}
    ~A() {
        pool.RemoveFromType(type);
        std::cout << "don't call : " << count << std::endl;
    }

    void Test() {
        for (int i = 0; i < 2000000; i++) {
            pool.AddTask(std::bind(&A::DoSomething, this), type);
        }
    }
    void DoSomething() {

        int xx = count.fetch_add(1);
        //std::cout << "A::DoSomething() : "<< xx << std::endl;
        //std::this_thread::sleep_for(std::chrono::milliseconds(rand()%100 + 1));
    }
private:
    int type = 0;
    std::atomic<int> count = 0;
    ThreadPool& pool;
};

int main() {
    {
        ThreadPool pool;
        {
            A a(pool);
            a.Test();
            std::this_thread::sleep_for(std::chrono::milliseconds(3000));

            for (int i = 0; i < 10; i++) {
                int taskId = pool.AddTask([i]() {
                    std::cout << "this is " << i << std::endl;
                    });
                if (i % 2 == 0) {
                    pool.RemoveFromId(taskId);
                }
            }
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(3000));
        std::cout << "nonono" << std::endl;
    }
    return 0;
}



最近发表
标签列表