一、多线程化选择
并行化一个代码有两大选择:multithread 和 multiprocess。
Multithread,多线程,同一个进程(process)可以开启多个线程执行计算。每个线程代表了一个 CPU 核心,这么多线程可以访问同样的内存地址(所谓共享内存),实现了线程之间的通讯,算是最简单的并行模型。
Multiprocess,多进程,则相当于同时开启多个 Python 解释器,每个解释器有自己独有的数据,自然不会有数据冲突。
二、并行化思想
并行化的基本思路是把 dataframe 用 np.array_split 方法切割成多个子 dataframe。再调用 Pool.map 函数并行地执行。注意到顺序执行的 pandas.DataFrame.apply 是如何转化成 Pool.map 然后并行执行的。
Pool 对象是一组并行的进程,开源Pool类
开源Pool类定义
def Pool(self, processes=None, initializer=None, initargs=(), maxtasksperchild=None): '''Returns a process pool object''' from .pool import Pool return Pool(processes, initializer, initargs, maxtasksperchild, context=self.get_context())
设置进程初始化函数
def init_process(global_vars): global a a = global_vars
设置进程初始化函数
Pool(processes=8,initializer=init_process,initargs=(a,))
其中,指定产生 8 个进程,每个进程的初始化需运行 init_process函数,其参数为一个 singleton tuple a. 利用 init_process 和 initargs,我们可以方便的设定需要在进程间共享的全局变量(这里是 a)。
with 关键词是 context manager,避免写很繁琐的处理开关进程的逻辑。
with Pool(processes=8,initializer=init_process,initargs=(a,)) as pool: result_parts = pool.map(apply_f,df_parts)
三、多线程化应用
多线程时间比较和多线程的几种apply应用
import numpy as np import pandas as pd import time from multiprocessing import Pool def f(row): #直接对某列进行操作 return sum(row)+a def f1_1(row): #对某一列进行操作,我这里的columns=range(0,2),此处是对第0列进行操作 return row[0]**2 def f1_2(row1): #对某一列进行操作,我这里的columns=range(0,2),此处是对第0列进行操作 return row1**2 def f2_1(row): #对某两列进行操作,我这里的columns=range(0,2),此处是对第0,2列进行操作 return pd.Series([row[0]**2,row[1]**2],index=['1_1','1_2']) def f2_2(row1,row2): #对某两列进行操作,我这里的columns=range(0,2),此处是对第0,2列进行操作 return pd.Series([row1**2,row2**2],index=['2_1','2_2']) def apply_f(df): return df.apply(f,axis=1) def apply_f1_1(df): return df.apply(f1_1,axis=1) def apply_f1_2(df): return df[0].apply(f1_2) def apply_f2_1(df): return df.apply(f2_1,axis=1) def apply_f2_2(df): return df.apply(lambda row :f2_2(row[0],row[1]),axis=1) def init_process(global_vars): global a a = global_vars def time_compare(): '''直接调用和多线程调用时间对比''' a = 2 np.random.seed(0) df = pd.DataFrame(np.random.rand(10**5,2),columns=range(0,2)) print(df.columns) t1= time.time() result_serial = df.apply(f,axis=1) t2 = time.time() print("Serial time =",t2-t1) print(result_serial.head()) df_parts=np.array_split(df,20) print(len(df_parts),type(df_parts[0])) with Pool(processes=8,initializer=init_process,initargs=(a,)) as pool: #with Pool(processes=8) as pool: result_parts = pool.map(apply_f,df_parts) result_parallel= pd.concat(result_parts) t3 = time.time() print("Parallel time =",t3-t2) print(result_parallel.head()) def apply_fun(): '''多种apply函数的调用''' a = 2 np.random.seed(0) df = pd.DataFrame(np.random.rand(10**5,2),columns=range(0,2)) print(df.columns) df_parts=np.array_split(df,20) print(len(df_parts),type(df_parts[0])) with Pool(processes=8,initializer=init_process,initargs=(a,)) as pool: #with Pool(processes=8) as pool: res_part0 = pool.map(apply_f,df_parts) res_part1 = pool.map(apply_f1_1,df_parts) res_part2 = pool.map(apply_f1_2,df_parts) res_part3 = pool.map(apply_f2_1,df_parts) res_part4 = pool.map(apply_f2_2,df_parts) res_parallel0 = pd.concat(res_part0) res_parallel1 = pd.concat(res_part1) res_parallel2 = pd.concat(res_part2) res_parallel3 = pd.concat(res_part3) res_parallel4 = pd.concat(res_part4) print("f:\n",res_parallel0.head()) print("f1:\n",res_parallel1.head()) print("f2:\n",res_parallel2.head()) print("f3:\n",res_parallel3.head()) print("f4:\n",res_parallel4.head()) df=pd.concat([df,res_parallel0],axis=1) df=pd.concat([df,res_parallel1],axis=1) df=pd.concat([df,res_parallel2],axis=1) df=pd.concat([df,res_parallel3],axis=1) df=pd.concat([df,res_parallel4],axis=1) print(df.head()) if __name__ == '__main__': time_compare() apply_fun()
参考网址
https://blog.fangzhou.me/posts/20170702-python-parallelism/
https://docs.python.org/3.7/library/multiprocessing.html
免责声明:本站文章均来自网站采集或用户投稿,网站不提供任何软件下载或自行开发的软件! 如有用户或公司发现本站内容信息存在侵权行为,请邮件告知! 858582#qq.com
RTX 5090要首发 性能要翻倍!三星展示GDDR7显存
三星在GTC上展示了专为下一代游戏GPU设计的GDDR7内存。
首次推出的GDDR7内存模块密度为16GB,每个模块容量为2GB。其速度预设为32 Gbps(PAM3),但也可以降至28 Gbps,以提高产量和初始阶段的整体性能和成本效益。
据三星表示,GDDR7内存的能效将提高20%,同时工作电压仅为1.1V,低于标准的1.2V。通过采用更新的封装材料和优化的电路设计,使得在高速运行时的发热量降低,GDDR7的热阻比GDDR6降低了70%。
更新日志
- 《暗喻幻想》顺风耳作用介绍
- 崔健1985-梦中的倾诉[再版][WAV+CUE]
- 黄子馨《追星Xin的恋人们2》HQ头版限量编号[WAV+CUE]
- 孟庭苇《情人的眼泪》开盘母带[低速原抓WAV+CUE]
- 孙露《谁为我停留HQCD》[低速原抓WAV+CUE][1.1G]
- 孙悦《时光音乐会》纯银CD[低速原抓WAV+CUE][1.1G]
- 任然《渐晚》[FLAC/分轨][72.32MB]
- 英雄联盟新英雄安蓓萨上线了吗 新英雄安蓓萨技能介绍
- 魔兽世界奥杜尔竞速赛什么时候开启 奥杜尔竞速赛开启时间介绍
- 无畏契约CGRS准星代码多少 CGRS准星代码分享一览
- 张靓颖.2012-倾听【少城时代】【WAV+CUE】
- 游鸿明.1999-五月的雪【大宇国际】【WAV+CUE】
- 曹方.2005-遇见我【钛友文化】【WAV+CUE】
- Unity6引擎上线:稳定性提升、CPU性能最高提升4倍
- 人皇Sky今日举行婚礼!电竞传奇步入新篇章