本文整理汇总了Golang中github.com/cockroachdb/cockroach/proto.RaftNodeID函数的典型用法代码示例。如果您正苦于以下问题:Golang RaftNodeID函数的具体用法?Golang RaftNodeID怎么用?Golang RaftNodeID使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了RaftNodeID函数的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Golang代码示例。
示例1: maybeSendLeaderEvent
// maybeSendLeaderEvent processes a raft.Ready to send events in response to leadership
// changes (this includes both sending an event to the app and retrying any pending
// proposals).
func (s *state) maybeSendLeaderEvent(groupID proto.RaftID, g *group, ready *raft.Ready) {
term := g.committedTerm
if ready.SoftState != nil {
// Always save the leader whenever we get a SoftState.
g.leader = proto.RaftNodeID(ready.SoftState.Lead)
}
if len(ready.CommittedEntries) > 0 {
term = ready.CommittedEntries[len(ready.CommittedEntries)-1].Term
}
if term != g.committedTerm && g.leader != 0 {
// Whenever the committed term has advanced and we know our leader,
// emit an event.
g.committedTerm = term
s.sendEvent(&EventLeaderElection{
GroupID: groupID,
NodeID: proto.RaftNodeID(g.leader),
Term: g.committedTerm,
})
// Re-submit all pending proposals
for _, prop := range g.pending {
s.propose(prop)
}
}
}
开发者ID:huaxling,项目名称:cockroach,代码行数:28,代码来源:multiraft.go
示例2: Send
func (lt *localRPCTransport) Send(req *RaftMessageRequest) error {
client, err := lt.getClient(proto.RaftNodeID(req.Message.To))
if err != nil {
return err
}
call := client.Go(raftMessageName, req, &RaftMessageResponse{}, nil)
select {
case <-call.Done:
// If the call failed synchronously, report an error.
return call.Error
default:
// Otherwise, fire-and-forget.
go func() {
select {
case <-call.Done:
case <-lt.closed:
return
}
if call.Error != nil {
log.Errorf("sending rpc failed: %s", call.Error)
}
}()
return nil
}
}
开发者ID:Hellblazer,项目名称:cockroach,代码行数:25,代码来源:transport.go
示例3: start
func (lt *localInterceptableTransport) start() {
lt.stopper.RunWorker(func() {
for {
select {
case msg := <-lt.messages:
ack := make(chan struct{})
iMsg := &interceptMessage{
args: msg,
ack: ack,
}
// The following channel ops are not protected by a select with ShouldStop
// since leaving things partially complete here could prevent other components
// from shutting down cleanly.
lt.Events <- iMsg
<-ack
lt.mu.Lock()
srv, ok := lt.listeners[proto.RaftNodeID(msg.Message.To)]
lt.mu.Unlock()
if !ok {
continue
}
err := srv.RaftMessage(msg, nil)
if err == ErrStopped {
return
} else if err != nil {
log.Fatal(err)
}
case <-lt.stopper.ShouldStop():
return
}
}
})
}
开发者ID:huaxling,项目名称:cockroach,代码行数:34,代码来源:transport_test.go
示例4: handleMessage
func (lt *localInterceptableTransport) handleMessage(msg *RaftMessageRequest) {
ack := make(chan struct{})
iMsg := &interceptMessage{
args: msg,
ack: ack,
}
// The following channel ops are not protected by a select with
// ShouldStop since we are running under a StartTask and leaving
// things partially complete here could prevent other components
// from shutting down cleanly.
lt.Events <- iMsg
<-ack
lt.mu.Lock()
srv, ok := lt.listeners[proto.RaftNodeID(msg.Message.To)]
lt.mu.Unlock()
if !ok {
return
}
err := srv.RaftMessage(msg, nil)
if err == ErrStopped {
return
} else if err != nil {
log.Fatal(err)
}
}
开发者ID:Gardenya,项目名称:cockroach,代码行数:25,代码来源:transport_test.go
示例5: RaftMessage
// RaftMessage proxies the incoming request to the listening server interface.
func (t *rpcTransport) RaftMessage(args gogoproto.Message, callback func(gogoproto.Message, error)) {
protoReq := args.(*proto.RaftMessageRequest)
// Convert from proto to internal formats.
req := &multiraft.RaftMessageRequest{GroupID: protoReq.GroupID}
if err := req.Message.Unmarshal(protoReq.Msg); err != nil {
callback(nil, err)
return
}
t.mu.Lock()
server, ok := t.servers[proto.RaftNodeID(req.Message.To)]
t.mu.Unlock()
if !ok {
callback(nil, util.Errorf("Unable to proxy message to node: %d", req.Message.To))
return
}
// Raft responses are empty so we don't actually need to convert
// between multiraft's internal struct and the external proto
// representation. In fact, we don't even need to wait for the
// message to be processed to invoke the callback. We are just
// (ab)using the async handler mechanism to get this (synchronous)
// handler called in the RPC server's goroutine so we can preserve
// order of incoming messages.
err := server.RaftMessage(req, &multiraft.RaftMessageResponse{})
callback(&proto.RaftMessageResponse{}, err)
}
开发者ID:knorwood,项目名称:cockroach,代码行数:29,代码来源:raft_transport.go
示例6: newNotLeaderError
// newNotLeaderError returns a NotLeaderError intialized with the
// replica for the holder (if any) of the given lease.
func (r *Range) newNotLeaderError(l *proto.Lease) error {
err := &proto.NotLeaderError{}
if l != nil && l.RaftNodeID != 0 {
_, err.Replica = r.Desc().FindReplica(r.rm.StoreID())
_, storeID := proto.DecodeRaftNodeID(proto.RaftNodeID(l.RaftNodeID))
_, err.Leader = r.Desc().FindReplica(storeID)
}
return err
}
开发者ID:huaxling,项目名称:cockroach,代码行数:11,代码来源:range.go
示例7: newNotLeaderError
// newNotLeaderError returns a NotLeaderError intialized with the
// replica for the holder (if any) of the given lease.
func (r *Replica) newNotLeaderError(l *proto.Lease, originNode proto.RaftNodeID) error {
err := &proto.NotLeaderError{}
if l != nil && l.RaftNodeID != 0 {
_, originStoreID := proto.DecodeRaftNodeID(originNode)
_, err.Replica = r.Desc().FindReplica(originStoreID)
_, storeID := proto.DecodeRaftNodeID(proto.RaftNodeID(l.RaftNodeID))
_, err.Leader = r.Desc().FindReplica(storeID)
}
return err
}
开发者ID:ErikGrimes,项目名称:cockroach,代码行数:12,代码来源:replica.go
示例8: fanoutHeartbeat
// fanoutHeartbeat sends the given heartbeat to all groups which believe that
// their leader resides on the sending node.
func (s *state) fanoutHeartbeat(req *RaftMessageRequest) {
// A heartbeat message is expanded into a heartbeat for each group
// that the remote node is a part of.
fromID := proto.RaftNodeID(req.Message.From)
originNode, ok := s.nodes[fromID]
if !ok {
// When a leader considers a follower to be down, it doesn't begin recovery
// until the follower has successfully responded to a heartbeat. If we get a
// heartbeat from a node we don't know, it must think we are a follower of
// some group, so we need to respond so it can activate the recovery process.
log.Warningf("node %v: not fanning out heartbeat from unknown node %v (but responding anyway)",
s.nodeID, fromID)
s.sendMessage(noGroup,
raftpb.Message{
From: uint64(s.nodeID),
To: req.Message.From,
Type: raftpb.MsgHeartbeatResp,
})
return
}
cnt := 0
for groupID := range originNode.groupIDs {
// If we don't think that the sending node is leading that group, don't
// propagate.
if s.groups[groupID].leader != fromID || fromID == s.nodeID {
if log.V(8) {
log.Infof("node %v: not fanning out heartbeat to %v, msg is from %d and leader is %d",
s.nodeID, req.Message.To, fromID, s.groups[groupID].leader)
}
continue
}
if err := s.multiNode.Step(context.Background(), uint64(groupID), req.Message); err != nil {
if log.V(4) {
log.Infof("node %v: coalesced heartbeat step to group %v failed for message %s", s.nodeID, groupID,
raft.DescribeMessage(req.Message, s.EntryFormatter))
}
}
cnt++
}
if cnt > 0 {
s.sendMessage(noGroup,
raftpb.Message{
From: uint64(s.nodeID),
To: req.Message.From,
Type: raftpb.MsgHeartbeatResp,
})
}
if log.V(7) {
log.Infof("node %v: received coalesced heartbeat from node %v; "+
"fanned out to %d followers in %d overlapping groups",
s.nodeID, fromID, cnt, len(originNode.groupIDs))
}
}
开发者ID:huaxling,项目名称:cockroach,代码行数:55,代码来源:multiraft.go
示例9: RaftMessage
// RaftMessage proxies the incoming request to the listening server interface.
func (t *transportRPCServer) RaftMessage(protoReq *proto.RaftMessageRequest,
resp *proto.RaftMessageResponse) error {
// Convert from proto to internal formats.
req := &multiraft.RaftMessageRequest{GroupID: protoReq.GroupID}
if err := req.Message.Unmarshal(protoReq.Msg); err != nil {
return err
}
t.mu.Lock()
server, ok := t.servers[proto.RaftNodeID(req.Message.To)]
t.mu.Unlock()
if ok {
return server.RaftMessage(req, &multiraft.RaftMessageResponse{})
}
return util.Errorf("Unable to proxy message to node: %d", req.Message.To)
}
开发者ID:Hellblazer,项目名称:cockroach,代码行数:19,代码来源:raft_transport.go
示例10: Send
// Send a message to the recipient specified in the request.
func (t *rpcTransport) Send(req *multiraft.RaftMessageRequest) error {
raftNodeID := proto.RaftNodeID(req.Message.To)
t.mu.Lock()
ch, ok := t.queues[raftNodeID]
if !ok {
ch = make(chan *multiraft.RaftMessageRequest, raftSendBufferSize)
t.queues[raftNodeID] = ch
go t.processQueue(raftNodeID)
}
t.mu.Unlock()
select {
case ch <- req:
default:
return util.Errorf("queue for node %d is full", req.Message.To)
}
return nil
}
开发者ID:knorwood,项目名称:cockroach,代码行数:19,代码来源:raft_transport.go
示例11: processRaftCommand
// processRaftCommand processes a raft command by unpacking the command
// struct to get args and reply and then applying the command to the
// state machine via applyRaftCommand(). The error result is sent on
// the command's done channel, if available.
func (r *Range) processRaftCommand(idKey cmdIDKey, index uint64, raftCmd proto.InternalRaftCommand) error {
if index == 0 {
log.Fatalc(r.context(), "processRaftCommand requires a non-zero index")
}
r.Lock()
cmd := r.pendingCmds[idKey]
delete(r.pendingCmds, idKey)
r.Unlock()
args := raftCmd.Cmd.GetValue().(proto.Request)
var reply proto.Response
var ctx context.Context
if cmd != nil {
// We initiated this command, so use the caller-supplied reply.
reply = cmd.Reply
ctx = cmd.ctx
} else {
// This command originated elsewhere so we must create a new reply buffer.
reply = args.CreateReply()
// TODO(tschottdorf): consider the Trace situation here.
ctx = r.context()
}
execDone := tracer.FromCtx(ctx).Epoch(fmt.Sprintf("applying %s", args.Method()))
// applyRaftCommand will return "expected" errors, but may also indicate
// replica corruption (as of now, signaled by a replicaCorruptionError).
// We feed its return through maybeSetCorrupt to act when that happens.
err := r.maybeSetCorrupt(
r.applyRaftCommand(ctx, index, proto.RaftNodeID(raftCmd.OriginNodeID), args, reply),
)
execDone()
if cmd != nil {
cmd.done <- err
} else if err != nil && log.V(1) {
log.Errorc(r.context(), "error executing raft command %s: %s", args.Method(), err)
}
return err
}
开发者ID:jusongchen,项目名称:cockroach,代码行数:45,代码来源:range.go
示例12: sendMessage
// sendMessage sends a raft message on the given group. Coalesced heartbeats
// address nodes, not groups; they will use the noGroup constant as groupID.
func (s *state) sendMessage(groupID proto.RaftID, msg raftpb.Message) {
if log.V(6) {
log.Infof("node %v sending message %.200s to %v", s.nodeID,
raft.DescribeMessage(msg, s.EntryFormatter), msg.To)
}
nodeID := proto.RaftNodeID(msg.To)
if _, ok := s.nodes[nodeID]; !ok {
if log.V(4) {
log.Infof("node %v: connecting to new node %v", s.nodeID, nodeID)
}
var err error
if groupID != noGroup {
err = s.addNode(nodeID, groupID)
} else {
err = s.addNode(nodeID)
}
if err != nil {
log.Errorf("node %v: error adding group %v to node %v: %v",
s.nodeID, groupID, nodeID, err)
}
}
err := s.Transport.Send(&RaftMessageRequest{groupID, msg})
snapStatus := raft.SnapshotFinish
if err != nil {
log.Warningf("node %v failed to send message to %v: %s", s.nodeID, nodeID, err)
if groupID != noGroup {
s.multiNode.ReportUnreachable(msg.To, uint64(groupID))
}
snapStatus = raft.SnapshotFailure
}
if msg.Type == raftpb.MsgSnap {
// TODO(bdarnell): add an ack for snapshots and don't report status until
// ack, error, or timeout.
if groupID != noGroup {
s.multiNode.ReportSnapshot(msg.To, uint64(groupID), snapStatus)
}
}
}
开发者ID:huaxling,项目名称:cockroach,代码行数:40,代码来源:multiraft.go
示例13: newTestCluster
func newTestCluster(transport Transport, size int, stopper *stop.Stopper, t *testing.T) *testCluster {
if transport == nil {
transport = NewLocalRPCTransport(stopper)
}
stopper.AddCloser(transport)
cluster := &testCluster{
t: t,
transport: transport,
groups: map[proto.RangeID][]int{},
}
for i := 0; i < size; i++ {
ticker := newManualTicker()
storage := &BlockableStorage{storage: NewMemoryStorage()}
config := &Config{
Transport: transport,
Storage: storage,
Ticker: ticker,
ElectionTimeoutTicks: 2,
HeartbeatIntervalTicks: 1,
TickInterval: time.Hour, // not in use
}
mr, err := NewMultiRaft(proto.RaftNodeID(i+1), config, stopper)
if err != nil {
t.Fatal(err)
}
state := newState(mr)
demux := newEventDemux(state.Events)
demux.start(stopper)
cluster.nodes = append(cluster.nodes, state)
cluster.tickers = append(cluster.tickers, ticker)
cluster.events = append(cluster.events, demux)
cluster.storages = append(cluster.storages, storage)
}
cluster.start()
return cluster
}
开发者ID:kangxinrong,项目名称:cockroach,代码行数:37,代码来源:multiraft_test.go
示例14: removeGroup
func (s *state) removeGroup(op *removeGroupOp, readyGroups map[uint64]raft.Ready) {
// Group creation is lazy and idempotent; so is removal.
g, ok := s.groups[op.groupID]
if !ok {
op.ch <- nil
return
}
if log.V(3) {
log.Infof("node %v removing group %v", s.nodeID, op.groupID)
}
// Cancel commands which are still in transit.
for _, prop := range g.pending {
s.removePending(g, prop, ErrGroupDeleted)
}
if err := s.multiNode.RemoveGroup(uint64(op.groupID)); err != nil {
op.ch <- err
return
}
gs := s.Storage.GroupStorage(op.groupID)
_, cs, err := gs.InitialState()
if err != nil {
op.ch <- err
}
for _, nodeID := range cs.Nodes {
s.nodes[proto.RaftNodeID(nodeID)].unregisterGroup(op.groupID)
}
// Delete any entries for this group in readyGroups.
if readyGroups != nil {
delete(readyGroups, uint64(op.groupID))
}
delete(s.groups, op.groupID)
op.ch <- nil
}
开发者ID:huaxling,项目名称:cockroach,代码行数:36,代码来源:multiraft.go
示例15: handleWriteResponse
// handleWriteResponse updates the state machine and sends messages for a raft Ready batch.
func (s *state) handleWriteResponse(response *writeResponse, readyGroups map[uint64]raft.Ready) {
if log.V(6) {
log.Infof("node %v got write response: %#v", s.nodeID, *response)
}
// Everything has been written to disk; now we can apply updates to the state machine
// and send outgoing messages.
for groupID, ready := range readyGroups {
raftGroupID := proto.RaftID(groupID)
g, ok := s.groups[raftGroupID]
if !ok {
if log.V(4) {
log.Infof("dropping stale write to group %v", groupID)
}
continue
} else if !g.writing {
if log.V(4) {
log.Infof("dropping stale write to reincarnation of group %v", groupID)
}
delete(readyGroups, groupID) // they must not make it to Advance.
continue
}
g.writing = false
// Process committed entries.
for _, entry := range ready.CommittedEntries {
commandID := s.processCommittedEntry(raftGroupID, g, entry)
// TODO(bdarnell): the command is now committed, but not applied until the
// application consumes EventCommandCommitted. Is returning via the channel
// at this point useful or do we need to wait for the command to be
// applied too?
// This could be done with a Callback as in EventMembershipChangeCommitted
// or perhaps we should move away from a channel to a callback-based system.
s.removePending(g, g.pending[commandID], nil /* err */)
}
if !raft.IsEmptySnap(ready.Snapshot) {
// Sync the group/node mapping with the information contained in the snapshot.
for _, nodeID := range ready.Snapshot.Metadata.ConfState.Nodes {
// TODO(bdarnell): if we had any information that predated this snapshot
// we must remove those nodes.
if err := s.addNode(proto.RaftNodeID(nodeID), raftGroupID); err != nil {
log.Errorf("node %v: error adding node %v", s.nodeID, nodeID)
}
}
}
// Process SoftState and leader changes.
s.maybeSendLeaderEvent(raftGroupID, g, &ready)
// Send all messages.
for _, msg := range ready.Messages {
switch msg.Type {
case raftpb.MsgHeartbeat:
if log.V(8) {
log.Infof("node %v dropped individual heartbeat to node %v",
s.nodeID, msg.To)
}
case raftpb.MsgHeartbeatResp:
if log.V(8) {
log.Infof("node %v dropped individual heartbeat response to node %v",
s.nodeID, msg.To)
}
default:
s.sendMessage(raftGroupID, msg)
}
}
}
}
开发者ID:huaxling,项目名称:cockroach,代码行数:69,代码来源:multiraft.go
示例16: processCommittedEntry
// processCommittedEntry tells the application that a command was committed.
// Returns the commandID, or an empty string if the given entry was not a command.
func (s *state) processCommittedEntry(groupID proto.RaftID, g *group, entry raftpb.Entry) string {
var commandID string
switch entry.Type {
case raftpb.EntryNormal:
// etcd raft occasionally adds a nil entry (e.g. upon election); ignore these.
if entry.Data != nil {
var command []byte
commandID, command = decodeCommand(entry.Data)
s.sendEvent(&EventCommandCommitted{
GroupID: groupID,
CommandID: commandID,
Command: command,
Index: entry.Index,
})
}
case raftpb.EntryConfChange:
cc := raftpb.ConfChange{}
if err := cc.Unmarshal(entry.Data); err != nil {
log.Fatalf("invalid ConfChange data: %s", err)
}
var payload []byte
if len(cc.Context) > 0 {
commandID, payload = decodeCommand(cc.Context)
}
s.sendEvent(&EventMembershipChangeCommitted{
GroupID: groupID,
CommandID: commandID,
Index: entry.Index,
NodeID: proto.RaftNodeID(cc.NodeID),
ChangeType: cc.Type,
Payload: payload,
Callback: func(err error) {
select {
case s.callbackChan <- func() {
if err == nil {
if log.V(3) {
log.Infof("node %v applying configuration change %v", s.nodeID, cc)
}
// TODO(bdarnell): dedupe by keeping a record of recently-applied commandIDs
switch cc.Type {
case raftpb.ConfChangeAddNode:
err = s.addNode(proto.RaftNodeID(cc.NodeID), proto.RaftID(groupID))
case raftpb.ConfChangeRemoveNode:
// TODO(bdarnell): support removing nodes; fix double-application of initial entries
case raftpb.ConfChangeUpdateNode:
// Updates don't concern multiraft, they are simply passed through.
}
if err != nil {
log.Errorf("error applying configuration change %v: %s", cc, err)
}
s.multiNode.ApplyConfChange(uint64(groupID), cc)
} else {
log.Warningf("aborting configuration change: %s", err)
s.multiNode.ApplyConfChange(uint64(groupID),
raftpb.ConfChange{})
}
// Re-submit all pending proposals, in case any of them were config changes
// that were dropped due to the one-at-a-time rule. This is a little
// redundant since most pending proposals won't benefit from this but
// config changes should be rare enough (and the size of the pending queue
// small enough) that it doesn't really matter.
for _, prop := range g.pending {
s.propose(prop)
}
}:
case <-s.stopper.ShouldStop():
}
},
})
}
return commandID
}
开发者ID:huaxling,项目名称:cockroach,代码行数:76,代码来源:multiraft.go
示例17: createGroup
func (s *state) createGroup(groupID proto.RaftID) error {
if _, ok := s.groups[groupID]; ok {
return nil
}
if log.V(3) {
log.Infof("node %v creating group %v", s.nodeID, groupID)
}
gs := s.Storage.GroupStorage(groupID)
_, cs, err := gs.InitialState()
if err != nil {
return err
}
var appliedIndex uint64
if s.StateMachine != nil {
appliedIndex, err = s.StateMachine.AppliedIndex(groupID)
if err != nil {
return err
}
}
raftCfg := &raft.Config{
Applied: appliedIndex,
ElectionTick: s.ElectionTimeoutTicks,
HeartbeatTick: s.HeartbeatIntervalTicks,
Storage: gs,
// TODO(bdarnell): make these configurable; evaluate defaults.
MaxSizePerMsg: 1024 * 1024,
MaxInflightMsgs: 256,
}
if err := s.multiNode.CreateGroup(uint64(groupID), raftCfg, nil); err != nil {
return err
}
s.groups[groupID] = &group{
pending: map[string]*proposal{},
}
for _, nodeID := range cs.Nodes {
if err := s.addNode(proto.RaftNodeID(nodeID), groupID); err != nil {
return err
}
}
// Automatically campaign and elect a leader for this group if there's
// exactly one known node for this group.
//
// A grey area for this being correct happens in the case when we're
// currently in the progress of adding a second node to the group,
// with the change committed but not applied.
// Upon restarting, the node would immediately elect itself and only
// then apply the config change, where really it should be applying
// first and then waiting for the majority (which would now require
// two votes, not only its own).
// However, in that special case, the second node has no chance to
// be elected master while this node restarts (as it's aware of the
// configuration and knows it needs two votes), so the worst that
// could happen is both nodes ending up in candidate state, timing
// out and then voting again. This is expected to be an extremely
// rare event.
if len(cs.Nodes) == 1 && s.MultiRaft.nodeID == proto.RaftNodeID(cs.Nodes[0]) {
return s.multiNode.Campaign(context.Background(), uint64(groupID))
}
return nil
}
开发者ID:huaxling,项目名称:cockroach,代码行数:65,代码来源:multiraft.go
示例18: start
func (s *state) start() {
s.stopper.RunWorker(func() {
defer func() {
if log.V(6) {
log.Infof("node %v: stopping", s.nodeID)
}
s.stop()
}()
if log.V(1) {
log.Infof("node %v starting", s.nodeID)
}
s.writeTask.start(s.stopper)
// These maps form a kind of state machine: We don't want to read from the
// ready channel until the groups we got from the last read have made their
// way through the rest of the pipeline.
var readyGroups map[uint64]raft.Ready
var writingGroups map[uint64]raft.Ready
// Counts up to heartbeat interval and is then reset.
ticks := 0
for {
// raftReady signals that the Raft state machine has pending
// work. That work is supplied over the raftReady channel as a map
// from group ID to raft.Ready struct.
var raftReady <-chan map[uint64]raft.Ready
// writeReady is set to the write task's ready channel, which
// receives when the write task is prepared to persist ready data
// from the Raft state machine.
// The writeReady mechanism is currently disabled as we are testing
// performing all writes synchronously.
// TODO(bdarnell): either reinstate writeReady or rip it out completely.
//var writeReady chan struct{}
// The order of operations in this loop structure is as follows:
// start by setting raftReady to the multiNode's Ready()
// channel. Once a new raftReady has been consumed from the
// channel, set writeReady to the write task's ready channel and
// set raftReady back to nil. This advances our read-from-raft /
// write-to-storage state machine to the next step: wait for the
// write task to be ready to persist the new data.
if readyGroups != nil {
//writeReady = s.writeTask.ready
} else if writingGroups == nil {
raftReady = s.multiNode.Ready()
}
if log.V(8) {
log.Infof("node %v: selecting", s.nodeID)
}
select {
case <-s.stopper.ShouldStop():
return
case req := <-s.reqChan:
if log.V(5) {
log.Infof("node %v: group %v got message %.200s", s.nodeID, req.GroupID,
raft.DescribeMessage(req.Message, s.EntryFormatter))
}
switch req.Message.Type {
case raftpb.MsgHeartbeat:
s.fanoutHeartbeat(req)
case raftpb.MsgHeartbeatResp:
s.fanoutHeartbeatResponse(proto.RaftNodeID(req.Message.From))
default:
// We only want to lazily create the group if it's not heartbeat-related;
// our heartbeats are coalesced and contain a dummy GroupID.
// TODO(tschottdorf) still shouldn't hurt to move this part outside,
// but suddenly tests will start failing. Should investigate.
if _, ok := s.groups[req.GroupID]; !ok {
log.Infof("node %v: got message for unknown group %d; creating it", s.nodeID, req.GroupID)
if err := s.createGroup(req.GroupID); err != nil {
log.Warningf("Error creating group %d: %s", req.GroupID, err)
break
}
}
if err := s.multiNode.Step(context.Background(), uint64(req.GroupID), req.Message); err != nil {
if log.V(4) {
log.Infof("node %v: multinode step to group %v failed for message %.200s", s.nodeID, req.GroupID,
raft.DescribeMessage(req.Message, s.EntryFormatter))
}
}
}
case op := <-s.createGroupChan:
if log.V(6) {
log.Infof("node %v: got op %#v", s.nodeID, op)
}
op.ch <- s.createGroup(op.groupID)
case op := <-s.removeGroupChan:
if log.V(6) {
log.Infof("node %v: got op %#v", s.nodeID, op)
}
s.removeGroup(op, readyGroups)
case prop := <-s.proposalChan:
s.propose(prop)
case readyGroups = <-raftReady:
// readyGroups are saved in a local variable until they can be sent to
// the write task (and then the real work happens after the write is
//.........这里部分代码省略.........
开发者ID:huaxling,项目名称:cockroach,代码行数:101,代码来源:multiraft.go
示例19: processCommittedEntry
// processCommittedEntry tells the application that a command was committed.
// Returns the commandID, or an empty string if the given entry was not a command.
func (s *state) processCommittedEntry(groupID proto.RangeID, g *group, entry raftpb.Entry) string {
var commandID string
switch entry.Type {
case raftpb.EntryNormal:
// etcd raft occasionally adds a nil entry (e.g. upon election); ignore these.
if entry.Data != nil {
var command []byte
commandID, command = decodeCommand(entry.Data)
s.sendEvent(&EventCommandCommitted{
GroupID: groupID,
CommandID: commandID,
Command: command,
Index: entry.Index,
})
}
case raftpb.EntryConfChange:
cc := raftpb.ConfChange{}
if err := cc.Unmarshal(entry.Data); err != nil {
log.Fatalf("invalid ConfChange data: %s", err)
}
var payload []byte
if len(cc.Context) > 0 {
commandID, payload = decodeCommand(cc.Context)
}
g.waitForCallback = true
s.sendEvent(&EventMembershipChangeCommitted{
GroupID: groupID,
CommandID: commandID,
Index: entry.Index,
NodeID: proto.RaftNodeID(cc.NodeID),
ChangeType: cc.Type,
Payload: payload,
Callback: func(err error) {
select {
case s.callbackChan <- func() {
if err == nil {
if log.V(3) {
log.Infof("node %v applying configuration change %v", s.nodeID, cc)
}
// TODO(bdarnell): dedupe by keeping a record of recently-applied commandIDs
switch cc.Type {
case raftpb.ConfChangeAddNode:
err = s.addNode(proto.RaftNodeID(cc.NodeID), g)
case raftpb.ConfChangeRemoveNode:
err = s.removeNode(proto.RaftNodeID(cc.NodeID), g)
case raftpb.ConfChangeUpdateNode:
// Updates don't concern multiraft, they are simply passed through.
}
if err != nil {
log.Errorf("error applying configuration change %v: %s", cc, err)
}
s.multiNode.ApplyConfChange(uint64(groupID), cc)
} else {
log.Warningf("aborting configuration change: %s", err)
s.multiNode.ApplyConfChange(uint64(groupID),
raftpb.ConfChange{})
}
// Re-submit all pending proposals that were held
// while the config change was pending
g.waitForCallback = false
for _, prop := range g.pending {
s.propose(prop)
}
}:
case <-s.stopper.ShouldStop():
}
},
})
}
return commandID
}
开发者ID:harryyeh,项目名称:cockroach,代码行数:75,代码来源:multiraft.go
注:本文中的github.com/cockroachdb/cockroach/proto.RaftNodeID函数示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论