上一课大家跟福哥学会了在我们的TFLinux系统上面安装Kafka软件,今天福哥要带着大家学习使用PHP去操作Kafka消息系统的方法。
PHP操作Kafka需要借助rdkafka库,我们可以在github上面下载到源代码进行安装。因为rdkafka是作为PHP扩展部署的,所以我们不需要重新编译PHP环境。
下载librdkafka
wget https://github.com/edenhill/librdkafka/archive/v0.11.0.tar.gz
下载php-rdkafka
wget https://github.com/arnaud-lb/php-rdkafka/archive/3.0.4.tar.gz
安装librdkafka
tar -xzvf v0.11.0.tar.gz cd librdkafka-0.11.0/ ./configure --prefix=/tongfu.net/env/librdkafka/ make make install cd ..
安装php-rdkafka
tar -xzvf 3.0.4.tar.gz cd php-rdkafka-3.0.4/ /tongfu.net/env/php-7.4.6/bin/phpize ./configure \ --with-php-config=/tongfu.net/env/php-7.4.6/bin/php-config \ --with-rdkafka=/tongfu.net/env/librdkafka/ make make install cd ..
配置php.ini
extension=rdkafka
重新启动apache
systemctl restart httpd
低级消费者模式就是一个人接了一个项目,所有事情都要自己一点点做。如果当天没有做完,我会记录做到第几件了,然后第二天来了继续做下面的工作。
低级消费者示例代码
当 $message 为 null 或者 $message->err 为到底了,都表示没有新消息
强烈建议本地保存 offset(偏移),我做了多少我自己记着,放心!
$groupID = "lowLevel";
$topicName = "test";
$partitionID = 0;
$conf = new RdKafka\Conf();
$conf->setErrorCb(function($kafka, $err, $reason){
var_dump($err);
var_dump($reason);
});
$conf->set('group.id', $groupID);
$consumer = new RdKafka\Consumer($conf);
$consumer->addBrokers("server_kafka:9092");
$topicConf = new RdKafka\TopicConf();
$topicConf->set('offset.store.method', 'file');
$topicConf->set('offset.store.path', __DIR__);
$topicConf->set('auto.commit.enable', 1);
$topicConf->set('auto.commit.interval.ms', 10);
$topicConf->set('auto.offset.reset', 'smallest');
$topic = $consumer->newTopic($topicName, $topicConf);
$topic->consumeStart($partitionID, RD_KAFKA_OFFSET_STORED);
var_dump("启动中...");
while (1) {
// try consumer record
$message = $topic->consume($partitionID, 120*1000);
if($message == null){
continue;
}
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message->payload);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
var_dump("没有更多消息了");
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
var_dump("太长时间未收到消息了");
break;
default:
throw new Exception($message->errstr(), $message->err);
break;
}
}生产一个消息

消费一个消息

高级消费者模式就是团队做项目,要做的事情列出来了,你接一个,我接一个,各自做完手头的工作后再去接下一个。分配工作的是 Kafka 的 Rebalance 来控制,逻辑也非常简单。
它先把项目需要做的事情分开多个袋子里,然后每个袋子上写一个编号。当员工来接活的时候,会先看看有没有没人负责的袋子,如果有就告诉员工你的编号是这个,这个袋子里的事情都是你的。如果全部袋子都有人负责了,那就告诉员工暂时没事干,你先等等吧。
注意:消费者能不能拿到消息,完全看是不是可以得到分配到分区,而往往是这个地方会需要等很久~~
高级消费者示例代码
$groupID = "highLevel";
$topicName = "test";
$conf = new RdKafka\Conf();
$conf->setErrorCb(function($kafka, $err, $reason){
var_dump($err);
var_dump($reason);
});
$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
foreach($partitions as $partition){
var_dump("指定分区:". $partition->getPartition());
}
$kafka->assign($partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
foreach($partitions as $partition){
var_dump("删除分区:". $partition->getPartition());
}
$kafka->assign(NULL);
break;
default:
throw new \Exception($err);
}
});
$conf->set('group.id', $groupID);
$conf->set('metadata.broker.list', "127.0.0.1:9092");
$topicConf = new RdKafka\TopicConf();
$topicConf->set('offset.store.method', 'file');
$topicConf->set('offset.store.path', __DIR__);
$topicConf->set('auto.commit.enable', 1);
$topicConf->set('auto.commit.interval.ms', 10);
$topicConf->set('auto.offset.reset', 'smallest');
$conf->setDefaultTopicConf($topicConf);
$consumer = new RdKafka\KafkaConsumer($conf);
$consumer->subscribe(array($topicName));
var_dump("启动中...");
while (1) {
// try consumer record
$message = $consumer->consume(120*1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message->payload);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
var_dump("没有更多消息了");
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
var_dump("太长时间未收到消息了");
break;
default:
throw new Exception($message->errstr(), $message->err);
break;
}
}生产一个消息

消费一个消息

生产者示例代码
使用 setErrorCb 回调检查程序是否出错
使用 setDrMsgCb 回调检查消息是否成功推送
设置 message.timeout 避免 produce 阻塞
$conf = new RdKafka\Conf();
$conf->setErrorCb(function($kafka, $err, $reason){
var_dump($err);
var_dump($reason);
});
$conf->setDrMsgCb(function($kafka, $message){
if($message->err) var_dump("发送失败:");
else var_dump("发送成功:");
var_dump($message);
});
$rk = new RdKafka\Producer($conf);
$rk->addBrokers("server_kafka:9092");
$topicConf = new RdKafka\TopicConf();
$topicConf->set("message.timeout.ms", 3000);
$topic = $rk->newTopic("test", $topicConf);
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "福哥说:现在". date("H:i:s"). "了");Rdkafka扩展提供了在线文档,可以帮助我们编写代码的时候查阅
http://arnaud.le-blanc.net/php-rdkafka/phpdoc/book.rdkafka.html
今天福哥带着童鞋们学习了使用PHP语言通过rdkafka扩展连接操作Kafka消息系统的方法。PHP本身是弱类型语言,又是解析型语言,在处理复杂的业务逻辑时候难免会有些力不从心。借助Kafka消息系统可以将一些同步要求不高的处理放到消息队列里面,可以大大提高主线业务的处理效率。
好了,下一课福哥会给大家讲解如何使用Java语言来操作Kafka消息系统,敬请期待~~