网站首页 > 基础教程 正文
线程池
线程池是一种常用的组件, 在使用时需要使用者保证外部资源有效,避免在调用时资源已经释放,从而导致崩溃。下面是一个简单实现
#include <set>
#include <queue>
#include <vector>
#include <mutex>
#include <thread>
#include <functional>
#include <condition_variable>
class ThreadPool {
public:
ThreadPool(int maxThreads = 8) : m_maxThreads(maxThreads) {}
~ThreadPool() { Stop(); }
//添加任务
int AddTask(std::function<void()>&& func, int type = 0) {
std::lock_guard<std::mutex> lock(m_mutex);
if (m_maxTaskId > 0x3FFFFFFA) {
m_maxTaskId = 1;
}
Task task(m_maxTaskId++, type, func);
m_tasks.emplace(task);
m_cond.notify_one();
if (!m_threads.empty() && (m_maxThreads == m_threads.size() || m_tasks.size() < 2)) {
return task.id;
}
m_threads.emplace_back([this]() {
Task task(0, 0, nullptr);
int freeCount = 0;
do {
if (GetTask(task)) {
freeCount = 0;
task.func();
continue;
}
task.Clear();
if (++freeCount > 120) {
std::lock_guard<std::mutex> lock(m_mutex);
std::thread::id tid = std::this_thread::get_id();
for (auto iter = m_threads.begin(); m_tasks.empty() && iter != m_threads.end(); ++iter) {
if (iter->get_id() != tid) {
continue;
}
iter->detach();
m_threads.erase(iter);
return;
}
}
} while (!m_exit);
});
return task.id;
}
//删除特定ID的任务
void RemoveFromId(int id) {
std::lock_guard<std::mutex> lock(m_mutex);
if (!m_tasks.empty() && m_tasks.front().id < id) {
m_removeId.emplace(id);
}
}
//删除特定类型的任务(一般用于外部资源清理之前)
void RemoveFromType(int type) {
std::lock_guard<std::mutex> lock(m_mutex);
if (type != 0) {
m_removeType.emplace(Type(type, m_maxTaskId));
}
}
//获取全新未用任务类型
int GetNewType() {
std::lock_guard<std::mutex> lock(m_mutex);
return m_maxTaskType++;
}
//关闭
void Stop() {
m_exit = true;
while (!m_threads.empty()) {
if (m_threads[0].joinable()) {
m_threads[0].join();
}
m_threads.erase(m_threads.begin());
}
}
private:
struct Task;
bool GetTask(struct Task& task) {
std::unique_lock<std::mutex> lock(m_mutex);
if (m_tasks.empty()) {
//|| std::cv_status::timeout == m_cond.wait_for(lock, std::chrono::seconds(1))
m_cond.wait_for(lock, std::chrono::seconds(1), [this]() {return m_exit || !m_tasks.empty(); });
}
if (!m_exit && !m_tasks.empty()) {
task = std::move(m_tasks.front());
m_tasks.pop();
//判断是否已经删除的任务
if (!m_removeId.empty() && m_removeId.end() != m_removeId.find(task.id)) {
m_removeId.erase(task.id);
return false;
}
//判断是否已经删除的任务类型
if (!m_removeType.empty() && m_removeType.end() != m_removeType.find(Type(task.type))) {
if (task.id < m_removeType.find(Type(task.type))->maxId) {
return false;
}
m_removeType.erase(Type(task.type));
}
return true;
}
return false;
}
private:
struct Task {
int id;
int type;
std::function<void()> func;
Task(int _id, int _type, std::function<void()> _func) : id(_id), type(_type),func(_func){}
void operator()()const { func(); }
void Clear() { func = nullptr;}
};
struct Type {
int type;
int maxId;
Type(int _type, int _maxId = 0) : type(_type), maxId(_maxId){}
bool operator<(const Type& _type) const {
return type < _type.type;
}
};
private:
bool m_exit = false;
int m_maxThreads;
int m_maxTaskId = 0;
int m_maxTaskType = 1;
std::mutex m_mutex;
std::vector<std::thread> m_threads;
std::queue<Task> m_tasks;
std::set<int> m_removeId;
std::set<Type> m_removeType;
std::condition_variable m_cond;
};
如下是文心一言的解析:
这段代码定义了一个线程池类ThreadPool。线程池是一种编程模型,它允许你同时执行多个任务,而不是等待一个任务完成后再执行下一个任务。线程池通常用于管理并发任务,以避免创建和销毁线程的开销。
以下是该代码中ThreadPool类的功能和方法的详细解释:
- 构造函数:线程池的构造函数接受一个可选参数maxThreads,表示线程池中最大线程数。如果没有提供参数,默认值为8。
- 析构函数:在ThreadPool对象被销毁时调用Stop()方法,以停止所有正在执行的任务并等待所有线程完成。
- AddTask:此方法用于添加一个任务到线程池中。任务是一个无参数的函数对象,它在执行时不会返回任何值。方法接受两个参数:func表示要执行的任务,type表示任务的类型(在此代码中未使用)。任务通过std::function<void()>类型表示,这是一种通用、类型安全的函数包装器。
- RemoveFromId(int id):
* 这个方法接受一个整数参数`id`,并尝试删除具有该ID的任务。
* 它首先使用互斥锁来保护对`m_tasks`的访问,以避免同时发生多个线程的读写操作。
* 如果任务队列不为空,并且队列中的第一个任务的ID小于要删除的ID,那么它将该ID添加到`m_removeId`中。这可能意味着它只删除ID大于或等于给定ID的任务。
- RemoveFromType(int type):
* 这个方法接受一个整数参数`type`,并尝试删除具有该类型的任务。
* 它也使用互斥锁来保护对`m_tasks`的访问。
* 如果提供的类型不为0,它将创建一个新的`Type`对象(具体内容未在代码中给出),并将其与最大的任务ID一起添加到`m_removeType`中。
- GetNewType():
* 这个方法返回一个新的、未使用的任务类型。
* 它也使用互斥锁来保护对`m_maxTaskType`的访问。
* 它只是简单地递增`m_maxTaskType`并返回新值。
- Stop():
* 这个方法用于停止线程池中的所有活动。
* 它设置一个退出标志`m_exit`为true,这可能用于通知线程池中的线程停止执行。
* 然后,它循环等待所有线程完成执行。如果第一个线程(索引为0的线程)是可join的(即仍在运行),则等待它完成,然后从线程池中删除它。这个过程会一直重复,直到所有线程都已完成。
这段代码是一个C++代码片段,它定义了一个私有成员函数GetTask,该函数尝试从一个线程池中获取一个任务。以下是这段代码的详细解释:
- 前置声明:
struct Task;
这里声明了一个名为Task的结构体,但具体的定义没有给出。这通常是为了确保在使用结构体之前定义它,但在这里,由于结构体的定义不在当前作用域中,所以编译器会认为它已经定义过了。
- 函数定义:
bool GetTask(struct Task& task)
定义了一个名为GetTask的函数,它接受一个Task类型的引用参数task,并返回一个布尔值。这个函数的主要目的是从线程池中获取一个任务。
- 互斥锁:
std::unique_lock<std::mutex> lock(m_mutex);
使用互斥锁m_mutex来保护线程池中的数据结构,确保在多线程环境中对它们的访问是线程安全的。
- 等待任务: 如果线程池是空的,则线程会等待直到有任务可用或超时。代码中有一个注释掉的分支,表示如果任务超时,线程会继续等待。如果线程池中有任务可用,代码将继续执行。
- 检查退出标志和任务: 如果退出标志未设置且线程池不为空,则执行以下操作: a. 从线程池中获取一个任务并将其移动到提供的task参数中。 b. 从线程池中删除该任务。 c. 检查是否需要从已删除的任务ID列表或已删除的任务类型列表中删除该任务。如果需要,则返回false。否则,返回true表示成功获取了任务。
- 返回值: 如果成功获取了任务,函数返回true;否则,返回false。
总的来说,这段代码定义了一个从线程池中安全获取任务的函数,并处理了与已删除任务相关的逻辑。
- Task结构体:
* 定义了一个任务的结构体,其中包含任务的ID、类型、函数和清除函数。
* 构造函数用于初始化任务的ID、类型和函数。
* 定义了一个运算符重载,使得该结构体可以像函数一样被调用。
* 定义了一个`Clear`函数,用于清除任务函数。
- Type结构体:
* 定义了一个任务类型的结构体,其中包含类型和最大ID。
* 构造函数接受两个参数,分别是类型和最大ID,并初始化它们。
* 定义了一个运算符重载,使得两个Type对象可以根据它们的类型进行比较。
- 私有成员变量:
* `m_exit`: 一个布尔标志,用于指示线程池是否应该退出。
* `m_maxThreads`: 一个整数,表示线程池中的最大线程数。
* `m_maxTaskId` 和 `m_maxTaskType`: 分别表示任务的最大ID和最大类型。
* `m_mutex`: 一个互斥锁,用于保护线程池中的数据结构。
* `m_threads`: 一个线程向量,存储线程池中的所有线程。
* `m_tasks`: 一个任务队列,存储等待执行的任务。
* `m_removeId` 和 `m_removeType`: 分别用于存储要删除的任务ID和任务类型。
* `m_cond`: 一个条件变量,用于等待任务或通知其他线程任务已可用。
示例:
class A {
public:
A(ThreadPool& _pool) : pool(_pool), type(_pool.GetNewType()){}
~A() {
pool.RemoveFromType(type);
std::cout << "don't call : " << count << std::endl;
}
void Test() {
for (int i = 0; i < 2000000; i++) {
pool.AddTask(std::bind(&A::DoSomething, this), type);
}
}
void DoSomething() {
int xx = count.fetch_add(1);
//std::cout << "A::DoSomething() : "<< xx << std::endl;
//std::this_thread::sleep_for(std::chrono::milliseconds(rand()%100 + 1));
}
private:
int type = 0;
std::atomic<int> count = 0;
ThreadPool& pool;
};
int main() {
{
ThreadPool pool;
{
A a(pool);
a.Test();
std::this_thread::sleep_for(std::chrono::milliseconds(3000));
for (int i = 0; i < 10; i++) {
int taskId = pool.AddTask([i]() {
std::cout << "this is " << i << std::endl;
});
if (i % 2 == 0) {
pool.RemoveFromId(taskId);
}
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(3000));
std::cout << "nonono" << std::endl;
}
return 0;
}
猜你喜欢
- 2024-11-11 Linux下的C++ socket编程实例 linux c++ tcp
- 2024-11-11 C++11原子变量:线程安全、无锁操作的实例解析
- 2024-11-11 C++11的thread_local原理和应用范例
- 2024-11-11 知识重构-c++ : Lambda 知识重构拼音
- 2024-11-11 c++ 疑难杂症(4) std:vector c++ vector subscript out of range
- 2024-11-11 深入探索C++异步编程的奥秘 c++11异步编程
- 2024-11-11 C++ 开发中使用协程需要注意的问题
- 2024-11-11 golang极速嵌入式Linux应用开发(四)-协程与并发
- 2024-11-11 在计算机编程中,线程是指一个程序内部的执行流程
- 2024-11-11 C++ std:decay、std:bind、std:packaged_task 在模版编程的实践
- 最近发表
- 标签列表
-
- gitpush (61)
- pythonif (68)
- location.href (57)
- tail-f (57)
- pythonifelse (59)
- deletesql (62)
- c++模板 (62)
- css3动画 (57)
- c#event (59)
- linuxgzip (68)
- 字符串连接 (73)
- nginx配置文件详解 (61)
- html标签 (69)
- c++初始化列表 (64)
- exec命令 (59)
- canvasfilltext (58)
- mysqlinnodbmyisam区别 (63)
- arraylistadd (66)
- node教程 (59)
- console.table (62)
- c++time_t (58)
- phpcookie (58)
- mysqldatesub函数 (63)
- window10java环境变量设置 (66)
- c++虚函数和纯虚函数的区别 (66)