Skip to main content
 首页 » 编程设计

python之Python 线程定时器如何在内部工作

2025年05月04日60tintown

我想知道 python threading.Timer 是如何工作的。

更详细地说,当我运行几个 threading.Timer 时,它是否运行单独的线程来计算时间和运行处理程序?

或者一个线程一起管理和计数几个定时器?

我问是因为我的应用程序需要安排很多事件,但是

如果 threading.Timer 单独运行每个线程来计算一个计时器,并且我运行许多计时器,它可能会影响性能。

所以我担心如果我必须实现一个只运行一个线程的调度程序,如果它对性能有很大影响的话。

请您参考如下方法:

threading.Timer 类是 threading.Thread 的子类,基本上它只是运行一个单独的线程,在该线程中休眠指定的时间并运行相应的功能。

这绝对不是安排事件的有效方式。更好的方法是使用 Queue.PriorityQueue 在单个线程中进行调度,您可以在其中将事件放在“优先级”实际上意味着“下一个触发日期”的地方。类似于 cron 的工作方式。

或者甚至更好:使用已经存在的东西,不要重新发明轮子:Cron、Celery 等等......

通过 Queue.PriorityQueue 制作调度器的一个非常简单的例子:

import time 
from Queue import PriorityQueue 
 
class Task(object): 
    def __init__(self, fn, crontab): 
        # TODO: it should be possible to pass args, kwargs 
        # so that fn can be called with fn(*args, **kwargs) 
        self.fn = fn 
        self.crontab = crontab 
 
    def get_next_fire_date(self): 
        # TODO: evaluate next fire date based on self.crontab 
        pass 
 
class Scheduler(object): 
    def __init__(self): 
        self.event_queue = PriorityQueue() 
        self.new_task = False 
 
    def schedule_task(self, fn, crontab): 
        # TODO: add scheduling language, crontab or something 
        task = Task(fn, crontab) 
        next_fire = task.get_next_fire_date() 
        if next_fire: 
            self.new_task = True 
            self.event_queue.put((next_fire, task)) 
 
    def run(self): 
        self.new_task = False 
 
        # TODO: do we really want an infinite loop? 
        while True: 
            # TODO: actually we want .get() with timeout and to handle 
            # the case when the queue is empty 
            next_fire, task = self.event_queue.get() 
 
            # incremental sleep so that we can check 
            # if new tasks arrived in the meantime 
            sleep_for = int(next_fire - time.time()) 
            for _ in xrange(sleep_for): 
                time.sleep(1) 
                if self.new_task: 
                    self.new_task = False 
                    self.event_queue.put((next_fire, task)) 
                    continue 
 
            # TODO: run in separate thread? 
            task.fn() 
 
            time.sleep(1) 
            next_fire = task.get_next_fire_date() 
 
            if next_fire: 
                event_queue.put((next_fire, task)) 
 
def test(): 
    return 'hello world' 
 
sch = Scheduler() 
sch.schedule_task(test, '5 * * * *') 
sch.schedule_task(test, '0 22 * * 1-5') 
sch.schedule_task(test, '1 1 * * *') 
sch.run() 

这只是一个想法。您必须正确实现 TaskScheduler 类,即 get_next_fire_date 方法加上某种调度语言(crontab?)和错误处理。我仍然强烈建议使用现有的库之一。