PHP操作Kafka消息系统的方法【20210421】

发表于 2021-04-19 17:20:40
阅读 159

介绍

介绍

上一课大家跟福哥学会了在我们的TFLinux系统上面安装Kafka软件,今天福哥要带着大家学习使用PHP去操作Kafka消息系统的方法。

PHP操作Kafka需要借助rdkafka库,我们可以在github上面下载到源代码进行安装。因为rdkafka是作为PHP扩展部署的,所以我们不需要重新编译PHP环境。

安装

下载

下载librdkafka

wget https://github.com/edenhill/librdkafka/archive/v0.11.0.tar.gz

下载php-rdkafka

wget https://github.com/arnaud-lb/php-rdkafka/archive/3.0.4.tar.gz

安装

安装librdkafka

tar -xzvf v0.11.0.tar.gz
cd librdkafka-0.11.0/
./configure --prefix=/tongfu.net/env/librdkafka/
make
make install
cd ..

安装php-rdkafka

tar -xzvf 3.0.4.tar.gz
cd php-rdkafka-3.0.4/
/tongfu.net/env/php-7.4.6/bin/phpize
./configure \
--with-php-config=/tongfu.net/env/php-7.4.6/bin/php-config \
--with-rdkafka=/tongfu.net/env/librdkafka/
make
make install
cd ..

配置

配置

配置php.ini

extension=rdkafka

重启

重新启动apache

systemctl restart httpd

使用

低级消费者

概念

低级消费者模式就是一个人接了一个项目,所有事情都要自己一点点做。如果当天没有做完,我会记录做到第几件了,然后第二天来了继续做下面的工作。

代码

低级消费者示例代码 

  • 当 $message 为 null 或者 $message->err 为到底了,都表示没有新消息

  • 强烈建议本地保存 offset(偏移),我做了多少我自己记着,放心!

$groupID = "lowLevel";
$topicName = "test";
$partitionID = 0;

$conf = new RdKafka\Conf();
$conf->setErrorCb(function($kafka, $err, $reason){
    var_dump($err);
    var_dump($reason);
});
$conf->set('group.id', $groupID);

$consumer = new RdKafka\Consumer($conf);
$consumer->addBrokers("server_kafka:9092");

$topicConf = new RdKafka\TopicConf();
$topicConf->set('offset.store.method', 'file');
$topicConf->set('offset.store.path', __DIR__);
$topicConf->set('auto.commit.enable', 1);
$topicConf->set('auto.commit.interval.ms', 10);
$topicConf->set('auto.offset.reset', 'smallest');

$topic = $consumer->newTopic($topicName, $topicConf);
$topic->consumeStart($partitionID, RD_KAFKA_OFFSET_STORED);

var_dump("启动中...");
while (1) {
    // try consumer record
    $message = $topic->consume($partitionID, 120*1000);
    if($message == null){
        continue;
    }
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message->payload);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            var_dump("没有更多消息了");
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            var_dump("太长时间未收到消息了");
            break;
        default:
            throw new Exception($message->errstr(), $message->err);
            break;
    }
}

效果

生产一个消息

home/topic/2021/0421/22/c7ba7ede2e172b98bd9523626185a11d.jpg

消费一个消息

home/topic/2021/0421/22/95dfd4524b1178a0bee9b4c58ae8f984.jpg

高级消费者

概念

高级消费者模式就是团队做项目,要做的事情列出来了,你接一个,我接一个,各自做完手头的工作后再去接下一个。分配工作的是 Kafka 的 Rebalance 来控制,逻辑也非常简单。

它先把项目需要做的事情分开多个袋子里,然后每个袋子上写一个编号。当员工来接活的时候,会先看看有没有没人负责的袋子,如果有就告诉员工你的编号是这个,这个袋子里的事情都是你的。如果全部袋子都有人负责了,那就告诉员工暂时没事干,你先等等吧。

注意:消费者能不能拿到消息,完全看是不是可以得到分配到分区,而往往是这个地方会需要等很久~~

代码

高级消费者示例代码

$groupID = "highLevel";
$topicName = "test";

$conf = new RdKafka\Conf();
$conf->setErrorCb(function($kafka, $err, $reason){
    var_dump($err);
    var_dump($reason);
});
$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
    switch ($err) {
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
            foreach($partitions as $partition){
                var_dump("指定分区:". $partition->getPartition());
            }
            $kafka->assign($partitions);
            break;

        case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
            foreach($partitions as $partition){
                var_dump("删除分区:". $partition->getPartition());
            }
            $kafka->assign(NULL);
            break;

        default:
            throw new \Exception($err);
    }
});
$conf->set('group.id', $groupID);
$conf->set('metadata.broker.list', "127.0.0.1:9092");

$topicConf = new RdKafka\TopicConf();
$topicConf->set('offset.store.method', 'file');
$topicConf->set('offset.store.path', __DIR__);
$topicConf->set('auto.commit.enable', 1);
$topicConf->set('auto.commit.interval.ms', 10);
$topicConf->set('auto.offset.reset', 'smallest');

$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafka\KafkaConsumer($conf);
$consumer->subscribe(array($topicName));

var_dump("启动中...");
while (1) {
    // try consumer record
    $message = $consumer->consume(120*1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message->payload);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            var_dump("没有更多消息了");
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            var_dump("太长时间未收到消息了");
            break;
        default:
            throw new Exception($message->errstr(), $message->err);
            break;
    }
}

效果

生产一个消息

home/topic/2021/0421/22/2351e48d72e701696cf05d4a589fedc7.jpg

消费一个消息

home/topic/2021/0421/22/4c8297cd71cfe8363e5175dda92a0727.jpg

生产者

代码

生产者示例代码

  • 使用 setErrorCb 回调检查程序是否出错

  • 使用 setDrMsgCb 回调检查消息是否成功推送

  • 设置 message.timeout 避免 produce 阻塞

$conf = new RdKafka\Conf();
$conf->setErrorCb(function($kafka, $err, $reason){
    var_dump($err);
    var_dump($reason);
});
$conf->setDrMsgCb(function($kafka, $message){
    if($message->err) var_dump("发送失败:");
    else var_dump("发送成功:");
    var_dump($message);
});

$rk = new RdKafka\Producer($conf);
$rk->addBrokers("server_kafka:9092");

$topicConf = new RdKafka\TopicConf();
$topicConf->set("message.timeout.ms", 3000);

$topic = $rk->newTopic("test", $topicConf);
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "福哥说:现在". date("H:i:s"). "了");

官方在线文档

Rdkafka扩展提供了在线文档,可以帮助我们编写代码的时候查阅

http://arnaud.le-blanc.net/php-rdkafka/phpdoc/book.rdkafka.html

总结

今天福哥带着童鞋们学习了使用PHP语言通过rdkafka扩展连接操作Kafka消息系统的方法。PHP本身是弱类型语言,又是解析型语言,在处理复杂的业务逻辑时候难免会有些力不从心。借助Kafka消息系统可以将一些同步要求不高的处理放到消息队列里面,可以大大提高主线业务的处理效率。

好了,下一课福哥会给大家讲解如何使用Java语言来操作Kafka消息系统,敬请期待~~