Skip to main content
 首页 » 编程设计

python之使用 Scoop 编程 DEAP

2024年10月01日10lyj

我在 python 中使用 DEAP 库来解决多目标优化问题。我想为此任务使用多个处理器;但是,我遇到了一些麻烦。

为了提供一些背景信息,我将 networkx 与 DEAP 结合使用我还定义了适应度函数、交叉和变异函数(由于某些原因我不会在此处显示)。

它说 here我需要做的就是安装 Scoop 并添加行

from scoop import futures 
 
toolbox.register("map", futures.map) 

但是我似乎得到了一个错误:

scoop._comm.scoopexceptions.ReferenceBroken: 'module' object has no attribute 'Chromosome' 

在做了一些挖掘之后,我发现我需要将调用移动到主模块中指定的 creator.create here .

这样做之后,我得到另一个错误:

scoop._comm.scoopexceptions.ReferenceBroken: This element could not be pickled: FutureId(worker='127.0.0.1:49663', rank=1):partial(<Chromosome representation of a solution here>)=None 

我对并行计算不是很熟悉,我不太清楚“不能被腌制”是什么意思。可以在此处查看完整代码并进行一些编辑:

def genetic(network, creator, no_sensors, sfpd, lambda1, lambda2, lambda3, k): 
    locations = network.graph.nodes() 
    #move creator.create calls to the main module 
    ######################################## 
    creator.create("FitnessMax", base.Fitness, weights=(lambda1, -lambda2, lambda3))  
    creator.create("Chromosome", list, fitness=creator.FitnessMax)  
    ######################################## 
 
    toolbox = base.Toolbox() 
    toolbox.register("attr_item", random.sample, locations, no_sensors) 
    toolbox.register("chromosome", tools.initRepeat, creator.Chromosome, toolbox.attr_item, n=1) 
    toolbox.register("population", tools.initRepeat, list, toolbox.chromosome) 
 
    toolbox.register("map", futures.map) #######<-- this line ############## 
 
    def evaluate(chromosome): 
        #fitness function defined here 
 
    # Crossover 
    def crossover(chromosome1, chromosome2): # Uniform Crossover 
        #crossover is defined here 
 
    # Mutation 
    def mutation(chromosome): 
        #mutation is defined here 
 
    toolbox.register("evaluate", evaluate) 
    toolbox.register("mate", crossover) 
    toolbox.register("mutate", mutation) 
    toolbox.register("select", tools.selNSGA2) 
 
    random.seed(64) 
    pop = toolbox.population(n=MU) 
    hof = tools.ParetoFront() 
    stats = tools.Statistics(lambda ind: ind.fitness.values) 
    stats.register("avg", numpy.mean, axis=0) 
    stats.register("min", numpy.min, axis=0) 
    stats.register("max", numpy.max, axis=0) 
 
    algorithms.eaMuPlusLambda(pop, toolbox, MU, LAMBDA, CXPB, MUTPB, NGEN, stats, halloffame=hof) 
 
    return list(hof) 

谢谢,任何见解都将非常有值(value)。

请您参考如下方法:

这是使用 joblib 和 dill 的解决方法。

首先:使用 monkeypatch joblib 使其 pickle

import dill 
from dill import Pickler 
import joblib 
joblib.parallel.pickle = dill 
joblib.pool.dumps = dill.dumps 
joblib.pool.Pickler = Pickler 
 
from joblib.pool import CustomizablePicklingQueue 
from io import BytesIO 
from pickle import HIGHEST_PROTOCOL 
 
 
class CustomizablePickler(Pickler): 
    """Pickler that accepts custom reducers. 
    HIGHEST_PROTOCOL is selected by default as this pickler is used 
    to pickle ephemeral datastructures for interprocess communication 
    hence no backward compatibility is required. 
    `reducers` is expected expected to be a dictionary with key/values 
    being `(type, callable)` pairs where `callable` is a function that 
    give an instance of `type` will return a tuple `(constructor, 
    tuple_of_objects)` to rebuild an instance out of the pickled 
    `tuple_of_objects` as would return a `__reduce__` method. See the 
    standard library documentation on pickling for more details. 
    """ 
 
    # We override the pure Python pickler as its the only way to be able to 
    # customize the dispatch table without side effects in Python 2.6 
    # to 3.2. For Python 3.3+ leverage the new dispatch_table 
    # feature from http://bugs.python.org/issue14166 that makes it possible 
    # to use the C implementation of the Pickler which is faster. 
 
    def __init__(self, writer, reducers=None, protocol=HIGHEST_PROTOCOL): 
        Pickler.__init__(self, writer, protocol=protocol) 
        if reducers is None: 
            reducers = {} 
        # Make the dispatch registry an instance level attribute instead of 
        # a reference to the class dictionary under Python 2 
        self.dispatch = Pickler.dispatch.copy() 
        for type, reduce_func in reducers.items(): 
            self.register(type, reduce_func) 
 
    def register(self, type, reduce_func): 
        if hasattr(Pickler, 'dispatch'): 
            # Python 2 pickler dispatching is not explicitly customizable. 
            # Let us use a closure to workaround this limitation. 
            def dispatcher(self, obj): 
                reduced = reduce_func(obj) 
                self.save_reduce(obj=obj, *reduced) 
            self.dispatch[type] = dispatcher 
        else: 
            self.dispatch_table[type] = reduce_func 
 
joblib.pool.CustomizablePickler = CustomizablePickler 
 
 
def _make_methods(self): 
    self._recv = recv = self._reader.recv 
    racquire, rrelease = self._rlock.acquire, self._rlock.release 
 
    def get(): 
        racquire() 
        try: 
            return recv() 
        finally: 
            rrelease() 
 
    self.get = get 
 
    def send(obj): 
        buffer = BytesIO() 
        CustomizablePickler(buffer, self._reducers).dump(obj) 
        self._writer.send_bytes(buffer.getvalue()) 
 
    self._send = send 
 
    if self._wlock is None: 
        # writes to a message oriented win32 pipe are atomic 
        self.put = send 
    else: 
        wlock_acquire, wlock_release = ( 
            self._wlock.acquire, self._wlock.release) 
 
        def put(obj): 
            wlock_acquire() 
            try: 
                return send(obj) 
            finally: 
                wlock_release() 
 
        self.put = put 
 
CustomizablePicklingQueue._make_methods = _make_methods 

第二个:

from joblib import Parallel, delayed 
 
def mymap(f, *iters): 
    return Parallel(n_jobs=-1)(delayed(f)(*args) for args in zip(*iters)) 

最后只需注册 map :

toolbox.register("map", mymap) 

它与您链接的示例完美配合。你可以integrate dask 和 joblib 将此解决方案扩展到集群。使用 dask-drmaa而且您几乎拥有独家新闻所具有的相同功能。

可以找到示例代码here .