本文整理汇总了Golang中github.com/wvanbergen/kafka/consumergroup.NewConfig函数的典型用法代码示例。如果您正苦于以下问题:Golang NewConfig函数的具体用法?Golang NewConfig怎么用?Golang NewConfig使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了NewConfig函数的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Golang代码示例。
示例1: popKafka
func popKafka() error {
log.Debug("init popkafka")
config := consumergroup.NewConfig()
config.Offsets.Initial = sarama.OffsetNewest
config.Offsets.ProcessingTimeout = OFFSETS_PROCESSING_TIMEOUT_SECONDS
config.Offsets.CommitInterval = OFFSETS_COMMIT_INTERVAL
config.Zookeeper.Chroot = ""
kafkaTopics := []string{KafkaPushsTopic}
zooks := []string{"127.0.0.1:2181"}
cg, err := consumergroup.JoinConsumerGroup(KAFKA_GROUP_NAME, kafkaTopics, zooks, config)
if err != nil {
return err
}
go func() {
for err := range cg.Errors() {
log.Error("consumer error(%v)", err)
}
}()
go func() {
for msg := range cg.Messages() {
log.Info("deal with userId:%s, partitionId:%d, Offset:%d, Key:%s msg:%s", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
}
}()
return nil
}
开发者ID:qinlodestar,项目名称:simplecomet,代码行数:25,代码来源:kafka.go
示例2: InitKafka
func InitKafka() error {
log.Info("start topic:%s consumer", Conf.KafkaTopic)
log.Info("consumer group name:%s", KAFKA_GROUP_NAME)
config := consumergroup.NewConfig()
config.Offsets.Initial = sarama.OffsetNewest
config.Offsets.ProcessingTimeout = OFFSETS_PROCESSING_TIMEOUT_SECONDS
config.Offsets.CommitInterval = OFFSETS_COMMIT_INTERVAL
config.Zookeeper.Chroot = Conf.ZKRoot
kafkaTopics := []string{Conf.KafkaTopic}
cg, err := consumergroup.JoinConsumerGroup(KAFKA_GROUP_NAME, kafkaTopics, Conf.ZKAddrs, config)
if err != nil {
return err
}
go func() {
for err := range cg.Errors() {
log.Error("consumer error(%v)", err)
}
}()
go func() {
for msg := range cg.Messages() {
log.Info("deal with topic:%s, partitionId:%d, Offset:%d, Key:%s msg:%s", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
push(string(msg.Key), msg.Value)
cg.CommitUpto(msg)
}
}()
return nil
}
开发者ID:dulumao,项目名称:goim,代码行数:27,代码来源:kafka.go
示例3: init
func init() {
zookeeperHost := os.Getenv("ZOOKEEPER_TEST")
if zookeeperHost == "" {
log.Print("Please set up ZOOKEEPER_TEST variable")
panic("No kafka host specify")
}
kafkaHost := os.Getenv("KAFKA_TEST")
if kafkaHost == "" {
log.Print("Please set up KAFKA_TEST variable")
panic("No kafka host specify")
}
brokerList = append(brokerList, kafkaHost)
topicsInit2 = append(topicsInit2, "test2")
producer.NewProducer(brokerList, topicsInit2, config)
configConsumer = consumergroup.NewConfig()
consumerBrokerList = append(consumerBrokerList, zookeeperHost)
consumer, _ = RegisterConsumer(&Consumer{
Topic: "test2",
Group: "test2",
BrokerList: consumerBrokerList,
Config: configConsumer,
GetModel: testModel,
GetEventType: testEventName,
})
consumer.RegisterEvent("testEvent", &ProcessEvent{testProcess})
}
开发者ID:NexwayGroup,项目名称:nx-lib,代码行数:26,代码来源:consumer_test.go
示例4: Init
func (k *KafkaConsumerGroupInput) Init(config interface{}) (err error) {
k.config = config.(*KafkaConsumerGroupInputConfig)
if len(k.config.ConsumerGroup) == 0 {
return fmt.Errorf("consumer_group required")
}
if len(k.config.Topics) == 0 {
return fmt.Errorf("topics required")
}
if len(k.config.ZookeeperConnectionString) == 0 {
return fmt.Errorf("zookeeper_connection_string required")
}
// FIXME heka's logging infrastructure can probably be used for this
// contains useful information for debugging consumer group partition
// changes
if k.config.LogSarama {
sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)
}
k.consumerConfig = consumergroup.NewConfig()
switch k.config.OffsetMethod {
case "Newest":
k.consumerConfig.Offsets.Initial = sarama.OffsetNewest
case "Oldest":
k.consumerConfig.Offsets.Initial = sarama.OffsetOldest
default:
return fmt.Errorf("invalid offset_method: %s", k.config.OffsetMethod)
}
k.consumerConfig.Offsets.ProcessingTimeout = 10 * time.Second
k.consumerConfig.Config.Metadata.Retry.Max = k.config.MetadataRetries
k.consumerConfig.Config.Metadata.Retry.Backoff = time.Duration(k.config.WaitForElection) * time.Millisecond
k.consumerConfig.Config.Metadata.RefreshFrequency = time.Duration(k.config.BackgroundRefreshFrequency) * time.Millisecond
k.consumerConfig.Config.Net.MaxOpenRequests = k.config.MaxOpenRequests
k.consumerConfig.Config.Net.DialTimeout = time.Duration(k.config.DialTimeout) * time.Millisecond
k.consumerConfig.Config.Net.ReadTimeout = time.Duration(k.config.ReadTimeout) * time.Millisecond
k.consumerConfig.Config.Net.WriteTimeout = time.Duration(k.config.WriteTimeout) * time.Millisecond
k.consumerConfig.Config.Consumer.Fetch.Default = k.config.DefaultFetchSize
k.consumerConfig.Config.Consumer.Fetch.Min = k.config.MinFetchSize
k.consumerConfig.Config.Consumer.Fetch.Max = k.config.MaxMessageSize
k.consumerConfig.Config.Consumer.MaxWaitTime = time.Duration(k.config.MaxWaitTime) * time.Millisecond
k.consumerConfig.Config.ChannelBufferSize = k.config.EventBufferSize
var zookeeperNodes []string
zookeeperNodes, k.consumerConfig.Zookeeper.Chroot = kazoo.ParseConnectionString(k.config.ZookeeperConnectionString)
if len(zookeeperNodes) == 0 {
return fmt.Errorf("unable to parse zookeeper_connection_string")
}
consumer, err := consumergroup.JoinConsumerGroup(k.config.ConsumerGroup, k.config.Topics, zookeeperNodes, k.consumerConfig)
if err != nil {
return
}
k.consumer = consumer
k.stopChan = make(chan bool)
return
}
开发者ID:bsmedberg,项目名称:data-pipeline,代码行数:60,代码来源:kafka_consumer_group_input.go
示例5: main
func main() {
flag.Parse()
if *zookeeper == "" {
flag.PrintDefaults()
os.Exit(1)
}
config := consumergroup.NewConfig()
config.Offsets.Initial = sarama.OffsetNewest
config.Offsets.ProcessingTimeout = 10 * time.Second
zookeeperNodes, config.Zookeeper.Chroot = kazoo.ParseConnectionString(*zookeeper)
kafkaTopics := strings.Split(*kafkaTopicsCSV, ",")
consumer, consumerErr := consumergroup.JoinConsumerGroup(*consumerGroup, kafkaTopics, zookeeperNodes, config)
if consumerErr != nil {
log.Fatalln(consumerErr)
}
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
<-c
if err := consumer.Close(); err != nil {
sarama.Logger.Println("Error closing the consumer", err)
}
}()
go func() {
for err := range consumer.Errors() {
log.Println(err)
}
}()
eventCount := 0
offsets := make(map[string]map[int32]int64)
for message := range consumer.Messages() {
if offsets[message.Topic] == nil {
offsets[message.Topic] = make(map[int32]int64)
}
eventCount += 1
if offsets[message.Topic][message.Partition] != 0 && offsets[message.Topic][message.Partition] != message.Offset-1 {
log.Printf("Unexpected offset on %s:%d. Expected %d, found %d, diff %d.\n", message.Topic, message.Partition, offsets[message.Topic][message.Partition]+1, message.Offset, message.Offset-offsets[message.Topic][message.Partition]+1)
}
// Simulate processing time
time.Sleep(10 * time.Millisecond)
offsets[message.Topic][message.Partition] = message.Offset
consumer.CommitUpto(message)
}
log.Printf("Processed %d events.", eventCount)
log.Printf("%+v", offsets)
}
开发者ID:bcwalrus,项目名称:kafka,代码行数:59,代码来源:main.go
示例6: streamUsers
func streamUsers(conf *Config) chan models.User {
config := consumergroup.NewConfig()
config.Offsets.Initial = sarama.OffsetOldest
config.Offsets.CommitInterval = 100 * time.Millisecond
consumer, err := consumergroup.JoinConsumerGroup(
"indexer",
[]string{conf.Topic},
conf.Zookeepers,
config)
if err != nil {
log.Fatalf("Can't create consumer. Err: %v", err)
}
var received, errors int
// Trap SIGINT to trigger a graceful shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
out := make(chan models.User, 1024)
go func() {
for {
select {
case msg := <-consumer.Messages():
received++
var user models.User
if err := json.Unmarshal(msg.Value, &user); err != nil {
log.Fatalf("Can't unmarshal data from queue! Err: %v", err)
}
if *user.Dob == "0000-00-00" {
user.Dob = nil
}
out <- user
consumer.CommitUpto(msg)
case err := <-consumer.Errors():
errors++
log.Printf("Error reading from topic! Err: %v", err)
case <-signals:
log.Printf("Start consumer closing")
consumer.Close()
log.Printf("Consumer closed!")
close(out)
log.Printf("Successfully consumed: %d; errors: %d", received, errors)
return
}
}
}()
return out
}
开发者ID:mateuszdyminski,项目名称:am-pipeline,代码行数:54,代码来源:main.go
示例7: NewConsumer
func (self *Kafka) NewConsumer(consumerGroup string, topics []string, zoo string) (consumer *consumergroup.ConsumerGroup, err error) {
var zoos []string
config := consumergroup.NewConfig()
config.Offsets.Initial = self.offset
config.Offsets.ProcessingTimeout = 10 * time.Second
zoos, config.Zookeeper.Chroot = kazoo.ParseConnectionString(zoo)
consumer, err = consumergroup.JoinConsumerGroup(consumerGroup, topics, zoos, config)
if err != nil {
return
}
return
}
开发者ID:lixin9311,项目名称:EventTracker,代码行数:12,代码来源:kafka.go
示例8: main
func main() {
flag.Parse()
gzlog.InitGZLogger(*LogFile, 50*1000*1000, 5)
if *kafkaTopic == "" {
log.Printf("topicShould not be null!\n")
return
}
if *zookeeper == "" {
log.Printf("zookeeper should not be null! \n")
return
}
if *brokerList == "" {
log.Printf("kafka brokers must not be null\n")
}
config := consumergroup.NewConfig()
config.Offsets.Initial = sarama.OffsetNewest
config.Offsets.ProcessingTimeout = 2 * time.Second
config.Consumer.MaxProcessingTime = 2 * time.Second
kafkaTopics := strings.Split(*kafkaTopic, ",")
zookeeperNodes, config.Zookeeper.Chroot = kazoo.ParseConnectionString(*zookeeper)
consumer, consumerErr := consumergroup.JoinConsumerGroup(*consumerGroup, kafkaTopics, zookeeperNodes, config)
if consumerErr != nil {
log.Fatalln(consumerErr)
}
log.Printf("start to get mysl connection!\n")
db, err := sql.Open("mysql", fmt.Sprintf("%s:%[email protected](%s)/jwlwl?charset=utf8&parseTime=True", MysqlUser, MysqlPasswd, MysqlHost))
defer db.Close()
if err != nil {
log.Printf("mysql db connect failed !errMessage:%s \n", err)
return
}
log.Printf("start to get kafka producer\n")
producer, err := kafka_tool.GetKafkaProducer(*brokerList)
if err != nil {
log.Printf("Kafka get producer failed !err: %s \n", err)
return
}
log.Printf("Start to call consummer messages method !\n")
for message := range consumer.Messages() {
log.Printf("Start to call Run method with message:%s \n", message.Value)
latestLeakEventArg := &u_leak_merge.LatestEventArg{Properties: string(message.Value)}
err := u_leak_merge.Run("0", latestLeakEventArg, db, producer)
if err != nil {
log.Printf("message failed!:%s, errMessage:%s \n", message.Value, err)
continue
}
log.Printf("Start to commit message! \n")
consumer.CommitUpto(message)
time.Sleep(100 * time.Millisecond)
}
}
开发者ID:fantasycool,项目名称:leak_merge,代码行数:52,代码来源:leak_merge.go
示例9: Start
func (k *Kafka) Start() error {
k.Lock()
defer k.Unlock()
var consumerErr error
config := consumergroup.NewConfig()
switch strings.ToLower(k.Offset) {
case "oldest", "":
config.Offsets.Initial = sarama.OffsetOldest
case "newest":
config.Offsets.Initial = sarama.OffsetNewest
default:
log.Printf("WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n",
k.Offset)
config.Offsets.Initial = sarama.OffsetOldest
}
if k.Consumer == nil || k.Consumer.Closed() {
k.Consumer, consumerErr = consumergroup.JoinConsumerGroup(
k.ConsumerGroup,
k.Topics,
k.ZookeeperPeers,
config,
)
if consumerErr != nil {
return consumerErr
}
// Setup message and error channels
k.in = k.Consumer.Messages()
k.errs = k.Consumer.Errors()
}
k.done = make(chan struct{})
if k.PointBuffer == 0 && k.MetricBuffer == 0 {
k.MetricBuffer = 100000
} else if k.PointBuffer > 0 {
// Legacy support of PointBuffer field TODO remove
k.MetricBuffer = k.PointBuffer
}
k.metricC = make(chan telegraf.Metric, k.MetricBuffer)
// Start the kafka message reader
go k.receiver()
log.Printf("Started the kafka consumer service, peers: %v, topics: %v\n",
k.ZookeeperPeers, k.Topics)
return nil
}
开发者ID:noise,项目名称:telegraf,代码行数:48,代码来源:kafka_consumer.go
示例10: NewConsumer
// NewConsumer TODO: doc
func NewConsumer(options Options) (*consumergroup.ConsumerGroup, error) {
consumerConfig := consumergroup.NewConfig()
consumerConfig.Offsets.Initial = sarama.OffsetNewest
consumerConfig.Offsets.ProcessingTimeout = 10 * time.Second
connectionString := strings.Join(options.Zookeepers, ", ")
var zookeeperNodes []string
zookeeperNodes, consumerConfig.Zookeeper.Chroot = kazoo.ParseConnectionString(connectionString)
return consumergroup.JoinConsumerGroup(
options.ConsumerGroup,
options.KafkaTopics,
zookeeperNodes,
consumerConfig,
)
}
开发者ID:tulios,项目名称:kafka-revolver,代码行数:17,代码来源:consumer.go
示例11: NewConsumer
func (q *Kafka) NewConsumer(v interface{}) (Consumer, error) {
args, ok := v.(KafkaConsumerArgs)
if !ok {
return nil, fmt.Errorf("invalid consumer arguments(%v)", v)
}
if args.Group != "" {
if len(q.Zookeepers) == 0 {
return nil, fmt.Errorf("zookeeper url is required.")
}
config := consumergroup.NewConfig()
config.Offsets.Initial = args.getOffset()
config.Offsets.ProcessingTimeout = 10 * time.Second
cg, err := consumergroup.JoinConsumerGroup(args.Group, strings.Split(args.Topic, ","), q.Zookeepers, config)
if err != nil {
return nil, err
}
c := &kafkaGroupConsumer{consumer: cg}
q.closers = append(q.closers, c)
return c, nil
}
c, err := sarama.NewConsumer(q.Brokers, sarama.NewConfig())
if err != nil {
return nil, err
}
sc := &kafkaSingleConsumer{consumer: c}
partitions, err := args.getPartitions(c)
if err != nil {
return nil, err
}
for _, p := range partitions {
pc, err := c.ConsumePartition(args.Topic, p, args.getOffset())
if err != nil {
return nil, err
}
sc.partitionConsumers = append(sc.partitionConsumers, pc)
}
q.closers = append(q.closers, sc)
return sc, err
}
开发者ID:mpls,项目名称:anyq,代码行数:47,代码来源:kafka.go
示例12: Start
func (k *Kafka) Start(acc telegraf.Accumulator) error {
k.Lock()
defer k.Unlock()
var consumerErr error
k.acc = acc
config := consumergroup.NewConfig()
config.Zookeeper.Chroot = k.ZookeeperChroot
switch strings.ToLower(k.Offset) {
case "oldest", "":
config.Offsets.Initial = sarama.OffsetOldest
case "newest":
config.Offsets.Initial = sarama.OffsetNewest
default:
log.Printf("WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n",
k.Offset)
config.Offsets.Initial = sarama.OffsetOldest
}
if k.Consumer == nil || k.Consumer.Closed() {
k.Consumer, consumerErr = consumergroup.JoinConsumerGroup(
k.ConsumerGroup,
k.Topics,
k.ZookeeperPeers,
config,
)
if consumerErr != nil {
return consumerErr
}
// Setup message and error channels
k.in = k.Consumer.Messages()
k.errs = k.Consumer.Errors()
}
k.done = make(chan struct{})
// Start the kafka message reader
go k.receiver()
log.Printf("Started the kafka consumer service, peers: %v, topics: %v\n",
k.ZookeeperPeers, k.Topics)
return nil
}
开发者ID:miketonks,项目名称:telegraf,代码行数:44,代码来源:kafka_consumer.go
示例13: StartListening
/*
* StartListening will actually create the consumer group and start listening to incoming messages
*/
func (c *Consumer) StartListening() error {
var err error
fmt.Println("[INFO] StartListening called")
zookeeperAddrs := c.config.Zookeeper.AddrList
config := consumergroup.NewConfig()
config.Offsets.Initial = sarama.OffsetNewest
config.Offsets.ProcessingTimeout = 10 * time.Second
if topicsSize := len(c.callbacks); topicsSize != 0 {
topics := c.getTopicsKey()
for currConnAttempt := 0; currConnAttempt < c.config.Zookeeper.MaxRetry; currConnAttempt++ {
// Creates a new consumer and adds it to the consumer group
fmt.Println("[INFO] Will add consumer to group: ", c.config.ModuleName.Value)
c.consumer, err = consumergroup.JoinConsumerGroup(c.config.ModuleName.Value, topics, zookeeperAddrs, config)
if err == nil {
break
}
fmt.Println("[INFO] Connection attempt faild (", (currConnAttempt + 1), "/", c.config.Zookeeper.MaxRetry, ")")
<-time.After(time.Second * 5)
}
if err != nil {
fmt.Println("[ERROR] Failed to start KAFKA Consumer Group\nErr:", err)
return err
}
// handle consumer.Errors channel
c.handleConsumerErrors()
// start goroutine to handle incoming messages
go c.handleMessages()
}
fmt.Println("[INFO] kafka consumer initialized successfully")
return nil
}
开发者ID:asvins,项目名称:common_io,代码行数:40,代码来源:common_io.go
示例14: Start
func (kc *kafkaConsumer) Start() chan Message {
kc.sigChan = make(chan os.Signal, 1)
msgChan := make(chan Message, 1)
signal.Notify(kc.sigChan, os.Interrupt)
go func() {
<-kc.sigChan
kc.consumerGroup.Close()
close(msgChan)
}()
cfg := consumergroup.NewConfig()
cfg.Offsets.Initial = kc.Config.InitialOffset()
cfg.Offsets.ProcessingTimeout = kc.Config.ProcessingTimeout()
var zookeeperNodes []string
url := kc.Config.ZookeeperURL()
if chroot := kc.Config.ZookeeperChroot(); len(chroot) > 0 {
url += "/" + chroot
}
zookeeperNodes, cfg.Zookeeper.Chroot = kazoo.ParseConnectionString(url)
var cg *consumergroup.ConsumerGroup
var err error
var attempts, curExp int
for {
attempts++
cg, err = consumergroup.JoinConsumerGroup(
kc.Config.ConsumerGroup(),
kc.Config.Topics(),
zookeeperNodes,
cfg,
)
if err != nil {
log.Error(err, nil)
if attempts > maxAttempts {
log.Debug("reached maximum attempts, exiting", nil)
os.Exit(1)
}
if curExp == 0 {
curExp = 2
}
curExp *= 2
if curExp > maxExp {
curExp = maxExp
}
log.Debug("sleeping", log.Data{"ms": curExp})
time.Sleep(time.Millisecond * time.Duration(curExp))
continue
}
break
}
kc.consumerGroup = cg
go func() {
for err := range cg.Errors() {
log.Error(err, nil)
}
}()
go func() {
log.Debug("waiting for messages", nil)
for m := range cg.Messages() {
log.Debug("message", log.Data{"msg": m})
msgChan <- saramaMessage{m}
}
}()
return msgChan
}
开发者ID:ian-kent,项目名称:service.go,代码行数:73,代码来源:consumer.go
示例15: main
func main() {
flag.Parse()
if err := InitConfig(); err != nil {
panic(err)
}
log.LoadConfiguration(Conf.Log)
if err := InitRouterRpc(Conf.RouterAddrs); err != nil {
panic(err)
}
if err := InitCometRpc(Conf.Comets); err != nil {
panic(err)
}
log.Info("start topic:%s consumer", Conf.KafkaTopic)
runtime.GOMAXPROCS(runtime.NumCPU())
log.Info("consumer group name:%s", KAFKA_GROUP_NAME)
config := consumergroup.NewConfig()
config.Offsets.Initial = sarama.OffsetNewest
config.Offsets.ProcessingTimeout = OFFSETS_PROCESSING_TIMEOUT_SECONDS
config.Offsets.CommitInterval = OFFSETS_COMMIT_INTERVAL
config.Zookeeper.Chroot = Conf.ZKRoot
kafkaTopics := []string{Conf.KafkaTopic}
cg, err := consumergroup.JoinConsumerGroup(KAFKA_GROUP_NAME, kafkaTopics, Conf.ZKAddrs, config)
if err != nil {
panic(err)
return
}
go func() {
for err := range cg.Errors() {
log.Error("consumer error(%v)", err)
}
}()
go run(cg)
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGSTOP)
for {
s := <-c
log.Info("get a signal %s\n", s.String())
switch s {
case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGSTOP, syscall.SIGINT:
if err := cg.Close(); err != nil {
log.Error("Error closing the consumer error(%v)", err)
}
time.Sleep(3 * time.Second)
log.Warn("consumer exit\n")
return
case syscall.SIGHUP:
// TODO reload
default:
return
}
}
}
开发者ID:huweixuan,项目名称:goim,代码行数:61,代码来源:push_job.go
注:本文中的github.com/wvanbergen/kafka/consumergroup.NewConfig函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论