上一课大家跟福哥学会了在我们的TFLinux系统上面安装Kafka软件,今天福哥要带着大家学习使用Python去操作Kafka消息系统的方法。
Python操作Kafka可以使用pykafka这个库来实现,看这个命名是不是很眼熟?对了,类似的库名还有一个就是用来操作MySQL数据库的pymysql。pykafka库的语法和pymysql很像,不过福哥觉得Kafka和MySQL之间没有什么可比性,所以福哥选择了另外一个库。
福哥是用kafka-python这个库来操作Kafka的,这个库的设计和我们之前学习的PHP语言的rdkafka以及Java语言的KafkaConsumer/KafkaProducer对象很相似,学习起来更加舒服。
直接通过pip安装kafka-python库就可以。
pip install kafka-python
低级消费者模式就是一个人接了一个项目,所有事情都要自己一点点做。如果当天没有做完,我会记录做到第几件了,然后第二天来了继续做下面的工作。
低级消费者示例代码,Python作为控制台程序运行消费者真是太合适了。
from kafka import KafkaConsumer
topic = "test"
group_id = "lowLevel"
bootstrap_servers = "192.168.2.168:9092"
consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers, group_id=group_id, auto_offset_reset="latest")
consumer.subscribe([topic])
while True:
msg = consumer.poll(timeout_ms=12000)
for records in msg.values():
for i in range(0, len(records)):
record = records[i]
print(record.value.decode("utf-8"))
生产者示例代码。
from kafka import KafkaProducer
import time
topic = "test"
bootstrap_servers = "192.168.2.168:9092"
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
msg = "福哥说:现在是" + time.strftime("%H:%M:%S", time.localtime())
print(msg)
producer.send(topic, msg)编写消费者代码的时候,福哥特别不建议使用如下两种方式,因为它会阻塞进程,按Ctrl+C都退不出来。
for方式
for msg in consumer:
for records in msg.values():
for i in range(0, len(records)):
record = records[i]
print(record.value.decode("utf-8"))next方式
while True:
msg = next(consumer)
for records in msg.values():
for i in range(0, len(records)):
record = records[i]
print(record.value.decode("utf-8"))这是官方的消费者文档
KafkaConsumer — kafka-python 2.0.2-dev documentation
这是官方的生产者文档
KafkaProducer — kafka-python 2.0.2-dev documentation
今天福哥带着童鞋们学习了使用Python语言连接操作Kafka消息系统的方法,Python操作Kafka有pykafka和kafka-python两种方式,大家有空可以自行学习一下pykafka库的使用方法,然后对比一下,看看是不是和福哥有着一样的观点。