本文整理汇总了Golang中github.com/funkygao/golib/color.Green函数的典型用法代码示例。如果您正苦于以下问题:Golang Green函数的具体用法?Golang Green怎么用?Golang Green使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了Green函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Golang代码示例。
示例1: Run
func (this *Kguard) Run(args []string) (exitCode int) {
cmdFlags := flag.NewFlagSet("kguard", flag.ContinueOnError)
cmdFlags.Usage = func() { this.Ui.Output(this.Help()) }
cmdFlags.StringVar(&this.zone, "z", ctx.ZkDefaultZone(), "")
cmdFlags.BoolVar(&this.longFmt, "l", false, "")
if err := cmdFlags.Parse(args); err != nil {
return 2
}
zkzone := zk.NewZkZone(zk.DefaultConfig(this.zone, ctx.ZoneZkAddrs(this.zone)))
kguards, err := zkzone.KguardInfos()
if err != nil {
this.Ui.Error(fmt.Sprintf("%s %v", zk.KguardLeaderPath, err.Error()))
return
}
leader := kguards[0]
this.Ui.Output(fmt.Sprintf("%s(out of %d candidates) up: %s",
color.Green(leader.Host), leader.Candidates,
gofmt.PrettySince(leader.Ctime)))
if this.longFmt {
this.showKguardVersion(leader.Host)
this.showStats(leader.Host)
}
return
}
开发者ID:funkygao,项目名称:gafka,代码行数:28,代码来源:kguard.go
示例2: printLeader
func (this *Zookeeper) printLeader(zkzone *zk.ZkZone) {
// FIXME all zones will only show the 1st zone info because it blocks others
for {
this.Ui.Output(color.Blue(zkzone.Name()))
for zkhost, lines := range zkzone.RunZkFourLetterCommand("mntr") {
if this.zkHost != "" && !strings.HasPrefix(zkhost, this.zkHost+":") {
continue
}
parts := strings.Split(lines, "\n")
for _, l := range parts {
if strings.HasPrefix(l, "zk_server_state") && strings.HasSuffix(l, "leader") {
this.Ui.Output(color.Green("%28s", zkhost))
break
}
}
}
if this.watchMode {
time.Sleep(time.Second * 5)
} else {
break
}
}
}
开发者ID:funkygao,项目名称:gafka,代码行数:26,代码来源:zookeeper.go
示例3: diagnose
func (this *Ping) diagnose() {
this.zkzone.ForSortedClusters(func(zkcluster *zk.ZkCluster) {
registeredBrokers := zkcluster.RegisteredInfo().Roster
for _, broker := range registeredBrokers {
log.Debug("ping %s", broker.Addr())
kfk, err := sarama.NewClient([]string{broker.Addr()}, sarama.NewConfig())
if err != nil {
log.Error("%25s %30s %s", broker.Addr(), broker.NamedAddr(), color.Red(err.Error()))
continue
}
_, err = kfk.Topics() // kafka didn't provide ping, so use Topics() as ping
if err != nil {
log.Error("%25s %30s %s", broker.Addr(), broker.NamedAddr(), color.Red(err.Error()))
} else {
if !this.problematicMode {
log.Info("%25s %30s %s", broker.Addr(), broker.NamedAddr(), color.Green("ok"))
}
}
kfk.Close()
}
})
}
开发者ID:chendx79,项目名称:gafka,代码行数:26,代码来源:ping.go
示例4: displayZoneTop
func (this *Zktop) displayZoneTop(zkzone *zk.ZkZone) {
if this.batchMode {
this.Ui.Output(fmt.Sprintf("%s %s", zkzone.Name(), bjtime.NowBj()))
} else {
this.Ui.Output(color.Green(zkzone.Name()))
}
header := "VER SERVER PORT M OUTST RECVD SENT CONNS ZNODES LAT(MIN/AVG/MAX)"
this.Ui.Output(header)
stats := zkzone.RunZkFourLetterCommand("stat")
sortedHosts := make([]string, 0, len(stats))
for hp, _ := range stats {
sortedHosts = append(sortedHosts, hp)
}
sort.Strings(sortedHosts)
for _, hostPort := range sortedHosts {
host, port, err := net.SplitHostPort(hostPort)
if err != nil {
panic(err)
}
stat := zk.ParseStatResult(stats[hostPort])
if stat.Mode == "" {
if this.batchMode {
stat.Mode = "E"
} else {
stat.Mode = color.Red("E")
}
} else if stat.Mode == "L" && !this.batchMode {
stat.Mode = color.Blue(stat.Mode)
}
var sentQps, recvQps int
if lastRecv, present := this.lastRecvs[hostPort]; present {
r1, _ := strconv.Atoi(stat.Received)
r0, _ := strconv.Atoi(lastRecv)
recvQps = (r1 - r0) / int(this.refreshInterval.Seconds())
s1, _ := strconv.Atoi(stat.Sent)
s0, _ := strconv.Atoi(this.lastSents[hostPort])
sentQps = (s1 - s0) / int(this.refreshInterval.Seconds())
}
this.Ui.Output(fmt.Sprintf("%-15s %-15s %5s %1s %6s %16s %16s %5s %7s %s",
stat.Version, // 15
host, // 15
port, // 5
stat.Mode, // 1
stat.Outstanding, // 6
fmt.Sprintf("%s/%d", stat.Received, recvQps), // 16
fmt.Sprintf("%s/%d", stat.Sent, sentQps), // 16
stat.Connections, // 5
stat.Znodes, // 7
stat.Latency,
))
this.lastRecvs[hostPort] = stat.Received
this.lastSents[hostPort] = stat.Sent
}
}
开发者ID:funkygao,项目名称:gafka,代码行数:60,代码来源:zktop.go
示例5: clusterBrokers
func (this *Brokers) clusterBrokers(zone, cluster string, brokers map[string]*zk.BrokerZnode) []string {
if !patternMatched(cluster, this.cluster) {
return nil
}
if brokers == nil || len(brokers) == 0 {
return []string{fmt.Sprintf("%s|%s|%s|%s|%s",
zone, cluster, " ", color.Red("empty brokers"), " ")}
}
lines := make([]string, 0, len(brokers))
if this.staleOnly {
// try each broker's aliveness
for brokerId, broker := range brokers {
cf := sarama.NewConfig()
cf.Net.ReadTimeout = time.Second * 4
cf.Net.WriteTimeout = time.Second * 4
kfk, err := sarama.NewClient([]string{broker.Addr()}, cf)
if err != nil {
lines = append(lines, fmt.Sprintf("%s|%s|%s|%s|%s",
zone, cluster,
brokerId, broker.Addr(),
fmt.Sprintf("%s: %v", gofmt.PrettySince(broker.Uptime()), err)))
} else {
kfk.Close()
}
}
return lines
}
// sort by broker id
sortedBrokerIds := make([]string, 0, len(brokers))
for brokerId, _ := range brokers {
sortedBrokerIds = append(sortedBrokerIds, brokerId)
}
sort.Strings(sortedBrokerIds)
for _, brokerId := range sortedBrokerIds {
b := brokers[brokerId]
uptime := gofmt.PrettySince(b.Uptime())
if time.Since(b.Uptime()) < time.Hour*24*7 {
uptime = color.Green(uptime)
}
if this.ipInNumber {
lines = append(lines, fmt.Sprintf("%s|%s|%s|%s|%s",
zone, cluster,
brokerId, b.Addr(),
gofmt.PrettySince(b.Uptime())))
} else {
lines = append(lines, fmt.Sprintf("%s|%s|%s|%s|%s",
zone, cluster,
brokerId, b.NamedAddr(),
gofmt.PrettySince(b.Uptime())))
}
}
return lines
}
开发者ID:chendx79,项目名称:gafka,代码行数:59,代码来源:brokers.go
示例6: drawDashboard
func (this *TopBroker) drawDashboard() {
termui.Init()
width := termui.TermWidth()
height := termui.TermHeight()
termui.Close()
maxWidth := width - 23
var totalMaxQps, totalMaxBrokerQps float64
for {
time.Sleep(this.interval)
this.startAll()
this.collectAll()
datas, maxQps, totalQps := this.showAndResetCounters()
if maxQps < 1 {
// draw empty lines
for _, data := range datas {
this.Ui.Output(fmt.Sprintf("%20s", data.host))
}
continue
}
if maxQps > totalMaxBrokerQps {
totalMaxBrokerQps = maxQps
}
if totalQps > totalMaxQps {
totalMaxQps = totalQps
}
refreshScreen()
for idx, data := range datas {
if idx >= height-2 {
break
}
if data.qps < 0 {
panic("negative qps")
}
this.renderQpsRow(data.host, data.qps, maxQps, maxWidth)
}
this.Ui.Output(fmt.Sprintf("%20s brokers:%d total:%s cum max[broker:%.1f total:%.1f]",
"-SUMMARY-",
len(datas), color.Green("%.1f", totalQps), totalMaxBrokerQps, totalMaxQps))
}
}
开发者ID:chendx79,项目名称:gafka,代码行数:51,代码来源:topbroker.go
示例7: NewPubStore
func NewPubStore(poolCapcity int, idleTimeout time.Duration, compress bool,
debug bool, dryRun bool) *pubStore {
if debug {
sarama.Logger = l.New(os.Stdout, color.Green("[Sarama]"), l.LstdFlags|l.Lshortfile)
}
return &pubStore{
hostname: ctx.Hostname(),
compress: compress,
idleTimeout: idleTimeout,
pubPoolsCapcity: poolCapcity,
pubPools: make(map[string]*pubPool),
dryRun: dryRun,
shutdownCh: make(chan struct{}),
}
}
开发者ID:funkygao,项目名称:gafka,代码行数:16,代码来源:pubstore.go
示例8: renderQpsRow
func (this *TopBroker) renderQpsRow(host string, qps, maxQps float64, maxWidth int) {
w := int(qps*100/maxQps) * maxWidth / 100
qpsStr := fmt.Sprintf("%.1f", qps)
bar := ""
barColorLen := 0
for i := 0; i < w-len(qpsStr); i++ {
bar += color.Green("|")
barColorLen += 9 // color.Green will add extra 9 chars
}
for i := len(bar) - barColorLen; i < maxWidth-len(qpsStr); i++ {
bar += " "
}
bar += qpsStr
this.Ui.Output(fmt.Sprintf("%20s [%s]", host, bar))
}
开发者ID:chendx79,项目名称:gafka,代码行数:16,代码来源:topbroker.go
示例9: printCluster
func (this *LsZk) printCluster(zkcluster *zk.ZkCluster) {
this.Ui.Output(color.Green(zkcluster.Name()))
children, err := zkcluster.ListChildren(this.recursive)
if err != nil {
this.Ui.Error(fmt.Sprintf("%s%s", strings.Repeat(" ", 4), err))
return
}
for _, c := range children {
this.Ui.Output(fmt.Sprintf("%s%s", strings.Repeat(" ", 4), c))
if strings.HasSuffix(c, "brokers") {
this.Ui.Output(fmt.Sprintf("%s%s/ids", strings.Repeat(" ", 4), c))
this.Ui.Output(fmt.Sprintf("%s%s/topics", strings.Repeat(" ", 4), c))
}
}
}
开发者ID:funkygao,项目名称:gafka,代码行数:16,代码来源:lszk.go
示例10: verifyBrokers
func (this *Clusters) verifyBrokers(zkzone *zk.ZkZone) {
this.Ui.Output(zkzone.Name())
zkzone.ForSortedBrokers(func(cluster string, liveBrokers map[string]*zk.BrokerZnode) {
zkcluster := zkzone.NewCluster(cluster)
registeredBrokers := zkcluster.RegisteredInfo().Roster
// find diff between registeredBrokers and liveBrokers
// loop1 find liveBrokers>registeredBrokers
for _, broker := range liveBrokers {
foundInRoster := false
for _, b := range registeredBrokers {
bid := strconv.Itoa(b.Id)
if bid == broker.Id && broker.Addr() == b.Addr() {
foundInRoster = true
break
}
}
if !foundInRoster {
// should manually register the broker
this.Ui.Output(strings.Repeat(" ", 4) +
color.Green("+ gk clusters -z %s -s -c %s -addbroker %s:%s",
zkzone.Name(), cluster, broker.Id, broker.Addr()))
}
}
// loop2 find liveBrokers<registeredBrokers
for _, b := range registeredBrokers {
foundInLive := false
for _, broker := range liveBrokers {
bid := strconv.Itoa(b.Id)
if bid == broker.Id && broker.Addr() == b.Addr() {
foundInLive = true
break
}
}
if !foundInLive {
// the broker is dead
this.Ui.Output(strings.Repeat(" ", 4) +
color.Red("cluster[%s] broker[%d] %s is dead", cluster, b.Id, b.Addr()))
}
}
})
}
开发者ID:funkygao,项目名称:gafka,代码行数:45,代码来源:clusters.go
示例11: printZkStats
func (this *Zookeeper) printZkStats(zkzone *zk.ZkZone) {
for {
this.Ui.Output(color.Blue(zkzone.Name()))
for zkhost, lines := range zkzone.RunZkFourLetterCommand(this.flw) {
if this.zkHost != "" && !strings.HasPrefix(zkhost, this.zkHost+":") {
continue
}
this.Ui.Output(fmt.Sprintf("%s\n%s", color.Green("%28s", zkhost), lines))
}
if this.watchMode {
time.Sleep(time.Second * 5)
} else {
break
}
}
}
开发者ID:funkygao,项目名称:gafka,代码行数:19,代码来源:zookeeper.go
示例12: showChildrenRecursively
func (this *Get) showChildrenRecursively(conn *zk.Conn, path string) {
children, _, err := conn.Children(path)
if err != nil {
return
}
sort.Strings(children)
for _, child := range children {
if path == "/" {
path = ""
}
znode := fmt.Sprintf("%s/%s", path, child)
// display znode content
data, stat, err := conn.Get(znode)
must(err)
if stat.EphemeralOwner > 0 {
if patternMatched(znode, this.likePattern) {
this.Ui.Output(color.Yellow(znode))
}
} else {
if patternMatched(znode, this.likePattern) {
this.Ui.Output(color.Green(znode))
}
}
if len(data) > 0 && patternMatched(znode, this.likePattern) {
if this.verbose {
this.Ui.Output(fmt.Sprintf("%s %#v",
strings.Repeat(" ", 3), stat))
this.Ui.Output(fmt.Sprintf("%s %v",
strings.Repeat(" ", 3), data))
}
this.Ui.Output(fmt.Sprintf("%s %s",
strings.Repeat(" ", 3), string(data)))
}
this.showChildrenRecursively(conn, znode)
}
}
开发者ID:funkygao,项目名称:gafka,代码行数:41,代码来源:get.go
示例13: discoverClusters
func (this *Discover) discoverClusters(zkzone *zk.ZkZone) {
this.Ui.Output(zkzone.Name())
existingClusters := zkzone.Clusters()
existingCluserPaths := make(map[string]struct{}, len(existingClusters))
for _, path := range existingClusters {
existingCluserPaths[path] = struct{}{}
}
discoveredClusters, err := zkzone.DiscoverClusters("/")
if err != nil {
this.Ui.Error(zkzone.Name() + ": " + err.Error())
return
}
// print each cluster state: new, normal
for _, zkpath := range discoveredClusters {
if _, present := existingCluserPaths[zkpath]; !present {
this.Ui.Output(strings.Repeat(" ", 4) + color.Green("%s +++",
zkpath))
} else {
this.Ui.Output(strings.Repeat(" ", 4) + zkpath)
}
}
// find the offline clusters
for c, path := range existingClusters {
path = strings.TrimSpace(path)
foundOnline := false
for _, p := range discoveredClusters {
p = strings.TrimSpace(p)
if p == path {
foundOnline = true
break
}
}
if !foundOnline {
this.Ui.Output(strings.Repeat(" ", 4) + color.Red("%s: %s ---", c, path))
}
}
}
开发者ID:funkygao,项目名称:gafka,代码行数:41,代码来源:discover.go
示例14: printConsumersByHost
func (this *Consumers) printConsumersByHost(zkzone *zk.ZkZone, clusterPattern string) {
outputs := make(map[string]map[string]map[string]int) // host: {cluster: {topic: count}}
this.Ui.Output(color.Blue(zkzone.Name()))
zkzone.ForSortedClusters(func(zkcluster *zk.ZkCluster) {
if !patternMatched(zkcluster.Name(), clusterPattern) {
return
}
consumerGroups := zkcluster.ConsumerGroups()
for _, group := range consumerGroups {
for _, c := range group {
if _, present := outputs[c.Host()]; !present {
outputs[c.Host()] = make(map[string]map[string]int)
}
if _, present := outputs[c.Host()][zkcluster.Name()]; !present {
outputs[c.Host()][zkcluster.Name()] = make(map[string]int)
}
for topic, count := range c.Subscription {
outputs[c.Host()][zkcluster.Name()][topic] += count
}
}
}
})
sortedHosts := make([]string, 0, len(outputs))
for host, _ := range outputs {
sortedHosts = append(sortedHosts, host)
}
sort.Strings(sortedHosts)
for _, host := range sortedHosts {
tc := outputs[host]
this.Ui.Output(fmt.Sprintf("%s %+v", color.Green("%22s", host), tc))
}
}
开发者ID:funkygao,项目名称:gafka,代码行数:39,代码来源:consumers.go
示例15: verifyPub
func (this *Verify) verifyPub() {
table := tablewriter.NewWriter(os.Stdout)
table.SetHeader([]string{"Kafka", "Stock", "PubSub", "Stock", "Diff", "?"})
for _, t := range this.topics {
if t.KafkaTopicName == "" {
continue
}
kafkaCluster := this.kafkaTopics[t.KafkaTopicName]
if kafkaCluster == "" {
this.Ui.Warn(fmt.Sprintf("invalid kafka topic: %s", t.KafkaTopicName))
continue
}
psubTopic := manager.Default.KafkaTopic(t.AppId, t.TopicName, "v1")
offsets := this.pubOffsetDiff(t.KafkaTopicName, kafkaCluster,
psubTopic, this.cluster)
var diff string
if offsets[0] == 0 && offsets[1] != 0 {
diff = color.Yellow("%d", offsets[1]-offsets[0])
} else if math.Abs(float64(offsets[0]-offsets[1])) < 20 {
diff = color.Green("%d", offsets[1]-offsets[0])
} else {
diff = color.Red("%d", offsets[1]-offsets[0])
}
problem := "N"
if _, present := this.problemeticTopics[t.KafkaTopicName]; present {
problem = color.Yellow("Y")
}
table.Append([]string{
t.KafkaTopicName, fmt.Sprintf("%d", offsets[0]),
t.TopicName, fmt.Sprintf("%d", offsets[1]), diff, problem})
}
table.Render()
}
开发者ID:funkygao,项目名称:gafka,代码行数:37,代码来源:verify.go
示例16: debug
func (c *Cluster) debug(format string, v ...interface{}) {
if c.logLevel <= LogLevelDebug {
pc, file, line, ok := runtime.Caller(1)
if !ok {
file = "<?>"
line = 0
} else {
if i := strings.LastIndex(file, "/"); i >= 0 {
file = file[i+1:]
}
}
fn := runtime.FuncForPC(pc).Name()
fnparts := strings.Split(fn, "/")
t := time.Now()
hour, min, sec := t.Clock()
nanosec := t.Nanosecond() / 1e3
debugLock.Lock()
var nodePrefix string = c.self.ID.String()
switch c.color {
case "red":
nodePrefix = color.Red(c.self.ID.String())
case "blue":
nodePrefix = color.Blue(c.self.ID.String())
case "yellow":
nodePrefix = color.Yellow(c.self.ID.String())
case "green":
nodePrefix = color.Green(c.self.ID.String())
}
fmt.Printf(nodePrefix+" [%d:%d:%d.%04d] %s:%d(%s): %s\n",
hour, min, sec, nanosec,
file, line, color.Red(fnparts[len(fnparts)-1]),
fmt.Sprintf(format, v...))
debugLock.Unlock()
}
}
开发者ID:funkygao,项目名称:pastry,代码行数:36,代码来源:cluster.go
示例17: Run
func (this *Host) Run(args []string) (exitCode int) {
cmdFlags := flag.NewFlagSet("host", flag.ContinueOnError)
cmdFlags.Usage = func() { this.Ui.Output(this.Help()) }
cmdFlags.StringVar(&this.zone, "z", "", "")
cmdFlags.StringVar(&this.host, "ip", "", "")
if err := cmdFlags.Parse(args); err != nil {
return 1
}
if validateArgs(this, this.Ui).
require("-z", "-h").
invalid(args) {
return 2
}
for {
this.diagnose()
this.Ui.Output(color.Green("%s", strings.Repeat("=", 40)))
time.Sleep(time.Second * 5)
}
return
}
开发者ID:funkygao,项目名称:gafka,代码行数:24,代码来源:host.go
示例18: printClusters
//.........这里部分代码省略.........
for _, topic := range topics {
partitions, err := kfk.Partitions(topic)
if err != nil {
ci.err = err.Error()
clusters = append(clusters, ci)
continue
}
partitionN += len(partitions)
}
clusters = append(clusters, clusterInfo{
name: zkcluster.Name(),
nickname: info.Nickname,
path: zkcluster.Chroot(),
topicN: len(topics),
partitionN: partitionN,
retention: info.Retention,
public: info.Public,
replicas: info.Replicas,
priority: info.Priority,
brokerInfos: info.Roster,
})
})
this.Ui.Output(fmt.Sprintf("%s: %d", zkzone.Name(), len(clusters)))
if this.verbose {
// 2 loop: 1. print the err clusters 2. print the good clusters
for _, c := range clusters {
if c.err == "" {
continue
}
this.Ui.Output(fmt.Sprintf("%30s: %s %s", c.name, c.path,
color.Red(c.err)))
}
// loop2
for _, c := range clusters {
if c.err != "" {
continue
}
this.Ui.Output(fmt.Sprintf("%30s: %s",
c.name, c.path))
brokers := []string{}
for _, broker := range c.brokerInfos {
if this.ipInNumber {
brokers = append(brokers, fmt.Sprintf("%d/%s:%d", broker.Id, broker.Host, broker.Port))
} else {
brokers = append(brokers, fmt.Sprintf("%d/%s", broker.Id, broker.NamedAddr()))
}
}
if len(brokers) > 0 {
sort.Strings(brokers)
this.Ui.Info(color.Green("%31s %s", " ", strings.Join(brokers, ", ")))
}
this.Ui.Output(strings.Repeat(" ", 4) +
color.Green("nick:%s public:%v topics:%d partitions:%d replicas:%d retention:%dh",
c.nickname, c.public,
c.topicN, c.partitionN, c.replicas, c.retention))
}
return
}
// not verbose mode
hostsWithoutDnsRecords := make([]string, 0)
for _, c := range clusters {
this.Ui.Output(fmt.Sprintf("%30s: %s", c.name, c.path))
brokers := []string{}
for _, broker := range c.brokerInfos {
if this.ipInNumber {
brokers = append(brokers, fmt.Sprintf("%d/%s:%d", broker.Id, broker.Host, broker.Port))
} else {
brokers = append(brokers, fmt.Sprintf("%d/%s", broker.Id, broker.NamedAddr()))
}
if broker.Addr() == broker.NamedAddr() {
hostsWithoutDnsRecords = append(hostsWithoutDnsRecords, fmt.Sprintf("%s:%s", c.name, broker.Addr()))
}
}
if len(brokers) > 0 {
sort.Strings(brokers)
this.Ui.Info(color.Green("%31s %s", " ", strings.Join(brokers, ", ")))
} else {
this.Ui.Warn(fmt.Sprintf("%31s no live registered brokers", " "))
}
}
if len(hostsWithoutDnsRecords) > 0 {
this.Ui.Warn("brokers without dns record:")
for _, broker := range hostsWithoutDnsRecords {
parts := strings.SplitN(broker, ":", 2)
this.Ui.Output(fmt.Sprintf("%30s: %s", parts[0], color.Yellow(parts[1])))
}
}
}
开发者ID:funkygao,项目名称:gafka,代码行数:101,代码来源:clusters.go
示例19: consumeSample
func (*Sample) consumeSample() string {
return fmt.Sprintf(`
public class KafkaConsumer {
private final ConsumerConnector consumer;
private KafkaConsumer() {
Properties props = new Properties();
props.put("%s", "zk2181a.wdds.zk.com:2181,zk2181b.wdds.zk.com:2181,zk2181c.wdds.zk.com:2181/kafka");
props.put("%s", "group1");
props.put("zookeeper.session.timeout.ms", "4000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "60000"); // 1m
//props.put("auto.offset.reset", "smallest"); // largest | smallest
props.put("serializer.class", "kafka.serializer.StringEncoder");
ConsumerConfig config = new ConsumerConfig(props);
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
}
public void shutdown() {
if (consumer != null) {
consumer.shutdown();
}
}
void consume(String topic, int %s) {
// %s
// %s
// %s
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
consumer.shutdown();
}
});
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, %s);
StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
Map<String, List<KafkaStream<String, String>>> consumerMap =
consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
KafkaStream<String, String> stream = consumerMap.get(topic).get(0);
ConsumerIterator<String, String> it = stream.iterator();
while (it.hasNext()) {
// consumer.commitOffsets(); // manually commit offsets
System.out.println(it.next().message());
}
}
public static void main(String[] args) {
new KafkaConsumer().consume();
}
}
`,
color.Cyan("zookeeper.connect"),
color.Cyan("group.id"),
color.Green("threads"),
color.Red("VERY important!"),
color.Red("graceful shutdown the consumer group to commit consumed offset"),
color.Red("avoid consuming duplicated message when restarting the same consumer group"),
color.Green("threads"))
}
开发者ID:chendx79,项目名称:gafka,代码行数:64,代码来源:sample.go
示例20: Run
func (this *Console) Run(args []string) (exitCode int) {
cmdFlags := flag.NewFlagSet("console", flag.ContinueOnError)
cmdFlags.Usage = func() { this.Ui.Output(this.Help()) }
cmdFlags.StringVar(&this.zone, "z", ctx.ZkDefaultZone(), "")
if err := cmdFlags.Parse(args); err != nil {
return 1
}
this.builtinCmds = []string{"help", "history", "ls", "cat", "pwd", "cd"}
this.cwd = "/"
this.zkzone = gzk.NewZkZone(gzk.DefaultConfig(this.zone, ctx.ZoneZkAddrs(this.zone)))
if err := this.zkzone.Connect(); err != nil {
panic(err)
}
defer this.zkzone.Close()
this.Line = liner.NewLiner()
this.Line.SetCtrlCAborts(true)
this.Line.SetCompleter(func(line string) (c []string) {
p := strings.SplitN(line, " ", 2)
if len(p) == 2 && strings.TrimSpace(p[1]) != "" {
children, _, err := this.zkzone.Conn().Children(this.cwd)
if err != nil {
this.Ui.Error(err.Error())
return
}
for _, child := range children {
if strings.HasPrefix(child, p[1]) {
c = append(c, fmt.Sprintf("%s %s", p[0], child))
}
}
return
}
for cmd, _ := range this.Cmds {
if strings.HasPrefix(cmd, strings.ToLower(line)) {
c = append(c, cmd)
}
}
for _, cmd := range this.builtinCmds {
if strings.HasPrefix(cmd, strings.ToLower(line)) {
c = append(c, cmd)
}
}
c = append(c, this.builtinCmds...)
return
})
defer this.Line.Close()
if usr, err := user.Current(); err == nil {
this.historyFile = filepath.Join(usr.HomeDir, fmt.Sprintf(".%s_history", this.Cmd))
if f, e := os.Open(this.historyFile); e == nil {
this.Line.ReadHistory(f)
f.Close()
}
}
for {
this.refreshPrompt()
line, err := this.Line.Prompt(color.Green("%s> ", this.prompt))
if err != nil {
break
}
line = strings.TrimSpace(line)
if line == "" {
continue
}
if line == "bye" || line == "q" || line == "quit" || line == "exit" {
break
}
this.runCommand(line)
// write out the history
if len(this.historyFile) > 0 {
this.Line.AppendHistory(line)
if f, e := os.Create(this.historyFile); e == nil {
this.Line.WriteHistory(f)
f.Close()
}
}
}
return
}
开发者ID:chendx79,项目名称:gafka,代码行数:89,代码来源:console.go
注:本文中的github.com/funkygao/golib/color.Green函数示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论