圆月山庄资源网 Design By www.vgjia.com
一. 如何调用
def f1(arg1, arg2): print('f1', arg1, arg2) def f2(arg1): print('f2', arg1) def f3(): print('f3') def f4(): print('周期任务', int(time.time())) timer = TaskTimer() # 把任务加入任务队列 timer.join_task(f1, [1, 2], timing=15.5) # 每天15:30执行 timer.join_task(f2, [3], timing=14) # 每天14:00执行 timer.join_task(f3, [], timing=15) # 每天15:00执行 timer.join_task(f4, [], interval=10) # 每10秒执行1次 # 开始执行(此时才会创建线程) timer.start()
f1~f4是我们需要定时执行的函数。
首先创建TaskTimer对象(TaskTimer的代码在下面)。调用join_task函数,把需要执行的函数加入到任务队列。最后调用start,任务就开始执行了。
join_task参数:
fun:需要执行的函数
arg:fun的参数,如果没有就传一个空列表
interval:如果有此参数,说明任务是周期任务,单位为秒(注意interval最少5秒)
timing:如果有此参数,说明任务是定时任务,单位为时
注意:interval和timing只能选填1个
二. 源码
import datetime import time from threading import Thread from time import sleep class TaskTimer: __instance = None def __new__(cls, *args, **kwargs): """ 单例模式 """ if not cls.__instance: cls.__instance = object.__new__(cls) return cls.__instance def __init__(self): if not hasattr(self, 'task_queue'): setattr(self, 'task_queue', []) if not hasattr(self, 'is_running'): setattr(self, 'is_running', False) def write_log(self, level, msg): cur_time = datetime.datetime.now() with open('./task.log', mode='a+', encoding='utf8') as file: s = "[" + str(cur_time) + "][" + level + "] " + msg print(s) file.write(s + "\n") def work(self): """ 处理任务队列 """ while True: for task in self.task_queue: if task['interval']: self.cycle_task(task) elif task['timing']: self.timing_task(task) sleep(5) def cycle_task(self, task): """ 周期任务 """ if task['next_sec'] <= int(time.time()): try: task['fun'](*task['arg']) self.write_log("正常", "周期任务:" + task['fun'].__name__ + " 已执行") except Exception as e: self.write_log("异常", "周期任务:" + task['fun'].__name__ + " 函数内部异常:" + str(e)) finally: task['next_sec'] = int(time.time()) + task['interval'] def timing_task(self, task): """ 定时任务 """ # 今天已过秒数 today_sec = self.get_today_until_now() # 到了第二天,就重置任务状态 if task['today'] != self.get_today(): task['today'] = self.get_today() task['today_done'] = False # 第一次执行 if task['first_work']: if today_sec >= task['task_sec']: task['today_done'] = True task['first_work'] = False else: task['first_work'] = False # 今天还没有执行 if not task['today_done']: if today_sec >= task['task_sec']: # 到点了,开始执行任务 try: task['fun'](*task['arg']) self.write_log("正常", "定时任务:" + task['fun'].__name__ + " 已执行") except Exception as e: self.write_log("异常", "定时任务:" + task['fun'].__name__ + " 函数内部异常:" + str(e)) finally: task['today_done'] = True if task['first_work']: task['first_work'] = False def get_today_until_now(self): """ 获取今天凌晨到现在的秒数 """ i = datetime.datetime.now() return i.hour * 3600 + i.minute * 60 + i.second def get_today(self): """ 获取今天的日期 """ i = datetime.datetime.now() return i.day def join_task(self, fun, arg, interval=None, timing=None): """ interval和timing只能存在1个 :param fun: 你要调用的任务 :param arg: fun的参数 :param interval: 周期任务,单位秒 :param timing: 定时任务,取值:[0,24) """ # 参数校验 if (interval != None and timing != None) or (interval == None and timing == None): raise Exception('interval和timing只能选填1个') if timing and not 0 <= timing < 24: raise Exception('timing的取值范围为[0,24)') if interval and interval < 5: raise Exception('interval最少为5') # 封装一个task task = { 'fun': fun, 'arg': arg, 'interval': interval, 'timing': timing, } # 封装周期或定时任务相应的参数 if timing: task['task_sec'] = timing * 3600 task['today_done'] = False task['first_work'] = True task['today'] = self.get_today() elif interval: task['next_sec'] = int(time.time()) + interval # 把task加入任务队列 self.task_queue.append(task) self.write_log("正常", "新增任务:" + fun.__name__) def start(self): """ 开始执行任务 返回线程标识符 """ if not self.is_running: thread = Thread(target=self.work) thread.start() self.is_running = True self.write_log("正常", "TaskTimer已开始运行!") return thread.ident self.write_log("警告", "TaskTimer已运行,请勿重复启动!")
以上这篇python异步实现定时任务和周期任务的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。
圆月山庄资源网 Design By www.vgjia.com
广告合作:本站广告合作请联系QQ:858582 申请时备注:广告合作(否则不回)
免责声明:本站文章均来自网站采集或用户投稿,网站不提供任何软件下载或自行开发的软件! 如有用户或公司发现本站内容信息存在侵权行为,请邮件告知! 858582#qq.com
免责声明:本站文章均来自网站采集或用户投稿,网站不提供任何软件下载或自行开发的软件! 如有用户或公司发现本站内容信息存在侵权行为,请邮件告知! 858582#qq.com
圆月山庄资源网 Design By www.vgjia.com
暂无评论...
更新日志
2024年11月06日
2024年11月06日
- 雨林唱片《赏》新曲+精选集SACD版[ISO][2.3G]
- 罗大佑与OK男女合唱团.1995-再会吧!素兰【音乐工厂】【WAV+CUE】
- 草蜢.1993-宝贝对不起(国)【宝丽金】【WAV+CUE】
- 杨培安.2009-抒·情(EP)【擎天娱乐】【WAV+CUE】
- 周慧敏《EndlessDream》[WAV+CUE]
- 彭芳《纯色角3》2007[WAV+CUE]
- 江志丰2008-今生为你[豪记][WAV+CUE]
- 罗大佑1994《恋曲2000》音乐工厂[WAV+CUE][1G]
- 群星《一首歌一个故事》赵英俊某些作品重唱企划[FLAC分轨][1G]
- 群星《网易云英文歌曲播放量TOP100》[MP3][1G]
- 方大同.2024-梦想家TheDreamer【赋音乐】【FLAC分轨】
- 李慧珍.2007-爱死了【华谊兄弟】【WAV+CUE】
- 王大文.2019-国际太空站【环球】【FLAC分轨】
- 群星《2022超好听的十倍音质网络歌曲(163)》U盘音乐[WAV分轨][1.1G]
- 童丽《啼笑姻缘》头版限量编号24K金碟[低速原抓WAV+CUE][1.1G]