本文整理汇总了Golang中github.com/Shopify/sarama.SyncProducer类的典型用法代码示例。如果您正苦于以下问题:Golang SyncProducer类的具体用法?Golang SyncProducer怎么用?Golang SyncProducer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了SyncProducer类的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Golang代码示例。
示例1: Serve
func Serve(producer sarama.SyncProducer, topic string) {
for {
fmt.Print("x y: ")
var x, y int
fmt.Scanf("%d %d", &x, &y)
m := Multiply{
X: x,
Y: y,
}
jsonMsg, err := json.Marshal(m)
if err != nil {
log.Fatalln(err)
}
msg := sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(jsonMsg),
}
partition, offset, err := producer.SendMessage(&msg)
if err != nil {
log.Fatal(err)
} else {
fmt.Println("Sent msg to partition:", partition, ", offset:", offset)
}
}
}
开发者ID:nickmarrone,项目名称:go_kafka_test,代码行数:29,代码来源:producer.go
示例2: run
func (z *zipkin) run(ch chan bool, p sarama.SyncProducer) {
t := time.NewTicker(z.opts.BatchInterval)
var buf []*trace.Span
for {
select {
case s := <-z.spans:
buf = append(buf, s)
if len(buf) >= z.opts.BatchSize {
go z.send(buf, p)
buf = nil
}
case <-t.C:
// flush
if len(buf) > 0 {
go z.send(buf, p)
buf = nil
}
case <-ch:
// exit
t.Stop()
p.Close()
return
}
}
}
开发者ID:jelmersnoeck,项目名称:go-platform,代码行数:27,代码来源:zipkin.go
示例3: produce
func produce(producer sarama.SyncProducer, c chan int, n int, s int, topic string, logger *log.Logger) {
msg := &sarama.ProducerMessage{Topic: topic, Value: sarama.StringEncoder(randomString(s))}
i := 0
for ; i < n; i++ {
_, _, err := producer.SendMessage(msg)
if err != nil {
logger.Printf("FAILED to send message: %s\n", err)
}
}
c <- i
}
开发者ID:C-Pro,项目名称:go-kafka-test,代码行数:11,代码来源:producer.go
示例4: pub
func (z *zipkin) pub(s *zipkincore.Span, pr sarama.SyncProducer) {
t := thrift.NewTMemoryBufferLen(1024)
p := thrift.NewTBinaryProtocolTransport(t)
if err := s.Write(p); err != nil {
return
}
m := &sarama.ProducerMessage{
Topic: z.opts.Topic,
Value: sarama.ByteEncoder(t.Buffer.Bytes()),
}
pr.SendMessage(m)
}
开发者ID:jelmersnoeck,项目名称:go-platform,代码行数:13,代码来源:zipkin.go
示例5: produce
func (k *KafkaProducer) produce(producer sarama.SyncProducer) {
for {
metric := <-k.metricsChannel
json, err := json.MarshalIndent(metric, "", " ")
if err != nil {
return
}
msg := &sarama.ProducerMessage{Topic: "loadbalancer.all", Value: sarama.StringEncoder(json)}
_, _, err = producer.SendMessage(msg)
if err != nil {
k.Log.Error("error sending to Kafka ")
}
}
}
开发者ID:hlm,项目名称:vamp-router,代码行数:14,代码来源:kafka_producer.go
示例6: sendMessage
// Publish message
func sendMessage(producer sarama.SyncProducer, topic string, payload string) {
message := sarama.ProducerMessage{
// The Kafka topic for this message
Topic: topic,
// The actual message to store in Kafka
Value: sarama.StringEncoder(payload),
// No message key, so messages will be distributed randomly over partitions
}
// Send Message
partition, offset, err := producer.SendMessage(&message)
if err != nil {
logger.Printf("Error sending data: %s\n", err)
} else {
logger.Printf("[%s/%d/%d] Message successfully published\n", topic, partition, offset)
}
}
开发者ID:pmdcosta,项目名称:apache-kafka,代码行数:19,代码来源:producer.go
示例7: processBatch
// Note that unfortunately this does not actually produce in batches yet. We
// should theoretically be able to with Kafka, but the sarama interface for a
// `SyncProducer` currently seems overly limited.
func processBatch(producer sarama.SyncProducer, topic string, events []*stripe.Event) error {
for _, event := range events {
data, err := json.Marshal(event)
if err != nil {
return err
}
id := event.Data.Obj["id"]
if id == nil {
log.Printf("Found event with nil data ID, type is %v", event.Type)
}
// TODO: Verify that Kafka does indeed perform log compaction per
// partition key (as opposed to some other type of "key"). The docs
// aren't exactly clear on this point.
key := ""
if id != nil {
key = id.(string)
}
message := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(key),
Value: sarama.ByteEncoder(data),
}
//start := time.Now()
//partition, offset, err := producer.SendMessage(message)
_, _, err = producer.SendMessage(message)
if err != nil {
return err
} else {
//log.Printf("> Message sent to partition %d at offset %d in %v\n",
//partition, offset, time.Now().Sub(start))
}
}
return nil
}
开发者ID:brandur,项目名称:stripe-warehouse,代码行数:42,代码来源:main.go
示例8: main
func main() {
var cmd *exec.Cmd
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
for {
select {
case sig := <-sigs:
cmd.Process.Signal(sig)
}
}
}()
var cfg config
if err := gofigure.Gofigure(&cfg); err != nil {
fmt.Fprintln(os.Stderr, "unexpected error configuring ktee")
os.Exit(1)
}
var err error
var producer sarama.SyncProducer
if len(cfg.Brokers) > 0 {
brokers := strings.Split(cfg.Brokers, ",")
producer, err = sarama.NewSyncProducer(brokers, sarama.NewConfig())
if err != nil {
fmt.Fprintf(os.Stderr, "error connecting to Kafka brokers: %s\n", err)
os.Exit(1)
}
defer func() {
producer.Close()
}()
}
args := os.Args[1:]
if len(args) == 0 {
fmt.Fprintln(os.Stderr, "usage: ktee args")
os.Exit(1)
}
kwOut := kafkaWriter{producer, os.Stdout, cfg.OutTopic, new(bytes.Buffer), make(chan sarama.ProducerMessage)}
kwErr := kafkaWriter{producer, os.Stderr, cfg.ErrTopic, new(bytes.Buffer), make(chan sarama.ProducerMessage)}
defer func() {
kwOut.Flush()
kwErr.Flush()
}()
cmd = exec.Command(args[0], args[1:]...)
cmd.Stdin = os.Stdin
cmd.Stdout = kwOut
cmd.Stderr = kwErr
cmd.Env = os.Environ()
err = cmd.Run()
if err != nil {
switch err.(type) {
case *exec.ExitError:
fmt.Fprintf(os.Stderr, "non-zero exit code: %s\n", err)
if status, ok := err.(*exec.ExitError).Sys().(syscall.WaitStatus); ok {
os.Exit(status.ExitStatus())
}
os.Exit(1)
default:
fmt.Fprintf(os.Stderr, "error executing command: %s\n", err)
os.Exit(1)
}
}
}
开发者ID:ian-kent,项目名称:ktee,代码行数:71,代码来源:main.go
示例9: PublishSync
func PublishSync(input chan *FileEvent, source string, isRetryer bool) {
log.Debug("publishSync loop")
clientConfig := sarama.NewConfig()
clientConfig.Producer.RequiredAcks = sarama.WaitForAll
clientConfig.Producer.Compression = sarama.CompressionSnappy
clientConfig.Producer.Partitioner = sarama.NewRoundRobinPartitioner
clientConfig.Producer.Retry.Max = 10
topic := kafkaTopic
key := hashKey
if isRetryer {
topic = retryTopic
}
//brokerList := []string{"127.0.0.1:9092"}
var producer sarama.SyncProducer
var err error
for {
producer, err = sarama.NewSyncProducer(brokerList, clientConfig)
if err != nil {
log.Error("Sync: Failed to start Sarama producer: ", err)
log.Info("waiting...")
time.Sleep(1 * time.Second)
} else {
break
}
}
defer func() {
if err := producer.Close(); err != nil {
log.Error("Failed to shutdown producer cleanly", err)
}
}()
// if retryer, use retryer backup, others use Registrar
var recorder Recorder
if isRetryer {
// set to global retryer
retryRecorder := &RetryRecorder{file: mainRetryer.vernier}
recorder = retryRecorder
defer retryRecorder.file.Close()
} else {
registrar := &Registrar{source: source, dir: REGISTRAR_DIR}
if _, err := registrar.OpenRecord(registrar.dir); err != nil {
log.Error("PublishSync open record failed, error:", err)
os.Exit(2)
}
recorder = registrar
}
genMessage := func(rawMessage string) string {
return rawMessage
}
// retryer message sample: 0 this is a sample message
// 0 means, haven't retried succeed
// 1 means have been sended
if isRetryer {
genMessage = func(rawMessage string) string {
// 0|1 raw_name_of_log_file log_msg
rawMessage = rawMessage[2:]
idx := strings.Index(rawMessage, " ")
return rawMessage[idx+1:]
}
}
for event := range input {
log.Debugf("%v, %v, %v, %v\n", *event.Source, *event.Text, event.Line, event.Offset)
// if failed, retry send messge until succeed
key = strconv.FormatInt(event.Offset, 10)
rawMessage := *event.Text
if isRetryer {
if retryTopic != kafkaTopic {
topic = retryTopic
} else {
baseName := getSourceName(rawMessage)
if len(topicmap) > 0 {
tmpTopic := genTopic(baseName, topicmap)
if tmpTopic != "" {
topic = tmpTopic
}
}
}
}
message := genMessage(*event.Text)
if rawMessage[0] == '1' {
log.Debug("message[%s] have been seeded", rawMessage)
continue
}
for {
partition, offset, err := producer.SendMessage(&sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(key),
Value: sarama.StringEncoder(message),
})
if err != nil {
log.Errorf("Failed: %s, %d, %d\n", *event.Source, event.Line, event.Offset)
time.Sleep(3 * time.Second)
} else {
log.Debugf("OK: %d, %d, %s\n", partition, offset, *event.Source)
recorder.RecordSucceed(event.Offset, event.RawBytes)
//.........这里部分代码省略.........
开发者ID:chenhuaying,项目名称:logflume,代码行数:101,代码来源:publisher.go
示例10: startIndexDumper
func startIndexDumper(producer sarama.SyncProducer, wg *sync.WaitGroup) chan<- *indexDumperMessage {
ch := make(chan *indexDumperMessage, 300) // 5 min
wg.Add(1)
go func() {
defer wg.Done()
buf8 := make([]byte, 8)
builder := flatbuffers.NewBuilder(1024 * 1024)
for {
msg := <-ch
if msg == nil {
log.Println("exiting index dumper")
return
}
start := time.Now()
t, index := msg.t, msg.index
// log.Printf("index dumper got index for %d", t.Unix())
var tags []string
for tag := range index {
tags = append(tags, tag)
}
sort.Strings(tags)
builder.Reset()
var fbtags []flatbuffers.UOffsetT
for _, tag := range tags {
name := builder.CreateString(tag)
cnt := 0
builder.StartVector(flatbuffers.SizeInt64, 0, 0)
for partition, d := range index[tag] {
for di := d.head; di != nil; di = di.next {
for i := 0; i < di.cnt; i++ {
builder.PrependInt64(encodePartitionAndOffset(partition, di.vals[i]))
cnt++
}
}
}
offsetsVector := builder.EndVector(cnt)
fb.TagStart(builder)
fb.TagAddName(builder, name)
fb.TagAddOffsets(builder, offsetsVector)
fbtags = append(fbtags, fb.TagEnd(builder))
}
fb.IndexStartTagsVector(builder, len(fbtags))
for _, offset := range fbtags {
builder.PrependUOffsetT(offset)
}
tagsVector := builder.EndVector(len(fbtags))
fb.IndexStart(builder)
fb.IndexAddTags(builder, tagsVector)
builder.Finish(fb.IndexEnd(builder))
encoded := builder.FinishedBytes()
binary.LittleEndian.PutUint64(buf8, uint64(t.Unix()))
statIncrementTook(&stat.idxSerializeTook, start)
statIncrementSize(&stat.idxSendToKafkaSize, len(encoded))
start = time.Now()
_, _, err := producer.SendMessage(&sarama.ProducerMessage{
Topic: indexTopic,
Key: sarama.ByteEncoder(buf8),
Value: sarama.ByteEncoder(encoded),
})
statIncrementTook(&stat.idxSendToKafkaTook, start)
if err != nil {
log.Printf("failed to store message: %v", err)
}
// log.Printf("finished serializing index for %d, %d msgs, %d tags, %d offsets",
// t.Unix(), msg.msgs, len(tags), msg.offsets)
}
}()
return ch
}
开发者ID:jackdoe,项目名称:no,代码行数:89,代码来源:indexer.go
示例11: closeProducer
// Close Producer
func closeProducer(producer sarama.SyncProducer) {
err := producer.Close()
if err != nil {
logger.Printf("Error closing Producer: %s\n", err)
}
}
开发者ID:pmdcosta,项目名称:apache-kafka,代码行数:7,代码来源:producer.go
注:本文中的github.com/Shopify/sarama.SyncProducer类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论