本文整理汇总了Golang中github.com/Shopify/sarama.Consumer类的典型用法代码示例。如果您正苦于以下问题:Golang Consumer类的具体用法?Golang Consumer怎么用?Golang Consumer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Consumer类的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Golang代码示例。
示例1: consumePartition
func (this *Peek) consumePartition(zkcluster *zk.ZkCluster, kfk sarama.Client, consumer sarama.Consumer,
topic string, partitionId int32, msgCh chan *sarama.ConsumerMessage, offset int64) {
p, err := consumer.ConsumePartition(topic, partitionId, offset)
if err != nil {
this.Ui.Error(fmt.Sprintf("%s %s/%d: offset=%d %v", zkcluster.Name(), topic, partitionId, offset, err))
return
}
defer p.Close()
n := int64(0)
for {
select {
case <-this.quit:
return
case msg := <-p.Messages():
msgCh <- msg
n++
if this.lastN > 0 && n >= this.lastN {
return
}
}
}
}
开发者ID:funkygao,项目名称:gafka,代码行数:25,代码来源:peek.go
示例2: consumePartition
func (this *WatchAppError) consumePartition(zkcluster *zk.ZkCluster, consumer sarama.Consumer,
topic string, partitionId int32, offset int64, msgCh chan<- *sarama.ConsumerMessage, wg *sync.WaitGroup) {
defer wg.Done()
p, err := consumer.ConsumePartition(topic, partitionId, offset)
if err != nil {
log.Error("%s %s/%d: offset=%d %v", zkcluster.Name(), topic, partitionId, offset, err)
return
}
defer p.Close()
for {
select {
case <-this.Stop:
return
case err := <-p.Errors():
log.Critical("cluster[%s] %s/%d: %s", zkcluster.Name(), topic, partitionId, err)
return
case msg := <-p.Messages():
if msg != nil && this.predicate(msg.Value) {
msgCh <- msg
}
}
}
}
开发者ID:funkygao,项目名称:gafka,代码行数:28,代码来源:apperr.go
示例3: Test01
func (suite *KafkaTester) Test01() {
t := suite.T()
assert := assert.New(t)
const M1 = "message one"
const M2 = "message two"
var producer sarama.AsyncProducer
var consumer sarama.Consumer
var partitionConsumer sarama.PartitionConsumer
var err error
topic := makeTopicName()
{
config := sarama.NewConfig()
config.Producer.Return.Successes = false
config.Producer.Return.Errors = false
producer, err = sarama.NewAsyncProducer([]string{suite.server}, config)
assert.NoError(err)
defer close(t, producer)
producer.Input() <- &sarama.ProducerMessage{
Topic: topic,
Key: nil,
Value: sarama.StringEncoder(M1)}
producer.Input() <- &sarama.ProducerMessage{
Topic: topic,
Key: nil,
Value: sarama.StringEncoder(M2)}
}
{
consumer, err = sarama.NewConsumer([]string{suite.server}, nil)
assert.NoError(err)
defer close(t, consumer)
partitionConsumer, err = consumer.ConsumePartition(topic, 0, 0)
assert.NoError(err)
defer close(t, partitionConsumer)
}
{
mssg1 := <-partitionConsumer.Messages()
//t.Logf("Consumed: offset:%d value:%v", mssg1.Offset, string(mssg1.Value))
mssg2 := <-partitionConsumer.Messages()
//t.Logf("Consumed: offset:%d value:%v", mssg2.Offset, string(mssg2.Value))
assert.EqualValues(M1, string(mssg1.Value))
assert.EqualValues(M2, string(mssg2.Value))
}
}
开发者ID:venicegeo,项目名称:pz-logger,代码行数:55,代码来源:sys_test.go
示例4: getPartitions
func getPartitions(c sarama.Consumer) ([]int32, error) {
if *partitions == "all" {
return c.Partitions(*topic)
}
tmp := strings.Split(*partitions, ",")
var pList []int32
for i := range tmp {
val, err := strconv.ParseInt(tmp[i], 10, 32)
if err != nil {
return nil, err
}
pList = append(pList, int32(val))
}
return pList, nil
}
开发者ID:sunyuantao,项目名称:go-lang,代码行数:17,代码来源:manageOffset.go
示例5: newPartitionConsumer
func newPartitionConsumer(manager sarama.Consumer, topic string, partition int32, info offsetInfo, defaultOffset int64) (*partitionConsumer, error) {
pcm, err := manager.ConsumePartition(topic, partition, info.NextOffset(defaultOffset))
// Resume from default offset, if requested offset is out-of-range
if err == sarama.ErrOffsetOutOfRange {
info.Offset = -1
pcm, err = manager.ConsumePartition(topic, partition, defaultOffset)
}
if err != nil {
return nil, err
}
return &partitionConsumer{
pcm: pcm,
state: partitionState{Info: info},
dying: make(chan none),
dead: make(chan none),
}, nil
}
开发者ID:wannes-ds,项目名称:sarama-cluster,代码行数:20,代码来源:partitions.go
示例6: expectationConsumer
func expectationConsumer(c sarama.Consumer, expectations <-chan *sarama.ProducerMessage, wg *sync.WaitGroup) {
defer wg.Done()
defer func() {
if err := c.Close(); err != nil {
logger.Println("Failed to close consumer:", err)
}
}()
var (
partitionVerifiers = make(map[int32]*partitionVerifier)
consumerWg sync.WaitGroup
)
for expectation := range expectations {
partition := expectation.Partition
if partitionVerifiers[partition] == nil {
logger.Printf("Starting message verifier for partition %d...\n", partition)
pc, err := c.ConsumePartition(*topic, partition, expectation.Offset)
if err != nil {
logger.Fatalf("Failed to open partition consumer for %s/%d: %s", *topic, expectation.Partition, err)
}
partitionExpectations := make(chan *sarama.ProducerMessage)
partitionVerifiers[partition] = &partitionVerifier{pc: pc, expectations: partitionExpectations}
consumerWg.Add(1)
go partitionExpectationConsumer(pc, partitionExpectations, &consumerWg)
}
partitionVerifiers[partition].expectations <- expectation
}
for _, pv := range partitionVerifiers {
close(pv.expectations)
}
consumerWg.Wait()
}
开发者ID:bcwalrus,项目名称:kafka,代码行数:39,代码来源:main.go
示例7: GetKafkaPartitions
// GetKafkaPartitions is a helper function to look up which partitions are available
// via the given brokers for the given topic. This should be called only on startup.
func GetKafkaPartitions(brokerHosts []string, topic string) (partitions []int32, err error) {
if len(brokerHosts) == 0 {
return partitions, errors.New("at least 1 broker host is required")
}
if len(topic) == 0 {
return partitions, errors.New("topic name is required")
}
var cnsmr sarama.Consumer
cnsmr, err = sarama.NewConsumer(brokerHosts, sarama.NewConfig())
if err != nil {
return partitions, err
}
defer func() {
if cerr := cnsmr.Close(); cerr != nil && err == nil {
err = cerr
}
}()
return cnsmr.Partitions(topic)
}
开发者ID:strogo,项目名称:gizmo,代码行数:24,代码来源:kafka.go
示例8: findPartitions
func findPartitions(consumer sarama.Consumer, config consumeConfig) []int32 {
allPartitions, err := consumer.Partitions(config.topic)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to read partitions for topic %v err=%v\n", config.topic, err)
os.Exit(1)
}
_, hasDefaultOffset := config.offsets[-1]
partitions := []int32{}
if !hasDefaultOffset {
for _, p := range allPartitions {
_, ok := config.offsets[p]
if ok {
partitions = append(partitions, p)
}
}
} else {
partitions = allPartitions
}
return partitions
}
开发者ID:fgeller,项目名称:kt,代码行数:22,代码来源:consume.go
示例9: consumePartition
func consumePartition(
config consumeConfig,
closer chan struct{},
out chan string,
consumer sarama.Consumer,
partition int32,
) {
offsets, ok := config.offsets[partition]
if !ok {
offsets, ok = config.offsets[-1]
}
start, err := offsets.start.value(clientMaker, partition)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to read start offset for partition %v err=%v\n", partition, err)
return
}
end, err := offsets.end.value(clientMaker, partition)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to read end offset for partition %v err=%v\n", partition, err)
return
}
partitionConsumer, err := consumer.ConsumePartition(
config.topic,
partition,
start,
)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to consume partition %v err=%v\n", partition, err)
return
}
consumePartitionLoop(closer, out, partitionConsumer, partition, end)
}
开发者ID:fgeller,项目名称:kt,代码行数:37,代码来源:consume.go
示例10: JoinConsumerGroup
// Connects to a consumer group, using Zookeeper for auto-discovery
func JoinConsumerGroup(name string, topics []string, zookeeper []string, config *Config) (cg *ConsumerGroup, err error) {
if name == "" {
return nil, sarama.ConfigurationError("Empty consumergroup name")
}
if len(topics) == 0 {
return nil, sarama.ConfigurationError("No topics provided")
}
if len(zookeeper) == 0 {
return nil, errors.New("You need to provide at least one zookeeper node address!")
}
if config == nil {
config = NewConfig()
}
config.ClientID = name
// Validate configuration
if err = config.Validate(); err != nil {
return
}
var kz *kazoo.Kazoo
if kz, err = kazoo.NewKazoo(zookeeper, config.Zookeeper); err != nil {
return
}
brokers, err := kz.BrokerList()
if err != nil {
kz.Close()
return
}
group := kz.Consumergroup(name)
if config.Offsets.ResetOffsets {
err = group.ResetOffsets()
if err != nil {
cg.Logf("FAILED to reset offsets of consumergroup: %s!\n", err)
kz.Close()
return
}
}
instance := group.NewInstance()
var consumer sarama.Consumer
if consumer, err = sarama.NewConsumer(brokers, config.Config); err != nil {
kz.Close()
return
}
cg = &ConsumerGroup{
config: config,
consumer: consumer,
kazoo: kz,
group: group,
instance: instance,
messages: make(chan *sarama.ConsumerMessage, config.ChannelBufferSize),
errors: make(chan *sarama.ConsumerError, config.ChannelBufferSize),
stopper: make(chan struct{}),
}
// Register consumer group
if exists, err := cg.group.Exists(); err != nil {
cg.Logf("FAILED to check for existence of consumergroup: %s!\n", err)
_ = consumer.Close()
_ = kz.Close()
return nil, err
} else if !exists {
cg.Logf("Consumergroup `%s` does not yet exists, creating...\n", cg.group.Name)
if err := cg.group.Create(); err != nil {
cg.Logf("FAILED to create consumergroup in Zookeeper: %s!\n", err)
_ = consumer.Close()
_ = kz.Close()
return nil, err
}
}
// Register itself with zookeeper
if err := cg.instance.Register(topics); err != nil {
cg.Logf("FAILED to register consumer instance: %s!\n", err)
return nil, err
} else {
cg.Logf("Consumer instance registered (%s).", cg.instance.ID)
}
offsetConfig := OffsetManagerConfig{CommitInterval: config.Offsets.CommitInterval}
cg.offsetManager = NewZookeeperOffsetManager(cg, &offsetConfig)
go cg.topicListConsumer(topics)
return
}
开发者ID:crask,项目名称:kafka,代码行数:99,代码来源:consumer_group.go
示例11: Load
func (cg *ConsumerGroup) Load(logger zap.Logger) error {
var kz *kazoo.Kazoo
var err error
if kz, err = kazoo.NewKazoo(cg.zookeeper, cg.config.Zookeeper); err != nil {
return err
}
logger.Info("KAFKA: Getting broker list for replica",
zap.Int("replicaId", cg.replicaId),
)
brokers, err := kz.BrokerList()
if err != nil {
kz.Close()
return err
}
group := kz.Consumergroup(cg.config.ClientID)
instance := group.NewInstance()
var consumer sarama.Consumer
if consumer, err = sarama.NewConsumer(brokers, cg.config.Config); err != nil {
kz.Close()
return err
}
cg.kazoo = kz
cg.group = group
cg.instance = instance
cg.messages = make(chan *sarama.ConsumerMessage, cg.config.ChannelBufferSize)
cg.consumer = consumer
cg.singleShutdown = sync.Once{}
cg.errors = make(chan *sarama.ConsumerError, cg.config.ChannelBufferSize)
cg.stopper = make(chan struct{})
if exists, err := cg.group.Exists(); err != nil {
logger.Fatal("KAFKA: Replica failed to check existence of consumergroup",
zap.Int("replicaId", cg.replicaId),
zap.Error(err),
)
consumer.Close()
kz.Close()
return err
} else if !exists {
logger.Info("KAFKA: Consumergroup does not exist, creating it",
zap.Int("replicaId", cg.replicaId),
zap.String("consumerGroupName", cg.group.Name),
)
if err := cg.group.Create(); err != nil {
logger.Fatal("KAFKA: Failed to create consumergroup in Zookeeper",
zap.Int("replicaId", cg.replicaId),
zap.Error(err),
)
consumer.Close()
kz.Close()
return err
}
}
if err := cg.instance.Register(cg.topics); err != nil {
logger.Fatal("KAFKA: Failed to create consumer instance",
zap.Int("replicaId", cg.replicaId),
zap.Error(err),
)
return err
} else {
logger.Info("KAFKA: Consumer instance registered",
zap.Int("replicaId", cg.replicaId),
)
}
offsetConfig := OffsetManagerConfig{
CommitInterval: cg.config.Offsets.CommitInterval,
EnableAutoCommit: cg.config.EnableOffsetAutoCommit,
}
cg.offsetManager = NewZookeeperOffsetManager(cg, &offsetConfig, logger)
go cg.topicListConsumer(cg.topics, logger)
return nil
}
开发者ID:topfreegames,项目名称:kafka,代码行数:79,代码来源:consumer_group.go
注:本文中的github.com/Shopify/sarama.Consumer类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论