• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Golang consumergroup.JoinConsumerGroup函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Golang中github.com/wvanbergen/kafka/consumergroup.JoinConsumerGroup函数的典型用法代码示例。如果您正苦于以下问题:Golang JoinConsumerGroup函数的具体用法?Golang JoinConsumerGroup怎么用?Golang JoinConsumerGroup使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了JoinConsumerGroup函数的18个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Golang代码示例。

示例1: main

func main() {
	consumerGroupName := "my_consumer_group_name2"
	kafkaTopic := "topic"
	zookeeper := []string{"149.204.61.37:2181"}

	consumer, consumerErr := consumergroup.JoinConsumerGroup(consumerGroupName, kafkaTopic, zookeeper, nil)
	if consumerErr != nil {
		log.Fatalln(consumerErr)
	}

	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt)
	go func() {
		<-c
		consumer.Close()
	}()

	eventCount := 0

	stream := consumer.Stream()
	for {
		event, ok := <-stream
		if !ok {
			break
		}

		// Process event
		log.Println(string(event.Value))

		eventCount += 1
	}

	log.Printf("Processed %d events.", eventCount)

}
开发者ID:joshjdevl,项目名称:kafka-data-generator,代码行数:35,代码来源:consumer-list.go


示例2: InitKafka

func InitKafka() error {
	log.Info("start topic:%s consumer", Conf.KafkaTopic)
	log.Info("consumer group name:%s", KAFKA_GROUP_NAME)
	config := consumergroup.NewConfig()
	config.Offsets.Initial = sarama.OffsetNewest
	config.Offsets.ProcessingTimeout = OFFSETS_PROCESSING_TIMEOUT_SECONDS
	config.Offsets.CommitInterval = OFFSETS_COMMIT_INTERVAL
	config.Zookeeper.Chroot = Conf.ZKRoot
	kafkaTopics := []string{Conf.KafkaTopic}
	cg, err := consumergroup.JoinConsumerGroup(KAFKA_GROUP_NAME, kafkaTopics, Conf.ZKAddrs, config)
	if err != nil {
		return err
	}
	go func() {
		for err := range cg.Errors() {
			log.Error("consumer error(%v)", err)
		}
	}()
	go func() {
		for msg := range cg.Messages() {
			log.Info("deal with topic:%s, partitionId:%d, Offset:%d, Key:%s msg:%s", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
			push(string(msg.Key), msg.Value)
			cg.CommitUpto(msg)
		}
	}()
	return nil
}
开发者ID:dulumao,项目名称:goim,代码行数:27,代码来源:kafka.go


示例3: popKafka

func popKafka() error {
	log.Debug("init popkafka")
	config := consumergroup.NewConfig()
	config.Offsets.Initial = sarama.OffsetNewest
	config.Offsets.ProcessingTimeout = OFFSETS_PROCESSING_TIMEOUT_SECONDS
	config.Offsets.CommitInterval = OFFSETS_COMMIT_INTERVAL
	config.Zookeeper.Chroot = ""
	kafkaTopics := []string{KafkaPushsTopic}
	zooks := []string{"127.0.0.1:2181"}
	cg, err := consumergroup.JoinConsumerGroup(KAFKA_GROUP_NAME, kafkaTopics, zooks, config)
	if err != nil {
		return err
	}
	go func() {
		for err := range cg.Errors() {
			log.Error("consumer error(%v)", err)
		}
	}()
	go func() {
		for msg := range cg.Messages() {
			log.Info("deal with userId:%s, partitionId:%d, Offset:%d, Key:%s msg:%s", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
		}
	}()
	return nil
}
开发者ID:qinlodestar,项目名称:simplecomet,代码行数:25,代码来源:kafka.go


示例4: Gather

func (k *Kafka) Gather(acc plugins.Accumulator) error {
	var consumerErr error
	metricQueue := make(chan []byte, 200)

	if k.Consumer == nil {
		k.Consumer, consumerErr = consumergroup.JoinConsumerGroup(
			k.ConsumerGroupName,
			[]string{k.Topic},
			k.ZookeeperPeers,
			nil,
		)

		if consumerErr != nil {
			return consumerErr
		}

		c := make(chan os.Signal, 1)
		halt := make(chan bool, 1)
		signal.Notify(c, os.Interrupt)
		go func() {
			<-c
			halt <- true
			emitMetrics(k, acc, metricQueue)
			k.Consumer.Close()
		}()

		go readFromKafka(k.Consumer.Messages(), metricQueue, k.BatchSize, k.Consumer.CommitUpto, halt)
	}

	return emitMetrics(k, acc, metricQueue)
}
开发者ID:toorop,项目名称:telegraf,代码行数:31,代码来源:kafka_consumer.go


示例5: Init

func (k *KafkaConsumerGroupInput) Init(config interface{}) (err error) {
	k.config = config.(*KafkaConsumerGroupInputConfig)
	if len(k.config.ConsumerGroup) == 0 {
		return fmt.Errorf("consumer_group required")
	}
	if len(k.config.Topics) == 0 {
		return fmt.Errorf("topics required")
	}
	if len(k.config.ZookeeperConnectionString) == 0 {
		return fmt.Errorf("zookeeper_connection_string required")
	}

	// FIXME heka's logging infrastructure can probably be used for this
	// contains useful information for debugging consumer group partition
	// changes
	if k.config.LogSarama {
		sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)
	}

	k.consumerConfig = consumergroup.NewConfig()
	switch k.config.OffsetMethod {
	case "Newest":
		k.consumerConfig.Offsets.Initial = sarama.OffsetNewest
	case "Oldest":
		k.consumerConfig.Offsets.Initial = sarama.OffsetOldest
	default:
		return fmt.Errorf("invalid offset_method: %s", k.config.OffsetMethod)
	}

	k.consumerConfig.Offsets.ProcessingTimeout = 10 * time.Second

	k.consumerConfig.Config.Metadata.Retry.Max = k.config.MetadataRetries
	k.consumerConfig.Config.Metadata.Retry.Backoff = time.Duration(k.config.WaitForElection) * time.Millisecond
	k.consumerConfig.Config.Metadata.RefreshFrequency = time.Duration(k.config.BackgroundRefreshFrequency) * time.Millisecond

	k.consumerConfig.Config.Net.MaxOpenRequests = k.config.MaxOpenRequests
	k.consumerConfig.Config.Net.DialTimeout = time.Duration(k.config.DialTimeout) * time.Millisecond
	k.consumerConfig.Config.Net.ReadTimeout = time.Duration(k.config.ReadTimeout) * time.Millisecond
	k.consumerConfig.Config.Net.WriteTimeout = time.Duration(k.config.WriteTimeout) * time.Millisecond

	k.consumerConfig.Config.Consumer.Fetch.Default = k.config.DefaultFetchSize
	k.consumerConfig.Config.Consumer.Fetch.Min = k.config.MinFetchSize
	k.consumerConfig.Config.Consumer.Fetch.Max = k.config.MaxMessageSize
	k.consumerConfig.Config.Consumer.MaxWaitTime = time.Duration(k.config.MaxWaitTime) * time.Millisecond
	k.consumerConfig.Config.ChannelBufferSize = k.config.EventBufferSize

	var zookeeperNodes []string
	zookeeperNodes, k.consumerConfig.Zookeeper.Chroot = kazoo.ParseConnectionString(k.config.ZookeeperConnectionString)
	if len(zookeeperNodes) == 0 {
		return fmt.Errorf("unable to parse zookeeper_connection_string")
	}

	consumer, err := consumergroup.JoinConsumerGroup(k.config.ConsumerGroup, k.config.Topics, zookeeperNodes, k.consumerConfig)
	if err != nil {
		return
	}
	k.consumer = consumer
	k.stopChan = make(chan bool)
	return
}
开发者ID:bsmedberg,项目名称:data-pipeline,代码行数:60,代码来源:kafka_consumer_group_input.go


示例6: main

func main() {
	flag.Parse()

	if *zookeeper == "" {
		flag.PrintDefaults()
		os.Exit(1)
	}

	config := consumergroup.NewConfig()
	config.Offsets.Initial = sarama.OffsetNewest
	config.Offsets.ProcessingTimeout = 10 * time.Second

	zookeeperNodes, config.Zookeeper.Chroot = kazoo.ParseConnectionString(*zookeeper)

	kafkaTopics := strings.Split(*kafkaTopicsCSV, ",")

	consumer, consumerErr := consumergroup.JoinConsumerGroup(*consumerGroup, kafkaTopics, zookeeperNodes, config)
	if consumerErr != nil {
		log.Fatalln(consumerErr)
	}

	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt)
	go func() {
		<-c
		if err := consumer.Close(); err != nil {
			sarama.Logger.Println("Error closing the consumer", err)
		}
	}()

	go func() {
		for err := range consumer.Errors() {
			log.Println(err)
		}
	}()

	eventCount := 0
	offsets := make(map[string]map[int32]int64)

	for message := range consumer.Messages() {
		if offsets[message.Topic] == nil {
			offsets[message.Topic] = make(map[int32]int64)
		}

		eventCount += 1
		if offsets[message.Topic][message.Partition] != 0 && offsets[message.Topic][message.Partition] != message.Offset-1 {
			log.Printf("Unexpected offset on %s:%d. Expected %d, found %d, diff %d.\n", message.Topic, message.Partition, offsets[message.Topic][message.Partition]+1, message.Offset, message.Offset-offsets[message.Topic][message.Partition]+1)
		}

		// Simulate processing time
		time.Sleep(10 * time.Millisecond)

		offsets[message.Topic][message.Partition] = message.Offset
		consumer.CommitUpto(message)
	}

	log.Printf("Processed %d events.", eventCount)
	log.Printf("%+v", offsets)
}
开发者ID:bcwalrus,项目名称:kafka,代码行数:59,代码来源:main.go


示例7: Start

// Start runs the process of consuming. It is blocking.
func (c *Consumer) Start() error {
	cg, err := consumergroup.JoinConsumerGroup(c.ConsumerGroupName, []string{c.Topic}, []string{c.ZkAddress}, nil)
	if err != nil {
		return err
	}
	defer cg.Close()

	runConsumer(c.Topic, cg)
	return nil
}
开发者ID:gowroc,项目名称:meetups,代码行数:11,代码来源:consumer.go


示例8: streamUsers

func streamUsers(conf *Config) chan models.User {
	config := consumergroup.NewConfig()
	config.Offsets.Initial = sarama.OffsetOldest
	config.Offsets.CommitInterval = 100 * time.Millisecond

	consumer, err := consumergroup.JoinConsumerGroup(
		"indexer",
		[]string{conf.Topic},
		conf.Zookeepers,
		config)
	if err != nil {
		log.Fatalf("Can't create consumer. Err: %v", err)
	}

	var received, errors int

	// Trap SIGINT to trigger a graceful shutdown.
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	out := make(chan models.User, 1024)
	go func() {
		for {
			select {
			case msg := <-consumer.Messages():
				received++

				var user models.User
				if err := json.Unmarshal(msg.Value, &user); err != nil {
					log.Fatalf("Can't unmarshal data from queue! Err: %v", err)
				}

				if *user.Dob == "0000-00-00" {
					user.Dob = nil
				}

				out <- user
				consumer.CommitUpto(msg)
			case err := <-consumer.Errors():
				errors++
				log.Printf("Error reading from topic! Err: %v", err)
			case <-signals:
				log.Printf("Start consumer closing")
				consumer.Close()
				log.Printf("Consumer closed!")
				close(out)
				log.Printf("Successfully consumed: %d; errors: %d", received, errors)
				return
			}
		}
	}()

	return out
}
开发者ID:mateuszdyminski,项目名称:am-pipeline,代码行数:54,代码来源:main.go


示例9: NewConsumer

func (self *Kafka) NewConsumer(consumerGroup string, topics []string, zoo string) (consumer *consumergroup.ConsumerGroup, err error) {
	var zoos []string
	config := consumergroup.NewConfig()
	config.Offsets.Initial = self.offset
	config.Offsets.ProcessingTimeout = 10 * time.Second
	zoos, config.Zookeeper.Chroot = kazoo.ParseConnectionString(zoo)
	consumer, err = consumergroup.JoinConsumerGroup(consumerGroup, topics, zoos, config)
	if err != nil {
		return
	}
	return
}
开发者ID:lixin9311,项目名称:EventTracker,代码行数:12,代码来源:kafka.go


示例10: main

func main() {
	flag.Parse()
	gzlog.InitGZLogger(*LogFile, 50*1000*1000, 5)
	if *kafkaTopic == "" {
		log.Printf("topicShould not be null!\n")
		return
	}
	if *zookeeper == "" {
		log.Printf("zookeeper should not be null! \n")
		return
	}
	if *brokerList == "" {
		log.Printf("kafka brokers must not be null\n")
	}
	config := consumergroup.NewConfig()
	config.Offsets.Initial = sarama.OffsetNewest
	config.Offsets.ProcessingTimeout = 2 * time.Second
	config.Consumer.MaxProcessingTime = 2 * time.Second
	kafkaTopics := strings.Split(*kafkaTopic, ",")
	zookeeperNodes, config.Zookeeper.Chroot = kazoo.ParseConnectionString(*zookeeper)
	consumer, consumerErr := consumergroup.JoinConsumerGroup(*consumerGroup, kafkaTopics, zookeeperNodes, config)
	if consumerErr != nil {
		log.Fatalln(consumerErr)
	}
	log.Printf("start to get mysl connection!\n")
	db, err := sql.Open("mysql", fmt.Sprintf("%s:%[email protected](%s)/jwlwl?charset=utf8&parseTime=True", MysqlUser, MysqlPasswd, MysqlHost))
	defer db.Close()
	if err != nil {
		log.Printf("mysql db connect failed !errMessage:%s \n", err)
		return
	}
	log.Printf("start to get kafka producer\n")
	producer, err := kafka_tool.GetKafkaProducer(*brokerList)
	if err != nil {
		log.Printf("Kafka  get producer failed !err: %s \n", err)
		return
	}

	log.Printf("Start to call consummer messages method !\n")
	for message := range consumer.Messages() {
		log.Printf("Start to call Run method with message:%s \n", message.Value)
		latestLeakEventArg := &u_leak_merge.LatestEventArg{Properties: string(message.Value)}
		err := u_leak_merge.Run("0", latestLeakEventArg, db, producer)
		if err != nil {
			log.Printf("message failed!:%s, errMessage:%s \n", message.Value, err)
			continue
		}
		log.Printf("Start to commit message! \n")
		consumer.CommitUpto(message)
		time.Sleep(100 * time.Millisecond)
	}
}
开发者ID:fantasycool,项目名称:leak_merge,代码行数:52,代码来源:leak_merge.go


示例11: Start

func (k *Kafka) Start() error {
	k.Lock()
	defer k.Unlock()
	var consumerErr error

	config := consumergroup.NewConfig()
	switch strings.ToLower(k.Offset) {
	case "oldest", "":
		config.Offsets.Initial = sarama.OffsetOldest
	case "newest":
		config.Offsets.Initial = sarama.OffsetNewest
	default:
		log.Printf("WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n",
			k.Offset)
		config.Offsets.Initial = sarama.OffsetOldest
	}

	if k.Consumer == nil || k.Consumer.Closed() {
		k.Consumer, consumerErr = consumergroup.JoinConsumerGroup(
			k.ConsumerGroup,
			k.Topics,
			k.ZookeeperPeers,
			config,
		)
		if consumerErr != nil {
			return consumerErr
		}

		// Setup message and error channels
		k.in = k.Consumer.Messages()
		k.errs = k.Consumer.Errors()
	}

	k.done = make(chan struct{})
	if k.PointBuffer == 0 && k.MetricBuffer == 0 {
		k.MetricBuffer = 100000
	} else if k.PointBuffer > 0 {
		// Legacy support of PointBuffer field TODO remove
		k.MetricBuffer = k.PointBuffer
	}
	k.metricC = make(chan telegraf.Metric, k.MetricBuffer)

	// Start the kafka message reader
	go k.receiver()
	log.Printf("Started the kafka consumer service, peers: %v, topics: %v\n",
		k.ZookeeperPeers, k.Topics)
	return nil
}
开发者ID:noise,项目名称:telegraf,代码行数:48,代码来源:kafka_consumer.go


示例12: NewConsumer

// NewConsumer TODO: doc
func NewConsumer(options Options) (*consumergroup.ConsumerGroup, error) {
	consumerConfig := consumergroup.NewConfig()
	consumerConfig.Offsets.Initial = sarama.OffsetNewest
	consumerConfig.Offsets.ProcessingTimeout = 10 * time.Second
	connectionString := strings.Join(options.Zookeepers, ", ")

	var zookeeperNodes []string
	zookeeperNodes, consumerConfig.Zookeeper.Chroot = kazoo.ParseConnectionString(connectionString)

	return consumergroup.JoinConsumerGroup(
		options.ConsumerGroup,
		options.KafkaTopics,
		zookeeperNodes,
		consumerConfig,
	)
}
开发者ID:tulios,项目名称:kafka-revolver,代码行数:17,代码来源:consumer.go


示例13: NewConsumer

func (q *Kafka) NewConsumer(v interface{}) (Consumer, error) {
	args, ok := v.(KafkaConsumerArgs)
	if !ok {
		return nil, fmt.Errorf("invalid consumer arguments(%v)", v)
	}

	if args.Group != "" {
		if len(q.Zookeepers) == 0 {
			return nil, fmt.Errorf("zookeeper url is required.")
		}
		config := consumergroup.NewConfig()
		config.Offsets.Initial = args.getOffset()
		config.Offsets.ProcessingTimeout = 10 * time.Second

		cg, err := consumergroup.JoinConsumerGroup(args.Group, strings.Split(args.Topic, ","), q.Zookeepers, config)
		if err != nil {
			return nil, err
		}

		c := &kafkaGroupConsumer{consumer: cg}
		q.closers = append(q.closers, c)

		return c, nil

	}

	c, err := sarama.NewConsumer(q.Brokers, sarama.NewConfig())
	if err != nil {
		return nil, err
	}
	sc := &kafkaSingleConsumer{consumer: c}

	partitions, err := args.getPartitions(c)
	if err != nil {
		return nil, err
	}

	for _, p := range partitions {
		pc, err := c.ConsumePartition(args.Topic, p, args.getOffset())
		if err != nil {
			return nil, err
		}
		sc.partitionConsumers = append(sc.partitionConsumers, pc)
	}
	q.closers = append(q.closers, sc)
	return sc, err
}
开发者ID:mpls,项目名称:anyq,代码行数:47,代码来源:kafka.go


示例14: Start

func (k *Kafka) Start(acc telegraf.Accumulator) error {
	k.Lock()
	defer k.Unlock()
	var consumerErr error

	k.acc = acc

	config := consumergroup.NewConfig()
	config.Zookeeper.Chroot = k.ZookeeperChroot
	switch strings.ToLower(k.Offset) {
	case "oldest", "":
		config.Offsets.Initial = sarama.OffsetOldest
	case "newest":
		config.Offsets.Initial = sarama.OffsetNewest
	default:
		log.Printf("WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n",
			k.Offset)
		config.Offsets.Initial = sarama.OffsetOldest
	}

	if k.Consumer == nil || k.Consumer.Closed() {
		k.Consumer, consumerErr = consumergroup.JoinConsumerGroup(
			k.ConsumerGroup,
			k.Topics,
			k.ZookeeperPeers,
			config,
		)
		if consumerErr != nil {
			return consumerErr
		}

		// Setup message and error channels
		k.in = k.Consumer.Messages()
		k.errs = k.Consumer.Errors()
	}

	k.done = make(chan struct{})

	// Start the kafka message reader
	go k.receiver()
	log.Printf("Started the kafka consumer service, peers: %v, topics: %v\n",
		k.ZookeeperPeers, k.Topics)
	return nil
}
开发者ID:miketonks,项目名称:telegraf,代码行数:44,代码来源:kafka_consumer.go


示例15: RegisterConsumer

// RegisterConsumer register a new kaka consumer
func RegisterConsumer(registeredConsumer *Consumer) (*Consumer, error) {
	topics := []string{registeredConsumer.Topic}
	var consumerErr error
	registeredConsumer.Consumer, consumerErr = consumergroup.JoinConsumerGroup(
		registeredConsumer.Group,
		topics,
		registeredConsumer.BrokerList,
		registeredConsumer.Config)
	if consumerErr != nil {
		log.Fatalln(consumerErr)
		return nil, consumerErr
	}
	go func() {
		for {
			registeredConsumer.ProcessKafka()
		}
	}()
	return registeredConsumer, nil
}
开发者ID:NexwayGroup,项目名称:nx-lib,代码行数:20,代码来源:consumer.go


示例16: StartListening

/*
*	StartListening will actually create the consumer group and start listening to incoming messages
 */
func (c *Consumer) StartListening() error {
	var err error
	fmt.Println("[INFO] StartListening called")
	zookeeperAddrs := c.config.Zookeeper.AddrList
	config := consumergroup.NewConfig()
	config.Offsets.Initial = sarama.OffsetNewest
	config.Offsets.ProcessingTimeout = 10 * time.Second

	if topicsSize := len(c.callbacks); topicsSize != 0 {

		topics := c.getTopicsKey()

		for currConnAttempt := 0; currConnAttempt < c.config.Zookeeper.MaxRetry; currConnAttempt++ {
			// Creates a new consumer and adds it to the consumer group
			fmt.Println("[INFO] Will add consumer to group: ", c.config.ModuleName.Value)
			c.consumer, err = consumergroup.JoinConsumerGroup(c.config.ModuleName.Value, topics, zookeeperAddrs, config)
			if err == nil {
				break
			}
			fmt.Println("[INFO] Connection attempt faild (", (currConnAttempt + 1), "/", c.config.Zookeeper.MaxRetry, ")")
			<-time.After(time.Second * 5)
		}

		if err != nil {
			fmt.Println("[ERROR] Failed to start KAFKA Consumer Group\nErr:", err)
			return err
		}

		// handle consumer.Errors channel
		c.handleConsumerErrors()

		// start goroutine to handle incoming messages
		go c.handleMessages()
	}
	fmt.Println("[INFO] kafka consumer initialized successfully")
	return nil
}
开发者ID:asvins,项目名称:common_io,代码行数:40,代码来源:common_io.go


示例17: main

func main() {
	flag.Parse()
	if err := InitConfig(); err != nil {
		panic(err)
	}

	log.LoadConfiguration(Conf.Log)
	if err := InitRouterRpc(Conf.RouterAddrs); err != nil {
		panic(err)
	}
	if err := InitCometRpc(Conf.Comets); err != nil {
		panic(err)
	}

	log.Info("start topic:%s consumer", Conf.KafkaTopic)
	runtime.GOMAXPROCS(runtime.NumCPU())

	log.Info("consumer group name:%s", KAFKA_GROUP_NAME)

	config := consumergroup.NewConfig()
	config.Offsets.Initial = sarama.OffsetNewest
	config.Offsets.ProcessingTimeout = OFFSETS_PROCESSING_TIMEOUT_SECONDS
	config.Offsets.CommitInterval = OFFSETS_COMMIT_INTERVAL
	config.Zookeeper.Chroot = Conf.ZKRoot

	kafkaTopics := []string{Conf.KafkaTopic}

	cg, err := consumergroup.JoinConsumerGroup(KAFKA_GROUP_NAME, kafkaTopics, Conf.ZKAddrs, config)
	if err != nil {
		panic(err)
		return
	}

	go func() {
		for err := range cg.Errors() {
			log.Error("consumer error(%v)", err)
		}
	}()

	go run(cg)

	c := make(chan os.Signal, 1)
	signal.Notify(c, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGSTOP)
	for {
		s := <-c
		log.Info("get a signal %s\n", s.String())
		switch s {
		case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGSTOP, syscall.SIGINT:
			if err := cg.Close(); err != nil {
				log.Error("Error closing the consumer error(%v)", err)
			}
			time.Sleep(3 * time.Second)
			log.Warn("consumer exit\n")
			return
		case syscall.SIGHUP:
			// TODO reload
		default:
			return
		}
	}
}
开发者ID:huweixuan,项目名称:goim,代码行数:61,代码来源:push_job.go


示例18: Start

func (kc *kafkaConsumer) Start() chan Message {
	kc.sigChan = make(chan os.Signal, 1)
	msgChan := make(chan Message, 1)

	signal.Notify(kc.sigChan, os.Interrupt)
	go func() {
		<-kc.sigChan
		kc.consumerGroup.Close()
		close(msgChan)
	}()

	cfg := consumergroup.NewConfig()

	cfg.Offsets.Initial = kc.Config.InitialOffset()
	cfg.Offsets.ProcessingTimeout = kc.Config.ProcessingTimeout()

	var zookeeperNodes []string
	url := kc.Config.ZookeeperURL()
	if chroot := kc.Config.ZookeeperChroot(); len(chroot) > 0 {
		url += "/" + chroot
	}
	zookeeperNodes, cfg.Zookeeper.Chroot = kazoo.ParseConnectionString(url)

	var cg *consumergroup.ConsumerGroup
	var err error
	var attempts, curExp int

	for {
		attempts++
		cg, err = consumergroup.JoinConsumerGroup(
			kc.Config.ConsumerGroup(),
			kc.Config.Topics(),
			zookeeperNodes,
			cfg,
		)
		if err != nil {
			log.Error(err, nil)
			if attempts > maxAttempts {
				log.Debug("reached maximum attempts, exiting", nil)
				os.Exit(1)
			}
			if curExp == 0 {
				curExp = 2
			}
			curExp *= 2
			if curExp > maxExp {
				curExp = maxExp
			}
			log.Debug("sleeping", log.Data{"ms": curExp})
			time.Sleep(time.Millisecond * time.Duration(curExp))
			continue
		}
		break
	}

	kc.consumerGroup = cg

	go func() {
		for err := range cg.Errors() {
			log.Error(err, nil)
		}
	}()

	go func() {
		log.Debug("waiting for messages", nil)
		for m := range cg.Messages() {
			log.Debug("message", log.Data{"msg": m})
			msgChan <- saramaMessage{m}
		}
	}()

	return msgChan
}
开发者ID:ian-kent,项目名称:service.go,代码行数:73,代码来源:consumer.go



注:本文中的github.com/wvanbergen/kafka/consumergroup.JoinConsumerGroup函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Golang consumergroup.NewConfig函数代码示例发布时间:2022-05-28
下一篇:
Golang cli.Context类代码示例发布时间:2022-05-28
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap