Skip to main content
 首页 » 编程设计

python之多处理时避免在每个子进程中加载 spaCy 数据

2024年10月01日11kuangbin

我想在当前使用多处理实现的程序中使用 spaCy。具体来说,我正在使用 ProcessingPool 生成 4 个子进程,然后它们会运行并执行它们的快乐任务。

要使用 spaCy(专门用于 POS 标记),我需要调用 spacy.load('en'),这是一个昂贵的调用(大约需要 10 秒)。如果我要在每个子进程中加载​​此对象,则需要大约 40 秒,因为它们都从同一位置读取。这太长了。

但我想不出让他们共享正在加载的对象的方法。这个对象不能被 pickle ,这意味着(据我所知):

  1. 不能传入Pool.map调用
  2. 它不能被 Manager 实例存储和使用,然后在进程之间共享

我能做什么?

请您参考如下方法:

我不知道您是如何使用 Pool.map 的,但请注意 Pool.map 不适用于大量输入。在 Python 3.6 中,它在 Lib/multiprocessing/pool.py 中实现如您所见,它声明它采用 iterable 作为第一个参数,但实现确实 consume the whole iterable在运行多进程映射之前。所以我认为如果您需要处理大量数据,那不是您需要使用的Pool.map。也许 Pool.imapPool.imap_unordered 可以工作。

关于您的实际问题。我有一个不涉及 Pool.map 的解决方案,其工作方式类似于 multiprocess foreach

首先需要继承Pool并创建一个worker进程:

from multiprocessing import cpu_count 
from multiprocessing import Queue 
from multiprocessing import Process 
 
 
class Worker(Process): 
 
    english = spacy.load('en') 
 
    def __init__(self, queue): 
        super(Worker, self).__init__() 
        self.queue = queue 
 
    def run(self): 
        for args in iter(self.queue.get, None): 
            # process args here, you can use self. 

你像这样准备进程池:

queue = Queue() 
workers = list() 
for _ in range(cpu_count()):  # minus one if the main processus is CPU intensive 
    worker = Worker(queue) 
    workers.append(worker) 
    worker.start() 

然后你可以通过queue给池子喂食:

for args in iterable: 
    queue.put(args) 

iterable 是您传递给工作人员的参数列表。上面的代码将尽可能快地推送 iterable 的内容。基本上,如果 worker 足够慢,几乎所有的可迭代对象都会在 worker 完成工作之前被插入队列。这就是为什么可迭代对象的内容必须适合内存

如果 worker 参数(又名。iterable)不能放入内存中,您必须以某种方式同步主进程和 workers...

最后确保调用以下内容:

for worker in workers: 
    queue.put(None) 
 
for worker in workers: 
    worker.join()