本文整理汇总了Golang中github.com/uber-go/zap.Logger类的典型用法代码示例。如果您正苦于以下问题:Golang Logger类的具体用法?Golang Logger怎么用?Golang Logger使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Logger类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Golang代码示例。
示例1: commitOffset
func (zom *zookeeperOffsetManager) commitOffset(topic string, partition int32, tracker *partitionOffsetTracker, logger zap.Logger) error {
err := tracker.commit(func(offset int64) error {
if offset >= 0 {
return zom.cg.group.CommitOffset(topic, partition, offset+1)
} else {
return nil
}
})
if err != nil {
logger.Warn("ZOOKEEPER: FAILED to commit offset",
zap.Int64("highestProcessedOffset", tracker.highestProcessedOffset),
zap.String("topic", topic),
zap.Int64("partition", int64(partition)),
)
} else if zom.config.VerboseLogging {
logger.Debug("ZOOKEEPER: Committed offset",
zap.Int64("lastCommittedOffset", tracker.lastCommittedOffset),
zap.String("topic", topic),
zap.Int64("partition", int64(partition)),
)
}
return err
}
开发者ID:topfreegames,项目名称:kafka,代码行数:25,代码来源:offset_manager.go
示例2: Init
func (w *hostWorkers) Init(db *content.DBrw, logger zap.Logger, baseHosts []string, cnt int) error {
hostMng := &hostsManager{}
err := hostMng.Init(db, baseHosts)
if err != nil {
return err
}
hosts := hostMng.GetHosts()
w.workers = make([]*hostWorker, 0)
cntPerHost := cnt / len(hosts)
if cntPerHost < 1 {
cntPerHost = 1
}
for hostName, hostID := range hosts {
worker := &hostWorker{Request: &request{hostMng: hostMng}}
worker.Request.Init(logger.With(zap.String("host", hostName)))
worker.Tasks, err = db.GetNewURLs(hostID, cntPerHost)
if err != nil {
return err
}
w.workers = append(w.workers, worker)
}
return nil
}
开发者ID:ReanGD,项目名称:go-web-search,代码行数:25,代码来源:host_workers.go
示例3: FinalizePartition
func (zom *zookeeperOffsetManager) FinalizePartition(topic string, partition int32, lastOffset int64, timeout time.Duration, replicaId int, logger zap.Logger) error {
zom.l.RLock()
tracker := zom.offsets[topic][partition]
zom.l.RUnlock()
if lastOffset >= 0 {
if lastOffset-tracker.highestProcessedOffset > 0 {
logger.Info("ZOOKEEPER: Finalizing partition. Waiting before processing remaining messages",
zap.Int("replicaId", replicaId),
zap.String("topic", topic),
zap.Int64("partition", int64(partition)),
zap.Int64("lastProcessedOffset", tracker.highestProcessedOffset),
zap.Duration("waitingTimeToProcessMoreMessages", timeout/time.Second),
zap.Int64("numMessagesToProcess", lastOffset-tracker.highestProcessedOffset),
)
if !tracker.waitForOffset(lastOffset, timeout) {
return fmt.Errorf("REP %d - TIMEOUT waiting for offset %d. Last committed offset: %d", replicaId, lastOffset, tracker.lastCommittedOffset)
}
}
if err := zom.commitOffset(topic, partition, tracker, logger); err != nil {
return fmt.Errorf("REP %d - FAILED to commit offset %d to Zookeeper. Last committed offset: %d", replicaId, tracker.highestProcessedOffset, tracker.lastCommittedOffset)
}
}
zom.l.Lock()
delete(zom.offsets[topic], partition)
zom.l.Unlock()
return nil
}
开发者ID:topfreegames,项目名称:kafka,代码行数:31,代码来源:offset_manager.go
示例4: WrongURLsToLog
// WrongURLsToLog - write to log add wrong URLs
func (h *HTMLMetadata) WrongURLsToLog(logger zap.Logger) {
for url, error := range h.wrongURLs {
logger.Warn("Error parse URL",
zap.String("err_url", url),
zap.String("details", error),
)
}
}
开发者ID:ReanGD,项目名称:go-web-search,代码行数:9,代码来源:html_metadata.go
示例5: Stop
//Stop gracefully closes the connection - it waits for the sending queue to clear, and then shuts down.
func (conn *Connection) Stop(logger zap.Logger) chan bool {
logger.Info("APNS: Shutting down one of the connections",
zap.Int("connectionId", conn.id),
)
conn.stopping <- true
return conn.stopped
//Thought: Don't necessarily need a channel here. Could signal finishing by closing errors?
}
开发者ID:topfreegames,项目名称:apns-go,代码行数:9,代码来源:connection.go
示例6: LogError
// LogError - write error to zap log
func LogError(logger zap.Logger, err error) {
werr, ok := err.(*ErrorEx)
if !ok {
logger.Error(err.Error())
} else {
logger.Log(werr.Level, werr.Error(), werr.Fields...)
}
}
开发者ID:ReanGD,项目名称:go-web-search,代码行数:9,代码来源:errors.go
示例7: Start
//Start initiates a connection to APNS and asnchronously sends notifications which have been queued.
func (conn *Connection) Start(logger zap.Logger) error {
//Connect to APNS. The reason this is here as well as in sender is that this probably catches any unavoidable errors in a synchronous fashion, while in sender it can reconnect after temporary errors (which should work most of the time.)
err := conn.connect(logger)
if err != nil {
logger.Fatal("APNS: Failed to connect",
zap.Int("connectionId", conn.id),
zap.Error(err),
)
return err
}
//Start sender goroutine
sent := make(chan PushNotification, 10000)
go conn.sender(conn.queue, sent, logger)
//Start limbo goroutine
go conn.limbo(sent, conn.responses, conn.errors, conn.queue, logger)
return nil
}
开发者ID:topfreegames,项目名称:apns-go,代码行数:18,代码来源:connection.go
示例8: reload
func (cg *ConsumerGroup) reload(logger zap.Logger) error {
cg.reloadMutex.Lock()
defer cg.reloadMutex.Unlock()
cg.singleReload.Do(func() {
logger.Info("KAFKA: Closing down old connections for replica",
zap.Int("replicaId", cg.replicaId),
)
err := cg.Close(logger)
if err != nil {
logger.Error("KAFKA: Failed to close consumergroup for replica",
zap.Int("replicaId", cg.replicaId),
zap.Error(err),
)
}
cg.Load(logger)
})
return nil
}
开发者ID:topfreegames,项目名称:kafka,代码行数:18,代码来源:consumer_group.go
示例9: topicListConsumer
func (cg *ConsumerGroup) topicListConsumer(topics []string, logger zap.Logger) {
for {
select {
case <-cg.stopper:
return
default:
}
consumers, consumerChanges, err := cg.group.WatchInstances()
if err != nil {
logger.Fatal("KAFKA: FAILED to get list of registered consumer instances for replica",
zap.Int("replicaId", cg.replicaId),
zap.Error(err),
)
return
}
cg.consumers = consumers
logger.Info("KAFKA: Got currently registered consumers for replica",
zap.Int("replicaId", cg.replicaId),
zap.Int("numRegisteredConsumers", len(cg.consumers)),
)
stopper := make(chan struct{})
for _, topic := range topics {
cg.wg.Add(1)
go cg.topicConsumer(topic, cg.messages, cg.errors, stopper, logger)
}
select {
case <-cg.stopper:
close(stopper)
return
case event := <-consumerChanges:
if event.Err == zk.ErrSessionExpired || event.Err == zk.ErrConnectionClosed {
logger.Info("KAFKA: Session was expired, reloading consumer for replica",
zap.Int("replicaId", cg.replicaId),
)
go cg.reload(logger)
<-cg.stopper
close(stopper)
return
} else {
logger.Info("KAFKA: Triggering rebalance due to consumer list change in replica",
zap.Int("replicaId", cg.replicaId),
)
close(stopper)
cg.wg.Wait()
}
}
}
}
开发者ID:topfreegames,项目名称:kafka,代码行数:54,代码来源:consumer_group.go
示例10: spinUntilReconnect
func (c *Connection) spinUntilReconnect(logger zap.Logger) {
var backoff = time.Duration(100)
for {
logger.Info("APNS: Connection lost. Reconnecting",
zap.Int("connectionId", c.id),
)
err := c.connect(logger)
if err != nil {
//Exponential backoff up to a limit
logger.Info("APNS: Error connecting to server",
zap.Int("connectionId", c.id),
zap.Error(err),
)
backoff = backoff * 2
if backoff > maxBackoff {
backoff = maxBackoff
}
time.Sleep(backoff)
} else {
backoff = 100
logger.Info("APNS: New connection established",
zap.Int("connectionId", c.id),
)
break
}
}
}
开发者ID:topfreegames,项目名称:apns-go,代码行数:27,代码来源:connection.go
示例11: reader
func (conn *Connection) reader(responses chan<- Response, logger zap.Logger) {
buffer := make([]byte, 6)
for {
n, err := conn.conn.Read(buffer)
if err != nil && n < 6 {
logger.Info("APNS: Connection error before reading complete response",
zap.Int("connectionId", conn.id),
zap.Int("n", n),
zap.Error(err),
)
conn.shouldReconnect <- true
return
} else if err != nil {
logger.Info("APNS: Connection error before reading complete response",
zap.Int("connectionId", conn.id),
zap.Error(err),
)
}
command := uint8(buffer[0])
if command != 8 {
logger.Info("APNS: Something went wrong in a connection - Command should have been 8 but it had other value instead",
zap.Int("connectionId", conn.id),
zap.Object("commandValue", command),
)
}
resp := newResponse()
resp.Identifier = binary.BigEndian.Uint32(buffer[2:6])
resp.Status = uint8(buffer[1])
responses <- resp
conn.shouldReconnect <- true
return
}
}
开发者ID:topfreegames,项目名称:apns-go,代码行数:33,代码来源:connection.go
示例12: GetBatchOfMessages
func (cg *ConsumerGroup) GetBatchOfMessages(batchSize int, logger zap.Logger) []string {
offsets := make(map[string]map[int32]int64)
groupOfMessages := []string{}
if batchSize == 0 {
return groupOfMessages
}
counter := 0
for {
if counter == batchSize {
break
}
cg.reloadMutex.Lock()
select {
case message := <-cg.messages:
if offsets[message.Topic] == nil {
offsets[message.Topic] = make(map[int32]int64)
}
if offsets[message.Topic][message.Partition] != 0 && offsets[message.Topic][message.Partition] != message.Offset-1 {
logger.Error("KAFKA: Unexpected offset for message topic and partition",
zap.String("messageTopic", message.Topic),
zap.Int64("messagePartition", int64(message.Partition)),
zap.Int64("offsetExpected", offsets[message.Topic][message.Partition]+1),
zap.Int64("offsetFound", message.Offset),
zap.Int64("offsetDifference", message.Offset-offsets[message.Topic][message.Partition]+1),
)
continue
}
groupOfMessages = append(groupOfMessages, string(message.Value))
offsets[message.Topic][message.Partition] = message.Offset
cg.CommitUpto(message)
counter += 1
cg.reloadMutex.Unlock()
default:
cg.reloadMutex.Unlock()
continue
}
}
return groupOfMessages
}
开发者ID:topfreegames,项目名称:kafka,代码行数:39,代码来源:consumer_group.go
示例13: connect
func (conn *Connection) connect(logger zap.Logger) error {
if conn.conn != nil {
conn.conn.Close()
}
if conn.connAux != nil {
conn.connAux.Close()
}
var cert tls.Certificate
var err error
if len(conn.CertificateBase64) == 0 && len(conn.KeyBase64) == 0 {
// The user did not specify raw block contents, so check the filesystem.
cert, err = tls.LoadX509KeyPair(conn.CertificateFile, conn.KeyFile)
} else {
// The user provided the raw block contents, so use that.
cert, err = tls.X509KeyPair([]byte(conn.CertificateBase64), []byte(conn.KeyBase64))
}
if err != nil {
logger.Fatal("APNS: Failed to obtain certificate",
zap.Error(err),
)
return err
}
conf := &tls.Config{
Certificates: []tls.Certificate{cert},
ServerName: strings.Split(conn.Gateway, ":")[0],
}
connAux, err := net.Dial("tcp", conn.Gateway)
if err != nil {
logger.Fatal("APNS: Failed while dialing gateway",
zap.String("gateway", conn.Gateway),
zap.Error(err),
)
return err
}
tlsConn := tls.Client(connAux, conf)
err = tlsConn.Handshake()
if err != nil {
logger.Fatal("APNS: Failed while handshaking",
zap.Error(err),
)
_ = tlsConn.Close()
return err
}
conn.conn = tlsConn
conn.connAux = connAux
//Start reader goroutine
go conn.reader(conn.responses, logger)
return nil
}
开发者ID:topfreegames,项目名称:apns-go,代码行数:53,代码来源:connection.go
示例14: Close
func (cg *ConsumerGroup) Close(logger zap.Logger) error {
shutdownError := AlreadyClosing
cg.singleShutdown.Do(func() {
defer cg.kazoo.Close()
shutdownError = nil
close(cg.stopper)
cg.wg.Wait()
if err := cg.offsetManager.Close(logger); err != nil {
logger.Error("KAFKA: FAILED closing the offset manager for replica!",
zap.Int("replicaId", cg.replicaId),
zap.Error(err),
)
}
if shutdownError = cg.instance.Deregister(); shutdownError != nil {
logger.Warn("KAFKA: Replica FAILED deregistering consumer instance",
zap.Int("replicaId", cg.replicaId),
zap.Error(shutdownError),
)
} else {
logger.Info("KAFKA: Replica deregistered consumer instance",
zap.Int("replicaId", cg.replicaId),
zap.String("instanceId", cg.instance.ID),
)
}
if shutdownError = cg.consumer.Close(); shutdownError != nil {
logger.Error("Replica FAILED closing the Sarama client",
zap.Int("replicaId", cg.replicaId),
zap.Error(shutdownError),
)
}
close(cg.messages)
close(cg.errors)
cg.instance = nil
})
return shutdownError
}
开发者ID:topfreegames,项目名称:kafka,代码行数:37,代码来源:consumer_group.go
示例15: partitionConsumer
// Consumes a partition
func (cg *ConsumerGroup) partitionConsumer(topic string, partition int32, messages chan<- *sarama.ConsumerMessage, errors chan<- *sarama.ConsumerError, wg *sync.WaitGroup, stopper <-chan struct{}, logger zap.Logger) {
defer wg.Done()
select {
case <-stopper:
return
default:
}
for maxRetries, tries := 3, 0; tries < maxRetries; tries++ {
if err := cg.instance.ClaimPartition(topic, partition); err == nil {
break
} else if err == kazoo.ErrPartitionClaimedByOther && tries+1 < maxRetries {
time.Sleep(1 * time.Second)
} else {
logger.Warn("KAFKA: Replica FAILED to claim partition",
zap.Int("replicaId", cg.replicaId),
zap.String("topic", topic),
zap.Int64("partition", int64(partition)),
zap.Error(err),
)
return
}
}
defer cg.instance.ReleasePartition(topic, partition)
nextOffset, err := cg.offsetManager.InitializePartition(topic, partition)
if err != nil {
logger.Error("KAFKA: Replica FAILED to determine initial offset",
zap.Int("replicaId", cg.replicaId),
zap.String("topic", topic),
zap.Int64("partition", int64(partition)),
zap.Error(err),
)
return
}
if nextOffset >= 0 {
logger.Info("KAFKA: Replica partition consumer starting at offset",
zap.Int("replicaId", cg.replicaId),
zap.String("topic", topic),
zap.Int64("partition", int64(partition)),
zap.Int64("nextOffset", nextOffset),
)
} else {
nextOffset = cg.config.Offsets.Initial
if nextOffset == sarama.OffsetOldest {
logger.Info("KAFKA: Replica partition consumer starting at the oldest available offset",
zap.Int("replicaId", cg.replicaId),
zap.String("topic", topic),
zap.Int64("partition", int64(partition)),
)
} else if nextOffset == sarama.OffsetNewest {
logger.Info("KAFKA: Replica partition consumer listening for new messages only",
zap.Int("replicaId", cg.replicaId),
zap.String("topic", topic),
zap.Int64("partition", int64(partition)),
)
}
}
consumer, err := cg.consumer.ConsumePartition(topic, partition, nextOffset)
if err == sarama.ErrOffsetOutOfRange {
logger.Warn("KAFKA: Replica partition consumer offset out of Range",
zap.Int("replicaId", cg.replicaId),
zap.String("topic", topic),
zap.Int64("partition", int64(partition)),
)
// if the offset is out of range, simplistically decide whether to use OffsetNewest or OffsetOldest
// if the configuration specified offsetOldest, then switch to the oldest available offset, else
// switch to the newest available offset.
if cg.config.Offsets.Initial == sarama.OffsetOldest {
nextOffset = sarama.OffsetOldest
logger.Info("KAFKA: Replica partition consumer offset reset to oldest available offset",
zap.Int("replicaId", cg.replicaId),
zap.String("topic", topic),
zap.Int64("partition", int64(partition)),
)
} else {
nextOffset = sarama.OffsetNewest
logger.Info("KAFKA: Replica partition consumer offset reset to newest available offset",
zap.Int("replicaId", cg.replicaId),
zap.String("topic", topic),
zap.Int64("partition", int64(partition)),
)
}
// retry the consumePartition with the adjusted offset
consumer, err = cg.consumer.ConsumePartition(topic, partition, nextOffset)
}
if err != nil {
logger.Fatal("KAFKA: Replica FAILED to start partition consumer",
zap.Int("replicaId", cg.replicaId),
zap.String("topic", topic),
zap.Int64("partition", int64(partition)),
zap.Error(err),
)
return
}
//.........这里部分代码省略.........
开发者ID:topfreegames,项目名称:kafka,代码行数:101,代码来源:consumer_group.go
示例16: topicConsumer
func (cg *ConsumerGroup) topicConsumer(topic string, messages chan<- *sarama.ConsumerMessage, errors chan<- *sarama.ConsumerError, stopper <-chan struct{}, logger zap.Logger) {
defer cg.wg.Done()
select {
case <-stopper:
return
default:
}
logger.Info("KAFKA: Replica started consumer for topic",
zap.Int("replicaId", cg.replicaId),
zap.String("topic", topic),
)
// Fetch a list of partition IDs
partitions, err := cg.kazoo.Topic(topic).Partitions()
if err != nil {
logger.Fatal("KAFKA: Replica FAILED to get list of partitions for topic",
zap.Int("replicaId", cg.replicaId),
zap.String("topic", topic),
zap.Error(err),
)
cg.errors <- &sarama.ConsumerError{
Topic: topic,
Partition: -1,
Err: err,
}
return
}
partitionLeaders, err := retrievePartitionLeaders(partitions)
if err != nil {
logger.Fatal("KAFKA: Replica FAILED to get leaders of partitions for topic",
zap.Int("replicaId", cg.replicaId),
zap.String("topic", topic),
zap.Error(err),
)
cg.errors <- &sarama.ConsumerError{
Topic: topic,
Partition: -1,
Err: err,
}
return
}
dividedPartitions := dividePartitionsBetweenConsumers(cg.consumers, partitionLeaders)
myPartitions := dividedPartitions[cg.instance.ID]
logger.Info("KAFKA: Replica is claiming partitions",
zap.Int("replicaId", cg.replicaId),
zap.String("topic", topic),
zap.Int("claimedPartitions", len(myPartitions)),
zap.Int("numPartitionLeaders", len(partitionLeaders)),
)
// Consume all the assigned partitions
var wg sync.WaitGroup
myPartitionsStr := ""
for _, pid := range myPartitions {
myPartitionsStr += fmt.Sprintf("%d ", pid.ID)
wg.Add(1)
go cg.partitionConsumer(topic, pid.ID, messages, errors, &wg, stopper, logger)
}
logger.Info("KAFKA: Retrieved replica's partitions",
zap.Int("replicaId", cg.replicaId),
zap.String("myPartitions", myPartitionsStr),
)
wg.Wait()
logger.Info("KAFKA: Replica stopped consumer of a topic",
zap.Int("replicaId", cg.replicaId),
zap.String("topic", topic),
)
}
开发者ID:topfreegames,项目名称:kafka,代码行数:71,代码来源:consumer_group.go
示例17: limbo
func (conn *Connection) limbo(sent <-chan PushNotification, responses chan Response, errors chan BadPushNotification, queue chan PushNotification, logger zap.Logger) {
stopping := false
limbo := make([]timedPushNotification, 0, SentBufferSize)
ticker := time.NewTicker(1 * time.Second)
for {
select {
case pn, ok := <-sent:
limbo = append(limbo, pn.timed())
stopping = false
if !ok {
logger.Info("APNS: Connection limbo - sent is closed, so sender is done. So am I, then",
zap.Int("connectionId", conn.id),
)
close(errors)
conn.stopped <- true
return
}
case <-conn.senderFinished:
//senderFinished means the sender thinks it's done.
//However, sender might not be - limbo could resend some, if there are any left here.
//So we just take note of this until limbo is empty too.
stopping = true
case resp, ok := <-responses:
if !ok {
//If the responses channel is closed,
//that means we're shutting down the connection.
}
for i, pn := range limbo {
if pn.Identifier == resp.Identifier {
if resp.Status != 10 {
//It was an error, we should report this on the error channel
bad := BadPushNotification{PushNotification: pn.PushNotification, Status: resp.Status}
errors <- bad
}
if len(limbo) > i {
toRequeue := len(limbo) - (i + 1)
if toRequeue > 0 {
conn.requeue(limbo[i+1:])
//We resent some notifications: that means we should wait for sender to tell us it's done, again.
stopping = false
}
}
}
}
limbo = make([]timedPushNotification, 0, SentBufferSize)
case <-ticker.C:
flushed := false
for i := range limbo {
if limbo[i].After(time.Now().Add(-TimeoutSeconds * time.Second)) {
if i > 0 {
newLimbo := make([]timedPushNotification, len(limbo[i:]), SentBufferSize)
copy(newLimbo, limbo[i:])
limbo = newLimbo
flushed = true
break
}
}
}
if !flushed {
limbo = make([]timedPushNotification, 0, SentBufferSize)
}
if stopping && len(limbo) == 0 {
//sender() is finished and so is limbo - so the connection is done.
logger.Info("APNS: Connection limbo - I've flushed all my notifications. Tell sender I'm done",
zap.Int("connectionId", conn.id),
)
conn.ackFinished <- true
}
}
}
}
开发者ID:topfreegames,项目名称:apns-go,代码行数:71,代码来源:connection.go
示例18: sender
func (conn *Connection) sender(queue <-chan PushNotification, sent chan PushNotification, logger zap.Logger) {
i := 0
stopping := false
defer conn.conn.Close()
defer conn.connAux.Close()
logger.Info("APNS: Starting sender for connection",
zap.Int("connectionId", conn.id),
)
for {
select {
case pn, ok := <-conn.queue:
if !ok {
logger.Info("APNS: Connection not okay; queue closed",
zap.Int("connectionId", conn.id),
)
//That means the Connection is stopped
//close sent?
return
}
//This means we saw a response; connection is over.
select {
case <-conn.shouldReconnect:
conn.conn.Close()
conn.conn = nil
conn.connAux.Close()
conn.connAux = nil
conn.spinUntilReconnect(logger)
default:
}
//Then send the push notification
pn.Priority = 10
payload, err := pn.ToBytes()
if err != nil {
logger.Info("APNS: Connection Error",
zap.Int("connectionId", conn.id),
zap.Error(err),
)
//Should report this on the bad notifications channel probably
} else {
if conn.conn == nil {
conn.spinUntilReconnect(logger)
}
_, err = conn.conn.Write(payload)
if err != nil {
logger.Info("APNS: Error writing payload",
zap.Error(err),
)
go func() {
conn.shouldReconnect <- true
}()
//Disconnect?
} else {
i++
sent <- pn
if stopping && len(queue) == 0 {
conn.senderFinished <- true
}
}
}
case <-conn.stopping:
logger.Info("APNS: Connection sender - Got a stop message",
zap.Int("connectionId", conn.id),
)
stopping = true
if len(queue) == 0 {
logger.Info("APNS: Connection sender - Stopping because ran out of things to send. Let's see if limbo is empty",
zap.Int("connectionId", conn.id),
)
conn.senderFinished <- true
}
case <-conn.ackFinished:
logger.Info("APNS: Connection sender - limbo is empty",
zap.Int("connectionId", conn.id),
)
if len(queue) == 0 {
logger.Info("APNS: Connection sender - limbo is empty and so am I",
zap.Int("connectionId", conn.id),
)
close(sent)
return
}
}
}
}
开发者ID:topfreegames,项目名称:apns-go,代码行数:84,代码来源:connection.go
示例19: Load
func (cg *ConsumerGroup) Load(logger zap.Logger) error {
var kz *kazoo.Kazoo
var err error
if kz, err = kazoo.NewKazoo(cg.zookeeper, cg.config.Zookeeper); err != nil {
return err
}
logger.Info("KAFKA: Getting broker list for replica",
zap.Int("replicaId", cg.replicaId),
)
brokers, err := kz.BrokerList()
if err != nil {
kz.Close()
return err
}
group := kz.Consumergroup(cg.config.ClientID)
instance := group.NewInstance()
var consumer sarama.Consumer
if consumer, err = sarama.NewConsumer(brokers, cg.config.Config); err != nil {
kz.Close()
return err
}
cg.kazoo = kz
cg.group = group
cg.instance = instance
cg.messages = make(chan *sarama.ConsumerMessage, cg.config.ChannelBufferSize)
cg.consumer = consumer
cg.singleShutdown = sync.Once{}
cg.errors = make(chan *sarama.ConsumerError, cg.config.ChannelBufferSize)
cg.stopper = make(chan struct{})
if exists, err := cg.group.Exists(); err != nil {
logger.Fatal("KAFKA: Replica failed to check existence of consumergroup",
zap.Int("replicaId", cg.replicaId),
zap.Error(err),
)
consumer.Close()
kz.Close()
return err
} else if !exists {
logger.Info("KAFKA: Consumergroup does not exist, creating it",
zap.Int("replicaId", cg.replicaId),
zap.String("consumerGroupName", cg.group.Name),
)
if err := cg.group.Create(); err != nil {
logger.Fatal("KAFKA: Failed to create consumergroup in Zookeeper",
zap.Int("replicaId", cg.replicaId),
zap.Error(err),
)
consumer.Close()
kz.Close()
return err
}
}
if err := cg.instance.Register(cg.topics); err != nil {
logger.Fatal("KAFKA: Failed to create consumer instance",
zap.Int("replicaId", cg.replicaId),
zap.Error(err),
)
return err
} else {
logger.Info("KAFKA: Consumer instance registered",
zap.Int("replicaId", cg.replicaId),
)
}
offsetConfig := OffsetManagerConfig{
CommitInterval: cg.config.Offsets.CommitInterval,
EnableAutoCommit: cg.config.EnableOffsetAutoCommit,
}
cg.offsetManager = NewZookeeperOffsetManager(cg, &offsetConfig, logger)
go cg.topicListConsumer(cg.topics, logger)
return nil
}
开发者ID:topfreegames,项目名称:kafka,代码行数:79,代码来源:consumer_group.go
示例20: Start
func (conn_pool *ConnectionPool) Start(logger zap.Logger) {
logger.Info("APNS: Starting pool of connections")
go conn_pool.sendLoop()
}
开发者ID:topfreegames,项目名称:apns-go,代码行数:4,代码来源:connection_pool.go
注:本文中的github.com/uber-go/zap.Logger类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论