本文整理汇总了Golang中github.com/funkygao/gafka/zk.ZkCluster类的典型用法代码示例。如果您正苦于以下问题:Golang ZkCluster类的具体用法?Golang ZkCluster怎么用?Golang ZkCluster使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了ZkCluster类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Golang代码示例。
示例1: consumePartition
func (this *WatchAppError) consumePartition(zkcluster *zk.ZkCluster, consumer sarama.Consumer,
topic string, partitionId int32, offset int64, msgCh chan<- *sarama.ConsumerMessage, wg *sync.WaitGroup) {
defer wg.Done()
p, err := consumer.ConsumePartition(topic, partitionId, offset)
if err != nil {
log.Error("%s %s/%d: offset=%d %v", zkcluster.Name(), topic, partitionId, offset, err)
return
}
defer p.Close()
for {
select {
case <-this.Stop:
return
case err := <-p.Errors():
log.Critical("cluster[%s] %s/%d: %s", zkcluster.Name(), topic, partitionId, err)
return
case msg := <-p.Messages():
if msg != nil && this.predicate(msg.Value) {
msgCh <- msg
}
}
}
}
开发者ID:funkygao,项目名称:gafka,代码行数:28,代码来源:apperr.go
示例2: clusterSummary
func (this *Clusters) clusterSummary(zkcluster *zk.ZkCluster) (brokers, topics, partitions int, flat, cum int64) {
brokerInfos := zkcluster.Brokers()
brokers = len(brokerInfos)
kfk, err := sarama.NewClient(zkcluster.BrokerList(), saramaConfig())
if err != nil {
this.Ui.Error(err.Error())
return
}
defer kfk.Close()
topicInfos, _ := kfk.Topics()
topics = len(topicInfos)
for _, t := range topicInfos {
alivePartitions, _ := kfk.WritablePartitions(t)
partitions += len(alivePartitions)
for _, partitionID := range alivePartitions {
latestOffset, _ := kfk.GetOffset(t, partitionID, sarama.OffsetNewest)
oldestOffset, _ := kfk.GetOffset(t, partitionID, sarama.OffsetOldest)
flat += (latestOffset - oldestOffset)
cum += latestOffset
}
}
return
}
开发者ID:funkygao,项目名称:gafka,代码行数:28,代码来源:clusters.go
示例3: consumeCluster
func (this *Peek) consumeCluster(zkcluster *zk.ZkCluster, topicPattern string,
partitionId int, msgChan chan *sarama.ConsumerMessage) {
brokerList := zkcluster.BrokerList()
if len(brokerList) == 0 {
return
}
kfk, err := sarama.NewClient(brokerList, sarama.NewConfig())
if err != nil {
this.Ui.Output(err.Error())
return
}
//defer kfk.Close() // FIXME how to close it
topics, err := kfk.Topics()
if err != nil {
this.Ui.Output(err.Error())
return
}
for _, t := range topics {
if patternMatched(t, topicPattern) {
go this.simpleConsumeTopic(zkcluster, kfk, t, int32(partitionId), msgChan)
}
}
}
开发者ID:funkygao,项目名称:gafka,代码行数:26,代码来源:peek.go
示例4: makePub
func (this *Mirror) makePub(c2 *zk.ZkCluster) (sarama.AsyncProducer, error) {
cf := sarama.NewConfig()
cf.Metadata.RefreshFrequency = time.Minute * 10
cf.Metadata.Retry.Max = 3
cf.Metadata.Retry.Backoff = time.Second * 3
cf.ChannelBufferSize = 1000
cf.Producer.Return.Errors = true
cf.Producer.Flush.Messages = 2000 // 2000 message in batch
cf.Producer.Flush.Frequency = time.Second // flush interval
cf.Producer.Flush.MaxMessages = 0 // unlimited
cf.Producer.RequiredAcks = sarama.WaitForLocal
cf.Producer.Retry.Backoff = time.Second * 4
cf.Producer.Retry.Max = 3
cf.Net.DialTimeout = time.Second * 30
cf.Net.WriteTimeout = time.Second * 30
cf.Net.ReadTimeout = time.Second * 30
switch this.Compress {
case "gzip":
cf.Producer.Compression = sarama.CompressionGZIP
case "snappy":
cf.Producer.Compression = sarama.CompressionSnappy
}
return sarama.NewAsyncProducer(c2.BrokerList(), cf)
}
开发者ID:funkygao,项目名称:gafka,代码行数:28,代码来源:factory.go
示例5: resetTopicConfig
func (this *Topics) resetTopicConfig(zkcluster *zk.ZkCluster, topic string) {
zkAddrs := zkcluster.ZkConnectAddr()
key := "retention.ms"
cmd := pipestream.New(fmt.Sprintf("%s/bin/kafka-topics.sh", ctx.KafkaHome()),
fmt.Sprintf("--zookeeper %s", zkAddrs),
fmt.Sprintf("--alter"),
fmt.Sprintf("--topic %s", topic),
fmt.Sprintf("--deleteConfig %s", key),
)
err := cmd.Open()
swallow(err)
defer cmd.Close()
scanner := bufio.NewScanner(cmd.Reader())
scanner.Split(bufio.ScanLines)
output := make([]string, 0)
for scanner.Scan() {
output = append(output, scanner.Text())
}
swallow(scanner.Err())
path := zkcluster.GetTopicConfigPath(topic)
this.Ui.Info(path)
for _, line := range output {
this.Ui.Output(line)
}
}
开发者ID:funkygao,项目名称:gafka,代码行数:29,代码来源:topics.go
示例6: consumePartition
func (this *Peek) consumePartition(zkcluster *zk.ZkCluster, kfk sarama.Client, consumer sarama.Consumer,
topic string, partitionId int32, msgCh chan *sarama.ConsumerMessage, offset int64) {
p, err := consumer.ConsumePartition(topic, partitionId, offset)
if err != nil {
this.Ui.Error(fmt.Sprintf("%s %s/%d: offset=%d %v", zkcluster.Name(), topic, partitionId, offset, err))
return
}
defer p.Close()
n := int64(0)
for {
select {
case <-this.quit:
return
case msg := <-p.Messages():
msgCh <- msg
n++
if this.lastN > 0 && n >= this.lastN {
return
}
}
}
}
开发者ID:funkygao,项目名称:gafka,代码行数:25,代码来源:peek.go
示例7: clusterTopConsumers
func (this *Top) clusterTopConsumers(zkcluster *zk.ZkCluster) {
var topic string
for {
total := zkcluster.TotalConsumerOffsets(this.topicPattern)
if this.topicPattern != "" {
topic = this.topicPattern
} else {
topic = "-all-"
}
key := zkcluster.Name() + ":" + topic
this.mu.Lock()
this.consumerCounters[key] = float64(total)
this.mu.Unlock()
if !this.dashboardGraph {
this.mu.Lock()
this.counters[key] = float64(total)
this.mu.Unlock()
}
time.Sleep(this.topInterval)
}
}
开发者ID:funkygao,项目名称:gafka,代码行数:26,代码来源:top.go
示例8: addTopic
func (this *Topics) addTopic(zkcluster *zk.ZkCluster, topic string, replicas,
partitions int) error {
this.Ui.Info(fmt.Sprintf("creating kafka topic: %s", topic))
ts := sla.DefaultSla()
ts.Partitions = partitions
ts.Replicas = replicas
lines, err := zkcluster.AddTopic(topic, ts)
if err != nil {
return err
}
for _, l := range lines {
this.Ui.Output(color.Yellow(l))
}
if this.ipInNumber {
this.Ui.Output(fmt.Sprintf("\tzookeeper.connect: %s", zkcluster.ZkConnectAddr()))
this.Ui.Output(fmt.Sprintf("\t broker.list: %s",
strings.Join(zkcluster.BrokerList(), ",")))
} else {
this.Ui.Output(fmt.Sprintf("\tzookeeper.connect: %s", zkcluster.NamedZkConnectAddr()))
this.Ui.Output(fmt.Sprintf("\t broker.list: %s",
strings.Join(zkcluster.NamedBrokerList(), ",")))
}
return nil
}
开发者ID:funkygao,项目名称:gafka,代码行数:27,代码来源:topics.go
示例9: makePub
func (this *Mirror) makePub(c2 *zk.ZkCluster) (sarama.AsyncProducer, error) {
// TODO setup batch size
cf := sarama.NewConfig()
switch this.compress {
case "gzip":
cf.Producer.Compression = sarama.CompressionGZIP
case "snappy":
cf.Producer.Compression = sarama.CompressionSnappy
}
return sarama.NewAsyncProducer(c2.BrokerList(), cf)
}
开发者ID:chendx79,项目名称:gafka,代码行数:12,代码来源:mirror.go
示例10: makeSub
func (this *Mirror) makeSub(c1 *zk.ZkCluster, group string, topics []string) (*consumergroup.ConsumerGroup, error) {
cf := consumergroup.NewConfig()
cf.Zookeeper.Chroot = c1.Chroot()
cf.Offsets.CommitInterval = time.Second * 10
cf.Offsets.ProcessingTimeout = time.Second
cf.ChannelBufferSize = 0
cf.Consumer.Return.Errors = true
cf.Consumer.MaxProcessingTime = 100 * time.Millisecond // chan recv timeout
sub, err := consumergroup.JoinConsumerGroup(group, topics, c1.ZkZone().ZkAddrList(), cf)
return sub, err
}
开发者ID:chendx79,项目名称:gafka,代码行数:12,代码来源:mirror.go
示例11: makeSub
func (this *Mirror) makeSub(c1 *zk.ZkCluster, group string, topics []string) (*consumergroup.ConsumerGroup, error) {
cf := consumergroup.NewConfig()
cf.Zookeeper.Chroot = c1.Chroot()
cf.Offsets.CommitInterval = time.Second * 10
cf.Offsets.ProcessingTimeout = time.Second
cf.Consumer.Offsets.Initial = sarama.OffsetOldest
cf.ChannelBufferSize = 256
cf.Consumer.Return.Errors = true
cf.OneToOne = false
sub, err := consumergroup.JoinConsumerGroup(group, topics, c1.ZkZone().ZkAddrList(), cf)
return sub, err
}
开发者ID:funkygao,项目名称:gafka,代码行数:13,代码来源:factory.go
示例12: delTopic
func (this *Topics) delTopic(zkcluster *zk.ZkCluster, topic string) error {
this.Ui.Info(fmt.Sprintf("deleting kafka topic: %s", topic))
lines, err := zkcluster.DeleteTopic(topic)
if err != nil {
return err
}
for _, l := range lines {
this.Ui.Output(color.Yellow(l))
}
return nil
}
开发者ID:funkygao,项目名称:gafka,代码行数:14,代码来源:topics.go
示例13: clusterTopProducers
func (this *TopBroker) clusterTopProducers(zkcluster *zk.ZkCluster) {
kfk, err := sarama.NewClient(zkcluster.BrokerList(), sarama.NewConfig())
if err != nil {
return
}
defer kfk.Close()
for {
hostOffsets := make(map[string]int64)
topics, err := kfk.Topics()
swallow(err)
<-this.signalsCh[zkcluster.Name()]
for _, topic := range topics {
if !patternMatched(topic, this.topic) {
continue
}
partions, err := kfk.WritablePartitions(topic)
swallow(err)
for _, partitionID := range partions {
leader, err := kfk.Leader(topic, partitionID)
swallow(err)
latestOffset, err := kfk.GetOffset(topic, partitionID,
sarama.OffsetNewest)
swallow(err)
host, _, err := net.SplitHostPort(leader.Addr())
swallow(err)
if this.shortIp {
host = shortIp(host)
}
if _, present := hostOffsets[host]; !present {
hostOffsets[host] = 0
}
hostOffsets[host] += latestOffset
}
}
this.hostOffsetCh <- hostOffsets
kfk.RefreshMetadata(topics...)
}
}
开发者ID:chendx79,项目名称:gafka,代码行数:48,代码来源:topbroker.go
示例14: printCluster
func (this *LsZk) printCluster(zkcluster *zk.ZkCluster) {
this.Ui.Output(color.Green(zkcluster.Name()))
children, err := zkcluster.ListChildren(this.recursive)
if err != nil {
this.Ui.Error(fmt.Sprintf("%s%s", strings.Repeat(" ", 4), err))
return
}
for _, c := range children {
this.Ui.Output(fmt.Sprintf("%s%s", strings.Repeat(" ", 4), c))
if strings.HasSuffix(c, "brokers") {
this.Ui.Output(fmt.Sprintf("%s%s/ids", strings.Repeat(" ", 4), c))
this.Ui.Output(fmt.Sprintf("%s%s/topics", strings.Repeat(" ", 4), c))
}
}
}
开发者ID:funkygao,项目名称:gafka,代码行数:16,代码来源:lszk.go
示例15: makeMirror
func (this *Mirror) makeMirror(c1, c2 *zk.ZkCluster) {
pub, err := this.makePub(c2)
swallow(err)
topics, topicsChanges, err := c1.WatchTopics()
swallow(err)
log.Printf("topics: %+v", topics)
if len(topics) == 0 {
log.Println("empty topics")
return
}
group := fmt.Sprintf("%s.%s._mirror_", c1.Name(), c2.Name())
sub, err := this.makeSub(c1, group, topics)
swallow(err)
pumpStopper := make(chan struct{})
go this.pump(sub, pub, pumpStopper)
LOOP:
for {
select {
case <-topicsChanges:
log.Println("topics changed, stopping pump...")
pumpStopper <- struct{}{} // stop pump
<-pumpStopper // await pump cleanup
// refresh c1 topics
topics, err = c1.Topics()
if err != nil {
// TODO how to handle this err?
log.Println(err)
}
log.Printf("topics: %+v", topics)
sub, err = this.makeSub(c1, group, topics)
if err != nil {
// TODO how to handle this err?
log.Println(err)
}
go this.pump(sub, pub, pumpStopper)
case <-this.quit:
log.Println("awaiting pump cleanup...")
<-pumpStopper
log.Printf("total transferred: %s %smsgs",
gofmt.ByteSize(this.transferBytes),
gofmt.Comma(this.transferN))
break LOOP
}
}
pub.Close()
}
开发者ID:chendx79,项目名称:gafka,代码行数:57,代码来源:mirror.go
示例16: configTopic
func (this *Topics) configTopic(zkcluster *zk.ZkCluster, topic string, retentionInMinute int) {
/*
val SegmentBytesProp = "segment.bytes"
val SegmentMsProp = "segment.ms"
val SegmentIndexBytesProp = "segment.index.bytes"
val FlushMessagesProp = "flush.messages"
val FlushMsProp = "flush.ms"
val RetentionBytesProp = "retention.bytes"
val RententionMsProp = "retention.ms"
val MaxMessageBytesProp = "max.message.bytes"
val IndexIntervalBytesProp = "index.interval.bytes"
val DeleteRetentionMsProp = "delete.retention.ms"
val FileDeleteDelayMsProp = "file.delete.delay.ms"
val MinCleanableDirtyRatioProp = "min.cleanable.dirty.ratio"
val CleanupPolicyProp = "cleanup.policy"
*/
// ./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic foobar --config max.message.bytes=10000101
// zk: {"version":1,"config":{"index.interval.bytes":"10000101","max.message.bytes":"10000101"}}
if retentionInMinute < 10 {
panic("less than 10 minutes?")
}
ts := sla.DefaultSla()
ts.RetentionHours = float64(retentionInMinute) / 60
output, err := zkcluster.AlterTopic(topic, ts)
if err != nil {
this.Ui.Error(fmt.Sprintf("%+v: %v", ts, err))
os.Exit(1)
}
path := zkcluster.GetTopicConfigPath(topic)
this.Ui.Info(path)
for _, line := range output {
this.Ui.Output(line)
}
}
开发者ID:funkygao,项目名称:gafka,代码行数:40,代码来源:topics.go
示例17: clusterSummary
func (this *Topics) clusterSummary(zkcluster *zk.ZkCluster) []topicSummary {
r := make([]topicSummary, 0, 10)
kfk, err := sarama.NewClient(zkcluster.BrokerList(), saramaConfig())
if err != nil {
this.Ui.Error(err.Error())
return nil
}
defer kfk.Close()
topicInfos, _ := kfk.Topics()
for _, t := range topicInfos {
flat := int64(0)
cum := int64(0)
alivePartitions, _ := kfk.WritablePartitions(t)
for _, partitionID := range alivePartitions {
latestOffset, _ := kfk.GetOffset(t, partitionID, sarama.OffsetNewest)
oldestOffset, _ := kfk.GetOffset(t, partitionID, sarama.OffsetOldest)
flat += (latestOffset - oldestOffset)
cum += latestOffset
}
r = append(r, topicSummary{zkcluster.ZkZone().Name(), zkcluster.Name(), t, len(alivePartitions), flat, cum})
}
return r
}
开发者ID:funkygao,项目名称:gafka,代码行数:27,代码来源:topics.go
示例18: displayGroupOffsets
func (this *Consumers) displayGroupOffsets(zkcluster *zk.ZkCluster, group, topic string, echo bool) []consumerGroupOffset {
offsetMap := zkcluster.ConsumerOffsetsOfGroup(group)
sortedTopics := make([]string, 0, len(offsetMap))
for t, _ := range offsetMap {
sortedTopics = append(sortedTopics, t)
}
sort.Strings(sortedTopics)
r := make([]consumerGroupOffset, 0)
for _, t := range sortedTopics {
if !patternMatched(t, this.topicPattern) || (topic != "" && t != topic) {
continue
}
sortedPartitionIds := make([]string, 0, len(offsetMap[t]))
for partitionId, _ := range offsetMap[t] {
sortedPartitionIds = append(sortedPartitionIds, partitionId)
}
sort.Strings(sortedPartitionIds)
for _, partitionId := range sortedPartitionIds {
r = append(r, consumerGroupOffset{
topic: t,
partitionId: partitionId,
offset: gofmt.Comma(offsetMap[t][partitionId]),
})
if echo {
this.Ui.Output(fmt.Sprintf("\t\t%s/%s Offset:%s",
t, partitionId, gofmt.Comma(offsetMap[t][partitionId])))
}
}
}
return r
}
开发者ID:funkygao,项目名称:gafka,代码行数:39,代码来源:consumers.go
示例19: displayTopicsOfCluster
func (this *Topics) displayTopicsOfCluster(zkcluster *zk.ZkCluster) {
echoBuffer := func(lines []string) {
for _, l := range lines {
this.Ui.Output(l)
}
}
linesInTopicMode := make([]string, 0)
if this.verbose {
linesInTopicMode = this.echoOrBuffer(zkcluster.Name(), linesInTopicMode)
}
// get all alive brokers within this cluster
brokers := zkcluster.Brokers()
if len(brokers) == 0 {
linesInTopicMode = this.echoOrBuffer(fmt.Sprintf("%4s%s", " ",
color.Red("%s empty brokers", zkcluster.Name())), linesInTopicMode)
echoBuffer(linesInTopicMode)
return
}
if this.verbose {
sortedBrokerIds := make([]string, 0, len(brokers))
for brokerId, _ := range brokers {
sortedBrokerIds = append(sortedBrokerIds, brokerId)
}
sort.Strings(sortedBrokerIds)
for _, brokerId := range sortedBrokerIds {
if this.ipInNumber {
linesInTopicMode = this.echoOrBuffer(fmt.Sprintf("%4s%s %s", " ",
color.Green(brokerId), brokers[brokerId]), linesInTopicMode)
} else {
linesInTopicMode = this.echoOrBuffer(fmt.Sprintf("%4s%s %s", " ",
color.Green(brokerId), brokers[brokerId].NamedString()), linesInTopicMode)
}
}
}
kfk, err := sarama.NewClient(zkcluster.BrokerList(), saramaConfig())
if err != nil {
if this.verbose {
linesInTopicMode = this.echoOrBuffer(color.Yellow("%5s%+v %s", " ",
zkcluster.BrokerList(), err.Error()), linesInTopicMode)
}
return
}
defer kfk.Close()
topics, err := kfk.Topics()
swallow(err)
if len(topics) == 0 {
if this.topicPattern == "" && this.verbose {
linesInTopicMode = this.echoOrBuffer(fmt.Sprintf("%5s%s", " ",
color.Magenta("no topics")), linesInTopicMode)
echoBuffer(linesInTopicMode)
}
return
}
sortedTopics := make([]string, 0, len(topics))
for _, t := range topics {
sortedTopics = append(sortedTopics, t)
}
sort.Strings(sortedTopics)
topicsCtime := zkcluster.TopicsCtime()
hasTopicMatched := false
for _, topic := range sortedTopics {
if !patternMatched(topic, this.topicPattern) {
continue
}
if this.since > 0 && time.Since(topicsCtime[topic]) > this.since {
continue
}
this.topicN++
hasTopicMatched = true
if this.verbose {
linesInTopicMode = this.echoOrBuffer(strings.Repeat(" ", 4)+color.Cyan(topic), linesInTopicMode)
}
// get partitions and check if some dead
alivePartitions, err := kfk.WritablePartitions(topic)
swallow(err)
partions, err := kfk.Partitions(topic)
swallow(err)
if len(alivePartitions) != len(partions) {
linesInTopicMode = this.echoOrBuffer(fmt.Sprintf("%30s %s %s P: %s/%+v",
zkcluster.Name(), color.Cyan("%-50s", topic), color.Red("partial dead"), color.Green("%+v", alivePartitions), partions), linesInTopicMode)
}
replicas, err := kfk.Replicas(topic, partions[0])
if err != nil {
this.Ui.Error(fmt.Sprintf("%s/%d %v", topic, partions[0], err))
}
//.........这里部分代码省略.........
开发者ID:funkygao,项目名称:gafka,代码行数:101,代码来源:topics.go
示例20: clusterTopProducers
func (this *Top) clusterTopProducers(zkcluster *zk.ZkCluster) {
cluster := zkcluster.Name()
brokerList := zkcluster.BrokerList()
if len(brokerList) == 0 {
return
}
kfk, err := sarama.NewClient(brokerList, sarama.NewConfig())
if err != nil {
return
}
defer kfk.Close()
for {
topics, err := kfk.Topics()
if err != nil || len(topics) == 0 {
return
}
for _, topic := range topics {
if !patternMatched(topic, this.topicPattern) {
continue
}
msgs := int64(0)
alivePartitions, err := kfk.WritablePartitions(topic)
swallow(err)
for _, partitionID := range alivePartitions {
latestOffset, err := kfk.GetOffset(topic, partitionID,
sarama.OffsetNewest)
if err != nil {
// this broker is down
continue
}
msgs += latestOffset
}
this.mu.Lock()
if _, present := this.brokers[cluster+":"+topic]; !present {
// calculate the broker leading partitions
leadingPartitions := make(map[string]int) // broker:lead partitions n
for _, pid := range alivePartitions {
leader, err := kfk.Leader(topic, pid)
swallow(err)
leadingPartitions[leader.Addr()]++
}
brokers := make([]string, 0)
for addr, n := range leadingPartitions {
brokers = append(brokers, fmt.Sprintf("%[email protected]%s", n, addr))
}
this.brokers[cluster+":"+topic] = this.discardPortOfBrokerAddr(brokers)
}
this.counters[cluster+":"+topic] = float64(msgs)
this.partitions[cluster+":"+topic] = len(alivePartitions)
this.mu.Unlock()
}
time.Sleep(time.Second)
kfk.RefreshMetadata(topics...)
}
}
开发者ID:funkygao,项目名称:gafka,代码行数:66,代码来源:top.go
注:本文中的github.com/funkygao/gafka/zk.ZkCluster类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论