圆月山庄资源网 Design By www.vgjia.com
由于requests是http类接口的核心,因此封装前考虑问题比较多:
1. 对多种接口类型的支持;
2. 连接异常时能够重连;
3. 并发处理的选择;
4. 使用方便,容易维护;
当前并未全部实现,后期会不断完善。重点提一下并发处理的选择:python的并发处理机制由于存在GIL的原因,实现起来并不是很理想,综合考虑多进程、多线程、协程,在不考虑大并发性能测试的前提下使用了多线程-线程池的形式实现。使用的是
concurrent.futures模块。当前仅方便支持webservice接口。
# -*- coding:utf-8 -*- import requests from concurrent.futures import ThreadPoolExecutor from Tools.Config import Config # 配置文件读取 from Tools.Log import Log # 日志管理 from Tools.tools import decoLOG # 日志装饰 ''' 功能: Requests类 使用方法: 作者: 郭可昌 作成时间: 20180224 更新内容: 更新时间: ''' class Requests(object): def __init__(self): self.session = requests.session() self.header = {} # URL默认来源于配置文件,方便不同测试环境的切换,也可以动态设定 self.URL = Config().getURL() # 默认60s,可以动态设定 self.timeout = 60 #http连接异常的场合,重新连接的次数,默认为3,可以动态设定 self.iRetryNum = 3 self.errorMsg = "" # 内容 = {用例编号:响应数据} self.responses = {} # 内容 = {用例编号:异常信息} self.resErr={} # 原始post使用保留 # bodyData: request's data @decoLOG def post(self, bodyData): response = None self.errorMsg = "" try: response = self.session.post(self.URL, data=bodyData.encode('utf-8'), headers=self.header, timeout=self.timeout) response.raise_for_status() except Exception as e: self.errorMsg = str(e) Log().logger.error("HTTP请求异常,异常信息:%s" % self.errorMsg) return response # 复数请求并发处理,采用线程池的形式,用例数>线程池的容量:线程池的容量为并发数,否则,用例数为并发数 # dicDatas: {用例编号:用例数据} @decoLOG def req_all(self, dicDatas, iThreadNum=5): if len(dict(dicDatas)) < 1: Log().logger.error("没有测试对象,请确认后再尝试。。。") return self.responses.clear() # 请求用例集合转换(用例编号,用例数据) seed = [i for i in dicDatas.items()] self.responses.clear() # 线程池并发执行,iThreadNum为并发数 with ThreadPoolExecutor(iThreadNum) as executor: executor.map(self.req_single,seed) # 返回所有请求的响应信息({用例编号:响应数据}),http连接异常:对应None return self.responses # 用于单用例提交,http连接失败可以重新连接,最大重新连接数可以动态设定 def req_single(self, listData, reqType="post", iLoop=1): response = None # 如果达到最大重连次数,连接后提交结束 if iLoop == self.iRetryNum: if reqType == "post": try: response = requests.post(self.URL, data=listData[1].encode('utf-8'), headers=self.header, timeout=self.timeout) response.raise_for_status() except Exception as e: # 异常信息保存只在最大连接次数时进行,未达到最大连接次数,异常信息为空 self.resErr[listData[0]] = str(e) Log().logger.error("HTTP请求异常,异常信息:%s【%d】" % (str(e), iLoop)) self.responses[listData[0]] = response else: # for future: other request method expand pass # 未达到最大连接数,如果出现异常,则重新连接尝试 else: if reqType == "post": try: response = requests.post(self.URL, data=listData[1].encode('utf-8'), headers=self.header, timeout=self.timeout) response.raise_for_status() except Exception as e: Log().logger.error("HTTP请求异常,异常信息:%s【%d】" % (str(e), iLoop)) # 重连次数递增 iLoop += 1 # 进行重新连接 self.req_single(listData, reqType, iLoop) # 当前连接终止 return None self.responses[listData[0]] = response else: # for future: other request method expand pass # 设定SoapAction, 快捷完成webservice接口header设定 def setSoapAction(self, soapAction): self.header["SOAPAction"] = soapAction self.header["Content-Type"] = "text/xml;charset=UTF-8" self.header["Connection"] = "Keep-Alive" self.header["User-Agent"] = "InterfaceAutoTest-run"
以上这篇python对于requests的封装方法详解就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。
圆月山庄资源网 Design By www.vgjia.com
广告合作:本站广告合作请联系QQ:858582 申请时备注:广告合作(否则不回)
免责声明:本站文章均来自网站采集或用户投稿,网站不提供任何软件下载或自行开发的软件! 如有用户或公司发现本站内容信息存在侵权行为,请邮件告知! 858582#qq.com
免责声明:本站文章均来自网站采集或用户投稿,网站不提供任何软件下载或自行开发的软件! 如有用户或公司发现本站内容信息存在侵权行为,请邮件告知! 858582#qq.com
圆月山庄资源网 Design By www.vgjia.com
暂无评论...
更新日志
2024年11月06日
2024年11月06日
- 雨林唱片《赏》新曲+精选集SACD版[ISO][2.3G]
- 罗大佑与OK男女合唱团.1995-再会吧!素兰【音乐工厂】【WAV+CUE】
- 草蜢.1993-宝贝对不起(国)【宝丽金】【WAV+CUE】
- 杨培安.2009-抒·情(EP)【擎天娱乐】【WAV+CUE】
- 周慧敏《EndlessDream》[WAV+CUE]
- 彭芳《纯色角3》2007[WAV+CUE]
- 江志丰2008-今生为你[豪记][WAV+CUE]
- 罗大佑1994《恋曲2000》音乐工厂[WAV+CUE][1G]
- 群星《一首歌一个故事》赵英俊某些作品重唱企划[FLAC分轨][1G]
- 群星《网易云英文歌曲播放量TOP100》[MP3][1G]
- 方大同.2024-梦想家TheDreamer【赋音乐】【FLAC分轨】
- 李慧珍.2007-爱死了【华谊兄弟】【WAV+CUE】
- 王大文.2019-国际太空站【环球】【FLAC分轨】
- 群星《2022超好听的十倍音质网络歌曲(163)》U盘音乐[WAV分轨][1.1G]
- 童丽《啼笑姻缘》头版限量编号24K金碟[低速原抓WAV+CUE][1.1G]