本文整理汇总了Golang中github.com/Shopify/sarama.NewAsyncProducer函数的典型用法代码示例。如果您正苦于以下问题:Golang NewAsyncProducer函数的具体用法?Golang NewAsyncProducer怎么用?Golang NewAsyncProducer使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了NewAsyncProducer函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Golang代码示例。
示例1: main
func main() {
var host = flag.String("kafka", "127.0.0.1:9092", "IP address:port of kafka")
flag.Parse()
duration := 10 * time.Millisecond
src := make(chan uint32)
dst := make(chan uint32)
notify := make(chan os.Signal, 1)
signal.Notify(notify, os.Interrupt, os.Kill)
config := kafka.NewConfig()
config.Producer.Return.Successes = true
k_producer, err := kafka.NewAsyncProducer([]string{*host}, config)
if err != nil {
panic(err)
}
fmt.Println("src_ip,dst_ip,src_coord,dst_coord,received_at")
//dc_ips are data center IPs
dc_ips := []uint32{1222977025, 2212761857, 2169380865}
go producer(src, dc_ips, duration)
go producer(dst, dc_ips, duration)
go consumer(src, dst, k_producer)
go func(producer kafka.AsyncProducer) {
for {
<-producer.Successes()
}
}(k_producer)
s := <-notify
fmt.Println("signal:", s)
fmt.Println("done.")
}
开发者ID:diebels727,项目名称:ip-generator,代码行数:35,代码来源:noise.go
示例2: newAccessLogProducer
func newAccessLogProducer(brokerList []string) sarama.AsyncProducer {
// For the access log, we are looking for AP semantics, with high throughput.
// By creating batches of compressed messages, we reduce network I/O at a cost of more latency.
config := sarama.NewConfig()
tlsConfig := createTlsConfiguration()
if tlsConfig != nil {
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
}
config.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
config.Producer.Compression = sarama.CompressionSnappy // Compress messages
config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
producer, err := sarama.NewAsyncProducer(brokerList, config)
if err != nil {
log.Fatalln("Failed to start Sarama producer:", err)
}
// We will just log to STDOUT if we're not able to produce messages.
// Note: messages will only be returned here after all retry attempts are exhausted.
go func() {
for err := range producer.Errors() {
log.Println("Failed to write access log entry:", err)
}
}()
return producer
}
开发者ID:ChongFeng,项目名称:beats,代码行数:29,代码来源:http_server.go
示例3: newStorage
func newStorage(machineName string) (storage.StorageDriver, error) {
config := kafka.NewConfig()
tlsConfig, err := generateTLSConfig()
if err != nil {
return nil, err
}
if tlsConfig != nil {
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
}
config.Producer.RequiredAcks = kafka.WaitForAll
brokerList := strings.Split(*brokers, ",")
glog.V(4).Infof("Kafka brokers:%q", brokers)
producer, err := kafka.NewAsyncProducer(brokerList, config)
if err != nil {
return nil, err
}
ret := &kafkaStorage{
producer: producer,
topic: *topic,
machineName: machineName,
}
return ret, nil
}
开发者ID:chrisdo,项目名称:cadvisor,代码行数:29,代码来源:kafka.go
示例4: pubKafkaAsyncLoop
func pubKafkaAsyncLoop(seq int) {
cf := sarama.NewConfig()
cf.Producer.Flush.Frequency = time.Second * 10
cf.Producer.Flush.Messages = 1000
cf.Producer.Flush.MaxMessages = 1000
cf.Producer.RequiredAcks = sarama.WaitForLocal
cf.Producer.Partitioner = sarama.NewHashPartitioner
cf.Producer.Timeout = time.Second
//cf.Producer.Compression = sarama.CompressionSnappy
cf.Producer.Retry.Max = 3
producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, cf)
if err != nil {
stress.IncCounter("fail", 1)
log.Println(err)
return
}
defer producer.Close()
msg := strings.Repeat("X", sz)
for i := 0; i < loops; i++ {
producer.Input() <- &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(msg),
}
stress.IncCounter("ok", 1)
}
}
开发者ID:chendx79,项目名称:gafka,代码行数:28,代码来源:bench.go
示例5: NewEventPublisher
func NewEventPublisher() (*EventPublisher, error) {
config := sarama.NewConfig()
config.ClientID = ipresolver.GetLocalAddr()
config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Compression = sarama.CompressionNone
config.Producer.Return.Successes = false
config.Producer.Return.Errors = false
config.Producer.Partitioner = sarama.NewHashPartitioner
asyncProducer, err := sarama.NewAsyncProducer(eatonconfig.KafkaServers, config)
if err != nil {
return nil, err
}
if config.Producer.Return.Successes {
go func() {
for msg := range asyncProducer.Successes() {
log.Println("Sent Message to logs: ", msg.Key)
}
}()
}
if config.Producer.Return.Errors {
go func() {
for err := range asyncProducer.Errors() {
log.Println("failed to send message to logs: ", err.Error())
}
}()
}
return &EventPublisher{
producer: asyncProducer,
}, nil
}
开发者ID:ECLabs,项目名称:Eaton-Feeder,代码行数:31,代码来源:event_producer.go
示例6: makePub
func (this *Mirror) makePub(c2 *zk.ZkCluster) (sarama.AsyncProducer, error) {
cf := sarama.NewConfig()
cf.Metadata.RefreshFrequency = time.Minute * 10
cf.Metadata.Retry.Max = 3
cf.Metadata.Retry.Backoff = time.Second * 3
cf.ChannelBufferSize = 1000
cf.Producer.Return.Errors = true
cf.Producer.Flush.Messages = 2000 // 2000 message in batch
cf.Producer.Flush.Frequency = time.Second // flush interval
cf.Producer.Flush.MaxMessages = 0 // unlimited
cf.Producer.RequiredAcks = sarama.WaitForLocal
cf.Producer.Retry.Backoff = time.Second * 4
cf.Producer.Retry.Max = 3
cf.Net.DialTimeout = time.Second * 30
cf.Net.WriteTimeout = time.Second * 30
cf.Net.ReadTimeout = time.Second * 30
switch this.Compress {
case "gzip":
cf.Producer.Compression = sarama.CompressionGZIP
case "snappy":
cf.Producer.Compression = sarama.CompressionSnappy
}
return sarama.NewAsyncProducer(c2.BrokerList(), cf)
}
开发者ID:funkygao,项目名称:gafka,代码行数:28,代码来源:factory.go
示例7: Setup
// Setup prepares the Requester for benchmarking.
func (k *kafkaRequester) Setup() error {
config := sarama.NewConfig()
producer, err := sarama.NewAsyncProducer(k.urls, config)
if err != nil {
return err
}
consumer, err := sarama.NewConsumer(k.urls, nil)
if err != nil {
producer.Close()
return err
}
partitionConsumer, err := consumer.ConsumePartition(k.topic, 0, sarama.OffsetNewest)
if err != nil {
producer.Close()
consumer.Close()
return err
}
k.producer = producer
k.consumer = consumer
k.partitionConsumer = partitionConsumer
k.msg = &sarama.ProducerMessage{
Topic: k.topic,
Value: sarama.ByteEncoder(make([]byte, k.payloadSize)),
}
return nil
}
开发者ID:actourex,项目名称:bench,代码行数:30,代码来源:kafka_requester.go
示例8: initProducer
func initProducer(moduleConfig *Config) (*Producer, error) {
fmt.Println("[INFO] initProducer called")
brokerList := moduleConfig.Kafka.BrokerList
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // only wait for leader to ack
config.Producer.Compression = sarama.CompressionSnappy
config.Producer.Flush.Frequency = 500 * time.Millisecond
var producer sarama.AsyncProducer
var err error
for currConnAttempt := 0; currConnAttempt < moduleConfig.Kafka.MaxRetry; currConnAttempt++ {
producer, err = sarama.NewAsyncProducer(brokerList, config)
if err == nil {
break
}
fmt.Println("[INFO] Connection attempt faild (", (currConnAttempt + 1), "/", moduleConfig.Kafka.MaxRetry, ")")
<-time.After(time.Second * 5)
}
if err != nil {
fmt.Println("[ERROR] Unable to setup kafka producer", err)
return nil, err
}
//You must read from the Errors() channel or the producer will deadlock.
go func() {
for err := range producer.Errors() {
log.Println("[ERROR] Kadka producer Error: ", err)
}
}()
fmt.Println("[INFO] kafka producer initialized successfully")
return &Producer{producer: producer, id: CreatedProducersLength()}, nil
}
开发者ID:asvins,项目名称:common_io,代码行数:34,代码来源:common_io.go
示例9: main
func main() {
config := sarama.NewConfig()
config.Producer.Compression = sarama.CompressionSnappy
flag.StringVar(&kafkaBrokers, "brokers", "localhost:9092", "The kafka broker addresses")
flag.Parse()
brokers := []string{}
for _, broker := range strings.Split(kafkaBrokers, ",") {
brokers = append(brokers, broker)
}
producer, err := sarama.NewAsyncProducer(brokers, config)
if err == nil {
fmt.Println("Connected to Kafka brokers", "["+kafkaBrokers+"]")
ifaces, err := net.Interfaces()
if err != nil {
log.Fatal("Cannot get network interfaces")
}
for _, iface := range ifaces {
addrs, _ := iface.Addrs()
if iface.Name != "lo" && len(addrs) > 0 {
fmt.Printf("Starting live capture on %s interface...", iface.Name)
decodePackets(iface.Name, producer)
}
}
} else {
log.Fatal("Can't create the Kafka producer")
}
}
开发者ID:bhnedo,项目名称:rabbit-packetstorm,代码行数:33,代码来源:rabbitgopcap.go
示例10: handler
func handler(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
var request Request
err := decoder.Decode(&request)
if err != nil {
log.Print("Could not decode request")
http.Error(w, err.Error(), 500)
return
}
log.Print("Received request for kind: ", request.Kind)
config := sarama.NewConfig()
producer, err := sarama.NewAsyncProducer(KafkaAddresses, config)
if err != nil {
log.Print("Could not connect to Kafka: ", err)
http.Error(w, err.Error(), 500)
return
}
log.Print("Connected to Kafka")
message := sarama.ProducerMessage{
Topic: request.Kind,
Value: MapEncoder(request.Data),
}
producer.Input() <- &message
log.Print("Message sent")
fmt.Fprintf(w, "OK")
}
开发者ID:JosephSalisbury,项目名称:event-sourcing-poc,代码行数:34,代码来源:main.go
示例11: main
func main() {
producer, err := sarama.NewAsyncProducer([]string{"10.3.10.32:9091"}, nil)
if err != nil {
panic(err)
}
defer func() {
if err = producer.Close(); err != nil {
log.Fatalln(err)
}
}()
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
var enqueued, errors int
ProducerLoop:
for {
select {
case producer.Input() <- &sarama.ProducerMessage{Topic: "dataman_test", Key: nil, Value: sarama.StringEncoder("testing 123")}:
enqueued++
case err = <-producer.Errors():
log.Println("Failed to produce message", err)
errors++
case <-signals:
break ProducerLoop
}
}
log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)
}
开发者ID:upccup,项目名称:cuplearn,代码行数:31,代码来源:asyncproducer.go
示例12: NewKafkaProducer
func NewKafkaProducer() (*IndeedKafkaProducer, error) {
config := sarama.NewConfig()
config.ClientID = ipresolver.GetLocalAddr()
config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Compression = sarama.CompressionNone
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Producer.Partitioner = sarama.NewHashPartitioner
asyncProducer, err := sarama.NewAsyncProducer(eatonconfig.KafkaServers, config)
if err != nil {
return nil, err
}
go func() {
for msg := range asyncProducer.Successes() {
eatonevents.Info(fmt.Sprintf("Successfully sent message to topic %s with key %s", msg.Topic, msg.Key))
}
}()
go func() {
for err := range asyncProducer.Errors() {
eatonevents.Error("Failed to send message due to error: ", err)
}
}()
return &IndeedKafkaProducer{
producer: asyncProducer,
}, nil
}
开发者ID:ECLabs,项目名称:Eaton-Feeder,代码行数:26,代码来源:kafka_producer.go
示例13: NewKafka
func NewKafka(numberOfMessages int, testLatency bool) *Kafka {
config := sarama.NewConfig()
client, _ := sarama.NewClient([]string{"localhost:9092"}, config)
topic := "test"
pub, _ := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)
consumer, _ := sarama.NewConsumerFromClient(client)
sub, _ := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest)
var handler benchmark.MessageHandler
if testLatency {
handler = &benchmark.LatencyMessageHandler{
NumberOfMessages: numberOfMessages,
Latencies: []float32{},
}
} else {
handler = &benchmark.ThroughputMessageHandler{NumberOfMessages: numberOfMessages}
}
return &Kafka{
handler: handler,
client: client,
pub: pub,
sub: sub,
topic: topic,
}
}
开发者ID:houcy,项目名称:mq-benchmarking,代码行数:27,代码来源:kafka.go
示例14: queueInit
func queueInit() {
config := sarama.NewConfig()
config.ClientID = args.ID
// Acks
if args.Pub.Ack {
config.Producer.RequiredAcks = sarama.WaitForAll
} else {
config.Producer.RequiredAcks = sarama.WaitForLocal
}
// Compress
if args.Pub.Compress {
config.Producer.Compression = sarama.CompressionSnappy
} else {
config.Producer.Compression = sarama.CompressionNone
}
// Flush Intervals
if args.Pub.FlushFreq > 0 {
config.Producer.Flush.Frequency = time.Duration(args.Pub.FlushFreq) * time.Second
} else {
config.Producer.Flush.Frequency = 1 * time.Second
}
producer, err := sarama.NewAsyncProducer(args.Pub.URI, config)
if err != nil {
log.Fatalln("Failed to start Kafka producer:", err)
}
qProducer = producer
}
开发者ID:trustedanalytics,项目名称:gateway,代码行数:35,代码来源:publisher.go
示例15: NewKafkaOutput
// NewKafkaOutput creates instance of kafka producer client.
func NewKafkaOutput(address string, config *KafkaConfig) io.Writer {
c := sarama.NewConfig()
c.Producer.RequiredAcks = sarama.WaitForLocal
c.Producer.Compression = sarama.CompressionSnappy
c.Producer.Flush.Frequency = KafkaOutputFrequency * time.Millisecond
brokerList := strings.Split(config.host, ",")
producer, err := sarama.NewAsyncProducer(brokerList, c)
if err != nil {
log.Fatalln("Failed to start Sarama(Kafka) producer:", err)
}
o := &KafkaOutput{
config: config,
producer: producer,
}
if Settings.verbose {
// Start infinite loop for tracking errors for kafka producer.
go o.ErrorHandler()
}
return o
}
开发者ID:buger,项目名称:gor,代码行数:26,代码来源:output_kafka.go
示例16: NewPeer
// NewPeer creates and returns a new Peer for communicating with Kafka.
func NewPeer(host string) (*Peer, error) {
host = strings.Split(host, ":")[0] + ":9092"
config := sarama.NewConfig()
client, err := sarama.NewClient([]string{host}, config)
if err != nil {
return nil, err
}
producer, err := sarama.NewAsyncProducer([]string{host}, config)
if err != nil {
return nil, err
}
consumer, err := sarama.NewConsumer([]string{host}, config)
if err != nil {
return nil, err
}
partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest)
if err != nil {
return nil, err
}
return &Peer{
client: client,
producer: producer,
consumer: partitionConsumer,
send: make(chan []byte),
errors: make(chan error, 1),
done: make(chan bool),
}, nil
}
开发者ID:huikang,项目名称:Flotilla,代码行数:33,代码来源:kafka.go
示例17: newAsyncProducer
func newAsyncProducer(tlsConfig *tls.Config, brokerList []string) *sarama.AsyncProducer {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
config.Producer.Compression = sarama.CompressionSnappy // Compress messages
config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
// On the broker side, you may want to change the following settings to get
// stronger consistency guarantees:
// - For your broker, set `unclean.leader.election.enable` to false
// - For the topic, you could increase `min.insync.replicas`.
producer, err := sarama.NewAsyncProducer(brokerList, config)
if err != nil {
log.Fatalln("Failed to start Sarama producer:", err)
}
// We will just log to STDOUT if we're not able to produce messages.
// Note: messages will only be returned here after all retry attempts are exhausted.
// this goroutine will eventually exit as producer.shutdown closes the errors channel
go func() {
for err := range producer.Errors() {
log.Println("Failed to write access log entry:", err)
}
}()
return &producer
}
开发者ID:Fiery,项目名称:fsmonitor,代码行数:29,代码来源:monitor_kafka_client.go
示例18: NewPipelineKafka
func NewPipelineKafka(host, db string) *PipelineKafka {
brokerList := strings.Split(host, ",")
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
config.Producer.Compression = sarama.CompressionSnappy // Compress messages
config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
producer, _ := sarama.NewAsyncProducer(brokerList, config)
return &PipelineKafka{producer: producer, db: db}
}
开发者ID:ZuochaoLee,项目名称:visual_spider_go,代码行数:9,代码来源:pipeline_kafka.go
示例19: initAP
func (b *Broker) initAP() error {
var err error
b.ap, err = sarama.NewAsyncProducer(b.config.brokerServerList, b.brokerConfig)
if err != nil {
return err
}
b.produceChan = make(chan *sarama.ProducerMessage, 64)
return nil
}
开发者ID:dzch,项目名称:binstore,代码行数:9,代码来源:broker.go
示例20: main
func main() {
// Setup configuration
config := sarama.NewConfig()
// Return specifies what channels will be populated.
// If they are set to true, you must read from
// config.Producer.Return.Successes = true
// The total number of times to retry sending a message (default 3).
config.Producer.Retry.Max = 5
// The level of acknowledgement reliability needed from the broker.
config.Producer.RequiredAcks = sarama.WaitForAll
brokers := []string{"localhost:9092"}
producer, err := sarama.NewAsyncProducer(brokers, config)
if err != nil {
// Should not reach here
panic(err)
}
defer func() {
if err := producer.Close(); err != nil {
// Should not reach here
panic(err)
}
}()
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
var enqueued, errors int
doneCh := make(chan struct{})
go func() {
for {
time.Sleep(500 * time.Millisecond)
strTime := strconv.Itoa(int(time.Now().Unix()))
msg := &sarama.ProducerMessage{
Topic: "important",
Key: sarama.StringEncoder(strTime),
Value: sarama.StringEncoder("Something Cool"),
}
select {
case producer.Input() <- msg:
enqueued++
fmt.Println("Produce message")
case err := <-producer.Errors():
errors++
fmt.Println("Failed to produce message:", err)
case <-signals:
doneCh <- struct{}{}
}
}
}()
<-doneCh
log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)
}
开发者ID:hyndio,项目名称:go-kafka,代码行数:57,代码来源:main.go
注:本文中的github.com/Shopify/sarama.NewAsyncProducer函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论