这篇文章主要介绍了Python concurrent.futures模块使用实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
concurrent.futures的作用:
管理并发任务池。concurrent.futures模块提供了使用工作线程或进程池运行任务的接口。线程和进程池API都是一样,所以应用只做最小的修改就可以在线程和进程之间地切换
1、基于线程池使用map()
futures_thread_pool_map.py
#!/usr/bin/env python # -*- coding: utf-8 -*- from concurrent import futures import threading import time def task(n): print('{}: 睡眠 {}'.format(threading.current_thread().name,n)) time.sleep(n / 10) print('{}: 执行完成 {}'.format(threading.current_thread().name,n)) return n / 10 ex = futures.ThreadPoolExecutor(max_workers=2) print('main: 开始运行') results = ex.map(task, range(5, 0, -1)) #返回值是generator 生成器 print('main: 未处理的结果 {}'.format(results)) print('main: 等待真实结果') real_results = list(results) print('main: 最终结果: {}'.format(real_results))
运行效果
[root@ mnt]# python3 futures_thread_pool_map.py main: 开始运行 ThreadPoolExecutor-0_0: 睡眠 5 ThreadPoolExecutor-0_1: 睡眠 4 main: 未处理的结果 <generator object Executor.map.<locals>.result_iterator at 0x7f1c97484678> main: 等待真实结果 ThreadPoolExecutor-0_1: 执行完成 4 ThreadPoolExecutor-0_1: 睡眠 3 ThreadPoolExecutor-0_0: 执行完成 5 ThreadPoolExecutor-0_0: 睡眠 2 ThreadPoolExecutor-0_0: 执行完成 2 ThreadPoolExecutor-0_0: 睡眠 1 ThreadPoolExecutor-0_1: 执行完成 3 ThreadPoolExecutor-0_0: 执行完成 1 main: 最终结果: [0.5, 0.4, 0.3, 0.2, 0.1]
2、futures执行单个任务
futures_thread_pool_submit.py
#!/usr/bin/env python # -*- coding: utf-8 -*- from concurrent import futures import threading import time def task(n): print('{}: 睡眠 {}'.format(threading.current_thread().name, n)) time.sleep(n / 10) print('{}: 执行完成 {}'.format(threading.current_thread().name, n)) return n / 10 ex = futures.ThreadPoolExecutor(max_workers=2) print('main :开始') f = ex.submit(task, 5) print('main: future: {}'.format(f)) print('等待运行结果') results = f.result() print('main: result:{}'.format(results)) print('main: future 之后的结果:{}'.format(f))
运行效果
[root@ mnt]# python3 futures_thread_pool_submit.py main :开始 ThreadPoolExecutor-0_0: 睡眠 5 main: future: <Future at 0x7f40c0a6a400 state=running> 等待运行结果 ThreadPoolExecutor-0_0: 执行完成 5 main: result:0.5 main: future 之后的结果:<Future at 0x7f40c0a6a400 state=finished returned float>
3、futures.as_completed()按任意顺序运行结果
futures_as_completed.py
#!/usr/bin/env python # -*- coding: utf-8 -*- import random import time from concurrent import futures def task(n): time.sleep(random.random()) return (n, n / 10) ex = futures.ThreadPoolExecutor(max_workers=2) print('main: 开始') wait_for = [ ex.submit(task, i) for i in range(5, 0, -1) ] for f in futures.as_completed(wait_for): print('main: result:{}'.format(f.result()))
运行效果
[root@ mnt]# python3 futures_as_completed.py main: 开始 main: result:(5, 0.5) main: result:(4, 0.4) main: result:(3, 0.3) main: result:(1, 0.1) main: result:(2, 0.2)
4、Future回调之futures.add_done_callback()
futures_future_callback.py
#!/usr/bin/env python # -*- coding: utf-8 -*- from concurrent import futures import time def task(n): print('task {} : 睡眠'.format(n)) time.sleep(0.5) print('task {} : 完成'.format(n)) return n / 10 def done(fn): if fn.cancelled(): print('done {}:取消'.format(fn.arg)) elif fn.done(): error = fn.exception() if error: print('done {} : 错误返回 : {}'.format(fn.arg, error)) else: result = fn.result() print('done {} : 正常返回 : {}'.format(fn.arg, result)) if __name__ == '__main__': ex = futures.ThreadPoolExecutor(max_workers=2) print('main : 开始') f = ex.submit(task, 5) f.arg = 5 f.add_done_callback(done) result = f.result()
运行效果
[root@ mnt]# python3 futures_future_callback.py main : 开始 task 5 : 睡眠 task 5 : 完成 done 5 : 正常返回 : 0.5
5、Future任务取消之futures.cancel()
futures_future_callback_cancel.py
#!/usr/bin/env python # -*- coding: utf-8 -*- from concurrent import futures import time def task(n): print('task {} : 睡眠'.format(n)) time.sleep(0.5) print('task {} : 完成'.format(n)) return n / 10 def done(fn): if fn.cancelled(): print('done {}:取消'.format(fn.arg)) elif fn.done(): error = fn.exception() if error: print('done {} : 错误返回 : {}'.format(fn.arg, error)) else: result = fn.result() print('done {} : 正常返回 : {}'.format(fn.arg, result)) if __name__ == '__main__': ex = futures.ThreadPoolExecutor(max_workers=2) print('main : 开始') tasks = [] for i in range(10, 0, -1): print('main: submitting {}'.format(i)) f = ex.submit(task, i) f.arg = i f.add_done_callback(done) tasks.append((i, f)) for i, task_obj in reversed(tasks): if not task_obj.cancel(): print('main: 不能取消{}'.format(i)) ex.shutdown()
运行效果
[root@mnt]# python3 futures_future_callback_cancel.py main : 开始 main: submitting 10 task 10 : 睡眠 main: submitting 9 task 9 : 睡眠 main: submitting 8 main: submitting 7 main: submitting 6 main: submitting 5 main: submitting 4 main: submitting 3 main: submitting 2 main: submitting 1 done 1:取消 done 2:取消 done 3:取消 done 4:取消 done 5:取消 done 6:取消 done 7:取消 done 8:取消 main: 不能取消9 main: 不能取消10 task 10 : 完成 done 10 : 正常返回 : 1.0 task 9 : 完成 done 9 : 正常返回 : 0.9
6、Future异常的处理
futures_future_exception
#!/usr/bin/env python # -*- coding: utf-8 -*- from concurrent import futures def task(n): print('{} : 开始'.format(n)) raise ValueError('这个值不太好 {}'.format(n)) ex = futures.ThreadPoolExecutor(max_workers=2) print('main: 开始...') f = ex.submit(task, 5) error = f.exception() print('main: error:{}'.format(error)) try: result = f.result() except ValueError as e: print('访问结果值的异常 {}'.format(e))
运行效果
[root@mnt]# python3 futures_future_exception.py main: 开始... 5 : 开始 main: error:这个值不太好 5 访问结果值的异常 这个值不太好 5
7、Future上下文管理即利用with打开futures.ThreadPoolExecutor()
futures_context_manager.py
#!/usr/bin/env python # -*- coding: utf-8 -*- from concurrent import futures def task(n): print(n) with futures.ThreadPoolExecutor(max_workers=2) as ex: print('main: 开始') ex.submit(task, 1) ex.submit(task, 2) ex.submit(task, 3) ex.submit(task, 4) print('main: 结束')
运行效果
[root@ mnt]# python3 futures_context_manager.py main: 开始 2 4 main: 结束
8、基于进程池使用map()
futures_process_pool_map.py
#!/usr/bin/env python # -*- coding: utf-8 -*- from concurrent import futures import os def task(n): return (n, os.getpid()) if __name__ == '__main__': ex = futures.ProcessPoolExecutor(max_workers=2) results = ex.map(task, range(50, 0, -1)) for n, pid in results: print('task {} in 进程id {}'.format(n, pid))
运行效果
[root@ mnt]# python3 futures_process_pool_map.py task 5 in 进程id 9192 task 4 in 进程id 8668 task 3 in 进程id 9192 task 2 in 进程id 8668 task 1 in 进程id 9192
9、基于进程池异常处理
futures_process_pool_broken.py
#!/usr/bin/env python # -*- coding: utf-8 -*- from concurrent import futures import os import signal def task(n): return (n, os.getpid()) if __name__ == '__main__': with futures.ProcessPoolExecutor(max_workers=2) as ex: print('获取工作进程的id') f1 = ex.submit(os.getpid) pid1 = f1.result() print('结束进程 {}'.format(pid1)) os.kill(pid1, signal.SIGHUP) print('提交其它进程') f2 = ex.submit(os.getpid) try: pid2 = f2.result() except futures.process.BrokenProcessPool as e: print('不能开始新的任务:{}'.format(e))
运行效果
[root@ mnt]# python3 futures_process_pool_broken.py 获取工作进程的id 结束进程 104623 提交其它进程 不能开始新的任务:A process in the process pool was terminated abruptly while the future was running or pending.
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
免责声明:本站文章均来自网站采集或用户投稿,网站不提供任何软件下载或自行开发的软件! 如有用户或公司发现本站内容信息存在侵权行为,请邮件告知! 858582#qq.com
《魔兽世界》大逃杀!60人新游玩模式《强袭风暴》3月21日上线
暴雪近日发布了《魔兽世界》10.2.6 更新内容,新游玩模式《强袭风暴》即将于3月21 日在亚服上线,届时玩家将前往阿拉希高地展开一场 60 人大逃杀对战。
艾泽拉斯的冒险者已经征服了艾泽拉斯的大地及遥远的彼岸。他们在对抗世界上最致命的敌人时展现出过人的手腕,并且成功阻止终结宇宙等级的威胁。当他们在为即将于《魔兽世界》资料片《地心之战》中来袭的萨拉塔斯势力做战斗准备时,他们还需要在熟悉的阿拉希高地面对一个全新的敌人──那就是彼此。在《巨龙崛起》10.2.6 更新的《强袭风暴》中,玩家将会进入一个全新的海盗主题大逃杀式限时活动,其中包含极高的风险和史诗级的奖励。
《强袭风暴》不是普通的战场,作为一个独立于主游戏之外的活动,玩家可以用大逃杀的风格来体验《魔兽世界》,不分职业、不分装备(除了你在赛局中捡到的),光是技巧和战略的强弱之分就能决定出谁才是能坚持到最后的赢家。本次活动将会开放单人和双人模式,玩家在加入海盗主题的预赛大厅区域前,可以从强袭风暴角色画面新增好友。游玩游戏将可以累计名望轨迹,《巨龙崛起》和《魔兽世界:巫妖王之怒 经典版》的玩家都可以获得奖励。
更新日志
- 明达年度发烧碟MasterSuperiorAudiophile2021[DSF]
- 英文DJ 《致命的温柔》24K德国HD金碟DTS 2CD[WAV+分轨][1.7G]
- 张学友1997《不老的传说》宝丽金首版 [WAV+CUE][971M]
- 张韶涵2024 《不负韶华》开盘母带[低速原抓WAV+CUE][1.1G]
- lol全球总决赛lcs三号种子是谁 S14全球总决赛lcs三号种子队伍介绍
- lol全球总决赛lck三号种子是谁 S14全球总决赛lck三号种子队伍
- 群星.2005-三里屯音乐之男孩女孩的情人节【太合麦田】【WAV+CUE】
- 崔健.2005-给你一点颜色【东西音乐】【WAV+CUE】
- 南台湾小姑娘.1998-心爱,等一下【大旗】【WAV+CUE】
- 【新世纪】群星-美丽人生(CestLaVie)(6CD)[WAV+CUE]
- ProteanQuartet-Tempusomniavincit(2024)[24-WAV]
- SirEdwardElgarconductsElgar[FLAC+CUE]
- 田震《20世纪中华歌坛名人百集珍藏版》[WAV+CUE][1G]
- BEYOND《大地》24K金蝶限量编号[低速原抓WAV+CUE][986M]
- 陈奕迅《准备中 SACD》[日本限量版] [WAV+CUE][1.2G]