圆月山庄资源网 Design By www.vgjia.com
1、es的批量插入
这是为了方便后期配置的更改,把配置信息放在logging.conf中
用elasticsearch来实现批量操作,先安装依赖包,sudo pip install Elasticsearch2
from elasticsearch import Elasticsearch class ImportEsData: logging.config.fileConfig("logging.conf") logger = logging.getLogger("msg") def __init__(self,hosts,index,type): self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000) self.index = index self.type = type def set_date(self,data): # 批量处理 # es.index(index="test-index",doc_type="test-type",id=42,body={"any":"data","timestamp":datetime.now()}) self.es.index(index=self.index,doc_type=self.index,body=data)
2、使用pykafka消费kafka
1.因为kafka是0.8,pykafka不支持zk,只能用get_simple_consumer来实现
2.为了实现多个应用同时消费而且不重消费,所以一个应用消费一个partition
3. 为是确保消费数据量在不满足10000这个批量值,能在一个时间范围内插入到es中,这里设置consumer_timeout_ms一个超时等待时间,退出等待消费阻塞。
4.退出等待消费阻塞后导致无法再消费数据,因此在获取self.consumer 的外层加入了while True 一个死循环
#!/usr/bin/python # -*- coding: UTF-8 -*- from pykafka import KafkaClient import logging import logging.config from ConfigUtil import ConfigUtil import datetime class KafkaPython: logging.config.fileConfig("logging.conf") logger = logging.getLogger("msg") logger_data = logging.getLogger("data") def __init__(self): self.server = ConfigUtil().get("kafka","kafka_server") self.topic = ConfigUtil().get("kafka","topic") self.group = ConfigUtil().get("kafka","group") self.partition_id = int(ConfigUtil().get("kafka","partition")) self.consumer_timeout_ms = int(ConfigUtil().get("kafka","consumer_timeout_ms")) self.consumer = None self.hosts = ConfigUtil().get("es","hosts") self.index_name = ConfigUtil().get("es","index_name") self.type_name = ConfigUtil().get("es","type_name") def getConnect(self): client = KafkaClient(self.server) topic = client.topics[self.topic] p = topic.partitions ps={p.get(self.partition_id)} self.consumer = topic.get_simple_consumer( consumer_group=self.group, auto_commit_enable=True, consumer_timeout_ms=self.consumer_timeout_ms, # num_consumer_fetchers=1, # consumer_id='test1', partitions=ps ) self.starttime = datetime.datetime.now() def beginConsumer(self): print("beginConsumer kafka-python") imprtEsData = ImportEsData(self.hosts,self.index_name,self.type_name) #创建ACTIONS count = 0 ACTIONS = [] while True: endtime = datetime.datetime.now() print (endtime - self.starttime).seconds for message in self.consumer: if message is not None: try: count = count + 1 # print(str(message.partition.id)+","+str(message.offset)+","+str(count)) # self.logger.info(str(message.partition.id)+","+str(message.offset)+","+str(count)) action = { "_index": self.index_name, "_type": self.type_name, "_source": message.value } ACTIONS.append(action) if len(ACTIONS) >= 10000: imprtEsData.set_date(ACTIONS) ACTIONS = [] self.consumer.commit_offsets() endtime = datetime.datetime.now() print (endtime - self.starttime).seconds #break except (Exception) as e: # self.consumer.commit_offsets() print(e) self.logger.error(e) self.logger.error(str(message.partition.id)+","+str(message.offset)+","+message.value+"\n") # self.logger_data.error(message.value+"\n") # self.consumer.commit_offsets() if len(ACTIONS) > 0: self.logger.info("等待时间超过,consumer_timeout_ms,把集合数据插入es") imprtEsData.set_date(ACTIONS) ACTIONS = [] self.consumer.commit_offsets() def disConnect(self): self.consumer.close() from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk class ImportEsData: logging.config.fileConfig("logging.conf") logger = logging.getLogger("msg") def __init__(self,hosts,index,type): self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000) self.index = index self.type = type def set_date(self,data): # 批量处理 success = bulk(self.es, data, index=self.index, raise_on_error=True) self.logger.info(success)
3、运行
if __name__ == '__main__': kp = KafkaPython() kp.getConnect() kp.beginConsumer() # kp.disConnect()
注:简单的写了一个从kafka中读取数据到一个list里,当数据达到一个阈值时,在批量插入到 es的插件
现在还在批量的压测中。。。
以上这篇python消费kafka数据批量插入到es的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。
标签:
python,kafka,es
圆月山庄资源网 Design By www.vgjia.com
广告合作:本站广告合作请联系QQ:858582 申请时备注:广告合作(否则不回)
免责声明:本站文章均来自网站采集或用户投稿,网站不提供任何软件下载或自行开发的软件! 如有用户或公司发现本站内容信息存在侵权行为,请邮件告知! 858582#qq.com
免责声明:本站文章均来自网站采集或用户投稿,网站不提供任何软件下载或自行开发的软件! 如有用户或公司发现本站内容信息存在侵权行为,请邮件告知! 858582#qq.com
圆月山庄资源网 Design By www.vgjia.com
暂无评论...
更新日志
2024年11月06日
2024年11月06日
- 雨林唱片《赏》新曲+精选集SACD版[ISO][2.3G]
- 罗大佑与OK男女合唱团.1995-再会吧!素兰【音乐工厂】【WAV+CUE】
- 草蜢.1993-宝贝对不起(国)【宝丽金】【WAV+CUE】
- 杨培安.2009-抒·情(EP)【擎天娱乐】【WAV+CUE】
- 周慧敏《EndlessDream》[WAV+CUE]
- 彭芳《纯色角3》2007[WAV+CUE]
- 江志丰2008-今生为你[豪记][WAV+CUE]
- 罗大佑1994《恋曲2000》音乐工厂[WAV+CUE][1G]
- 群星《一首歌一个故事》赵英俊某些作品重唱企划[FLAC分轨][1G]
- 群星《网易云英文歌曲播放量TOP100》[MP3][1G]
- 方大同.2024-梦想家TheDreamer【赋音乐】【FLAC分轨】
- 李慧珍.2007-爱死了【华谊兄弟】【WAV+CUE】
- 王大文.2019-国际太空站【环球】【FLAC分轨】
- 群星《2022超好听的十倍音质网络歌曲(163)》U盘音乐[WAV分轨][1.1G]
- 童丽《啼笑姻缘》头版限量编号24K金碟[低速原抓WAV+CUE][1.1G]