Skip to main content
 首页 » 编程设计

python之concurrent.futures 问题 : why only 1 worker

2024年10月01日4telwanggs

我正在试验使用 concurrent.futures.ProcessPoolExecutor 来并行化串行任务。串行任务涉及从数字范围中查找给定数字的出现次数。我的代码如下所示。
在执行过程中,我从任务管理器/系统监视器/顶部注意到,尽管给 processPoolExecutor 的 max_workers 设置了一个大于 1 的值,但只有一个 cpu/线程一直在运行。为什么会这样案子?如何使用 concurrent.futures 并行化我的代码? 我的代码是使用 python 3.5 执行的。

import concurrent.futures as cf 
from time import time 
 
def _findmatch(nmax, number):     
    print('def _findmatch(nmax, number):') 
    start = time() 
    match=[] 
    nlist = range(nmax) 
    for n in nlist: 
        if number in str(n):match.append(n) 
    end = time() - start 
    print("found {} in {}sec".format(len(match),end)) 
    return match 
 
def _concurrent(nmax, number, workers): 
    with cf.ProcessPoolExecutor(max_workers=workers) as executor: 
        start = time() 
        future = executor.submit(_findmatch, nmax, number) 
        futures = future.result() 
        found = len(futures) 
        end = time() - start 
        print('with statement of def _concurrent(nmax, number):') 
        print("found {} in {}sec".format(found, end)) 
    return futures 
 
if __name__ == '__main__': 
    match=[] 
    nmax = int(1E8) 
    number = str(5) # Find this number 
    workers = 3 
    start = time() 
    a = _concurrent(nmax, number, workers) 
    end = time() - start 
    print('main') 
    print("found {} in {}sec".format(len(a),end)) 

请您参考如下方法:

您的代码的问题是它只提交一个任务,然后由其中一名工作人员执行,而其余工作人员什么都不做。您需要提交多个可以由工作人员并行执行的任务。

下面的例子将搜索区域分成三个不同的任务,每个任务由不同的工作人员执行。 submit返回的 future 被添加到列表中,一旦所有这些都被提交wait用于等待它们全部完成。如果您调用 result提交任务后,它将立即阻塞,直到 future 完成。

请注意,下面的代码不是生成数字列表,而是计算其中包含数字 5 的数字,以减少内存使用量:

import concurrent.futures as cf 
from time import time 
 
def _findmatch(nmin, nmax, number): 
    print('def _findmatch', nmin, nmax, number) 
    start = time() 
    count = 0 
    for n in range(nmin, nmax): 
        if number in str(n): 
            count += 1 
    end = time() - start 
    print("found {} in {}sec".format(count,end)) 
    return count 
 
def _concurrent(nmax, number, workers): 
    with cf.ProcessPoolExecutor(max_workers=workers) as executor: 
        start = time() 
        chunk = nmax // workers 
        futures = [] 
 
        for i in range(workers): 
            cstart = chunk * i 
            cstop = chunk * (i + 1) if i != workers - 1 else nmax 
 
            futures.append(executor.submit(_findmatch, cstart, cstop, number)) 
 
        cf.wait(futures) 
        res = sum(f.result() for f in futures) 
        end = time() - start 
        print('with statement of def _concurrent(nmax, number):') 
        print("found {} in {}sec".format(res, end)) 
    return res 
 
if __name__ == '__main__': 
    match=[] 
    nmax = int(1E8) 
    number = str(5) # Find this number 
    workers = 3 
    start = time() 
    a = _concurrent(nmax, number, workers) 
    end = time() - start 
    print('main') 
    print("found {} in {}sec".format(a,end)) 

输出:

def _findmatch 0 33333333 5 
def _findmatch 33333333 66666666 5 
def _findmatch 66666666 100000000 5 
found 17190813 in 20.09431290626526sec 
found 17190813 in 20.443560361862183sec 
found 22571653 in 20.47660517692566sec 
with statement of def _concurrent(nmax, number): 
found 56953279 in 20.6196870803833sec 
main 
found 56953279 in 20.648695707321167sec