• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Golang sarama.NewAsyncProducer函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Golang中github.com/Shopify/sarama.NewAsyncProducer函数的典型用法代码示例。如果您正苦于以下问题:Golang NewAsyncProducer函数的具体用法?Golang NewAsyncProducer怎么用?Golang NewAsyncProducer使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了NewAsyncProducer函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Golang代码示例。

示例1: main

func main() {
	var host = flag.String("kafka", "127.0.0.1:9092", "IP address:port of kafka")
	flag.Parse()
	duration := 10 * time.Millisecond
	src := make(chan uint32)
	dst := make(chan uint32)
	notify := make(chan os.Signal, 1)
	signal.Notify(notify, os.Interrupt, os.Kill)

	config := kafka.NewConfig()
	config.Producer.Return.Successes = true
	k_producer, err := kafka.NewAsyncProducer([]string{*host}, config)
	if err != nil {
		panic(err)
	}
	fmt.Println("src_ip,dst_ip,src_coord,dst_coord,received_at")

	//dc_ips are data center IPs
	dc_ips := []uint32{1222977025, 2212761857, 2169380865}

	go producer(src, dc_ips, duration)
	go producer(dst, dc_ips, duration)
	go consumer(src, dst, k_producer)

	go func(producer kafka.AsyncProducer) {
		for {
			<-producer.Successes()
		}
	}(k_producer)

	s := <-notify
	fmt.Println("signal:", s)
	fmt.Println("done.")

}
开发者ID:diebels727,项目名称:ip-generator,代码行数:35,代码来源:noise.go


示例2: newAccessLogProducer

func newAccessLogProducer(brokerList []string) sarama.AsyncProducer {

	// For the access log, we are looking for AP semantics, with high throughput.
	// By creating batches of compressed messages, we reduce network I/O at a cost of more latency.
	config := sarama.NewConfig()
	tlsConfig := createTlsConfiguration()
	if tlsConfig != nil {
		config.Net.TLS.Enable = true
		config.Net.TLS.Config = tlsConfig
	}
	config.Producer.RequiredAcks = sarama.WaitForLocal       // Only wait for the leader to ack
	config.Producer.Compression = sarama.CompressionSnappy   // Compress messages
	config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms

	producer, err := sarama.NewAsyncProducer(brokerList, config)
	if err != nil {
		log.Fatalln("Failed to start Sarama producer:", err)
	}

	// We will just log to STDOUT if we're not able to produce messages.
	// Note: messages will only be returned here after all retry attempts are exhausted.
	go func() {
		for err := range producer.Errors() {
			log.Println("Failed to write access log entry:", err)
		}
	}()

	return producer
}
开发者ID:ChongFeng,项目名称:beats,代码行数:29,代码来源:http_server.go


示例3: newStorage

func newStorage(machineName string) (storage.StorageDriver, error) {
	config := kafka.NewConfig()

	tlsConfig, err := generateTLSConfig()
	if err != nil {
		return nil, err
	}

	if tlsConfig != nil {
		config.Net.TLS.Enable = true
		config.Net.TLS.Config = tlsConfig
	}

	config.Producer.RequiredAcks = kafka.WaitForAll

	brokerList := strings.Split(*brokers, ",")
	glog.V(4).Infof("Kafka brokers:%q", brokers)

	producer, err := kafka.NewAsyncProducer(brokerList, config)
	if err != nil {
		return nil, err
	}
	ret := &kafkaStorage{
		producer:    producer,
		topic:       *topic,
		machineName: machineName,
	}
	return ret, nil
}
开发者ID:chrisdo,项目名称:cadvisor,代码行数:29,代码来源:kafka.go


示例4: pubKafkaAsyncLoop

func pubKafkaAsyncLoop(seq int) {
	cf := sarama.NewConfig()
	cf.Producer.Flush.Frequency = time.Second * 10
	cf.Producer.Flush.Messages = 1000
	cf.Producer.Flush.MaxMessages = 1000
	cf.Producer.RequiredAcks = sarama.WaitForLocal
	cf.Producer.Partitioner = sarama.NewHashPartitioner
	cf.Producer.Timeout = time.Second
	//cf.Producer.Compression = sarama.CompressionSnappy
	cf.Producer.Retry.Max = 3
	producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, cf)
	if err != nil {
		stress.IncCounter("fail", 1)
		log.Println(err)
		return
	}

	defer producer.Close()
	msg := strings.Repeat("X", sz)
	for i := 0; i < loops; i++ {
		producer.Input() <- &sarama.ProducerMessage{
			Topic: topic,
			Value: sarama.StringEncoder(msg),
		}
		stress.IncCounter("ok", 1)
	}

}
开发者ID:chendx79,项目名称:gafka,代码行数:28,代码来源:bench.go


示例5: NewEventPublisher

func NewEventPublisher() (*EventPublisher, error) {
	config := sarama.NewConfig()
	config.ClientID = ipresolver.GetLocalAddr()
	config.Producer.RequiredAcks = sarama.WaitForLocal
	config.Producer.Compression = sarama.CompressionNone
	config.Producer.Return.Successes = false
	config.Producer.Return.Errors = false
	config.Producer.Partitioner = sarama.NewHashPartitioner
	asyncProducer, err := sarama.NewAsyncProducer(eatonconfig.KafkaServers, config)

	if err != nil {
		return nil, err
	}
	if config.Producer.Return.Successes {
		go func() {
			for msg := range asyncProducer.Successes() {
				log.Println("Sent Message to logs: ", msg.Key)
			}
		}()
	}
	if config.Producer.Return.Errors {
		go func() {
			for err := range asyncProducer.Errors() {
				log.Println("failed to send message to logs: ", err.Error())
			}
		}()
	}
	return &EventPublisher{
		producer: asyncProducer,
	}, nil
}
开发者ID:ECLabs,项目名称:Eaton-Feeder,代码行数:31,代码来源:event_producer.go


示例6: makePub

func (this *Mirror) makePub(c2 *zk.ZkCluster) (sarama.AsyncProducer, error) {
	cf := sarama.NewConfig()
	cf.Metadata.RefreshFrequency = time.Minute * 10
	cf.Metadata.Retry.Max = 3
	cf.Metadata.Retry.Backoff = time.Second * 3

	cf.ChannelBufferSize = 1000

	cf.Producer.Return.Errors = true
	cf.Producer.Flush.Messages = 2000         // 2000 message in batch
	cf.Producer.Flush.Frequency = time.Second // flush interval
	cf.Producer.Flush.MaxMessages = 0         // unlimited
	cf.Producer.RequiredAcks = sarama.WaitForLocal
	cf.Producer.Retry.Backoff = time.Second * 4
	cf.Producer.Retry.Max = 3
	cf.Net.DialTimeout = time.Second * 30
	cf.Net.WriteTimeout = time.Second * 30
	cf.Net.ReadTimeout = time.Second * 30

	switch this.Compress {
	case "gzip":
		cf.Producer.Compression = sarama.CompressionGZIP

	case "snappy":
		cf.Producer.Compression = sarama.CompressionSnappy
	}
	return sarama.NewAsyncProducer(c2.BrokerList(), cf)
}
开发者ID:funkygao,项目名称:gafka,代码行数:28,代码来源:factory.go


示例7: Setup

// Setup prepares the Requester for benchmarking.
func (k *kafkaRequester) Setup() error {
	config := sarama.NewConfig()
	producer, err := sarama.NewAsyncProducer(k.urls, config)
	if err != nil {
		return err
	}

	consumer, err := sarama.NewConsumer(k.urls, nil)
	if err != nil {
		producer.Close()
		return err
	}
	partitionConsumer, err := consumer.ConsumePartition(k.topic, 0, sarama.OffsetNewest)
	if err != nil {
		producer.Close()
		consumer.Close()
		return err
	}

	k.producer = producer
	k.consumer = consumer
	k.partitionConsumer = partitionConsumer
	k.msg = &sarama.ProducerMessage{
		Topic: k.topic,
		Value: sarama.ByteEncoder(make([]byte, k.payloadSize)),
	}

	return nil
}
开发者ID:actourex,项目名称:bench,代码行数:30,代码来源:kafka_requester.go


示例8: initProducer

func initProducer(moduleConfig *Config) (*Producer, error) {
	fmt.Println("[INFO] initProducer called")
	brokerList := moduleConfig.Kafka.BrokerList
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll // only wait for leader to ack
	config.Producer.Compression = sarama.CompressionSnappy
	config.Producer.Flush.Frequency = 500 * time.Millisecond

	var producer sarama.AsyncProducer
	var err error
	for currConnAttempt := 0; currConnAttempt < moduleConfig.Kafka.MaxRetry; currConnAttempt++ {
		producer, err = sarama.NewAsyncProducer(brokerList, config)
		if err == nil {
			break
		}
		fmt.Println("[INFO] Connection attempt faild (", (currConnAttempt + 1), "/", moduleConfig.Kafka.MaxRetry, ")")
		<-time.After(time.Second * 5)
	}

	if err != nil {
		fmt.Println("[ERROR] Unable to setup kafka producer", err)
		return nil, err
	}

	//You must read from the Errors() channel or the producer will deadlock.
	go func() {
		for err := range producer.Errors() {
			log.Println("[ERROR] Kadka producer Error: ", err)
		}
	}()

	fmt.Println("[INFO] kafka producer initialized successfully")
	return &Producer{producer: producer, id: CreatedProducersLength()}, nil
}
开发者ID:asvins,项目名称:common_io,代码行数:34,代码来源:common_io.go


示例9: main

func main() {

	config := sarama.NewConfig()
	config.Producer.Compression = sarama.CompressionSnappy

	flag.StringVar(&kafkaBrokers, "brokers", "localhost:9092", "The kafka broker addresses")
	flag.Parse()

	brokers := []string{}

	for _, broker := range strings.Split(kafkaBrokers, ",") {
		brokers = append(brokers, broker)
	}

	producer, err := sarama.NewAsyncProducer(brokers, config)
	if err == nil {
		fmt.Println("Connected to Kafka brokers", "["+kafkaBrokers+"]")
		ifaces, err := net.Interfaces()
		if err != nil {
			log.Fatal("Cannot get network interfaces")
		}
		for _, iface := range ifaces {
			addrs, _ := iface.Addrs()
			if iface.Name != "lo" && len(addrs) > 0 {
				fmt.Printf("Starting live capture on %s interface...", iface.Name)
				decodePackets(iface.Name, producer)
			}

		}
	} else {
		log.Fatal("Can't create the Kafka producer")
	}
}
开发者ID:bhnedo,项目名称:rabbit-packetstorm,代码行数:33,代码来源:rabbitgopcap.go


示例10: handler

func handler(w http.ResponseWriter, r *http.Request) {
	decoder := json.NewDecoder(r.Body)
	var request Request
	err := decoder.Decode(&request)
	if err != nil {
		log.Print("Could not decode request")
		http.Error(w, err.Error(), 500)
		return
	}

	log.Print("Received request for kind: ", request.Kind)

	config := sarama.NewConfig()
	producer, err := sarama.NewAsyncProducer(KafkaAddresses, config)

	if err != nil {
		log.Print("Could not connect to Kafka: ", err)
		http.Error(w, err.Error(), 500)
		return
	}

	log.Print("Connected to Kafka")

	message := sarama.ProducerMessage{
		Topic: request.Kind,
		Value: MapEncoder(request.Data),
	}

	producer.Input() <- &message

	log.Print("Message sent")

	fmt.Fprintf(w, "OK")
}
开发者ID:JosephSalisbury,项目名称:event-sourcing-poc,代码行数:34,代码来源:main.go


示例11: main

func main() {
	producer, err := sarama.NewAsyncProducer([]string{"10.3.10.32:9091"}, nil)
	if err != nil {
		panic(err)
	}

	defer func() {
		if err = producer.Close(); err != nil {
			log.Fatalln(err)
		}
	}()

	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	var enqueued, errors int
ProducerLoop:
	for {
		select {
		case producer.Input() <- &sarama.ProducerMessage{Topic: "dataman_test", Key: nil, Value: sarama.StringEncoder("testing 123")}:
			enqueued++
		case err = <-producer.Errors():
			log.Println("Failed to produce message", err)
			errors++
		case <-signals:
			break ProducerLoop
		}
	}

	log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)
}
开发者ID:upccup,项目名称:cuplearn,代码行数:31,代码来源:asyncproducer.go


示例12: NewKafkaProducer

func NewKafkaProducer() (*IndeedKafkaProducer, error) {
	config := sarama.NewConfig()
	config.ClientID = ipresolver.GetLocalAddr()
	config.Producer.RequiredAcks = sarama.WaitForLocal
	config.Producer.Compression = sarama.CompressionNone
	config.Producer.Return.Successes = true
	config.Producer.Return.Errors = true
	config.Producer.Partitioner = sarama.NewHashPartitioner
	asyncProducer, err := sarama.NewAsyncProducer(eatonconfig.KafkaServers, config)
	if err != nil {
		return nil, err
	}
	go func() {
		for msg := range asyncProducer.Successes() {
			eatonevents.Info(fmt.Sprintf("Successfully sent message to topic %s with key %s", msg.Topic, msg.Key))
		}
	}()
	go func() {
		for err := range asyncProducer.Errors() {
			eatonevents.Error("Failed to send message due to error: ", err)
		}
	}()
	return &IndeedKafkaProducer{
		producer: asyncProducer,
	}, nil
}
开发者ID:ECLabs,项目名称:Eaton-Feeder,代码行数:26,代码来源:kafka_producer.go


示例13: NewKafka

func NewKafka(numberOfMessages int, testLatency bool) *Kafka {
	config := sarama.NewConfig()
	client, _ := sarama.NewClient([]string{"localhost:9092"}, config)

	topic := "test"
	pub, _ := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)
	consumer, _ := sarama.NewConsumerFromClient(client)
	sub, _ := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest)

	var handler benchmark.MessageHandler
	if testLatency {
		handler = &benchmark.LatencyMessageHandler{
			NumberOfMessages: numberOfMessages,
			Latencies:        []float32{},
		}
	} else {
		handler = &benchmark.ThroughputMessageHandler{NumberOfMessages: numberOfMessages}
	}

	return &Kafka{
		handler: handler,
		client:  client,
		pub:     pub,
		sub:     sub,
		topic:   topic,
	}
}
开发者ID:houcy,项目名称:mq-benchmarking,代码行数:27,代码来源:kafka.go


示例14: queueInit

func queueInit() {

	config := sarama.NewConfig()

	config.ClientID = args.ID

	// Acks
	if args.Pub.Ack {
		config.Producer.RequiredAcks = sarama.WaitForAll
	} else {
		config.Producer.RequiredAcks = sarama.WaitForLocal
	}

	// Compress
	if args.Pub.Compress {
		config.Producer.Compression = sarama.CompressionSnappy
	} else {
		config.Producer.Compression = sarama.CompressionNone
	}

	// Flush Intervals
	if args.Pub.FlushFreq > 0 {
		config.Producer.Flush.Frequency = time.Duration(args.Pub.FlushFreq) * time.Second
	} else {
		config.Producer.Flush.Frequency = 1 * time.Second
	}

	producer, err := sarama.NewAsyncProducer(args.Pub.URI, config)
	if err != nil {
		log.Fatalln("Failed to start Kafka producer:", err)
	}

	qProducer = producer

}
开发者ID:trustedanalytics,项目名称:gateway,代码行数:35,代码来源:publisher.go


示例15: NewKafkaOutput

// NewKafkaOutput creates instance of kafka producer client.
func NewKafkaOutput(address string, config *KafkaConfig) io.Writer {
	c := sarama.NewConfig()
	c.Producer.RequiredAcks = sarama.WaitForLocal
	c.Producer.Compression = sarama.CompressionSnappy
	c.Producer.Flush.Frequency = KafkaOutputFrequency * time.Millisecond

	brokerList := strings.Split(config.host, ",")

	producer, err := sarama.NewAsyncProducer(brokerList, c)
	if err != nil {
		log.Fatalln("Failed to start Sarama(Kafka) producer:", err)
	}

	o := &KafkaOutput{
		config:   config,
		producer: producer,
	}

	if Settings.verbose {
		// Start infinite loop for tracking errors for kafka producer.
		go o.ErrorHandler()
	}

	return o
}
开发者ID:buger,项目名称:gor,代码行数:26,代码来源:output_kafka.go


示例16: NewPeer

// NewPeer creates and returns a new Peer for communicating with Kafka.
func NewPeer(host string) (*Peer, error) {
	host = strings.Split(host, ":")[0] + ":9092"
	config := sarama.NewConfig()
	client, err := sarama.NewClient([]string{host}, config)
	if err != nil {
		return nil, err
	}

	producer, err := sarama.NewAsyncProducer([]string{host}, config)
	if err != nil {
		return nil, err
	}

	consumer, err := sarama.NewConsumer([]string{host}, config)
	if err != nil {
		return nil, err
	}

	partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest)
	if err != nil {
		return nil, err
	}

	return &Peer{
		client:   client,
		producer: producer,
		consumer: partitionConsumer,
		send:     make(chan []byte),
		errors:   make(chan error, 1),
		done:     make(chan bool),
	}, nil
}
开发者ID:huikang,项目名称:Flotilla,代码行数:33,代码来源:kafka.go


示例17: newAsyncProducer

func newAsyncProducer(tlsConfig *tls.Config, brokerList []string) *sarama.AsyncProducer {
	config := sarama.NewConfig()

	config.Producer.RequiredAcks = sarama.WaitForLocal       // Only wait for the leader to ack
	config.Producer.Compression = sarama.CompressionSnappy   // Compress messages
	config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms

	// On the broker side, you may want to change the following settings to get
	// stronger consistency guarantees:
	// - For your broker, set `unclean.leader.election.enable` to false
	// - For the topic, you could increase `min.insync.replicas`.

	producer, err := sarama.NewAsyncProducer(brokerList, config)
	if err != nil {
		log.Fatalln("Failed to start Sarama producer:", err)
	}

	// We will just log to STDOUT if we're not able to produce messages.
	// Note: messages will only be returned here after all retry attempts are exhausted.
	// this goroutine will eventually exit as producer.shutdown closes the errors channel
	go func() {
		for err := range producer.Errors() {
			log.Println("Failed to write access log entry:", err)
		}
	}()

	return &producer

}
开发者ID:Fiery,项目名称:fsmonitor,代码行数:29,代码来源:monitor_kafka_client.go


示例18: NewPipelineKafka

func NewPipelineKafka(host, db string) *PipelineKafka {
	brokerList := strings.Split(host, ",")
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForLocal       // Only wait for the leader to ack
	config.Producer.Compression = sarama.CompressionSnappy   // Compress messages
	config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
	producer, _ := sarama.NewAsyncProducer(brokerList, config)
	return &PipelineKafka{producer: producer, db: db}
}
开发者ID:ZuochaoLee,项目名称:visual_spider_go,代码行数:9,代码来源:pipeline_kafka.go


示例19: initAP

func (b *Broker) initAP() error {
	var err error
	b.ap, err = sarama.NewAsyncProducer(b.config.brokerServerList, b.brokerConfig)
	if err != nil {
		return err
	}
	b.produceChan = make(chan *sarama.ProducerMessage, 64)
	return nil
}
开发者ID:dzch,项目名称:binstore,代码行数:9,代码来源:broker.go


示例20: main

func main() {

	// Setup configuration
	config := sarama.NewConfig()
	// Return specifies what channels will be populated.
	// If they are set to true, you must read from
	// config.Producer.Return.Successes = true
	// The total number of times to retry sending a message (default 3).
	config.Producer.Retry.Max = 5
	// The level of acknowledgement reliability needed from the broker.
	config.Producer.RequiredAcks = sarama.WaitForAll
	brokers := []string{"localhost:9092"}
	producer, err := sarama.NewAsyncProducer(brokers, config)
	if err != nil {
		// Should not reach here
		panic(err)
	}

	defer func() {
		if err := producer.Close(); err != nil {
			// Should not reach here
			panic(err)
		}
	}()

	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	var enqueued, errors int
	doneCh := make(chan struct{})
	go func() {
		for {

			time.Sleep(500 * time.Millisecond)

			strTime := strconv.Itoa(int(time.Now().Unix()))
			msg := &sarama.ProducerMessage{
				Topic: "important",
				Key:   sarama.StringEncoder(strTime),
				Value: sarama.StringEncoder("Something Cool"),
			}
			select {
			case producer.Input() <- msg:
				enqueued++
				fmt.Println("Produce message")
			case err := <-producer.Errors():
				errors++
				fmt.Println("Failed to produce message:", err)
			case <-signals:
				doneCh <- struct{}{}
			}
		}
	}()

	<-doneCh
	log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)
}
开发者ID:hyndio,项目名称:go-kafka,代码行数:57,代码来源:main.go



注:本文中的github.com/Shopify/sarama.NewAsyncProducer函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Golang sarama.NewClient函数代码示例发布时间:2022-05-28
下一篇:
Golang sarama.ByteEncoder函数代码示例发布时间:2022-05-28
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap