本文整理汇总了Golang中github.com/youtube/vitess/go/vt/proto/topodata.TabletType类的典型用法代码示例。如果您正苦于以下问题:Golang TabletType类的具体用法?Golang TabletType怎么用?Golang TabletType使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了TabletType类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Golang代码示例。
示例1: Execute
// Execute executes a non-streaming query by routing based on the values in the query.
func (vtg *VTGate) Execute(ctx context.Context, sql string, bindVariables map[string]interface{}, keyspace string, tabletType topodatapb.TabletType, session *vtgatepb.Session, notInTransaction bool) (*sqltypes.Result, error) {
startTime := time.Now()
statsKey := []string{"Execute", "Any", strings.ToLower(tabletType.String())}
defer vtg.timings.Record(statsKey, startTime)
x := vtg.inFlight.Add(1)
defer vtg.inFlight.Add(-1)
if 0 < vtg.maxInFlight && vtg.maxInFlight < x {
return nil, errTooManyInFlight
}
qr, err := vtg.router.Execute(ctx, sql, bindVariables, keyspace, tabletType, session, notInTransaction)
if err == nil {
vtg.rowsReturned.Add(statsKey, int64(len(qr.Rows)))
return qr, nil
}
query := map[string]interface{}{
"Sql": sql,
"BindVariables": bindVariables,
"Keyspace": keyspace,
"TabletType": strings.ToLower(tabletType.String()),
"Session": session,
"NotInTransaction": notInTransaction,
}
handleExecuteError(err, statsKey, query, vtg.logExecute)
return nil, err
}
开发者ID:aaijazi,项目名称:vitess,代码行数:29,代码来源:vtgate.go
示例2: ExecuteEntityIds
// ExecuteEntityIds excutes a non-streaming query based on given KeyspaceId map.
func (vtg *VTGate) ExecuteEntityIds(ctx context.Context, sql string, bindVariables map[string]interface{}, keyspace string, entityColumnName string, entityKeyspaceIDs []*vtgatepb.ExecuteEntityIdsRequest_EntityId, tabletType topodatapb.TabletType, session *vtgatepb.Session, notInTransaction bool) (*sqltypes.Result, error) {
startTime := time.Now()
statsKey := []string{"ExecuteEntityIds", keyspace, strings.ToLower(tabletType.String())}
defer vtg.timings.Record(statsKey, startTime)
x := vtg.inFlight.Add(1)
defer vtg.inFlight.Add(-1)
if 0 < vtg.maxInFlight && vtg.maxInFlight < x {
return nil, errTooManyInFlight
}
sql = sqlannotation.AddFilteredReplicationUnfriendlyIfDML(sql)
qr, err := vtg.resolver.ExecuteEntityIds(ctx, sql, bindVariables, keyspace, entityColumnName, entityKeyspaceIDs, tabletType, session, notInTransaction)
if err == nil {
vtg.rowsReturned.Add(statsKey, int64(len(qr.Rows)))
return qr, nil
}
query := map[string]interface{}{
"Sql": sql,
"BindVariables": bindVariables,
"Keyspace": keyspace,
"EntityColumnName": entityColumnName,
"EntityKeyspaceIDs": entityKeyspaceIDs,
"TabletType": strings.ToLower(tabletType.String()),
"Session": session,
"NotInTransaction": notInTransaction,
}
handleExecuteError(err, statsKey, query, vtg.logExecuteEntityIds)
return nil, err
}
开发者ID:aaijazi,项目名称:vitess,代码行数:33,代码来源:vtgate.go
示例3: StreamExecute
// StreamExecute executes a streaming query by routing based on the values in the query.
func (vtg *VTGate) StreamExecute(ctx context.Context, sql string, bindVariables map[string]interface{}, tabletType topodatapb.TabletType, sendReply func(*sqltypes.Result) error) error {
startTime := time.Now()
statsKey := []string{"StreamExecute", "Any", strings.ToLower(tabletType.String())}
defer vtg.timings.Record(statsKey, startTime)
x := vtg.inFlight.Add(1)
defer vtg.inFlight.Add(-1)
if 0 < vtg.maxInFlight && vtg.maxInFlight < x {
return errTooManyInFlight
}
var rowCount int64
err := vtg.router.StreamExecute(
ctx,
sql,
bindVariables,
tabletType,
func(reply *sqltypes.Result) error {
rowCount += int64(len(reply.Rows))
vtg.rowsReturned.Add(statsKey, int64(len(reply.Rows)))
return sendReply(reply)
})
if err != nil {
normalErrors.Add(statsKey, 1)
query := map[string]interface{}{
"Sql": sql,
"BindVariables": bindVariables,
"TabletType": strings.ToLower(tabletType.String()),
}
logError(err, query, vtg.logStreamExecute)
}
return formatError(err)
}
开发者ID:littleyang,项目名称:vitess,代码行数:35,代码来源:vtgate.go
示例4: ExecuteKeyRanges
// ExecuteKeyRanges executes a non-streaming query based on the specified keyranges.
func (vtg *VTGate) ExecuteKeyRanges(ctx context.Context, sql string, bindVariables map[string]interface{}, keyspace string, keyRanges []*pb.KeyRange, tabletType pb.TabletType, session *proto.Session, notInTransaction bool, reply *proto.QueryResult) error {
startTime := time.Now()
statsKey := []string{"ExecuteKeyRanges", keyspace, strings.ToLower(tabletType.String())}
defer vtg.timings.Record(statsKey, startTime)
x := vtg.inFlight.Add(1)
defer vtg.inFlight.Add(-1)
if 0 < vtg.maxInFlight && vtg.maxInFlight < x {
return errTooManyInFlight
}
sql = sqlannotation.AddFilteredReplicationUnfriendlyIfDML(sql)
qr, err := vtg.resolver.ExecuteKeyRanges(ctx, sql, bindVariables, keyspace, keyRanges, tabletType, session, notInTransaction)
if err == nil {
reply.Result = qr
vtg.rowsReturned.Add(statsKey, int64(len(qr.Rows)))
} else {
query := map[string]interface{}{
"Sql": sql,
"BindVariables": bindVariables,
"Keyspace": keyspace,
"KeyRanges": keyRanges,
"TabletType": strings.ToLower(tabletType.String()),
"Session": session,
"NotInTransaction": notInTransaction,
}
reply.Error = handleExecuteError(err, statsKey, query, vtg.logExecuteKeyRanges).Error()
reply.Err = vterrors.RPCErrFromVtError(err)
}
reply.Session = session
return nil
}
开发者ID:hadoop835,项目名称:vitess,代码行数:34,代码来源:vtgate.go
示例5: getKeyspaceShards
func getKeyspaceShards(ctx context.Context, topoServ SrvTopoServer, cell, keyspace string, tabletType pb.TabletType) (string, *topo.SrvKeyspace, []topo.ShardReference, error) {
srvKeyspace, err := topoServ.GetSrvKeyspace(ctx, cell, keyspace)
if err != nil {
return "", nil, nil, vterrors.NewVitessError(
vtrpc.ErrorCode_INTERNAL_ERROR, err,
"keyspace %v fetch error: %v", keyspace, err,
)
}
// check if the keyspace has been redirected for this tabletType.
tt := topo.ProtoToTabletType(tabletType)
if servedFrom, ok := srvKeyspace.ServedFrom[tt]; ok {
keyspace = servedFrom
srvKeyspace, err = topoServ.GetSrvKeyspace(ctx, cell, keyspace)
if err != nil {
return "", nil, nil, vterrors.NewVitessError(
vtrpc.ErrorCode_INTERNAL_ERROR, err,
"keyspace %v fetch error: %v", keyspace, err,
)
}
}
partition, ok := srvKeyspace.Partitions[tt]
if !ok {
return "", nil, nil, vterrors.NewVitessError(
vtrpc.ErrorCode_INTERNAL_ERROR, err,
"No partition found for tabletType %v in keyspace %v", strings.ToLower(tabletType.String()), keyspace,
)
}
return keyspace, srvKeyspace, partition.ShardReferences, nil
}
开发者ID:ruiaylin,项目名称:vitess,代码行数:31,代码来源:topo_utils.go
示例6: startAction
func (stc *ScatterConn) startAction(ctx context.Context, name, keyspace, shard string, tabletType topodatapb.TabletType, session *SafeSession, notInTransaction bool, allErrors *concurrency.AllErrorRecorder) (time.Time, []string, int64, error) {
statsKey := []string{name, keyspace, shard, strings.ToLower(tabletType.String())}
startTime := time.Now()
transactionID, err := stc.updateSession(ctx, keyspace, shard, tabletType, session, notInTransaction)
return startTime, statsKey, transactionID, err
}
开发者ID:aaijazi,项目名称:vitess,代码行数:7,代码来源:scatter_conn.go
示例7: getKeyspaceShards
func getKeyspaceShards(ctx context.Context, topoServ SrvTopoServer, cell, keyspace string, tabletType pb.TabletType) (string, *pb.SrvKeyspace, []*pb.ShardReference, error) {
srvKeyspace, err := topoServ.GetSrvKeyspace(ctx, cell, keyspace)
if err != nil {
return "", nil, nil, vterrors.NewVitessError(
vtrpc.ErrorCode_INTERNAL_ERROR, err,
"keyspace %v fetch error: %v", keyspace, err,
)
}
// check if the keyspace has been redirected for this tabletType.
for _, sf := range srvKeyspace.ServedFrom {
if sf.TabletType == tabletType {
keyspace = sf.Keyspace
srvKeyspace, err = topoServ.GetSrvKeyspace(ctx, cell, keyspace)
if err != nil {
return "", nil, nil, vterrors.NewVitessError(
vtrpc.ErrorCode_INTERNAL_ERROR, err,
"keyspace %v fetch error: %v", keyspace, err,
)
}
}
}
partition := topoproto.SrvKeyspaceGetPartition(srvKeyspace, tabletType)
if partition == nil {
return "", nil, nil, vterrors.NewVitessError(
vtrpc.ErrorCode_INTERNAL_ERROR, err,
"No partition found for tabletType %v in keyspace %v", strings.ToLower(tabletType.String()), keyspace,
)
}
return keyspace, srvKeyspace, partition.ShardReferences, nil
}
开发者ID:hadmagic,项目名称:vitess,代码行数:32,代码来源:topo_utils.go
示例8: Execute
// Execute executes a non-streaming query by routing based on the values in the query.
func (vtg *VTGate) Execute(ctx context.Context, sql string, bindVariables map[string]interface{}, tabletType pb.TabletType, session *proto.Session, notInTransaction bool, reply *proto.QueryResult) error {
startTime := time.Now()
statsKey := []string{"Execute", "Any", strings.ToLower(tabletType.String())}
defer vtg.timings.Record(statsKey, startTime)
x := vtg.inFlight.Add(1)
defer vtg.inFlight.Add(-1)
if 0 < vtg.maxInFlight && vtg.maxInFlight < x {
return errTooManyInFlight
}
qr, err := vtg.router.Execute(ctx, sql, bindVariables, tabletType, session, notInTransaction)
if err == nil {
reply.Result = qr
vtg.rowsReturned.Add(statsKey, int64(len(qr.Rows)))
} else {
query := map[string]interface{}{
"Sql": sql,
"BindVariables": bindVariables,
"TabletType": strings.ToLower(tabletType.String()),
"Session": session,
"NotInTransaction": notInTransaction,
}
reply.Error = handleExecuteError(err, statsKey, query, vtg.logExecute)
}
reply.Session = session
return nil
}
开发者ID:yinyousong,项目名称:vitess,代码行数:29,代码来源:vtgate.go
示例9: ExecuteEntityIds
// ExecuteEntityIds excutes a non-streaming query based on given KeyspaceId map.
func (vtg *VTGate) ExecuteEntityIds(ctx context.Context, sql string, bindVariables map[string]interface{}, keyspace string, entityColumnName string, entityKeyspaceIDs []*pbg.ExecuteEntityIdsRequest_EntityId, tabletType pb.TabletType, session *proto.Session, notInTransaction bool, reply *proto.QueryResult) error {
startTime := time.Now()
statsKey := []string{"ExecuteEntityIds", keyspace, strings.ToLower(tabletType.String())}
defer vtg.timings.Record(statsKey, startTime)
x := vtg.inFlight.Add(1)
defer vtg.inFlight.Add(-1)
if 0 < vtg.maxInFlight && vtg.maxInFlight < x {
return errTooManyInFlight
}
qr, err := vtg.resolver.ExecuteEntityIds(ctx, sql, bindVariables, keyspace, entityColumnName, entityKeyspaceIDs, tabletType, session, notInTransaction)
if err == nil {
reply.Result = qr
vtg.rowsReturned.Add(statsKey, int64(len(qr.Rows)))
} else {
query := map[string]interface{}{
"Sql": sql,
"BindVariables": bindVariables,
"Keyspace": keyspace,
"EntityColumnName": entityColumnName,
"EntityKeyspaceIDs": entityKeyspaceIDs,
"TabletType": strings.ToLower(tabletType.String()),
"Session": session,
"NotInTransaction": notInTransaction,
}
reply.Error = handleExecuteError(err, statsKey, query, vtg.logExecuteEntityIds).Error()
reply.Err = rpcErrFromVtGateError(err)
}
reply.Session = session
return nil
}
开发者ID:yangzhongj,项目名称:vitess,代码行数:33,代码来源:vtgate.go
示例10: multiGo
// multiGo performs the requested 'action' on the specified shards in parallel.
// For each shard, if the requested
// session is in a transaction, it opens a new transactions on the connection,
// and updates the Session with the transaction id. If the session already
// contains a transaction id for the shard, it reuses it.
// If there are any unrecoverable errors during a transaction, multiGo
// rolls back the transaction for all shards.
// The action function must match the shardActionFunc signature.
// This function has similarities with StreamExecute. A change there will likely
// require a change here also.
func (stc *ScatterConn) multiGo(
ctx context.Context,
name string,
keyspace string,
shards []string,
tabletType topodatapb.TabletType,
session *SafeSession,
notInTransaction bool,
action shardActionFunc,
) (rResults <-chan interface{}, allErrors *concurrency.AllErrorRecorder) {
allErrors = new(concurrency.AllErrorRecorder)
results := make(chan interface{}, len(shards))
var wg sync.WaitGroup
for shard := range unique(shards) {
wg.Add(1)
go func(shard string) {
statsKey := []string{name, keyspace, shard, strings.ToLower(tabletType.String())}
defer wg.Done()
startTime := time.Now()
defer stc.timings.Record(statsKey, startTime)
transactionID, err := stc.updateSession(ctx, keyspace, shard, tabletType, session, notInTransaction)
if err != nil {
allErrors.RecordError(err)
stc.tabletCallErrorCount.Add(statsKey, 1)
return
}
err = action(shard, transactionID, results)
if err != nil {
allErrors.RecordError(err)
// Don't increment the error counter for duplicate keys, as those errors
// are caused by client queries and are not VTGate's fault.
// TODO(aaijazi): get rid of this string parsing, and handle all cases of invalid input
if !strings.Contains(err.Error(), errDupKey) && !strings.Contains(err.Error(), errOutOfRange) {
stc.tabletCallErrorCount.Add(statsKey, 1)
}
return
}
}(shard)
}
go func() {
wg.Wait()
// If we want to rollback, we have to do it before closing results
// so that the session is updated to be not InTransaction.
if allErrors.HasErrors() {
if session.InTransaction() {
errstr := allErrors.Error().Error()
// We cannot recover from these errors
// TODO(aaijazi): get rid of this string parsing. Might want a function that searches
// through a deeply nested error chain a particular error.
if strings.Contains(errstr, "tx_pool_full") || strings.Contains(errstr, "not_in_tx") {
stc.Rollback(ctx, session)
}
}
}
close(results)
}()
return results, allErrors
}
开发者ID:littleyang,项目名称:vitess,代码行数:69,代码来源:scatter_conn.go
示例11: UpdateEndPoints
// UpdateEndPoints is a high level wrapper for TopoServer.UpdateEndPoints.
// It generates trace spans.
func UpdateEndPoints(ctx context.Context, ts Server, cell, keyspace, shard string, tabletType pb.TabletType, addrs *pb.EndPoints, existingVersion int64) error {
span := trace.NewSpanFromContext(ctx)
span.StartClient("TopoServer.UpdateEndPoints")
span.Annotate("cell", cell)
span.Annotate("keyspace", keyspace)
span.Annotate("shard", shard)
span.Annotate("tablet_type", strings.ToLower(tabletType.String()))
defer span.Finish()
return ts.UpdateEndPoints(ctx, cell, keyspace, shard, tabletType, addrs, existingVersion)
}
开发者ID:richarwu,项目名称:vitess,代码行数:13,代码来源:serving_graph.go
示例12: getConnection
func (sg *shardGateway) getConnection(ctx context.Context, keyspace, shard string, tabletType topodatapb.TabletType) *ShardConn {
sg.mu.Lock()
defer sg.mu.Unlock()
key := fmt.Sprintf("%s.%s.%s", keyspace, shard, strings.ToLower(tabletType.String()))
sdc, ok := sg.shardConns[key]
if !ok {
sdc = NewShardConn(ctx, sg.toposerv, sg.cell, keyspace, shard, tabletType, sg.retryDelay, sg.retryCount, sg.connTimeoutTotal, sg.connTimeoutPerConn, sg.connLife, sg.connTimings)
sg.shardConns[key] = sdc
}
return sdc
}
开发者ID:tjyang,项目名称:vitess,代码行数:12,代码来源:shardgateway.go
示例13: VtctldSrvType
// VtctldSrvType returns the tablet type, possibly linked to the
// EndPoints page in vtctld.
func VtctldSrvType(cell, keyspace, shard string, tabletType pb.TabletType) template.HTML {
strTabletType := strings.ToLower(tabletType.String())
if !topo.IsInServingGraph(tabletType) {
return template.HTML(strTabletType)
}
return MakeVtctldRedirect(strTabletType, map[string]string{
"type": "srv_type",
"cell": cell,
"keyspace": keyspace,
"shard": shard,
"tablet_type": strTabletType,
})
}
开发者ID:richarwu,项目名称:vitess,代码行数:15,代码来源:status.go
示例14: StreamExecuteShards
// StreamExecuteShards executes a streaming query on the specified shards.
func (vtg *VTGate) StreamExecuteShards(ctx context.Context, sql string, bindVariables map[string]interface{}, keyspace string, shards []string, tabletType pb.TabletType, sendReply func(*proto.QueryResult) error) error {
startTime := time.Now()
statsKey := []string{"StreamExecuteShards", keyspace, strings.ToLower(tabletType.String())}
defer vtg.timings.Record(statsKey, startTime)
x := vtg.inFlight.Add(1)
defer vtg.inFlight.Add(-1)
if 0 < vtg.maxInFlight && vtg.maxInFlight < x {
return errTooManyInFlight
}
var rowCount int64
err := vtg.resolver.StreamExecute(
ctx,
sql,
bindVariables,
keyspace,
tabletType,
func(keyspace string) (string, []string, error) {
return keyspace, shards, nil
},
func(mreply *mproto.QueryResult) error {
reply := new(proto.QueryResult)
reply.Result = mreply
rowCount += int64(len(mreply.Rows))
vtg.rowsReturned.Add(statsKey, int64(len(mreply.Rows)))
// Note we don't populate reply.Session here,
// as it may change incrementaly as responses are sent.
return sendReply(reply)
})
if err != nil {
normalErrors.Add(statsKey, 1)
query := map[string]interface{}{
"Sql": sql,
"BindVariables": bindVariables,
"Keyspace": keyspace,
"Shards": shards,
"TabletType": strings.ToLower(tabletType.String()),
}
logError(err, query, vtg.logStreamExecuteShards)
}
return formatError(err)
}
开发者ID:yangzhongj,项目名称:vitess,代码行数:45,代码来源:vtgate.go
示例15: WrapError
// WrapError returns ShardConnError which preserves the original error code if possible,
// adds the connection context
// and adds a bit to determine whether the keyspace/shard needs to be
// re-resolved for a potential sharding event.
func WrapError(in error, keyspace, shard string, tabletType pbt.TabletType, endPoint *pbt.EndPoint, inTransaction bool) (wrapped error) {
if in == nil {
return nil
}
shardIdentifier := fmt.Sprintf("%s.%s.%s, %+v", keyspace, shard, strings.ToLower(tabletType.String()), endPoint)
code := tabletconn.ERR_NORMAL
serverError, ok := in.(*tabletconn.ServerError)
if ok {
code = serverError.Code
}
shardConnErr := &ShardConnError{
Code: code,
ShardIdentifier: shardIdentifier,
InTransaction: inTransaction,
Err: in,
EndPointCode: vterrors.RecoverVtErrorCode(in),
}
return shardConnErr
}
开发者ID:strogo,项目名称:vitess,代码行数:24,代码来源:discoverygateway.go
示例16: ExecuteBatchShards
// ExecuteBatchShards executes a group of queries on the specified shards.
func (vtg *VTGate) ExecuteBatchShards(ctx context.Context, queries []proto.BoundShardQuery, tabletType pb.TabletType, asTransaction bool, session *proto.Session, reply *proto.QueryResultList) error {
startTime := time.Now()
statsKey := []string{"ExecuteBatchShards", "", ""}
defer vtg.timings.Record(statsKey, startTime)
x := vtg.inFlight.Add(1)
defer vtg.inFlight.Add(-1)
if 0 < vtg.maxInFlight && vtg.maxInFlight < x {
return errTooManyInFlight
}
annotateBoundShardQueriesAsUnfriendly(queries)
qrs, err := vtg.resolver.ExecuteBatch(
ctx,
tabletType,
asTransaction,
session,
func() (*scatterBatchRequest, error) {
return boundShardQueriesToScatterBatchRequest(queries), nil
})
if err == nil {
reply.List = qrs.List
var rowCount int64
for _, qr := range qrs.List {
rowCount += int64(len(qr.Rows))
}
vtg.rowsReturned.Add(statsKey, rowCount)
} else {
query := map[string]interface{}{
"Queries": queries,
"TabletType": strings.ToLower(tabletType.String()),
"AsTransaction": asTransaction,
"Session": session,
}
reply.Error = handleExecuteError(err, statsKey, query, vtg.logExecuteBatchShards).Error()
reply.Err = vterrors.RPCErrFromVtError(err)
}
reply.Session = session
return nil
}
开发者ID:hadoop835,项目名称:vitess,代码行数:42,代码来源:vtgate.go
示例17: getStatsAggregator
func (dg *discoveryGateway) getStatsAggregator(keyspace, shard string, tabletType topodatapb.TabletType) *TabletStatusAggregator {
key := fmt.Sprintf("%v/%v/%v", keyspace, shard, tabletType.String())
// get existing aggregator
dg.mu.RLock()
aggr, ok := dg.statusAggregators[key]
dg.mu.RUnlock()
if ok {
return aggr
}
// create a new one, but check again before the creation
dg.mu.Lock()
defer dg.mu.Unlock()
aggr, ok = dg.statusAggregators[key]
if ok {
return aggr
}
aggr = NewTabletStatusAggregator(keyspace, shard, tabletType, key)
dg.statusAggregators[key] = aggr
return aggr
}
开发者ID:xujianhai,项目名称:vitess,代码行数:21,代码来源:discoverygateway.go
示例18: ExecuteBatchShards
// ExecuteBatchShards executes a group of queries on the specified shards.
func (vtg *VTGate) ExecuteBatchShards(ctx context.Context, queries []*vtgatepb.BoundShardQuery, tabletType topodatapb.TabletType, asTransaction bool, session *vtgatepb.Session) ([]sqltypes.Result, error) {
startTime := time.Now()
statsKey := []string{"ExecuteBatchShards", "", ""}
defer vtg.timings.Record(statsKey, startTime)
x := vtg.inFlight.Add(1)
defer vtg.inFlight.Add(-1)
if 0 < vtg.maxInFlight && vtg.maxInFlight < x {
return nil, errTooManyInFlight
}
annotateBoundShardQueriesAsUnfriendly(queries)
qrs, err := vtg.resolver.ExecuteBatch(
ctx,
tabletType,
asTransaction,
session,
func() (*scatterBatchRequest, error) {
return boundShardQueriesToScatterBatchRequest(queries)
})
if err == nil {
var rowCount int64
for _, qr := range qrs {
rowCount += int64(len(qr.Rows))
}
vtg.rowsReturned.Add(statsKey, rowCount)
return qrs, nil
}
query := map[string]interface{}{
"Queries": queries,
"TabletType": strings.ToLower(tabletType.String()),
"AsTransaction": asTransaction,
"Session": session,
}
handleExecuteError(err, statsKey, query, vtg.logExecuteBatchShards)
return nil, err
}
开发者ID:aaijazi,项目名称:vitess,代码行数:40,代码来源:vtgate.go
示例19: ExecuteBatch
// ExecuteBatch executes a batch of non-streaming queries on the specified shards.
func (stc *ScatterConn) ExecuteBatch(
ctx context.Context,
batchRequest *scatterBatchRequest,
tabletType topodatapb.TabletType,
asTransaction bool,
session *SafeSession) (qrs []sqltypes.Result, err error) {
allErrors := new(concurrency.AllErrorRecorder)
results := make([]sqltypes.Result, batchRequest.Length)
var resMutex sync.Mutex
var wg sync.WaitGroup
for _, req := range batchRequest.Requests {
wg.Add(1)
go func(req *shardBatchRequest) {
statsKey := []string{"ExecuteBatch", req.Keyspace, req.Shard, strings.ToLower(tabletType.String())}
defer wg.Done()
startTime := time.Now()
defer stc.timings.Record(statsKey, startTime)
transactionID, err := stc.updateSession(ctx, req.Keyspace, req.Shard, tabletType, session, false)
if err != nil {
allErrors.RecordError(err)
stc.tabletCallErrorCount.Add(statsKey, 1)
return
}
innerqrs, err := stc.gateway.ExecuteBatch(ctx, req.Keyspace, req.Shard, tabletType, req.Queries, asTransaction, transactionID)
if err != nil {
allErrors.RecordError(err)
// Don't increment the error counter for duplicate keys, as those errors
// are caused by client queries and are not VTGate's fault.
// TODO(aaijazi): get rid of this string parsing, and handle all cases of invalid input
if !strings.Contains(err.Error(), errDupKey) && !strings.Contains(err.Error(), errOutOfRange) {
stc.tabletCallErrorCount.Add(statsKey, 1)
}
return
}
// Encapsulate in a function for safe mutex operation.
func() {
resMutex.Lock()
defer resMutex.Unlock()
for i, result := range innerqrs {
appendResult(&results[req.ResultIndexes[i]], &result)
}
}()
}(req)
}
wg.Wait()
// If we want to rollback, we have to do it before closing results
// so that the session is updated to be not InTransaction.
if allErrors.HasErrors() {
if session.InTransaction() {
errstr := allErrors.Error().Error()
// We cannot recover from these errors
// TODO(aaijazi): get rid of this string parsing
if strings.Contains(errstr, "tx_pool_full") || strings.Contains(errstr, "not_in_tx") {
stc.Rollback(ctx, session)
}
}
return nil, allErrors.AggrError(stc.aggregateErrors)
}
return results, nil
}
开发者ID:littleyang,项目名称:vitess,代码行数:65,代码来源:scatter_conn.go
示例20: GetEndPoints
// GetEndPoints return all endpoints for the given cell, keyspace, shard, and tablet type.
func (server *ResilientSrvTopoServer) GetEndPoints(ctx context.Context, cell, keyspace, shard string, tabletType pb.TabletType) (result *pb.EndPoints, version int64, err error) {
shard = strings.ToLower(shard)
key := []string{cell, keyspace, shard, strings.ToLower(tabletType.String())}
server.counts.Add(queryCategory, 1)
server.endPointCounters.queries.Add(key, 1)
// find the entry in the cache, add it if not there
keyStr := strings.Join(key, ".")
server.mutex.Lock()
entry, ok := server.endPointsCache[keyStr]
if !ok {
entry = &endPointsEntry{
cell: cell,
keyspace: keyspace,
shard: shard,
tabletType: tabletType,
}
server.endPointsCache[keyStr] = entry
}
server.mutex.Unlock()
// Lock the entry, and do everything holding the lock. This
// means two concurrent requests will only issue one
// underlying query.
entry.mutex.Lock()
defer entry.mutex.Unlock()
// Whether the query was serviced with remote endpoints.
remote := false
// Record some stats regardless of cache status.
defer func() {
if remote {
server.endPointCounters.remoteQueries.Add(key, 1)
}
if err != nil {
server.endPointCounters.errors.Add(key, 1)
return
}
if result == nil || len(result.Entries) == 0 {
server.endPointCounters.emptyResults.Add(key, 1)
return
}
server.endPointCounters.numberReturned.Add(key, int64(len(result.Entries)))
// We either serve all healthy endpoints or all degraded endpoints, so the first entry is representative.
if !endPointIsHealthy(result.Entries[0]) {
server.endPointCounters.degradedResults.Add(key, 1)
return
}
}()
// If the entry is fresh enough, return it
if time.Now().Sub(entry.insertionTime) < server.cacheTTL {
server.endPointCounters.cacheHits.Add(key, 1)
remote = entry.remote
return entry.value, -1, entry.lastError
}
// not in cache or too old, get the real value
newCtx, cancel := context.WithTimeout(context.Background(), *srvTopoTimeout)
defer cancel()
result, _, err = server.topoServer.GetEndPoints(newCtx, cell, keyspace, shard, tabletType)
// get remote endpoints for master if enabled
if err != nil && server.enableRemoteMaster && tabletType == pb.TabletType_MASTER {
remote = true
server.counts.Add(remoteQueryCategory, 1)
server.endPointCounters.remoteLookups.Add(key, 1)
var ss *pb.SrvShard
ss, err = server.topoServer.GetSrvShard(newCtx, cell, keyspace, shard)
if err != nil {
server.counts.Add(remoteErrorCategory, 1)
server.endPointCounters.remoteLookupErrors.Add(key, 1)
log.Errorf("GetEndPoints(%v, %v, %v, %v, %v) failed to get SrvShard for remote master: %v",
newCtx, cell, keyspace, shard, tabletType, err)
} else {
if ss.MasterCell != "" && ss.MasterCell != cell {
result, _, err = server.topoServer.GetEndPoints(newCtx, ss.MasterCell, keyspace, shard, tabletType)
}
}
}
if err != nil {
server.endPointCounters.lookupErrors.Add(key, 1)
if entry.insertionTime.IsZero() {
server.counts.Add(errorCategory, 1)
log.Errorf("GetEndPoints(%v, %v, %v, %v, %v) failed: %v (no cached value, caching and returning error)", newCtx, cell, keyspace, shard, tabletType, err)
} else {
server.counts.Add(cachedCategory, 1)
server.endPointCounters.staleCacheFallbacks.Add(key, 1)
log.Warningf("GetEndPoints(%v, %v, %v, %v, %v) failed: %v (returning cached value: %v %v)", newCtx, cell, keyspace, shard, tabletType, err, entry.value, entry.lastError)
return entry.value, -1, entry.lastError
}
}
// save the value we got and the current time in the cache
entry.insertionTime = time.Now()
entry.originalValue = result
entry.value = filterUnhealthyServers(result)
//.........这里部分代码省略.........
开发者ID:richarwu,项目名称:vitess,代码行数:101,代码来源:srv_topo_server.go
注:本文中的github.com/youtube/vitess/go/vt/proto/topodata.TabletType类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论