专业编程基础技术教程

网站首页 > 基础教程 正文

Python中的线程

ccvgpt 2024-12-12 11:11:21 基础教程 1 ℃

简介

这篇文章介绍了Python中的多线程,线程模块和来自concurrent.futures模块的ThreadPoolExecutor类。

末尾的资源部分有一些精彩材料的链接,你可以用它来深入了解这个主题

Python中的线程

相关帖子

  • 并发和并行性简介
  • Python中的多处理
  • Python中的ProcessPoolExecutor

此帖子中的代码可以在此存储库中找到。

什么是线程

线程是进程中执行的基本单位。这是一个独立的执行流,与同一进程中的其他独立执行流共享相同的地址空间。一个进程可以有一个或多个线程,其中一个是主线程,这是Python进程的默认线程。

过程和线程

在Python中,我们可以使用线程模块编写使用多个线程的程序,以及concurrent.futures模块中的ThreadPoolExecutor类。

如果我们编写程序,使其使用多个线程,那么程序将能够在一个核心中同时运行。也可以使用协程同时执行一个线程程序。

与Java、C/C++和Go等其他编程语言中的线程不同,即使由于Python的全局解释器锁(GIL)而存在多个内核,每个Python(CPython实现)进程中的线程也无法并行运行。如果您有CPU绑定操作,并且需要在Python中并行实现,则应使用多处理模块或ProcessPoolExecutor类(请参阅Python中的多处理)。

想象一下,我们编写一个程序,当它开始执行时,它将成为一个单一的进程。此外,该过程将有两个线程。当有两个线程时,可以开始并发游戏。

在单核CPU中,程序可以同时执行。使用一个核心和两个线程,线程可以在同一核心中切换到另一个线程。这被称为上下文切换

上下文切换期间,一个线程从CPU中切换出,以便另一个线程可以运行。为此,存储进程或线程的状态,以便稍后恢复并恢复执行,然后恢复另一个之前保存的状态。

上下文切换通常计算成本高昂,从一个进程或线程切换到另一个进程或线程需要一定的时间来保存和加载寄存器和其他操作。在线程之间切换上下文通常比在进程之间切换更快。

线程用例

多线程最适合使用的任务是I/O绑定操作。例如,如果线程执行必须向数据库发出请求的指令,那么用等待响应的线程阻止CPU核心是没有意义的。相反,在第一个线程等待时,允许另一个线程使用核心将更好地利用资源。

在下图中,空圆圈表示I/O操作,其中线程一直在等待事情发生。当第一个I/O操作开始(空的绿色圆圈)时,操作系统会快速切换红色线程的等待线程,以便更好地分配计算资源。这是操作系统做出的决定,开发人员无法决定何时在线程之间切换。

与I/O绑定操作同时执行任务

如果程序不使用同时运行的多个线程,而是按顺序运行单个线程中的任务,则需要等待绿色任务完成才能开始执行红色任务,因此它将花费更多时间来完成这两个任务。

带有I/O绑定操作的任务的顺序执行

因此,在处理I/O操作时,多线程是实现更好资源分配的好选择。

现在让我们看看多线程程序的一些实现!

Pythonthreading的第一步

首先,让我们定义I/O绑定和CPU绑定任务。Theioio_bound_operation只按指定的秒数睡觉。cpu_bound_operation添加了指定的数字范围。两个函数都将结果附加到shared_list中。请记住,同一进程中的线程可以共享数据。

导入日志记录
从线程导入线程
从时间导入perf_counter,睡眠

从concurrency.utils导入flaten_list_of_lists,get_saving_path,postprocess_times
从并发.visualize import barh


格式 = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")


shared_list = [] # 来自同一进程的线程共享数据


def io_bound_operation(secs: float | int) -> None:
    “运行1个每秒的I/O绑定任务,并将结果附加到shared_list。”
开始 = perf_counter()
睡眠(秒)
完成 = perf_counter()

shared_list.append([(开始,完成)])


def cpu_bound_operation(n: int) -> 无:
    “CPU绑定任务。”
开始 = perf_counter()
计数 = 0
    对于i在范围内(n):
计数 += i
完成 = perf_counter()

shared_list.append([(开始,完成)])

现在,我们将创建两个新线程:t1t2。在实例化Thread对象时,我们需要添加target,这是我们希望在线程中运行的任务/函数。参数可以通过args参数传递,该参数接受可以的Iterable对象。

在这个例子中,我们将使I/O绑定操作持续1秒,而我的处理器大约需要3.5秒来添加这些1亿个数字

def threading_two_threads():
    # 创建两个线程对象
t1 = Thread(target=io_bound_operation, args=(1,))
t2 = 线程(target=cpu_bound_operation, args=(100000000,))

    # 开始活动 -> 调用 run() 方法
t1.启动()
睡眠(0.1)
t2.启动()

    #阻止调用线程->避免在没有线程的情况下继续运行
    #正在完成
t1.join()
t2.join()

logging.info(f"shared_list {shared_list}")

然后我们需要开始线程的活动。这是通过调用thestartstart()方法完成的。它安排在单独的控制线程中调用对象的run()方法。

此外,还有一个sleep(0.1)函数,可以使第二个线程稍后启动。这将有助于我们更好地想象。

def threading_two_threads():
    # 创建两个线程对象
t1 = Thread(target=io_bound_operation, args=(1,))
t2 = 线程(target=cpu_bound_operation, args=(100000000,))

    # 开始活动 -> 调用 run() 方法
t1.启动()
睡眠(0.1)
t2.启动()

    #阻止调用线程->避免在没有线程的情况下继续运行
    #正在完成
t1.join()
t2.join()

logging.info(f"shared_list {shared_list}")

最后,如果我们想等到线程终止,我们必须调用Thread对象的join()方法。

在两个线程完成之前,主线程不会退出。

通过加入线程,我们将阻止调用线程主线程),直到调用join()方法的线程终止——通常或通过未处理的异常,或直到出现可选超时。

您可以玩这个示例,如果您注释两个join()方法,程序将引发异常,因为thesharedshared_list中将没有任何内容,并且postprocess_times函数将尝试索引空列表。

def threading_two_threads():
    # 创建两个线程对象
t1 = Thread(target=io_bound_operation, args=(1,))
t2 = 线程(target=cpu_bound_operation, args=(100000000,))

    # 开始活动 -> 调用 run() 方法
t1.启动()
睡眠(0.1)
t2.启动()

    #阻止调用线程->避免在没有线程的情况下继续运行
    #正在完成
t1.join()
t2.join()

logging.info(f"shared_list {shared_list}")

为了可视化线程花费的时间,threading_two_threads函数有几个额外的功能来后处理,并在水平条形图上绘制它们。代码可以在回购协议中找到。

def threading_two_threads():
    # 创建两个线程对象
t1 = Thread(target=io_bound_operation, args=(1,))
t2 = 线程(target=cpu_bound_operation, args=(100000000,))

    # 开始活动 -> 调用 run() 方法
t1.启动()
睡眠(0.1)
t2.启动()

    #阻止调用线程->避免在没有线程的情况下继续运行
    #正在完成
t1.join()
t2.join()

logging.info(f"shared_list {shared_list}")

    #只是对图表进行一些处理
start_points, end_points = postprocess_times(flaten_list_of_lists(shared_list))
    # start_points, end_points = postprocess_times(shared_list)

barh(
title="并发执行,2个线程,1个I/O绑定任务1s + 1 \
CPU任务约3.5s",
start_points=start_points,
end_points=end_points,
path=get_saving_path("threading/images/first_multithreaded_program.png"),
n=2,
)


if __name__ == "__main__":
logging.info(f“Init并发任务”)
threading_two_threads()
logging.info(f“完成并发任务”)

下图显示了每个线程完成的时间。sleep函数使第二个线程(cpu_bound_operation)稍后启动。第一个线程(图中为0)开始,0.1秒后第二个线程开始。

由于I/O绑定任务仅持续1秒,而functionioio_bound_operation只需执行该操作,在I/O绑定任务等待(整个秒)期间,CPU绑定任务可以执行。这就是为什么CPU绑定任务(第二个线程)大约只持续3.5秒,并且不会因I/O绑定任务而延迟。

多线程程序花费的时间。2个线程,1个1s的I/O绑定任务+1个3.5s的CPU任务

线程对象是一种天真的创建线程的方式,有更方便的方法。然而,在深入研究之前,让我们看看一些更简单的例子。

使用threading模块可视化多线程时间

示例1-2个线程

  • 线程1-1 1s的I/O绑定操作和1个1s的CPU绑定操作。
  • 线程2-1 CPU绑定任务约3.5秒

现在让我们考虑一下,第一个线程运行的任务包括大约1秒的CPU绑定操作和1秒的I/O绑定操作,而不是只执行I/O绑定操作的任务。

因此,现在我们有一个程序,可以创建两个新线程,一个运行一个I/O绑定操作和一个CPU绑定操作,另一个运行大约3.5秒的CPU绑定操作。

def cpu_io_bound_operations(secs: float | int, n: int) -> None:
    “运行1个函数,执行1个秒的I/O绑定任务和1个CPU绑定任务。
将结果附加到shared_list中。"""
开始 = perf_counter()
计数 = 0
    对于i在范围(n):# CPU绑定
计数 += i
睡眠(秒)# I/O-bound
完成 = perf_counter()

shared_list.append([(开始,完成)])

线程2大约需要3.5秒的处理器,而线程1只需要1秒。

由于CPU绑定操作,线程1只需要1秒,因为线程2可以使用I/O绑定任务的等待时间。

如果我们添加3.5s(线程2)+ 1s(线程1),我们有4.5s的CPU工作。

下图显示,两项任务一起持续4.5秒。每个CPU密集型任务所需的时间可能略有不同。

多线程程序花费的时间。2个线程,(1个I/O绑定操作1s和1个CPU绑定操作1s)+1个CPU任务大约3.5s

然而,线程1需要3秒钟才能终止。这是因为我们不控制上下文开关何时发生,因此线程1可能已经等待了一段时间才能使用处理器,即使I/O绑定被终止了。上下文切换是开发人员无法控制的,因此它们发生的时间可能比我们实际希望的要多,而且有时我们不会将一个线程切换到另一个线程。

现在让我们快速看看更多的例子!如果你已经得到了它,你可以跳过这部分,直接进入下一节:ThreadPoolExecutor

示例2-1线程

  • 按顺序进行10个1s的IO绑定操作

在这里,我们代表的是顺序执行,我们不需要创建更多线程,主线程就足够了。

def sequential(n: int = 10, secs: float | int = 1) -> None:
    """在一个线程中按顺序执行n次秒的I/O绑定操作
并绘制一个水平条形图。
“”
    #执行n个I/O绑定操作,为每个任务保存一个元组
times = [io_bound_operation(secs) for _ in range(n)]
start_points,end_points = postprocess_times(时间)

barh(
title="顺序执行,1个线程,10个1s的I/O绑定任务",
start_points=start_points,
end_points=end_points,
path=get_saving_path("threading/images/ex_1_one_thread.png"),
)

在上面的图中,线程被表示为条,因为每个线程都执行了一项任务(即使当我们加入I/O绑定CPU绑定操作时,我们也认为它们属于同一任务)。

现在,我们考虑在同一线程中执行的10个不同的I/O绑定任务,因此我们可以在一个栏中更好地可视化每个任务。所以这十条属于同一条线。

单线程程序花费的时间。1个线程,10个I/O绑定任务1s

示例3-1线程

  • 2个CPU绑定任务

如果我们在同一线程中按顺序执行两个大约3.5秒的CPU绑定任务,我们会发现它们大约需要7秒才能完成。

在第二项任务开始之前,第一项任务必须完成。

def sequential(计数:int,n:int = 10)->无:
    #执行n个CPU绑定操作,为每个任务保存一个元组
times = [cpu_bound_operation(counts) for _ in range(n)]
start_points,end_points = postprocess_times(时间)

单线程程序花费的时间。1个线程,2个CPU绑定任务,大约3.5s

示例4-2线程

  • 线程1-1 CPU绑定任务大约3.5秒
  • 线程2-1 CPU绑定任务约3.5秒

当我们同时执行它们时,上面的两个相同的CPU绑定任务显示了一个非常不同的图表。这两项任务似乎都需要7秒,但实际上需要3.5秒。他们只是来回切换,直到他们都完成。

这不是对多线程的正确使用,它仅用于教学目的。使用多线程,只有CPU绑定的任务,没有时间改进。

def thread_cpu_bound_operations(counts: int) -> None:
    “运行一个CPU绑定的任务,并将结果附加到shared_list。”
shared_list.append([cpu_bound_operation(counts)])


def threading_two_threads() -> 无:
    #创建两个线程对象,每个线程将执行五个I/O绑定任务
t1 = Thread(target=thread_cpu_bound_operations, args=(100000000,))
t2 = Thread(target=thread_cpu_bound_operations, args=(100000000,))

    # 开始活动 -> 调用 run() 方法
t1.启动()
t2.启动()

    #阻止调用线程->避免在未完成线程的情况下继续运行
t1.join()
t2.join()

多线程程序花费的时间。2个线程,1个CPU绑定任务,每个大约3.5s

示例5-2个线程

  • 线程1-5个I/O绑定任务,每个1s
  • 线程2-5个I/O绑定任务,每个1s

1秒和2个线程的十个I/O绑定任务。每个线程负责按顺序执行五个I/O绑定任务,五组任务同时执行。

def thread_io_bound_operations(n: int, secs: float | int) -> None:
    “运行n n 秒的I/O绑定任务,并将结果附加到shared_list。”
shared_list.append([io_bound_operation(secs) for _ in range(n)])


def threading_two_threads() -> 无:
    #创建两个线程对象,每个线程将执行五个I/O绑定任务
t1 = Thread(target=thread_io_bound_operations,args=(5,1))
t2 = Thread(target=thread_io_bound_operations, args=(5, 1))

    # 开始活动 -> 调用 run() 方法
t1.启动()
t2.启动()

    #阻止调用线程->避免在未完成线程的情况下继续运行
t1.join()
t2.join()

多线程程序花费的时间。2个线程,5个I/O绑定任务,每个1s

示例6-10个线程

  • 每个线程——1个1个I/O绑定任务

与上一个示例类似,但现在我们有十个线程,而不是两个线程,每个线程只负责执行一个1秒的I/O绑定任务。

def thread_io_bound_operations(n: int, secs: float | int) -> None:
    “运行n n 秒的I/O绑定任务,并将结果附加到shared_list。”
shared_list.append([io_bound_operation(secs) for _ in range(n)])


def threading_two_threads() -> 无:
线程 = []
    #创建十个线程对象,每个线程将执行一个I/O绑定任务
    对于_在范围内(10):
t = Thread(target=thread_io_bound_operations, args=(1, 1))
t.start()
threads.append(t)

    #阻止调用线程->避免在未完成线程的情况下继续运行
[线程中的线程线程的thread.join()]

多线程程序花费的时间。10个线程,1个I/O绑定任务,每个1s

示例7-2线程

  • 线程1-1 CPU绑定任务约3.5s
  • 线程2-5个I/O绑定任务,每个1s

现在我们有两条线。线程1执行一个大约3.5秒的CPU绑定操作,线程2执行五个1秒的I/O绑定任务。

当I/O任务等待CPU密集型任务正在执行时,因为每次I/O任务开始时,操作系统都会非常快速地从一个线程切换到另一个线程。

def thread_io_bound_operations(n: int, secs: float | int) -> None:
    “运行n n 秒的I/O绑定任务,并将结果附加到shared_list。”
shared_list.append([io_bound_operation(secs) for _ in range(n)])


def thread_cpu_bound_operations(counts: int) -> None:
    “运行一个CPU绑定的任务,并将结果附加到shared_list。”
shared_list.append([cpu_bound_operation(counts)])


def threading_two_threads() -> 无:
    #创建两个线程对象,每个线程将执行五个I/O绑定任务
t1 = Thread(target=thread_cpu_bound_operations, args=(100000000,))
t2 = Thread(target=thread_io_bound_operations, args=(5, 1))

    # 开始活动 -> 调用 run() 方法
t1.启动()
t2.启动()

    #阻止调用线程->避免在未完成线程的情况下继续运行
t1.join()
t2.join()

多线程程序花费的时间。线程1:1个大约3.5秒的CPU绑定任务,线程2:5个1秒的I/O绑定任务

示例8-6线程

  • 线程1-1 CPU绑定任务,约3.5s(第5栏)
  • 线程2-1 CPU绑定任务(第4栏)
  • 线程3-1 CPU绑定任务(第3栏)
  • 线程4-1 I/O绑定任务1s
  • 线程5-1 I/O绑定任务1s
  • 线程6-1 I/O绑定任务1s

在这里,我们有三个线程负责执行一个I/O任务,三个线程负责执行一个CPU密集型任务。这三个CPU密集型任务需要不同的时间来完成。

最长的任务,持续3.5秒,是第一个开始的任务(5)。由于另外两项CPU密集型任务,完成需要近6秒。

def thread_io_bound_operations(n: int, secs: float | int) -> None:
    “运行n n 秒的I/O绑定任务,并将结果附加到shared_list。”
shared_list.append([io_bound_operation(secs) for _ in range(n)])


def thread_cpu_bound_operations(counts: int) -> None:
    “运行一个CPU绑定的任务,并将结果附加到shared_list。”
shared_list.append([cpu_bound_operation(counts)])


def threading_six_threads() -> 无:
    #创建两个线程对象,每个线程将执行五个I/O绑定任务
t1 = Thread(target=thread_cpu_bound_operations, args=(100000000,))
t2 = Thread(target=thread_cpu_bound_operations, args=(50000000,))
t3 = Thread(target=thread_cpu_bound_operations, args=(20000000,))
t4 = Thread(target=thread_io_bound_operations, args=(1, 1))
t5 = Thread(target=thread_io_bound_operations, args=(1, 1))
t6 = Thread(target=thread_io_bound_operations, args=(1, 1))

    # 开始活动 -> 调用 run() 方法
t1.启动()
t2.启动()
t3.启动()
t4.start()
t5.start()
t6.启动()

    #阻止调用线程->避免在未完成线程的情况下继续运行
t1.join()
t2.join()
t3.join()
t4.join()
t5.加入()
t6.join()

多线程程序花费的时间。6个线程,3个1s的I/O绑定任务和3个CPU绑定任务

示例9-4个线程

线程1按顺序执行两个CPU绑定操作,每次3.5秒(第6和7号)。线程2按顺序执行两个几乎1秒的CPU绑定操作(第4条和第5条)。

其他条代表I/O绑定任务,每个线程为1中的两个。

def thread_io_bound_operations(n: int, secs: float | int) -> None:
    “运行n n 秒的I/O绑定任务,并将结果附加到shared_list。”
shared_list.append([io_bound_operation(secs) for _ in range(n)])


def thread_cpu_bound_operations(counts: int, n: int) -> None:
    “运行一个CPU绑定的任务,并将结果附加到shared_list。”
shared_list.append([cpu_bound_operation(counts) for _ in range(n)])


def threading_four_threads() -> 无:
    #创建两个线程对象,每个线程将执行五个I/O绑定任务
t1 = Thread(target=thread_cpu_bound_operations,args=(100000000,2))
t2 = Thread(target=thread_cpu_bound_operations, args=(20000000, 2))
t3 = Thread(target=thread_io_bound_operations, args=(2, 1))
t4 = Thread(target=thread_io_bound_operations,args=(2,1))

    # 开始活动 -> 调用 run() 方法
t1.启动()
t2.启动()
t3.启动()
t4.start()

    #阻止调用线程->避免在未完成线程的情况下继续运行
t1.join()
t2.join()
t3.join()
t4.join()

多线程程序花费的时间。4个线程,4个1s的I/O绑定任务和4个CPU绑定任务

当线程1(第6栏和第7栏)执行第一个3.5s CPU绑定任务(第6栏)时,它与其他线程交替进行,因此最终需要5s才能完成执行。所有I/O绑定操作大约需要1s才能完成,但在等待时,CPU密集型任务可以执行。因此,线程1的第一次操作需要5s,主要是因为线程2的两个CPU绑定操作(第4条和第5条)。

当线程1(第6和第7栏)执行第二个3.5秒CPU绑定任务(第7栏)时,它拥有处理器的所有力量。这是因为只需要大约3.5s就能完成。

这些是一些澄清概念的例子,现在让我们看看其他更方便的方法!

ThreadPoolExecutor

concurrent.futures模块提供了一个ThreadPoolExecutor对象,我们可以用它来创建线程和一个用于多处理的ProcessPoolExecutor对象。

由于我们在本文中专注于线程,我们将只与ThreadPoolExecutor合作。

为什么我们需要它

ThreadPoolExecutor类是一个Executor子类,它使用线程池异步执行调用。

线程池是在计算机程序中实现并发执行的软件设计模式。它维护多个线程,等待任务被分配到监督程序并发执行。

因此,ThreadPoolExecutor创建和管理可以重用的线程或工作线程集合,避免了每次我们想要同时执行任务时创建和销毁线程,就像我们上面所做的那样。这将提高性能,因为这些操作很耗时。

工作方式

Executor类

除了ProcessPoolExecutor类外,ThreadPoolExecutor还扩展了Executor类,这是一个抽象基类,只定义了五种方法:

  • submit()
  • map()
  • shutdown()

Executor源代码Python 3.13

The other two methods are actually the __enter__() and __exit__() Python’s magic methods to implement the context management protocol. Thanks to them we can use the ThreadPoolExecutor in a with statement (recommended). The with statement will call the __enter__() method, and __exit__() will be called when the execution leaves the with code block.

类执行器(对象):
    """这是混凝土异步执行器的抽象基类。"""
...
    def __enter__(自我):
        回归自我

    def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(等待=真实)
        返回 False

Executor is only an abstract class and most of the logic is implemented in the ThreadPoolExecutor methods. The submit() and shutdown() methods are implemented in the ThreadPoolExecutor class while the logic for the map()method is implemented in the Executor class, since it uses submit()internally.

ThreadPoolExecutor类

ThreadPoolExecutor在Python的concurrent.futures模块中,内部使用队列来管理任务。队列是在ThreadPoolExecutor构造函数中创建的。

ThreadPoolExecutor源代码。Python 3.13

__init__()方法

__init__()方法初始化一个新的ThreadPoolExecutor实例,并创建队列和更多对象。

下面的SimpleQueue类是一个简单、无界的FIFO队列。如果遵循先进先出原则,该原则规定项目将按照其输入的顺序处理或从队列中删除。

我们可以通过将其作为参数传递给max_workers参数来设置可以使用的最大线程数。如果我们不这样做,这将是机器中的处理器数量加4个,限制为32个:

类ThreadPoolExecutor(_base.Executor):
...

    def __init__(self, max_workers=None, thread_name_prefix='',
initializer=None, initargs=()):
...
        如果max_workers为None:
            #我们使用process_cpu_count + 4来处理这两种类型的任务。
            #但我们将其限制在32,以避免消耗出乎意料的大量资源
            #在许多核心机器上。
max_workers = min(32, (os.process_cpu_count() 或 1) + 4)
...

self._max_workers = max_workers
self._work_queue = queue.SimpleQueue()
...

我们还可以传递一个可选的名称前缀来传递我们的线程,一个用于初始化工作线程的可调用项,以及一个带有其参数的元组。

submit()方法

submit()方法安排可调用的执行。它被提交到线程池。可调用函数是我们将其名称与其参数一起作为参数传递的函数。

以ThreadPoolExecutor(max_workers=1)作为执行者:
future = executor.submit(pow,323,1235)
    print(future.result()) # 块

任务包装在Future对象中,该对象表示可调用的可异步执行,并由submit()方法立即返回。

Future只是一个抽象,代表异步操作的最终结果,它是一个作为最初未知结果的占位符的对象,通常是因为结果的计算尚未完成。

concurrent.futures.Future文档。

您可以在Python 3.13的_base模块中,在Executor类的代码上方找到Future类的源代码。

future.result() returns the value returned by the call (the pow function). If the call hasn’t yet completed then this method will wait up to timeoutseconds (timeout is the only parameter of result(timeout=None)). If the call hasn’t completed in timeout seconds, then a TimeoutError will be raised. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.

下面我们可以看到ThreadPoolExecutor类的一些源代码。当submit()方法被调用Future并创建_WorkItem对象时。然后将_WorkItem放入_work_queue中。

类ThreadPoolExecutor(_base.Executor):
...

    def submit(self, fn, /, *args, **kwargs):
        使用self._shutdown_lock,_global_shutdown_lock:
...

f = _base.Future()
w = _WorkItem(f,fn,args,kwargs)

self._work_queue.put(w)
self._adjust_thread_count()
            返回f
...

_WorkItem是一个对象,用于将任务(fn)、其参数(argskwargs)和未来对象(_base.Future())包装在一起。它实现了arunrun()方法,其中执行任务,并在Future对象中设置结果。

类_WorkItem:
    def __init__(自我,未来,fn,args,kwargs):
自我.未来=未来
self.fn = fn
self.args = args
self.kwargs = kwargs

    def run(self):
        如果不是self.future.set_running_or_notify_cancel():
            退货

        尝试:
结果 = self.fn(*self.args, **self.kwargs)
        除了BaseException作为exc:
self.future.set_exception(exc)
            #用例外'exc'打破参考周期
自我=无
        其他:
self.future.set_result(结果)

__class_getitem__ = classmethod(types.GenericAlias)

run()方法是从工作线程调用的。它在_worker模块函数中实现,该函数是作为目标传递给线程的函数。

def _worker(executor_reference,work_queue,初始化器,initargs):
...

            如果work_item不是None:
work_item.run()
                #删除对对象的引用。见GH-60488
                del work_item
                继续

...

线程是在ThreadPoolExecutor类的构造函数中调用的_adjust_thread_count()方法中创建的。

类ThreadPoolExecutor(_base.Executor):

    def _adjust_thread_count(self):
...
        如果num_threads < self._max_workers:
thread_name = '%s_%d' % (self._thread_name_prefix 或 self,
num_threads)
t = threading.Thread(name=thread_name, target=_worker,
args=(weakref.ref(self, weakref_cb),
self._work_queue,
self._initializer,
self._initargs))
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue
...

map()方法

map()直接在Executor类中实现,并在内部使用submit()方法。

类执行器(对象):
    """这是混凝土异步执行器的抽象基类。"""
...
    def map(self,fn,*iterables,timeout=None,chunksize=1):
...
fs = [self.submit(fn, *args) for args in zip(*iterables)]
...

map()是我们必须将任务提交到线程池的另一种方式。它类似于内置的map(fn, *iterables)函数,但我们作为fn传递给它的函数是异步执行的,并且可以同时对fn进行几次调用。

内置的map()函数提供惰性评估,这意味着它返回的可回性值仅在请求时计算并返回。

然而,当调用Executor.map(fn, *iterables)时,该函数会预先从提供的迭代中收集所有项目。这与懒惰的评估方法形成鲜明对比,在需要时(即按需)处理项目。

它返回一个遍主,我们可以遍离它,以从任务中获取值,因为它们以与我们提供的可重复的顺序相同。

如果未指定timeoutNone,则等待时间没有限制。因此,当我们开始进行更像时,在第一个元素可用之前,我们不会访问其中的第二个元素。如果设置为特定intfloat时,结果在超时秒后不可用,则返回的回数器会引发超时TimeoutError

如果fn调用引发异常,那么当从重发器中检索其值时,将引发该异常。

让我们看一个例子!这个例子超级美,我跑了大约二十次

基本上,我们使用带有5个工作线程的ThreadPoolExecutor从维基百科加载20种异国情调的澳大利亚动物

只有一个线程可以在特定时刻运行,但有5个线程可用。因此,当第一个线程开始执行时,会出现上下文开关,第二个线程可以启动,因为操作系统检测到它是I/O操作,并通过将资源分配给另一个线程来避免浪费时间。

下面我们看到,在任何时候都只有5个线程同时工作。当一个线程完成任务时,它被重复使用来启动另一个任务。完成时间不到4秒。

花在异步装载20只异国情调的澳大利亚动物上的时间

另一方面,如果我们同步加载20只澳大利亚动物,几乎需要15秒!

花在同步装载20只澳大利亚动物上的时间

你可以在代码中看到这二十种异国情调的澳大利亚动物。

导入并发.futures
从时间导入perf_counter,时间
导入urllib.request
导入日志记录

从concurrency.utils导入get_saving_path,postprocess_times
从并发.visualize import barh


格式 = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")


URL = [

]

动物 = {}


# I/O绑定操作
def load_url(url: str) -> tuple[float]:
    “检索单个页面并返回开始和结束时间。”
开始 = perf_counter()
    使用urllib.request.urlopen(url)作为conn:
animals[url] = conn.read()
完成 = perf_counter()
    返回开始,完成


def asynchronous_load_australian_animals() -> 无:
开始=时间()
    #使用ThreadPoolExecutor管理并发
    使用concurrent.futures.ThreadPoolExecutor(max_workers=5)作为执行器:
        #使用地图方法将load_url应用于每个URL
结果 = executor.map(load_url,URLS)

        #处理结果和时间
时间=[结果中的时间]
start_points,end_points = postprocess_times(时间)
结束=时间()

total_time = round(结束-开始)+1

barh(
title="异步执行,5个线程,I/O绑定任务,澳大利亚动物",
start_points=start_points,
end_points=end_points,
path=get_saving_path("thread-pool-executor/images/ThreadPoolExecutor_ex1.png"),
n=len(URLS),
秒=total_time,
)


if __name__ == "__main__":
logging.info(“Init异步任务”)
asynchronous_load_australian_animals()
logging.info(f"len(animals): {len(animals)}")
logging.info(“完成异步任务”)

使用submit()方法,它看起来像这样,尽管它因运行而异。

花在异步装载20只异国情调的澳大利亚动物上的时间

我们需要使用as_completed()函数来遍默Future实例。否则,我们的postprocess_times()函数将引发异常。

as_completed() returns an iterator over the Future instances given by results that yields futures as they complete (finished or cancelled futures).

def asynchronous_load_australian_animals() -> 无:
开始=时间()
    #使用ThreadPoolExecutor管理并发
    使用concurrent.futures.ThreadPoolExecutor(max_workers=5)作为执行器:
        #使用提交方法将load_url应用于每个URL
结果 = [执行器.submit(load_url, url) url 中的 URL]

        #处理结果和时间
times = [result.result() for result in concurrent.futures.as_completed(results)]
start_points,end_points = postprocess_times(时间)
结束=时间()

total_time = round(结束-开始)+1

shutdown()方法

如果您在with语句中调用ThreadPoolExecutor作为上下文管理器,那么您不需要调用shutdown()方法,因为它是在__exit__()魔术方法中调用的。否则,您需要调用它,向执行者发出信号,当当前待定期货完成执行时,它应该释放它正在使用的任何资源。

类执行器(对象):
    """这是混凝土异步执行器的抽象基类。"""
...
    def __enter__(自我):
        回归自我

    def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(等待=真实)
        返回 False

你可以用这些工具做很多事情。Future对象有几种方法,您可以使用这些方法自定义程序的行为(例如,cancel()running()done()等)

concurrent.futures模块中还提供了一个wait()函数,允许您等待它们完成。您可以通过return_when参数指定何时返回。

Tags:

最近发表
标签列表