在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
开源软件名称:kafka-php开源软件地址:https://gitee.com/weiboad/kafka-php开源软件介绍:Kafka-phpKafka-php 使用纯粹的PHP 编写的 kafka 客户端,目前支持 0.8.x 以上版本的 Kafka,该项目 v0.2.x 和 v0.1.x 不兼容,如果使用原有的 v0.1.x 的可以参照文档 Kafka PHP v0.1.x Document, 不过建议切换到 v0.2.x 上。v0.2.x 使用 PHP 异步执行的方式来和kafka broker 交互,较 v0.1.x 更加稳定高效, 由于使用 PHP 语言编写所以不用编译任何的扩展就可以使用,降低了接入与维护成本 安装环境要求
Installation使用 Composer 安装添加 composer 依赖 { "require": { "nmred/kafka-php": "0.2.*" }} 配置配置参数见 配置 Produce异步回调方式调用<?phprequire '../vendor/autoload.php';date_default_timezone_set('PRC');use Monolog\Logger;use Monolog\Handler\StdoutHandler;// Create the logger$logger = new Logger('my_logger');// Now add some handlers$logger->pushHandler(new StdoutHandler());$config = \Kafka\ProducerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('10.13.4.159:9192');$config->setBrokerVersion('0.9.0.1');$config->setRequiredAck(1);$config->setIsAsyn(false);$config->setProduceInterval(500);$producer = new \Kafka\Producer(function() { return array( array( 'topic' => 'test', 'value' => 'test....message.', 'key' => 'testkey', ), );});$producer->setLogger($logger);$producer->success(function($result) { var_dump($result);});$producer->error(function($errorCode) { var_dump($errorCode);});$producer->send(true); 同步方式调用生产者<?phprequire '../vendor/autoload.php';date_default_timezone_set('PRC');use Monolog\Logger;use Monolog\Handler\StdoutHandler;// Create the logger$logger = new Logger('my_logger');// Now add some handlers$logger->pushHandler(new StdoutHandler());$config = \Kafka\ProducerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9192');$config->setBrokerVersion('0.9.0.1');$config->setRequiredAck(1);$config->setIsAsyn(false);$config->setProduceInterval(500);$producer = new \Kafka\Producer();$producer->setLogger($logger);for($i = 0; $i < 100; $i++) { $result = $producer->send(array( array( 'topic' => 'test1', 'value' => 'test1....message.', 'key' => '', ), )); var_dump($result);} Consumer<?phprequire '../vendor/autoload.php';date_default_timezone_set('PRC');use Monolog\Logger;use Monolog\Handler\StdoutHandler;// Create the logger$logger = new Logger('my_logger');// Now add some handlers$logger->pushHandler(new StdoutHandler());$config = \Kafka\ConsumerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('10.13.4.159:9192');$config->setGroupId('test');$config->setBrokerVersion('0.9.0.1');$config->setTopics(array('test'));//$config->setOffsetReset('earliest');$consumer = new \Kafka\Consumer();$consumer->setLogger($logger);$consumer->start(function($topic, $part, $message) { var_dump($message);}); Basic Protocol基础协议 API 调用方式见 Example QQ 群号531522091 |
请发表评论