PHP 与 Kafka 连接与搭建

https://gist.github.com/rambolee/71286ad4c31aeb62253f4eb5cb6cbd93 安装

https://github.com/arnaud-lb/php-rdkafka 使用

https://kafka.apache.org/documentation/#configuration kafka 连接配置

背景

由于相关项目有 Kafka 的操作需求,因此,需要当前平台连接 Kafka 获取相关数据。

安装与配置

系统环境

  • CentOS 6.7
  • Kafka 0.10
  • PHP 5.6.21

安装 php Kafka 扩展

根据官网指引地址 我们选择对应的 PHP 客户端

Dev 环境准备

$ sudo yum install zlib zlib-devel openssl openssl-devel cyrus-sasl2 cyrus-sasl-devel

安装 php-rdkafka 的依赖 C 扩展库

$ git clone https://github.com/edenhill/librdkafka/$ cd librdkafka
$ ./configure
$ make 
$ sudo make install

PECL  安装

/usr/local/php56-5.6.24/bin/pecl install rdkafka

或者 编译安装   php-rdkafka的 PHP 扩展

参考官网安装说明

$ git clone https://github.com/arnaud-lb/php-rdkafka.git$ cd php-rdkafka
$ # For PHP 7, checkout the php7 branch:$ # git checkout php7$ phpize
$ ./configure
$ make all -j 5$ sudo make install
$ sh -c 'echo "extension=rdkafka.so" >> /usr/local/php/etc/php.ini'

查看 php-rdkafka 配置

$ php -m | grep kafkaWarning: PHP Startup: Unable to load dynamic library '/usr/local/php/lib/php/extensions/no-debug-non-zts-20131226/kafka.so' - librdkafka.so.1: cannot open shared object file: No such file or directory in Unknown on line 0

修复加载.so 问题

$ sudo touch /etc/ld.so.conf.d/librd.conf
$ sudo sh -c 'echo "/usr/local/lib" >> /etc/ld.so.conf.d/librd.conf'$ ldconfig
$ ldconfig -p

查看 php 安装好的 kafka 配置

$ php -m | grep kafka
rdkafka

初步使用 Demo

目前使用高级消费者模式,参考官网的 demo 如下

$conf = new RdKafka\Conf();

// Set a rebalance callback to log partition assignemts (optional)
$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
echo "Assign: ";
var_dump($partitions);
$kafka->assign($partitions);
break;

case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
echo "Revoke: ";
var_dump($partitions);
$kafka->assign(NULL);
break;

default:
throw new \Exception($err);
}
});

// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set('group.id', 'myConsumerGroup_1');

// Initial list of Kafka brokers
$conf->set('metadata.broker.list', '10.110.92.95,10.110.92.101');

$topicConf = new RdKafka\TopicConf();

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');

// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafka\KafkaConsumer($conf);

// Subscribe to topic 'test'
$consumer->subscribe(['NetBaseInfo']);

echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joinging the group after leaving it.)\n";

while (true) {
$message = $consumer->consume(120*1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}

kafka  topic 使用多个 partition

topic 的删除,和使用标记

bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --delete --topic someTopic

删除 消费者组

https://stackoverflow.com/questions/29243896/removing-a-kafka-consumer-group-in-zookeeper

./bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --delete --group <group-name>

There is no need to delete with the new consumer. Here is what the script outputs when attempting to delete:

Note that there's no need to delete group metadata for the new consumer as it is automatically deleted when the last member leaves

That's the short answer. More details: By "metadata", two things are meant. First, just the information about the consumers and consumer groups that is stored as part of the group membership coordinator. That is automatically removed if all consumers in the group are gone.

Second, the consumer group has stored the committed offsets in a Kafka topic (when the new consumer is used. Previously these used to be stored with Zookeeper). That topic is not immediately deleted when the consumer group disappears. If the consumer group reappears again, it will find the previous offsets automatically in this topic. It can choose to use them or ignore them. If the consumer group never reappears, these stored offsets are eventually garbage collected automatically.

https://stackoverflow.com/questions/37741936/how-to-delete-kafka-consumer-created-via-new-consumer-api


版权及转载说明

本站原创、转载文章欢迎任何形式的转载,但请务必注明出处,尊重他人劳动共创开源社区

本站转载文章版权归原作者所有,如发现本站文章涉嫌侵权请点击「联系我们」反馈,本站将立即给予删除

转载请注明:文章转载自:全分享社区 「http://www.aweb.cc

本文标题:PHP 与 Kafka 连接与搭建

本文地址:http://www.aweb.cc/article/detail/id/638.html

Swoole编程指南-2.1 环境搭建与 <<上一篇