• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

kafka基础知识1:安装,启动,基本操作,php扩展使用

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

参考:

https://blog.csdn.net/panchang199266/article/details/82113453 (kafka安装部署)

https://www.cnblogs.com/winnerREN/p/13407396.html(kafka集群部署指南)

https://www.cnblogs.com/sodawoods-blogs/p/8969774.html (kafka消费模型)

常识:

1,kafak依赖zookeeper,zookerper基于java

linux一般默认安装了java,如果没有安装java就先安装

java -version #查看java版本

2,安装zookeeper(kafka已自带zookeeper)

参考:https://www.runoob.com/w3cnote/zookeeper-setup.html

3,安装kafka

参考:http://kafka.apache.org/quickstart

下载:

wget  https://downloads.apache.org/kafka/2.8.0/kafka_2.13-2.8.0.tgz
tar -xzf kafka_2.13-2.8.0.tgz
cd kafka_2.13-2.8.0
//或
wget https://mirrors.bfsu.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz
tar -zxvf tar zxvf kafka_2.13-2.8.0.tgz
cd kafka_2.13-2.8.0

启动:

#先启动zookeeper
#这是前台启动,启动以后,当前就无法进行其他操作(不推荐)
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
./bin/kafka-server-start.sh ./config/server.properties

#后台启动(推荐)
nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties 1>/dev/null 2>&1 &
nohup ./bin/kafka-server-start.sh ./config/server.properties 1>/dev/null 2>&1 & 
./bin/kafka-server-start.sh -daemon config/server.properties

使用:

#创建一个主题
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --config max.message.bytes=12800000 --config flush.messages=1 --replication-factor 1 --partitions 1 --topic test
#查看主题
./bin/kafka-topics.sh --list --zookeeper localhost:2181
#描述主题
./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
#修改主题
./bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic test --partitions 3 #kafka分区数量只许增加不许减少
#删除主题
./bin/kafka-topics.sh --delete --topic test --zookeeper localhost:2181
#查看topic的消息数
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -1 #--time -1 表示最大位移 -2表示最早位移

#发送消息
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
#消费消息
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test # --from-beginning可以查看历史消息,消费者应该在生产者之前就绪,不然就丢失之前的数据

#生产者吞吐量
./bin/kafka-producer-perf-test.sh --topic test --num-records 100000 --record-size 200 --throughput -1 --producer-props bootstrap.servers=localhost:9092 acks=-1
#消费这吞吐量
./bin/kafka-consumer-perf-test.sh --topic test --broker-list localhost:9092 --messages 500000

kafka常用配置

配置项 默认值/示例值 说明
broker.id 0 Broker唯一标识
listeners PLAINTEXT://172.31.9.177:9092 监听信息,PLAINTEXT表示明文传输
log.dirs kafka/logs kafka数据存放地址,可以填写多个。用”,”间隔
message.max.bytes message.max.bytes 单个消息长度限制,单位是字节
num.partitions 1 默认分区数
log.flush.interval.messages Long.MaxValue 在数据被写入到硬盘和消费者可用前最大累积的消息的数量
log.flush.interval.ms Long.MaxValue 在数据被写入到硬盘前的最大时间
log.flush.scheduler.interval.ms Long.MaxValue 检查数据是否要写入到硬盘的时间间隔。
log.retention.hours 24 控制一个log保留时间,单位:小时
zookeeper.connect 172.31.9.177:2181 ZooKeeper服务器地址,多台用”,”间隔

4,php的kafka操作

方式1:安装php的kafka的扩展(源码安装或pecl安装)

参考:https://www.jianshu.com/p/fd8ce54e1156 (php操作kafka)

参考:https://zhuanlan.zhihu.com/p/142044988 (知乎)

#//源码安装
git clone https://github.com/arnaud-lb/php-rdkafka.git
 
#生成configure文件
/Users/shiyibo/LNMP/php/bin/phpize 
 
#编译安装
 ./configure --with-php-config=/Users/shiyibo/LNMP/php/bin/php-config
make
make install 
 
#在php.ini 文件中配置 rdkafka扩展
vim /Users/shiyibo/LNMP/php/etc/php.ini
extension=rdkafka.so
 
#查看扩展是否生效
$php -m | grep kafka

#//pecl 安装(实践成功!)
yum install zlib-devel -y
yum install librdkafka-devel
pecl install rdkafka
vim php.ini 添加 extension=rdkafka.so

#遇到问题:
1,librdkafka/rdkafka.h in default path... not found  
解决:librdkafka是kafka的c语言接口,yum install librdkafka-devel
参考资源:https://github.com/edenhill/librdkafka

2,如果yum install php71w-extname 失败就用pecl代替
生产者
$objRdKafka = new RdKafka\Producer();
$objRdKafka->setLogLevel(LOG_DEBUG);
$objRdKafka->addBrokers("localhost:9092");

$oObjTopic = $objRdKafka->newTopic("test");

// 从终端接收输入 
$oInputHandler = fopen('php://stdin', 'r');

while (true) {
    echo "\nEnter  messages:\n";
    $sMsg = trim(fgets($oInputHandler));

   // 空消息意味着退出
    if (empty($sMsg)) {
        break;
    }

    // 发送消息
    $oObjTopic->produce(RD_KAFKA_PARTITION_UA, 0, $sMsg);
}
消费者
$objRdKafka = new RdKafka\Consumer();
$objRdKafka->setLogLevel(LOG_DEBUG);
$objRdKafka->addBrokers("localhost:9092");

$oObjTopic = $objRdKafka->newTopic("test");

/**
 * consumeStart
 *   第一个参数标识分区,生产者是往分区0发送的消息,这里也从分区0拉取消息
 *   第二个参数标识从什么位置开始拉取消息,可选值为
 *     RD_KAFKA_OFFSET_BEGINNING : 从开始拉取消息
 *     RD_KAFKA_OFFSET_END : 从当前位置开始拉取消息
 *     RD_KAFKA_OFFSET_STORED : 猜测跟RD_KAFKA_OFFSET_END一样
 */
$oObjTopic->consumeStart(0, RD_KAFKA_OFFSET_END);

while (true) {
    // 第一个参数是分区,第二个参数是超时时间
    $oMsg = $oObjTopic->consume(0, 1000);

    // 没拉取到消息时,返回NULL
    if (!$oMsg) {
        usleep(10000);
        continue;
    }

    if ($oMsg->err) {
        echo $msg->errstr(), "\n";
        break;
    } else {
        echo $oMsg->payload, "\n";
    }
}

方式2:php的composer包

参考:https://github.com/weiboad/kafka-php

 


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
php正则表达式发布时间:2022-07-10
下一篇:
PHP Cookbook读书笔记 – 第11章Session和持久化发布时间:2022-07-10
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap