博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python对kafka的基本操作
阅读量:6617 次
发布时间:2019-06-25

本文共 2331 字,大约阅读时间需要 7 分钟。

-- coding:utf-8 --

from kafka import KafkaProducer

from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time

bootstrap_servers = []

class OperateKafka:
def init(self,bootstrap_servers,topic):
self.bootstrap_servers = bootstrap_servers
self.topic = topic

"""生产者"""def produce(self):    producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers)    for i in range(4):        msg = "msg%d" %i        producer.send(self.topic,key=str(i),value=msg)    producer.close()"""一个消费者消费一个topic"""def consume(self):    #consumer = KafkaConsumer(self.topic,auto_offset_reset='earliest',group_id="testgroup",bootstrap_servers=self.bootstrap_servers)    consumer = KafkaConsumer(self.topic,bootstrap_servers=self.bootstrap_servers)    print consumer.partitions_for_topic(self.topic)  #获取test主题的分区信息print consumer.topics()  #获取主题列表print consumer.subscription()  #获取当前消费者订阅的主题print consumer.assignment()  #获取当前消费者topic、分区信息print consumer.beginning_offsets(consumer.assignment()) #获取当前消费者可消费的偏移量consumer.seek(TopicPartition(topic=self.topic, partition=0), 1)  #重置偏移量,从第1个偏移量消费    for message in consumer:        print ("%s:%d:%d: key=%s value=%s"         % (message.topic,message.partition,message.offset, message.key,message.value))"""一个消费者订阅多个topic """def consume2(self):    consumer = KafkaConsumer(bootstrap_servers=['192.168.124.201:9092'])consumer.subscribe(topics=('TEST','TEST2'))  #订阅要消费的主题print consumer.topics()print consumer.position(TopicPartition(topic='TEST', partition=0)) #获取当前主题的最新偏移量for message in consumer:        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,                                      message.offset, message.key,                                      message.value))"""消费者(手动拉取消息)"""def consume3(self):    consumer = KafkaConsumer(group_id="mygroup",max_poll_records=3,bootstrap_servers=['192.168.124.201:9092'])consumer.subscribe(topics=('TEST','TEST2'))while True:        message = consumer.poll(timeout_ms=5)   #从kafka获取消息        if message:        print message        time.sleep(1)

def main():

bootstrap_servers = ['192.168.124.201:9092']
topic = "TEST"
operateKafka = OperateKafka(bootstrap_servers,topic)
operateKafka.produce()
#operateKafka.consume()
#operateKafka.consume2()
operateKafka.consume3()
main()

转载于:https://blog.51cto.com/xujingbo/2154987

你可能感兴趣的文章
虚拟现实技术或会产生副作用
查看>>
【云图】如何设置微信里的全国实体店地图?
查看>>
db file async I/O submit 等待事件优化
查看>>
前端需要了解的 SSO 与 CAS 知识
查看>>
李开复谈未来工作:虽然会被AI取代,但谁说人类非得工作不可?
查看>>
PostgreSQL 空间切割(st_split)功能扩展 - 空间对象网格化
查看>>
Intercom的持续部署实践:一天部署100次,1次10分钟
查看>>
SpringBoot权限控制
查看>>
阿里云中间件技术 促进互联网高速发展
查看>>
智能时代悄然到来 物联网称王将引爆传感器产业
查看>>
物理隔离计算机被USB蜜蜂刺破 数据通过无线信号泄露
查看>>
利用一点机器学习来加速你的网站
查看>>
中国域名现状:应用水平较低,安全仍存隐患
查看>>
Java中HashMap的原理分析
查看>>
React Native入门项目与解析
查看>>
云计算:大势所趋 你准备好了么?
查看>>
数据资产的运营商--天市大数据交易平台
查看>>
中小企业如何成功转型跨境电商
查看>>
java中文乱码解决之道(二)—–字符编码详解:基础知识 + ASCII + GB**
查看>>
《ANTLR 4权威指南》——2.5 语法分析树监听器和访问器
查看>>