• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Golang sarama.SyncProducer类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Golang中github.com/Shopify/sarama.SyncProducer的典型用法代码示例。如果您正苦于以下问题:Golang SyncProducer类的具体用法?Golang SyncProducer怎么用?Golang SyncProducer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了SyncProducer类的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Golang代码示例。

示例1: Serve

func Serve(producer sarama.SyncProducer, topic string) {
	for {
		fmt.Print("x y: ")
		var x, y int
		fmt.Scanf("%d %d", &x, &y)

		m := Multiply{
			X: x,
			Y: y,
		}

		jsonMsg, err := json.Marshal(m)
		if err != nil {
			log.Fatalln(err)
		}

		msg := sarama.ProducerMessage{
			Topic: topic,
			Value: sarama.ByteEncoder(jsonMsg),
		}

		partition, offset, err := producer.SendMessage(&msg)
		if err != nil {
			log.Fatal(err)
		} else {
			fmt.Println("Sent msg to partition:", partition, ", offset:", offset)
		}
	}
}
开发者ID:nickmarrone,项目名称:go_kafka_test,代码行数:29,代码来源:producer.go


示例2: run

func (z *zipkin) run(ch chan bool, p sarama.SyncProducer) {
	t := time.NewTicker(z.opts.BatchInterval)

	var buf []*trace.Span

	for {
		select {
		case s := <-z.spans:
			buf = append(buf, s)
			if len(buf) >= z.opts.BatchSize {
				go z.send(buf, p)
				buf = nil
			}
		case <-t.C:
			// flush
			if len(buf) > 0 {
				go z.send(buf, p)
				buf = nil
			}
		case <-ch:
			// exit
			t.Stop()
			p.Close()
			return
		}
	}
}
开发者ID:jelmersnoeck,项目名称:go-platform,代码行数:27,代码来源:zipkin.go


示例3: produce

func produce(producer sarama.SyncProducer, c chan int, n int, s int, topic string, logger *log.Logger) {
	msg := &sarama.ProducerMessage{Topic: topic, Value: sarama.StringEncoder(randomString(s))}
	i := 0
	for ; i < n; i++ {
		_, _, err := producer.SendMessage(msg)
		if err != nil {
			logger.Printf("FAILED to send message: %s\n", err)
		}
	}
	c <- i
}
开发者ID:C-Pro,项目名称:go-kafka-test,代码行数:11,代码来源:producer.go


示例4: pub

func (z *zipkin) pub(s *zipkincore.Span, pr sarama.SyncProducer) {
	t := thrift.NewTMemoryBufferLen(1024)
	p := thrift.NewTBinaryProtocolTransport(t)
	if err := s.Write(p); err != nil {
		return
	}

	m := &sarama.ProducerMessage{
		Topic: z.opts.Topic,
		Value: sarama.ByteEncoder(t.Buffer.Bytes()),
	}
	pr.SendMessage(m)
}
开发者ID:jelmersnoeck,项目名称:go-platform,代码行数:13,代码来源:zipkin.go


示例5: produce

func (k *KafkaProducer) produce(producer sarama.SyncProducer) {
	for {
		metric := <-k.metricsChannel
		json, err := json.MarshalIndent(metric, "", " ")
		if err != nil {
			return
		}
		msg := &sarama.ProducerMessage{Topic: "loadbalancer.all", Value: sarama.StringEncoder(json)}
		_, _, err = producer.SendMessage(msg)
		if err != nil {
			k.Log.Error("error sending to Kafka ")
		}
	}
}
开发者ID:hlm,项目名称:vamp-router,代码行数:14,代码来源:kafka_producer.go


示例6: sendMessage

// Publish message
func sendMessage(producer sarama.SyncProducer, topic string, payload string) {
	message := sarama.ProducerMessage{
		// The Kafka topic for this message
		Topic: topic,
		// The actual message to store in Kafka
		Value: sarama.StringEncoder(payload),
		// No message key, so messages will be distributed randomly over partitions
	}

	// Send Message
	partition, offset, err := producer.SendMessage(&message)
	if err != nil {
		logger.Printf("Error sending data: %s\n", err)
	} else {
		logger.Printf("[%s/%d/%d] Message successfully published\n", topic, partition, offset)
	}

}
开发者ID:pmdcosta,项目名称:apache-kafka,代码行数:19,代码来源:producer.go


示例7: processBatch

// Note that unfortunately this does not actually produce in batches yet. We
// should theoretically be able to with Kafka, but the sarama interface for a
// `SyncProducer` currently seems overly limited.
func processBatch(producer sarama.SyncProducer, topic string, events []*stripe.Event) error {
	for _, event := range events {
		data, err := json.Marshal(event)
		if err != nil {
			return err
		}

		id := event.Data.Obj["id"]
		if id == nil {
			log.Printf("Found event with nil data ID, type is %v", event.Type)
		}

		// TODO: Verify that Kafka does indeed perform log compaction per
		// partition key (as opposed to some other type of "key"). The docs
		// aren't exactly clear on this point.
		key := ""
		if id != nil {
			key = id.(string)
		}

		message := &sarama.ProducerMessage{
			Topic: topic,
			Key:   sarama.StringEncoder(key),
			Value: sarama.ByteEncoder(data),
		}

		//start := time.Now()
		//partition, offset, err := producer.SendMessage(message)
		_, _, err = producer.SendMessage(message)
		if err != nil {
			return err
		} else {
			//log.Printf("> Message sent to partition %d at offset %d in %v\n",
			//partition, offset, time.Now().Sub(start))
		}
	}

	return nil
}
开发者ID:brandur,项目名称:stripe-warehouse,代码行数:42,代码来源:main.go


示例8: main

func main() {
	var cmd *exec.Cmd
	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

	go func() {
		for {
			select {
			case sig := <-sigs:
				cmd.Process.Signal(sig)
			}
		}
	}()

	var cfg config
	if err := gofigure.Gofigure(&cfg); err != nil {
		fmt.Fprintln(os.Stderr, "unexpected error configuring ktee")
		os.Exit(1)
	}

	var err error
	var producer sarama.SyncProducer

	if len(cfg.Brokers) > 0 {
		brokers := strings.Split(cfg.Brokers, ",")
		producer, err = sarama.NewSyncProducer(brokers, sarama.NewConfig())
		if err != nil {
			fmt.Fprintf(os.Stderr, "error connecting to Kafka brokers: %s\n", err)
			os.Exit(1)
		}

		defer func() {
			producer.Close()
		}()
	}

	args := os.Args[1:]
	if len(args) == 0 {
		fmt.Fprintln(os.Stderr, "usage: ktee args")
		os.Exit(1)
	}

	kwOut := kafkaWriter{producer, os.Stdout, cfg.OutTopic, new(bytes.Buffer), make(chan sarama.ProducerMessage)}
	kwErr := kafkaWriter{producer, os.Stderr, cfg.ErrTopic, new(bytes.Buffer), make(chan sarama.ProducerMessage)}

	defer func() {
		kwOut.Flush()
		kwErr.Flush()
	}()

	cmd = exec.Command(args[0], args[1:]...)
	cmd.Stdin = os.Stdin
	cmd.Stdout = kwOut
	cmd.Stderr = kwErr
	cmd.Env = os.Environ()

	err = cmd.Run()
	if err != nil {
		switch err.(type) {
		case *exec.ExitError:
			fmt.Fprintf(os.Stderr, "non-zero exit code: %s\n", err)
			if status, ok := err.(*exec.ExitError).Sys().(syscall.WaitStatus); ok {
				os.Exit(status.ExitStatus())
			}
			os.Exit(1)
		default:
			fmt.Fprintf(os.Stderr, "error executing command: %s\n", err)
			os.Exit(1)
		}
	}
}
开发者ID:ian-kent,项目名称:ktee,代码行数:71,代码来源:main.go


示例9: PublishSync

func PublishSync(input chan *FileEvent, source string, isRetryer bool) {
	log.Debug("publishSync loop")
	clientConfig := sarama.NewConfig()
	clientConfig.Producer.RequiredAcks = sarama.WaitForAll
	clientConfig.Producer.Compression = sarama.CompressionSnappy
	clientConfig.Producer.Partitioner = sarama.NewRoundRobinPartitioner
	clientConfig.Producer.Retry.Max = 10

	topic := kafkaTopic
	key := hashKey
	if isRetryer {
		topic = retryTopic
	}
	//brokerList := []string{"127.0.0.1:9092"}
	var producer sarama.SyncProducer
	var err error
	for {
		producer, err = sarama.NewSyncProducer(brokerList, clientConfig)
		if err != nil {
			log.Error("Sync: Failed to start Sarama producer: ", err)
			log.Info("waiting...")
			time.Sleep(1 * time.Second)
		} else {
			break
		}
	}

	defer func() {
		if err := producer.Close(); err != nil {
			log.Error("Failed to shutdown producer cleanly", err)
		}
	}()

	// if retryer, use retryer backup, others use Registrar
	var recorder Recorder
	if isRetryer {
		// set to global retryer
		retryRecorder := &RetryRecorder{file: mainRetryer.vernier}
		recorder = retryRecorder
		defer retryRecorder.file.Close()
	} else {
		registrar := &Registrar{source: source, dir: REGISTRAR_DIR}
		if _, err := registrar.OpenRecord(registrar.dir); err != nil {
			log.Error("PublishSync open record failed, error:", err)
			os.Exit(2)
		}
		recorder = registrar
	}

	genMessage := func(rawMessage string) string {
		return rawMessage
	}
	// retryer message sample: 0 this is a sample message
	// 0 means, haven't retried succeed
	// 1 means have been sended
	if isRetryer {
		genMessage = func(rawMessage string) string {
			// 0|1 raw_name_of_log_file log_msg
			rawMessage = rawMessage[2:]
			idx := strings.Index(rawMessage, " ")
			return rawMessage[idx+1:]
		}
	}

	for event := range input {
		log.Debugf("%v, %v, %v, %v\n", *event.Source, *event.Text, event.Line, event.Offset)
		// if failed, retry send messge until succeed
		key = strconv.FormatInt(event.Offset, 10)
		rawMessage := *event.Text
		if isRetryer {
			if retryTopic != kafkaTopic {
				topic = retryTopic
			} else {
				baseName := getSourceName(rawMessage)
				if len(topicmap) > 0 {
					tmpTopic := genTopic(baseName, topicmap)
					if tmpTopic != "" {
						topic = tmpTopic
					}
				}
			}
		}
		message := genMessage(*event.Text)
		if rawMessage[0] == '1' {
			log.Debug("message[%s] have been seeded", rawMessage)
			continue
		}

		for {
			partition, offset, err := producer.SendMessage(&sarama.ProducerMessage{
				Topic: topic,
				Key:   sarama.StringEncoder(key),
				Value: sarama.StringEncoder(message),
			})
			if err != nil {
				log.Errorf("Failed: %s, %d, %d\n", *event.Source, event.Line, event.Offset)
				time.Sleep(3 * time.Second)
			} else {
				log.Debugf("OK: %d, %d, %s\n", partition, offset, *event.Source)
				recorder.RecordSucceed(event.Offset, event.RawBytes)
//.........这里部分代码省略.........
开发者ID:chenhuaying,项目名称:logflume,代码行数:101,代码来源:publisher.go


示例10: startIndexDumper

func startIndexDumper(producer sarama.SyncProducer, wg *sync.WaitGroup) chan<- *indexDumperMessage {
	ch := make(chan *indexDumperMessage, 300) // 5 min
	wg.Add(1)

	go func() {
		defer wg.Done()
		buf8 := make([]byte, 8)
		builder := flatbuffers.NewBuilder(1024 * 1024)

		for {
			msg := <-ch
			if msg == nil {
				log.Println("exiting index dumper")
				return
			}

			start := time.Now()
			t, index := msg.t, msg.index
			// log.Printf("index dumper got index for %d", t.Unix())

			var tags []string
			for tag := range index {
				tags = append(tags, tag)
			}

			sort.Strings(tags)

			builder.Reset()
			var fbtags []flatbuffers.UOffsetT

			for _, tag := range tags {
				name := builder.CreateString(tag)

				cnt := 0
				builder.StartVector(flatbuffers.SizeInt64, 0, 0)
				for partition, d := range index[tag] {
					for di := d.head; di != nil; di = di.next {
						for i := 0; i < di.cnt; i++ {
							builder.PrependInt64(encodePartitionAndOffset(partition, di.vals[i]))
							cnt++
						}
					}
				}

				offsetsVector := builder.EndVector(cnt)

				fb.TagStart(builder)
				fb.TagAddName(builder, name)
				fb.TagAddOffsets(builder, offsetsVector)
				fbtags = append(fbtags, fb.TagEnd(builder))
			}

			fb.IndexStartTagsVector(builder, len(fbtags))
			for _, offset := range fbtags {
				builder.PrependUOffsetT(offset)
			}

			tagsVector := builder.EndVector(len(fbtags))

			fb.IndexStart(builder)
			fb.IndexAddTags(builder, tagsVector)
			builder.Finish(fb.IndexEnd(builder))

			encoded := builder.FinishedBytes()
			binary.LittleEndian.PutUint64(buf8, uint64(t.Unix()))

			statIncrementTook(&stat.idxSerializeTook, start)
			statIncrementSize(&stat.idxSendToKafkaSize, len(encoded))

			start = time.Now()
			_, _, err := producer.SendMessage(&sarama.ProducerMessage{
				Topic: indexTopic,
				Key:   sarama.ByteEncoder(buf8),
				Value: sarama.ByteEncoder(encoded),
			})

			statIncrementTook(&stat.idxSendToKafkaTook, start)

			if err != nil {
				log.Printf("failed to store message: %v", err)
			}

			// log.Printf("finished serializing index for %d, %d msgs, %d tags, %d offsets",
			// t.Unix(), msg.msgs, len(tags), msg.offsets)
		}
	}()

	return ch
}
开发者ID:jackdoe,项目名称:no,代码行数:89,代码来源:indexer.go


示例11: closeProducer

// Close Producer
func closeProducer(producer sarama.SyncProducer) {
	err := producer.Close()
	if err != nil {
		logger.Printf("Error closing Producer: %s\n", err)
	}
}
开发者ID:pmdcosta,项目名称:apache-kafka,代码行数:7,代码来源:producer.go



注:本文中的github.com/Shopify/sarama.SyncProducer类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Golang mgo.Dial函数代码示例发布时间:2022-05-28
下一篇:
Golang sarama.FetchRequest类代码示例发布时间:2022-05-28
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap