本文整理汇总了Golang中github.com/Shopify/sarama.NewConfig函数的典型用法代码示例。如果您正苦于以下问题:Golang NewConfig函数的具体用法?Golang NewConfig怎么用?Golang NewConfig使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了NewConfig函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Golang代码示例。
示例1: newConsumer
func newConsumer() (masterConsumer kafka.Consumer, consumers []kafka.PartitionConsumer) {
config := kafka.NewConfig()
config.Net.KeepAlive = 30 * time.Second
config.Consumer.Retry.Backoff = 25 * time.Millisecond
consumers = make([]kafka.PartitionConsumer, 0)
retry(func() (err error) {
var consumer kafka.PartitionConsumer
var partitions []int32
masterConsumer, err = kafka.NewConsumer(kafkas, config)
if err != nil {
return
}
partitions, err = masterConsumer.Partitions(topic)
if err != nil {
return
}
for _, partition := range partitions {
consumer, err = masterConsumer.ConsumePartition(topic, partition, kafka.OffsetNewest)
if err != nil {
return
}
consumers = append(consumers, consumer)
}
return
})
return
}
开发者ID:nanoservice,项目名称:core-fanout,代码行数:34,代码来源:main.go
示例2: TestProducerReturnsExpectationsToChannels
func TestProducerReturnsExpectationsToChannels(t *testing.T) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
mp := NewAsyncProducer(t, config)
mp.ExpectInputAndSucceed()
mp.ExpectInputAndSucceed()
mp.ExpectInputAndFail(sarama.ErrOutOfBrokers)
mp.Input() <- &sarama.ProducerMessage{Topic: "test 1"}
mp.Input() <- &sarama.ProducerMessage{Topic: "test 2"}
mp.Input() <- &sarama.ProducerMessage{Topic: "test 3"}
msg1 := <-mp.Successes()
msg2 := <-mp.Successes()
err1 := <-mp.Errors()
if msg1.Topic != "test 1" {
t.Error("Expected message 1 to be returned first")
}
if msg2.Topic != "test 2" {
t.Error("Expected message 2 to be returned second")
}
if err1.Msg.Topic != "test 3" || err1.Err != sarama.ErrOutOfBrokers {
t.Error("Expected message 3 to be returned as error")
}
if err := mp.Close(); err != nil {
t.Error(err)
}
}
开发者ID:qband,项目名称:down,代码行数:33,代码来源:async_producer_test.go
示例3: pubKafkaLoop
func pubKafkaLoop(seq int) {
cf := sarama.NewConfig()
cf.Producer.RequiredAcks = sarama.WaitForLocal
cf.Producer.Partitioner = sarama.NewHashPartitioner
cf.Producer.Timeout = time.Second
//cf.Producer.Compression = sarama.CompressionSnappy
cf.Producer.Retry.Max = 3
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, cf)
if err != nil {
stress.IncCounter("fail", 1)
log.Println(err)
return
}
defer producer.Close()
msg := strings.Repeat("X", sz)
for i := 0; i < loops; i++ {
_, _, err := producer.SendMessage(&sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(msg),
})
if err == nil {
stress.IncCounter("ok", 1)
} else {
stress.IncCounter("fail", 1)
}
}
}
开发者ID:chendx79,项目名称:gafka,代码行数:29,代码来源:bench.go
示例4: consumeCluster
func (this *Peek) consumeCluster(zkcluster *zk.ZkCluster, topicPattern string,
partitionId int, msgChan chan *sarama.ConsumerMessage) {
brokerList := zkcluster.BrokerList()
if len(brokerList) == 0 {
return
}
kfk, err := sarama.NewClient(brokerList, sarama.NewConfig())
if err != nil {
this.Ui.Output(err.Error())
return
}
//defer kfk.Close() // FIXME how to close it
topics, err := kfk.Topics()
if err != nil {
this.Ui.Output(err.Error())
return
}
for _, t := range topics {
if patternMatched(t, topicPattern) {
go this.simpleConsumeTopic(zkcluster, kfk, t, int32(partitionId), msgChan)
}
}
}
开发者ID:funkygao,项目名称:gafka,代码行数:26,代码来源:peek.go
示例5: NewClient
// NewClient returns a Kafka client
func NewClient(addresses []string) (sarama.Client, error) {
config := sarama.NewConfig()
hostname, err := os.Hostname()
if err != nil {
hostname = ""
}
config.ClientID = hostname
config.Producer.Compression = sarama.CompressionSnappy
config.Producer.Return.Successes = true
var client sarama.Client
retries := outOfBrokersRetries + 1
for retries > 0 {
client, err = sarama.NewClient(addresses, config)
retries--
if err == sarama.ErrOutOfBrokers {
glog.Errorf("Can't connect to the Kafka cluster at %s (%d retries left): %s",
addresses, retries, err)
time.Sleep(outOfBrokersBackoff)
} else {
break
}
}
return client, err
}
开发者ID:aristanetworks,项目名称:goarista,代码行数:26,代码来源:client.go
示例6: makePub
func (this *Mirror) makePub(c2 *zk.ZkCluster) (sarama.AsyncProducer, error) {
cf := sarama.NewConfig()
cf.Metadata.RefreshFrequency = time.Minute * 10
cf.Metadata.Retry.Max = 3
cf.Metadata.Retry.Backoff = time.Second * 3
cf.ChannelBufferSize = 1000
cf.Producer.Return.Errors = true
cf.Producer.Flush.Messages = 2000 // 2000 message in batch
cf.Producer.Flush.Frequency = time.Second // flush interval
cf.Producer.Flush.MaxMessages = 0 // unlimited
cf.Producer.RequiredAcks = sarama.WaitForLocal
cf.Producer.Retry.Backoff = time.Second * 4
cf.Producer.Retry.Max = 3
cf.Net.DialTimeout = time.Second * 30
cf.Net.WriteTimeout = time.Second * 30
cf.Net.ReadTimeout = time.Second * 30
switch this.Compress {
case "gzip":
cf.Producer.Compression = sarama.CompressionGZIP
case "snappy":
cf.Producer.Compression = sarama.CompressionSnappy
}
return sarama.NewAsyncProducer(c2.BrokerList(), cf)
}
开发者ID:funkygao,项目名称:gafka,代码行数:28,代码来源:factory.go
示例7: main
func main() {
config := sarama.NewConfig()
config.Producer.Compression = sarama.CompressionSnappy
flag.StringVar(&kafkaBrokers, "brokers", "localhost:9092", "The kafka broker addresses")
flag.Parse()
brokers := []string{}
for _, broker := range strings.Split(kafkaBrokers, ",") {
brokers = append(brokers, broker)
}
producer, err := sarama.NewAsyncProducer(brokers, config)
if err == nil {
fmt.Println("Connected to Kafka brokers", "["+kafkaBrokers+"]")
ifaces, err := net.Interfaces()
if err != nil {
log.Fatal("Cannot get network interfaces")
}
for _, iface := range ifaces {
addrs, _ := iface.Addrs()
if iface.Name != "lo" && len(addrs) > 0 {
fmt.Printf("Starting live capture on %s interface...", iface.Name)
decodePackets(iface.Name, producer)
}
}
} else {
log.Fatal("Can't create the Kafka producer")
}
}
开发者ID:bhnedo,项目名称:rabbit-packetstorm,代码行数:33,代码来源:rabbitgopcap.go
示例8: kafkaClient
// kafkaClient initializes a connection to a Kafka cluster and
// initializes one or more clientProducer() (producer instances).
func kafkaClient(n int) {
switch noop {
// If not noop, actually fire up Kafka connections and send messages.
case false:
cId := "client_" + strconv.Itoa(n)
conf := kafka.NewConfig()
if compression != kafka.CompressionNone {
conf.Producer.Compression = compression
}
conf.Producer.Flush.MaxMessages = batchSize
client, err := kafka.NewClient(brokers, conf)
if err != nil {
log.Println(err)
os.Exit(1)
} else {
log.Printf("%s connected\n", cId)
}
for i := 0; i < producers; i++ {
go clientProducer(client)
}
// If noop, we're not creating connections at all.
// Just generate messages and burn CPU.
default:
for i := 0; i < producers; i++ {
go clientDummyProducer()
}
}
<-killClients
}
开发者ID:prezi,项目名称:sangrenel,代码行数:33,代码来源:sangrenel.go
示例9: main
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 5
// brokers := []string{"192.168.59.103:9092"}
brokers := []string{"localhost:9092"}
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
// Should not reach here
panic(err)
}
defer func() {
if err := producer.Close(); err != nil {
// Should not reach here
panic(err)
}
}()
topic := "important"
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder("Something Cool"),
}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
panic(err)
}
fmt.Printf("Message is stored in topic(%s)/partition(%d)/offset(%d)\n", topic, partition, offset)
}
开发者ID:hyndio,项目名称:go-kafka,代码行数:34,代码来源:main.go
示例10: NewKafka
func NewKafka(numberOfMessages int, testLatency bool) *Kafka {
config := sarama.NewConfig()
client, _ := sarama.NewClient([]string{"localhost:9092"}, config)
topic := "test"
pub, _ := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)
consumer, _ := sarama.NewConsumerFromClient(client)
sub, _ := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest)
var handler benchmark.MessageHandler
if testLatency {
handler = &benchmark.LatencyMessageHandler{
NumberOfMessages: numberOfMessages,
Latencies: []float32{},
}
} else {
handler = &benchmark.ThroughputMessageHandler{NumberOfMessages: numberOfMessages}
}
return &Kafka{
handler: handler,
client: client,
pub: pub,
sub: sub,
topic: topic,
}
}
开发者ID:houcy,项目名称:mq-benchmarking,代码行数:27,代码来源:kafka.go
示例11: NewKafkaProducer
func NewKafkaProducer() (*IndeedKafkaProducer, error) {
config := sarama.NewConfig()
config.ClientID = ipresolver.GetLocalAddr()
config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Compression = sarama.CompressionNone
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Producer.Partitioner = sarama.NewHashPartitioner
asyncProducer, err := sarama.NewAsyncProducer(eatonconfig.KafkaServers, config)
if err != nil {
return nil, err
}
go func() {
for msg := range asyncProducer.Successes() {
eatonevents.Info(fmt.Sprintf("Successfully sent message to topic %s with key %s", msg.Topic, msg.Key))
}
}()
go func() {
for err := range asyncProducer.Errors() {
eatonevents.Error("Failed to send message due to error: ", err)
}
}()
return &IndeedKafkaProducer{
producer: asyncProducer,
}, nil
}
开发者ID:ECLabs,项目名称:Eaton-Feeder,代码行数:26,代码来源:kafka_producer.go
示例12: queueInit
func queueInit() {
config := sarama.NewConfig()
config.ClientID = args.ID
// Acks
if args.Pub.Ack {
config.Producer.RequiredAcks = sarama.WaitForAll
} else {
config.Producer.RequiredAcks = sarama.WaitForLocal
}
// Compress
if args.Pub.Compress {
config.Producer.Compression = sarama.CompressionSnappy
} else {
config.Producer.Compression = sarama.CompressionNone
}
// Flush Intervals
if args.Pub.FlushFreq > 0 {
config.Producer.Flush.Frequency = time.Duration(args.Pub.FlushFreq) * time.Second
} else {
config.Producer.Flush.Frequency = 1 * time.Second
}
producer, err := sarama.NewAsyncProducer(args.Pub.URI, config)
if err != nil {
log.Fatalln("Failed to start Kafka producer:", err)
}
qProducer = producer
}
开发者ID:trustedanalytics,项目名称:gateway,代码行数:35,代码来源:publisher.go
示例13: main
func main() {
flag.Parse()
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
consumer, err := sarama.NewConsumer(brokers, config)
if err != nil {
log.Fatalln(err)
}
defer func() {
if err := consumer.Close(); err != nil {
panic(err)
}
}()
var pf ProcessFunc
switch {
case "+" == op:
pf = processAdd
case "-" == op:
pf = processSub
case "*" == op:
pf = processMul
case "/" == op:
pf = processDiv
}
// Set up one partition_consumer for each partition
partitions, err := consumer.Partitions(topic)
if err != nil {
log.Fatalln(err)
}
partition_consumers := make([]sarama.PartitionConsumer, len(partitions))
for idx, partition := range partitions {
pc, err := consumer.ConsumePartition(topic, partition, sarama.OffsetNewest)
if err != nil {
log.Fatalln(err)
}
partition_consumers[idx] = pc
go func(pc sarama.PartitionConsumer) {
Serve(pc.Messages(), pf)
}(pc)
go func(pc sarama.PartitionConsumer) {
for err := range pc.Errors() {
log.Println(err)
}
}(pc)
}
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
<-signals
for _, pc := range partition_consumers {
fmt.Println("Closing partition, next offset", pc.HighWaterMarkOffset())
pc.AsyncClose()
}
}
开发者ID:nickmarrone,项目名称:go_kafka_test,代码行数:60,代码来源:consumer.go
示例14: main
func main() {
config := sarama.NewConfig()
// Handle errors manually
config.Consumer.Return.Errors = true
consumer, err := sarama.NewConsumer([]string{kafkaAddr}, config)
if err != nil {
panic(err)
}
defer consumer.Close()
logConsumer, err := consumer.ConsumePartition("buy", 0, sarama.OffsetNewest)
if err != nil {
panic(err)
}
defer logConsumer.Close()
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
for {
select {
case err := <-logConsumer.Errors():
log.Println(err)
case msg := <-logConsumer.Messages():
order := &Order{}
json.Unmarshal(msg.Value, order)
log.Printf("notification to %s with order %s", order.UserID, order.OrderID)
case <-signals:
return
}
}
}
开发者ID:thanhliem89dn,项目名称:microservices-book-code,代码行数:33,代码来源:notification.go
示例15: NewKafkaSubscriber
// NewKafkaSubscriber will initiate a the experimental Kafka consumer.
func NewKafkaSubscriber(cfg *config.Kafka, offsetProvider func() int64, offsetBroadcast func(int64)) (*KafkaSubscriber, error) {
var (
err error
)
s := &KafkaSubscriber{
offset: offsetProvider,
broadcastOffset: offsetBroadcast,
partition: cfg.Partition,
stop: make(chan chan error, 1),
}
if len(cfg.BrokerHosts) == 0 {
return s, errors.New("at least 1 broker host is required")
}
if len(cfg.Topic) == 0 {
return s, errors.New("topic name is required")
}
s.topic = cfg.Topic
sconfig := sarama.NewConfig()
sconfig.Consumer.Return.Errors = true
s.cnsmr, err = sarama.NewConsumer(cfg.BrokerHosts, sconfig)
return s, err
}
开发者ID:strogo,项目名称:gizmo,代码行数:26,代码来源:kafka.go
示例16: newDataCollector
func newDataCollector(brokerList []string) sarama.SyncProducer {
// For the data collector, we are looking for strong consistency semantics.
// Because we don't change the flush settings, sarama will try to produce messages
// as fast as possible to keep latency low.
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
tlsConfig := createTlsConfiguration()
if tlsConfig != nil {
config.Net.TLS.Config = tlsConfig
config.Net.TLS.Enable = true
}
// On the broker side, you may want to change the following settings to get
// stronger consistency guarantees:
// - For your broker, set `unclean.leader.election.enable` to false
// - For the topic, you could increase `min.insync.replicas`.
producer, err := sarama.NewSyncProducer(brokerList, config)
if err != nil {
log.Fatalln("Failed to start Sarama producer:", err)
}
return producer
}
开发者ID:ChongFeng,项目名称:beats,代码行数:26,代码来源:http_server.go
示例17: newAsyncProducer
func newAsyncProducer(tlsConfig *tls.Config, brokerList []string) *sarama.AsyncProducer {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
config.Producer.Compression = sarama.CompressionSnappy // Compress messages
config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
// On the broker side, you may want to change the following settings to get
// stronger consistency guarantees:
// - For your broker, set `unclean.leader.election.enable` to false
// - For the topic, you could increase `min.insync.replicas`.
producer, err := sarama.NewAsyncProducer(brokerList, config)
if err != nil {
log.Fatalln("Failed to start Sarama producer:", err)
}
// We will just log to STDOUT if we're not able to produce messages.
// Note: messages will only be returned here after all retry attempts are exhausted.
// this goroutine will eventually exit as producer.shutdown closes the errors channel
go func() {
for err := range producer.Errors() {
log.Println("Failed to write access log entry:", err)
}
}()
return &producer
}
开发者ID:Fiery,项目名称:fsmonitor,代码行数:29,代码来源:monitor_kafka_client.go
示例18: newAccessLogProducer
func newAccessLogProducer(brokerList []string) sarama.AsyncProducer {
// For the access log, we are looking for AP semantics, with high throughput.
// By creating batches of compressed messages, we reduce network I/O at a cost of more latency.
config := sarama.NewConfig()
tlsConfig := createTlsConfiguration()
if tlsConfig != nil {
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
}
config.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
config.Producer.Compression = sarama.CompressionSnappy // Compress messages
config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
producer, err := sarama.NewAsyncProducer(brokerList, config)
if err != nil {
log.Fatalln("Failed to start Sarama producer:", err)
}
// We will just log to STDOUT if we're not able to produce messages.
// Note: messages will only be returned here after all retry attempts are exhausted.
go func() {
for err := range producer.Errors() {
log.Println("Failed to write access log entry:", err)
}
}()
return producer
}
开发者ID:ChongFeng,项目名称:beats,代码行数:29,代码来源:http_server.go
示例19: handler
func handler(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
var request Request
err := decoder.Decode(&request)
if err != nil {
log.Print("Could not decode request")
http.Error(w, err.Error(), 500)
return
}
log.Print("Received request for kind: ", request.Kind)
config := sarama.NewConfig()
producer, err := sarama.NewAsyncProducer(KafkaAddresses, config)
if err != nil {
log.Print("Could not connect to Kafka: ", err)
http.Error(w, err.Error(), 500)
return
}
log.Print("Connected to Kafka")
message := sarama.ProducerMessage{
Topic: request.Kind,
Value: MapEncoder(request.Data),
}
producer.Input() <- &message
log.Print("Message sent")
fmt.Fprintf(w, "OK")
}
开发者ID:JosephSalisbury,项目名称:event-sourcing-poc,代码行数:34,代码来源:main.go
示例20: Connect
func (k *Kafka) Connect() error {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.RequiredAcks(k.RequiredAcks)
config.Producer.Compression = sarama.CompressionCodec(k.CompressionCodec)
config.Producer.Retry.Max = k.MaxRetry
// Legacy support ssl config
if k.Certificate != "" {
k.SSLCert = k.Certificate
k.SSLCA = k.CA
k.SSLKey = k.Key
}
tlsConfig, err := internal.GetTLSConfig(
k.SSLCert, k.SSLKey, k.SSLCA, k.InsecureSkipVerify)
if err != nil {
return err
}
if tlsConfig != nil {
config.Net.TLS.Config = tlsConfig
config.Net.TLS.Enable = true
}
producer, err := sarama.NewSyncProducer(k.Brokers, config)
if err != nil {
return err
}
k.producer = producer
return nil
}
开发者ID:jeichorn,项目名称:telegraf,代码行数:32,代码来源:kafka.go
注:本文中的github.com/Shopify/sarama.NewConfig函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论