• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

kafkagoproducer启动基本配置

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

  1.官网上下载kafka安装包:http://kafka.apache.org/downloads.html

  2.执行命令运行zookeeper 实例(单点):

   bin/zookeeper-server-start.sh config/zookeeper.properties
 3. 启动kafka broker 服务:
  
bin/kafka-server-start.sh config/server.properties
  其中的
server.properties 有些配置需要修改:
  listeners=PLAINTEXT://hostName:9092
  如果是远程producer,hostname设置为ip,这样远程机器无需设置host.
  log.dir 是broker的日志地址。
 4.在使用go的客户端 Shopify/sarama 包的操作过程:
   (1) go get "github.com/Shopify/sarama"
   (2) 修改config 中的配置:
      c.Version = V0_10_0_0 //使用的是kafka 0.10.0.0的版本
   (3) producer测试代码如下:
package main

import (
	"github.com/Shopify/sarama"
	"log"
	"os"
	"strings"
)

var logger = log.New(os.Stderr, "[TEST]", log.LstdFlags)

func main(){
	sarama.Logger = logger
	
	config := sarama.NewConfig()
	config.ClientID = "newsDataSource"
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Partitioner = sarama.NewRandomPartitioner
	
	msg := &sarama.ProducerMessage{}
	msg.Topic = "hello"
	msg.Partition = int32(-1)
	msg.Key = sarama.StringEncoder("key")
	msg.Value = sarama.ByteEncoder("hello")
	
	producer, err := sarama.NewSyncProducer(strings.Split("localhost:9092", ","), config)
	if err != nil {
		logger.Printf("Failed to produce message :%s", err )
		os.Exit(500)
	}
	
	defer producer.Close()
	
	partition, offset, err := producer.SendMessage(msg)
	if err != nil {
		logger.Printf("Failed to produce message :%s", err )
	}
	logger.Printf("partition:%d, offset: %d\n", partition, offset )
}

  


  
 
 

鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Docker更新解决Errorresponsefromdaemon:ociruntimeerror:container_linux.go:247:sta ...发布时间:2022-07-10
下一篇:
go爬虫发布时间:2022-07-10
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap