1、先看最简单的场景,生产者生产消息,消费者接收消息,下面是生产者的简单代码。
#!/usr/bin/env python # -*- coding: utf-8 -*- import json from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='xxxx:x') msg_dict = { "sleep_time": 10, "db_config": { "database": "test_1", "host": "xxxx", "user": "root", "password": "root" }, "table": "msg", "msg": "Hello World" } msg = json.dumps(msg_dict) producer.send('test_rhj', msg, partition=0) producer.close()
下面是消费者的简单代码:
from kafka import KafkaConsumer consumer = KafkaConsumer('test_rhj', bootstrap_servers=['xxxx:x']) for msg in consumer: recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) print recv
下面是结果:
2、如果想要完成负载均衡,就需要知道kafka的分区机制,同一个主题,可以为其分区,在生产者不指定分区的情况,kafka会将多个消息分发到不同的分区,消费者订阅时候如果不指定服务组,会收到所有分区的消息,如果指定了服务组,则同一服务组的消费者会消费不同的分区,如果2个分区两个消费者的消费者组消费,则,每个消费者消费一个分区,如果有三个消费者的服务组,则会出现一个消费者消费不到数据;如果想要消费同一分区,则需要用不同的服务组。以此为原理,我们对消费者做如下修改:
from kafka import KafkaConsumer consumer = KafkaConsumer('test_rhj', bootstrap_servers=['xxxx:x']) for msg in consumer: recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) print recv
然后我们开两个消费者进行消费,生产者分别往0分区和1分区发消息结果如下,可以看到,一个消费者只能消费0分区,另一个只能消费1分区:
3、kafka提供了偏移量的概念,允许消费者根据偏移量消费之前遗漏的内容,这基于kafka名义上的全量存储,可以保留大量的历史数据,历史保存时间是可配置的,一般是7天,如果偏移量定位到了已删除的位置那也会有问题,但是这种情况可能很小;每个保存的数据文件都是以偏移量命名的,当前要查的偏移量减去文件名就是数据在该文件的相对位置。要指定偏移量消费数据,需要指定该消费者要消费的分区,否则代码会找不到分区而无法消费,代码如下:
from kafka import KafkaConsumer from kafka.structs import TopicPartition consumer = KafkaConsumer(group_id='123456', bootstrap_servers=['10.43.35.25:4531']) consumer.assign([TopicPartition(topic='test_rhj', partition=0), TopicPartition(topic='test_rhj', partition=1)]) print consumer.partitions_for_topic("test_rhj") # 获取test主题的分区信息 print consumer.assignment() print consumer.beginning_offsets(consumer.assignment()) consumer.seek(TopicPartition(topic='test_rhj', partition=0), 0) for msg in consumer: recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) print recv
因为指定的便宜量为0,所以从一开始插入的数据都可以查到,而且因为指定了分区,指定的分区结果都可以消费,结果如下:
4、有时候,我们并不需要实时获取数据,因为这样可能会造成性能瓶颈,我们只需要定时去获取队列里的数据然后批量处理就可以,这种情况,我们可以选择主动拉取数据
from kafka import KafkaConsumer import time consumer = KafkaConsumer(group_id='123456', bootstrap_servers=['10.43.35.25:4531']) consumer.subscribe(topics=('test_rhj',)) index = 0 while True: msg = consumer.poll(timeout_ms=5) # 从kafka获取消息 print msg time.sleep(2) index += 1 print '--------poll index is %s----------' % index
结果如下,可以看到,每次拉取到的都是前面生产的数据,可能是多条的列表,也可能没有数据,如果没有数据,则拉取到的为空:
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
python操作kafka
免责声明:本站文章均来自网站采集或用户投稿,网站不提供任何软件下载或自行开发的软件! 如有用户或公司发现本站内容信息存在侵权行为,请邮件告知! 858582#qq.com
稳了!魔兽国服回归的3条重磅消息!官宣时间再确认!
昨天有一位朋友在大神群里分享,自己亚服账号被封号之后居然弹出了国服的封号信息对话框。
这里面让他访问的是一个国服的战网网址,com.cn和后面的zh都非常明白地表明这就是国服战网。
而他在复制这个网址并且进行登录之后,确实是网易的网址,也就是我们熟悉的停服之后国服发布的暴雪游戏产品运营到期开放退款的说明。这是一件比较奇怪的事情,因为以前都没有出现这样的情况,现在突然提示跳转到国服战网的网址,是不是说明了简体中文客户端已经开始进行更新了呢?
更新日志
- 小骆驼-《草原狼2(蓝光CD)》[原抓WAV+CUE]
- 群星《欢迎来到我身边 电影原声专辑》[320K/MP3][105.02MB]
- 群星《欢迎来到我身边 电影原声专辑》[FLAC/分轨][480.9MB]
- 雷婷《梦里蓝天HQⅡ》 2023头版限量编号低速原抓[WAV+CUE][463M]
- 群星《2024好听新歌42》AI调整音效【WAV分轨】
- 王思雨-《思念陪着鸿雁飞》WAV
- 王思雨《喜马拉雅HQ》头版限量编号[WAV+CUE]
- 李健《无时无刻》[WAV+CUE][590M]
- 陈奕迅《酝酿》[WAV分轨][502M]
- 卓依婷《化蝶》2CD[WAV+CUE][1.1G]
- 群星《吉他王(黑胶CD)》[WAV+CUE]
- 齐秦《穿乐(穿越)》[WAV+CUE]
- 发烧珍品《数位CD音响测试-动向效果(九)》【WAV+CUE】
- 邝美云《邝美云精装歌集》[DSF][1.6G]
- 吕方《爱一回伤一回》[WAV+CUE][454M]