介绍
介绍
上一课大家跟福哥学会了在我们的TFLinux系统上面安装Kafka软件,今天福哥要带着大家学习使用Python去操作Kafka消息系统的方法。
Python操作Kafka可以使用pykafka这个库来实现,看这个命名是不是很眼熟?对了,类似的库名还有一个就是用来操作MySQL数据库的pymysql。pykafka库的语法和pymysql很像,不过福哥觉得Kafka和MySQL之间没有什么可比性,所以福哥选择了另外一个库。
福哥是用kafka-python这个库来操作Kafka的,这个库的设计和我们之前学习的PHP语言的rdkafka以及Java语言的KafkaConsumer/KafkaProducer对象很相似,学习起来更加舒服。
安装
直接通过pip安装kafka-python库就可以。
pip install kafka-python
使用
消费者
概念
低级消费者模式就是一个人接了一个项目,所有事情都要自己一点点做。如果当天没有做完,我会记录做到第几件了,然后第二天来了继续做下面的工作。
代码
低级消费者示例代码,Python作为控制台程序运行消费者真是太合适了。
from kafka import KafkaConsumer topic = "test" group_id = "lowLevel" bootstrap_servers = "192.168.2.168:9092" consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers, group_id=group_id, auto_offset_reset="latest") consumer.subscribe([topic]) while True: msg = consumer.poll(timeout_ms=12000) for records in msg.values(): for i in range(0, len(records)): record = records[i] print(record.value.decode("utf-8"))
效果
生产者
代码
生产者示例代码。
from kafka import KafkaProducer import time topic = "test" bootstrap_servers = "192.168.2.168:9092" producer = KafkaProducer(bootstrap_servers=bootstrap_servers) msg = "福哥说:现在是" + time.strftime("%H:%M:%S", time.localtime()) print(msg) producer.send(topic, msg)
踩坑
消费者
阻塞进程
编写消费者代码的时候,福哥特别不建议使用如下两种方式,因为它会阻塞进程,按Ctrl+C都退不出来。
for方式
for msg in consumer: for records in msg.values(): for i in range(0, len(records)): record = records[i] print(record.value.decode("utf-8"))
next方式
while True: msg = next(consumer) for records in msg.values(): for i in range(0, len(records)): record = records[i] print(record.value.decode("utf-8"))
官方文档
消费者
这是官方的消费者文档
KafkaConsumer — kafka-python 2.0.2-dev documentation
生产者
这是官方的生产者文档
KafkaProducer — kafka-python 2.0.2-dev documentation
总结
今天福哥带着童鞋们学习了使用Python语言连接操作Kafka消息系统的方法,Python操作Kafka有pykafka和kafka-python两种方式,大家有空可以自行学习一下pykafka库的使用方法,然后对比一下,看看是不是和福哥有着一样的观点。