在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
参考: https://blog.csdn.net/panchang199266/article/details/82113453 (kafka安装部署) https://www.cnblogs.com/winnerREN/p/13407396.html(kafka集群部署指南) https://www.cnblogs.com/sodawoods-blogs/p/8969774.html (kafka消费模型) 常识:1,kafak依赖zookeeper,zookerper基于javalinux一般默认安装了java,如果没有安装java就先安装 java -version #查看java版本 2,安装zookeeper(kafka已自带zookeeper)参考:https://www.runoob.com/w3cnote/zookeeper-setup.html 3,安装kafka参考:http://kafka.apache.org/quickstart 下载:wget https://downloads.apache.org/kafka/2.8.0/kafka_2.13-2.8.0.tgz tar -xzf kafka_2.13-2.8.0.tgz cd kafka_2.13-2.8.0 //或 wget https://mirrors.bfsu.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz tar -zxvf tar zxvf kafka_2.13-2.8.0.tgz cd kafka_2.13-2.8.0 启动:#先启动zookeeper #这是前台启动,启动以后,当前就无法进行其他操作(不推荐) ./bin/zookeeper-server-start.sh ./config/zookeeper.properties ./bin/kafka-server-start.sh ./config/server.properties #后台启动(推荐) nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties 1>/dev/null 2>&1 & nohup ./bin/kafka-server-start.sh ./config/server.properties 1>/dev/null 2>&1 & ./bin/kafka-server-start.sh -daemon config/server.properties 使用:#创建一个主题 ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --config max.message.bytes=12800000 --config flush.messages=1 --replication-factor 1 --partitions 1 --topic test #查看主题 ./bin/kafka-topics.sh --list --zookeeper localhost:2181 #描述主题 ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test #修改主题 ./bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic test --partitions 3 #kafka分区数量只许增加不许减少 #删除主题 ./bin/kafka-topics.sh --delete --topic test --zookeeper localhost:2181 #查看topic的消息数 ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -1 #--time -1 表示最大位移 -2表示最早位移 #发送消息 ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test #消费消息 ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test # --from-beginning可以查看历史消息,消费者应该在生产者之前就绪,不然就丢失之前的数据 #生产者吞吐量 ./bin/kafka-producer-perf-test.sh --topic test --num-records 100000 --record-size 200 --throughput -1 --producer-props bootstrap.servers=localhost:9092 acks=-1 #消费这吞吐量 ./bin/kafka-consumer-perf-test.sh --topic test --broker-list localhost:9092 --messages 500000 kafka常用配置
4,php的kafka操作方式1:安装php的kafka的扩展(源码安装或pecl安装)参考:https://www.jianshu.com/p/fd8ce54e1156 (php操作kafka) 参考:https://zhuanlan.zhihu.com/p/142044988 (知乎) #//源码安装 git clone https://github.com/arnaud-lb/php-rdkafka.git #生成configure文件 /Users/shiyibo/LNMP/php/bin/phpize #编译安装 ./configure --with-php-config=/Users/shiyibo/LNMP/php/bin/php-config make make install #在php.ini 文件中配置 rdkafka扩展 vim /Users/shiyibo/LNMP/php/etc/php.ini extension=rdkafka.so #查看扩展是否生效 $php -m | grep kafka #//pecl 安装(实践成功!) yum install zlib-devel -y yum install librdkafka-devel pecl install rdkafka vim php.ini 添加 extension=rdkafka.so #遇到问题: 1,librdkafka/rdkafka.h in default path... not found 解决:librdkafka是kafka的c语言接口,yum install librdkafka-devel 参考资源:https://github.com/edenhill/librdkafka 2,如果yum install php71w-extname 失败就用pecl代替 生产者$objRdKafka = new RdKafka\Producer(); $objRdKafka->setLogLevel(LOG_DEBUG); $objRdKafka->addBrokers("localhost:9092"); $oObjTopic = $objRdKafka->newTopic("test"); // 从终端接收输入 $oInputHandler = fopen('php://stdin', 'r'); while (true) { echo "\nEnter messages:\n"; $sMsg = trim(fgets($oInputHandler)); // 空消息意味着退出 if (empty($sMsg)) { break; } // 发送消息 $oObjTopic->produce(RD_KAFKA_PARTITION_UA, 0, $sMsg); } 消费者$objRdKafka = new RdKafka\Consumer(); $objRdKafka->setLogLevel(LOG_DEBUG); $objRdKafka->addBrokers("localhost:9092"); $oObjTopic = $objRdKafka->newTopic("test"); /** * consumeStart * 第一个参数标识分区,生产者是往分区0发送的消息,这里也从分区0拉取消息 * 第二个参数标识从什么位置开始拉取消息,可选值为 * RD_KAFKA_OFFSET_BEGINNING : 从开始拉取消息 * RD_KAFKA_OFFSET_END : 从当前位置开始拉取消息 * RD_KAFKA_OFFSET_STORED : 猜测跟RD_KAFKA_OFFSET_END一样 */ $oObjTopic->consumeStart(0, RD_KAFKA_OFFSET_END); while (true) { // 第一个参数是分区,第二个参数是超时时间 $oMsg = $oObjTopic->consume(0, 1000); // 没拉取到消息时,返回NULL if (!$oMsg) { usleep(10000); continue; } if ($oMsg->err) { echo $msg->errstr(), "\n"; break; } else { echo $oMsg->payload, "\n"; } } 方式2:php的composer包参考:https://github.com/weiboad/kafka-php
|
2022-08-17
2022-11-06
2022-08-15
2022-08-18
2022-08-16
请发表评论