本文整理汇总了Golang中github.com/youtube/vitess/go/sync2.NewSemaphore函数的典型用法代码示例。如果您正苦于以下问题:Golang NewSemaphore函数的具体用法?Golang NewSemaphore怎么用?Golang NewSemaphore使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了NewSemaphore函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Golang代码示例。
示例1: NewTabletServer
// NewTabletServer creates an instance of TabletServer. Only one instance
// of TabletServer can be created per process.
func NewTabletServer(config Config) *TabletServer {
tsv := &TabletServer{
config: config,
QueryTimeout: sync2.NewAtomicDuration(time.Duration(config.QueryTimeout * 1e9)),
BeginTimeout: sync2.NewAtomicDuration(time.Duration(config.TxPoolTimeout * 1e9)),
checkMySQLThrottler: sync2.NewSemaphore(1, 0),
streamHealthMap: make(map[int]chan<- *querypb.StreamHealthResponse),
sessionID: Rand(),
history: history.New(10),
}
tsv.qe = NewQueryEngine(tsv, config)
tsv.invalidator = NewRowcacheInvalidator(config.StatsPrefix, tsv, tsv.qe, config.EnablePublishStats)
if config.EnablePublishStats {
stats.Publish(config.StatsPrefix+"TabletState", stats.IntFunc(func() int64 {
tsv.mu.Lock()
state := tsv.state
tsv.mu.Unlock()
return state
}))
stats.Publish(config.StatsPrefix+"QueryTimeout", stats.DurationFunc(tsv.QueryTimeout.Get))
stats.Publish(config.StatsPrefix+"BeginTimeout", stats.DurationFunc(tsv.BeginTimeout.Get))
stats.Publish(config.StatsPrefix+"TabletStateName", stats.StringFunc(tsv.GetState))
}
return tsv
}
开发者ID:aaijazi,项目名称:vitess,代码行数:27,代码来源:tabletserver.go
示例2: NewQueryEngine
func NewQueryEngine(config Config) *QueryEngine {
qe := &QueryEngine{}
qe.cachePool = NewCachePool("CachePool", config.RowCache, time.Duration(config.QueryTimeout*1e9), time.Duration(config.IdleTimeout*1e9))
qe.schemaInfo = NewSchemaInfo(config.QueryCacheSize, time.Duration(config.SchemaReloadTime*1e9), time.Duration(config.IdleTimeout*1e9))
qe.connPool = NewConnectionPool("ConnPool", config.PoolSize, time.Duration(config.IdleTimeout*1e9))
qe.streamConnPool = NewConnectionPool("StreamConnPool", config.StreamPoolSize, time.Duration(config.IdleTimeout*1e9))
qe.streamTokens = sync2.NewSemaphore(config.StreamExecThrottle, time.Duration(config.StreamWaitTimeout*1e9))
qe.reservedPool = NewReservedPool("ReservedPool")
qe.txPool = NewConnectionPool("TxPool", config.TransactionCap, time.Duration(config.IdleTimeout*1e9)) // connections in pool has to be > transactionCap
qe.activeTxPool = NewActiveTxPool("ActiveTxPool", time.Duration(config.TransactionTimeout*1e9))
qe.activePool = NewActivePool("ActivePool", time.Duration(config.QueryTimeout*1e9), time.Duration(config.IdleTimeout*1e9))
qe.consolidator = NewConsolidator()
qe.spotCheckFreq = sync2.AtomicInt64(config.SpotCheckRatio * SPOT_CHECK_MULTIPLIER)
qe.maxResultSize = sync2.AtomicInt64(config.MaxResultSize)
qe.streamBufferSize = sync2.AtomicInt64(config.StreamBufferSize)
stats.Publish("MaxResultSize", stats.IntFunc(qe.maxResultSize.Get))
stats.Publish("StreamBufferSize", stats.IntFunc(qe.streamBufferSize.Get))
queryStats = stats.NewTimings("Queries")
stats.NewRates("QPS", queryStats, 15, 60e9)
waitStats = stats.NewTimings("Waits")
killStats = stats.NewCounters("Kills")
errorStats = stats.NewCounters("Errors")
resultStats = stats.NewHistogram("Results", resultBuckets)
stats.Publish("SpotCheckRatio", stats.FloatFunc(func() float64 {
return float64(qe.spotCheckFreq.Get()) / SPOT_CHECK_MULTIPLIER
}))
spotCheckCount = stats.NewInt("SpotCheckCount")
return qe
}
开发者ID:CERN-Stage-3,项目名称:vitess,代码行数:29,代码来源:query_engine.go
示例3: init
func init() {
// The zookeeper C module logs quite a bit of useful information,
// but much of it does not come back in the error API. To aid
// debugging, enable the log to stderr for warnings.
//zookeeper.SetLogLevel(zookeeper.LOG_WARN)
maxConcurrency := 64
x := os.Getenv("ZK_CLIENT_MAX_CONCURRENCY")
if x != "" {
var err error
maxConcurrency, err = strconv.Atoi(x)
if err != nil {
log.Infof("invalid ZK_CLIENT_MAX_CONCURRENCY: %v", err)
}
}
sem = sync2.NewSemaphore(maxConcurrency, 0)
}
开发者ID:CowLeo,项目名称:vitess,代码行数:18,代码来源:zkconn.go
示例4: NewTabletServer
// NewTabletServer creates an instance of TabletServer. Only one instance
// of TabletServer can be created per process.
func NewTabletServer(config Config) *TabletServer {
tsv := &TabletServer{
config: config,
checkMySQLThrottler: sync2.NewSemaphore(1, 0),
streamHealthMap: make(map[int]chan<- *pb.StreamHealthResponse),
sessionID: Rand(),
}
tsv.qe = NewQueryEngine(tsv, config)
tsv.invalidator = NewRowcacheInvalidator(config.StatsPrefix, tsv, tsv.qe, config.EnablePublishStats)
if config.EnablePublishStats {
stats.Publish(config.StatsPrefix+"TabletState", stats.IntFunc(func() int64 {
tsv.mu.Lock()
state := tsv.state
tsv.mu.Unlock()
return state
}))
stats.Publish(config.StatsPrefix+"TabletStateName", stats.StringFunc(tsv.GetState))
}
return tsv
}
开发者ID:e4x,项目名称:vitess,代码行数:22,代码来源:tabletserver.go
示例5: clone
// clone phase:
// - copy the data from source tablets to destination masters (with replication on)
// Assumes that the schema has already been created on each destination tablet
// (probably from vtctl's CopySchemaShard)
func (vscw *VerticalSplitCloneWorker) clone(ctx context.Context) error {
vscw.setState(WorkerStateCloneOffline)
start := time.Now()
defer func() {
statsStateDurationsNs.Set(string(WorkerStateCloneOffline), time.Now().Sub(start).Nanoseconds())
}()
// get source schema
shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
sourceSchemaDefinition, err := vscw.wr.GetSchema(shortCtx, vscw.sourceAlias, vscw.tables, nil, true)
cancel()
if err != nil {
return fmt.Errorf("cannot get schema from source %v: %v", topoproto.TabletAliasString(vscw.sourceAlias), err)
}
if len(sourceSchemaDefinition.TableDefinitions) == 0 {
return fmt.Errorf("no tables matching the table filter")
}
vscw.wr.Logger().Infof("Source tablet has %v tables to copy", len(sourceSchemaDefinition.TableDefinitions))
vscw.tableStatusList.initialize(sourceSchemaDefinition)
// In parallel, setup the channels to send SQL data chunks to
// for each destination tablet.
//
// mu protects firstError
mu := sync.Mutex{}
var firstError error
ctx, cancelCopy := context.WithCancel(ctx)
processError := func(format string, args ...interface{}) {
vscw.wr.Logger().Errorf(format, args...)
mu.Lock()
if firstError == nil {
firstError = fmt.Errorf(format, args...)
cancelCopy()
}
mu.Unlock()
}
destinationWaitGroup := sync.WaitGroup{}
// we create one channel for the destination tablet. It
// is sized to have a buffer of a maximum of
// destinationWriterCount * 2 items, to hopefully
// always have data. We then have
// destinationWriterCount go routines reading from it.
insertChannel := make(chan string, vscw.destinationWriterCount*2)
// Set up the throttler for the destination shard.
keyspaceAndShard := topoproto.KeyspaceShardString(vscw.destinationKeyspace, vscw.destinationShard)
destinationThrottler, err := throttler.NewThrottler(
keyspaceAndShard, "transactions", vscw.destinationWriterCount, vscw.maxTPS, throttler.ReplicationLagModuleDisabled)
if err != nil {
return fmt.Errorf("cannot instantiate throttler: %v", err)
}
for j := 0; j < vscw.destinationWriterCount; j++ {
destinationWaitGroup.Add(1)
go func(threadID int) {
defer destinationWaitGroup.Done()
defer destinationThrottler.ThreadFinished(threadID)
executor := newExecutor(vscw.wr, vscw.tsc, destinationThrottler, vscw.destinationKeyspace, vscw.destinationShard, threadID)
if err := executor.fetchLoop(ctx, insertChannel); err != nil {
processError("executer.FetchLoop failed: %v", err)
}
}(j)
}
// Now for each table, read data chunks and send them to insertChannel
sourceWaitGroup := sync.WaitGroup{}
sema := sync2.NewSemaphore(vscw.sourceReaderCount, 0)
dbName := vscw.destinationDbNames[topoproto.KeyspaceShardString(vscw.destinationKeyspace, vscw.destinationShard)]
for tableIndex, td := range sourceSchemaDefinition.TableDefinitions {
if td.Type == tmutils.TableView {
continue
}
chunks, err := generateChunks(ctx, vscw.wr, vscw.sourceTablet, td, vscw.minTableSizeForSplit, vscw.sourceReaderCount)
if err != nil {
return err
}
vscw.tableStatusList.setThreadCount(tableIndex, len(chunks)-1)
for _, c := range chunks {
sourceWaitGroup.Add(1)
go func(td *tabletmanagerdatapb.TableDefinition, tableIndex int, chunk chunk) {
defer sourceWaitGroup.Done()
sema.Acquire()
defer sema.Release()
vscw.tableStatusList.threadStarted(tableIndex)
// Start streaming from the source tablet.
rr, err := NewRestartableResultReader(ctx, vscw.wr.Logger(), vscw.wr.TopoServer(), vscw.sourceAlias, td, chunk)
if err != nil {
processError("NewRestartableResultReader failed: %v", err)
return
//.........这里部分代码省略.........
开发者ID:yuer2008,项目名称:vitess,代码行数:101,代码来源:vertical_split_clone.go
示例6: copy
//.........这里部分代码省略.........
firstError = fmt.Errorf(format, args...)
cancelCopy()
}
mu.Unlock()
}
insertChannels := make([]chan string, len(scw.destinationShards))
destinationWaitGroup := sync.WaitGroup{}
for shardIndex, si := range scw.destinationShards {
// we create one channel per destination tablet. It
// is sized to have a buffer of a maximum of
// destinationWriterCount * 2 items, to hopefully
// always have data. We then have
// destinationWriterCount go routines reading from it.
insertChannels[shardIndex] = make(chan string, scw.destinationWriterCount*2)
go func(shardName string, insertChannel chan string) {
for j := 0; j < scw.destinationWriterCount; j++ {
destinationWaitGroup.Add(1)
go func() {
defer destinationWaitGroup.Done()
if err := executeFetchLoop(ctx, scw.wr, scw, shardName, insertChannel); err != nil {
processError("executeFetchLoop failed: %v", err)
}
}()
}
}(si.ShardName(), insertChannels[shardIndex])
}
// Now for each table, read data chunks and send them to all
// insertChannels
sourceWaitGroup := sync.WaitGroup{}
for shardIndex := range scw.sourceShards {
sema := sync2.NewSemaphore(scw.sourceReaderCount, 0)
for tableIndex, td := range sourceSchemaDefinition.TableDefinitions {
if td.Type == myproto.TableView {
continue
}
rowSplitter := NewRowSplitter(scw.destinationShards, key.ProtoToKeyspaceIdType(scw.keyspaceInfo.ShardingColumnType), columnIndexes[tableIndex])
chunks, err := FindChunks(ctx, scw.wr, scw.sourceTablets[shardIndex], td, scw.minTableSizeForSplit, scw.sourceReaderCount)
if err != nil {
return err
}
scw.tableStatus[tableIndex].setThreadCount(len(chunks) - 1)
for chunkIndex := 0; chunkIndex < len(chunks)-1; chunkIndex++ {
sourceWaitGroup.Add(1)
go func(td *myproto.TableDefinition, tableIndex, chunkIndex int) {
defer sourceWaitGroup.Done()
sema.Acquire()
defer sema.Release()
scw.tableStatus[tableIndex].threadStarted()
// build the query, and start the streaming
selectSQL := buildSQLFromChunks(scw.wr, td, chunks, chunkIndex, scw.sourceAliases[shardIndex].String())
qrr, err := NewQueryResultReaderForTablet(ctx, scw.wr.TopoServer(), scw.sourceAliases[shardIndex], selectSQL)
if err != nil {
processError("NewQueryResultReaderForTablet failed: %v", err)
return
}
defer qrr.Close()
开发者ID:strogo,项目名称:vitess,代码行数:66,代码来源:split_clone.go
示例7: restoreFiles
// restoreFiles will copy all the files from the BackupStorage to the
// right place
func restoreFiles(cnf *Mycnf, bh backupstorage.BackupHandle, fes []FileEntry, restoreConcurrency int) error {
sema := sync2.NewSemaphore(restoreConcurrency, 0)
rec := concurrency.AllErrorRecorder{}
wg := sync.WaitGroup{}
for i, fe := range fes {
wg.Add(1)
go func(i int, fe FileEntry) {
defer wg.Done()
// wait until we are ready to go, skip if we already
// encountered an error
sema.Acquire()
defer sema.Release()
if rec.HasErrors() {
return
}
// open the source file for reading
name := fmt.Sprintf("%v", i)
source, err := bh.ReadFile(name)
if err != nil {
rec.RecordError(err)
return
}
defer source.Close()
// open the destination file for writing
dstFile, err := fe.open(cnf, false)
if err != nil {
rec.RecordError(err)
return
}
defer func() { rec.RecordError(dstFile.Close()) }()
// create a buffering output
dst := bufio.NewWriterSize(dstFile, 2*1024*1024)
// create hash to write the compressed data to
hasher := newHasher()
// create a Tee: we split the input into the hasher
// and into the gunziper
tee := io.TeeReader(source, hasher)
// create the uncompresser
gz, err := cgzip.NewReader(tee)
if err != nil {
rec.RecordError(err)
return
}
defer func() { rec.RecordError(gz.Close()) }()
// copy the data. Will also write to the hasher
if _, err = io.Copy(dst, gz); err != nil {
rec.RecordError(err)
return
}
// check the hash
hash := hasher.HashString()
if hash != fe.Hash {
rec.RecordError(fmt.Errorf("hash mismatch for %v, got %v expected %v", fe.Name, hash, fe.Hash))
return
}
// flush the buffer
rec.RecordError(dst.Flush())
}(i, fe)
}
wg.Wait()
return rec.Error()
}
开发者ID:jmptrader,项目名称:vitess,代码行数:74,代码来源:backup.go
示例8: backupFiles
func backupFiles(mysqld MysqlDaemon, logger logutil.Logger, bh backupstorage.BackupHandle, fes []FileEntry, replicationPosition replication.Position, backupConcurrency int) (err error) {
sema := sync2.NewSemaphore(backupConcurrency, 0)
rec := concurrency.AllErrorRecorder{}
wg := sync.WaitGroup{}
for i, fe := range fes {
wg.Add(1)
go func(i int, fe FileEntry) {
defer wg.Done()
// wait until we are ready to go, skip if we already
// encountered an error
sema.Acquire()
defer sema.Release()
if rec.HasErrors() {
return
}
// open the source file for reading
source, err := fe.open(mysqld.Cnf(), true)
if err != nil {
rec.RecordError(err)
return
}
defer source.Close()
// open the destination file for writing, and a buffer
name := fmt.Sprintf("%v", i)
wc, err := bh.AddFile(name)
if err != nil {
rec.RecordError(fmt.Errorf("cannot add file: %v", err))
return
}
defer func() { rec.RecordError(wc.Close()) }()
dst := bufio.NewWriterSize(wc, 2*1024*1024)
// create the hasher and the tee on top
hasher := newHasher()
tee := io.MultiWriter(dst, hasher)
// create the gzip compression filter
gzip, err := cgzip.NewWriterLevel(tee, cgzip.Z_BEST_SPEED)
if err != nil {
rec.RecordError(fmt.Errorf("cannot create gziper: %v", err))
return
}
// copy from the source file to gzip to tee to output file and hasher
_, err = io.Copy(gzip, source)
if err != nil {
rec.RecordError(fmt.Errorf("cannot copy data: %v", err))
return
}
// close gzip to flush it, after that the hash is good
if err = gzip.Close(); err != nil {
rec.RecordError(fmt.Errorf("cannot close gzip: %v", err))
return
}
// flush the buffer to finish writing, save the hash
rec.RecordError(dst.Flush())
fes[i].Hash = hasher.HashString()
}(i, fe)
}
wg.Wait()
if rec.HasErrors() {
return rec.Error()
}
// open the MANIFEST
wc, err := bh.AddFile(backupManifest)
if err != nil {
return fmt.Errorf("cannot add %v to backup: %v", backupManifest, err)
}
defer func() {
if closeErr := wc.Close(); err == nil {
err = closeErr
}
}()
// JSON-encode and write the MANIFEST
bm := &BackupManifest{
FileEntries: fes,
Position: replicationPosition,
}
data, err := json.MarshalIndent(bm, "", " ")
if err != nil {
return fmt.Errorf("cannot JSON encode %v: %v", backupManifest, err)
}
if _, err := wc.Write([]byte(data)); err != nil {
return fmt.Errorf("cannot write %v: %v", backupManifest, err)
}
return nil
}
开发者ID:jmptrader,项目名称:vitess,代码行数:96,代码来源:backup.go
示例9: diff
func (vsdw *VerticalSplitDiffWorker) diff() error {
vsdw.setState(stateVSDDiff)
vsdw.wr.Logger().Infof("Gathering schema information...")
wg := sync.WaitGroup{}
rec := concurrency.AllErrorRecorder{}
wg.Add(1)
go func() {
var err error
vsdw.destinationSchemaDefinition, err = vsdw.wr.GetSchema(vsdw.destinationAlias, nil, nil, false)
rec.RecordError(err)
vsdw.wr.Logger().Infof("Got schema from destination %v", vsdw.destinationAlias)
wg.Done()
}()
wg.Add(1)
go func() {
var err error
vsdw.sourceSchemaDefinition, err = vsdw.wr.GetSchema(vsdw.sourceAlias, nil, nil, false)
rec.RecordError(err)
vsdw.wr.Logger().Infof("Got schema from source %v", vsdw.sourceAlias)
wg.Done()
}()
wg.Wait()
if rec.HasErrors() {
return rec.Error()
}
// Build a list of regexp to exclude tables from source schema
tableRegexps := make([]*regexp.Regexp, len(vsdw.shardInfo.SourceShards[0].Tables))
for i, table := range vsdw.shardInfo.SourceShards[0].Tables {
var err error
tableRegexps[i], err = regexp.Compile(table)
if err != nil {
return fmt.Errorf("cannot compile regexp %v for table: %v", table, err)
}
}
// Remove the tables we don't need from the source schema
newSourceTableDefinitions := make([]myproto.TableDefinition, 0, len(vsdw.destinationSchemaDefinition.TableDefinitions))
for _, tableDefinition := range vsdw.sourceSchemaDefinition.TableDefinitions {
found := false
for _, tableRegexp := range tableRegexps {
if tableRegexp.MatchString(tableDefinition.Name) {
found = true
break
}
}
if !found {
vsdw.wr.Logger().Infof("Removing table %v from source schema", tableDefinition.Name)
continue
}
newSourceTableDefinitions = append(newSourceTableDefinitions, tableDefinition)
}
vsdw.sourceSchemaDefinition.TableDefinitions = newSourceTableDefinitions
// Check the schema
vsdw.wr.Logger().Infof("Diffing the schema...")
rec = concurrency.AllErrorRecorder{}
myproto.DiffSchema("destination", vsdw.destinationSchemaDefinition, "source", vsdw.sourceSchemaDefinition, &rec)
if rec.HasErrors() {
vsdw.wr.Logger().Warningf("Different schemas: %v", rec.Error())
} else {
vsdw.wr.Logger().Infof("Schema match, good.")
}
// run the diffs, 8 at a time
vsdw.wr.Logger().Infof("Running the diffs...")
sem := sync2.NewSemaphore(8, 0)
for _, tableDefinition := range vsdw.destinationSchemaDefinition.TableDefinitions {
wg.Add(1)
go func(tableDefinition myproto.TableDefinition) {
defer wg.Done()
sem.Acquire()
defer sem.Release()
vsdw.wr.Logger().Infof("Starting the diff on table %v", tableDefinition.Name)
sourceQueryResultReader, err := TableScan(vsdw.wr.Logger(), vsdw.wr.TopoServer(), vsdw.sourceAlias, &tableDefinition)
if err != nil {
vsdw.wr.Logger().Errorf("TableScan(source) failed: %v", err)
return
}
defer sourceQueryResultReader.Close()
destinationQueryResultReader, err := TableScan(vsdw.wr.Logger(), vsdw.wr.TopoServer(), vsdw.destinationAlias, &tableDefinition)
if err != nil {
vsdw.wr.Logger().Errorf("TableScan(destination) failed: %v", err)
return
}
defer destinationQueryResultReader.Close()
differ, err := NewRowDiffer(sourceQueryResultReader, destinationQueryResultReader, &tableDefinition)
if err != nil {
vsdw.wr.Logger().Errorf("NewRowDiffer() failed: %v", err)
return
}
report, err := differ.Go(vsdw.wr.Logger())
if err != nil {
vsdw.wr.Logger().Errorf("Differ.Go failed: %v", err)
} else {
//.........这里部分代码省略.........
开发者ID:chinna1986,项目名称:vitess,代码行数:101,代码来源:vertical_split_diff.go
示例10: copy
//.........这里部分代码省略.........
destinationWaitGroup.Add(1)
go func(ti *topo.TabletInfo, insertChannel chan string) {
defer destinationWaitGroup.Done()
scw.wr.Logger().Infof("Creating tables on tablet %v", ti.Alias)
if err := runSqlCommands(scw.wr, ti, createDbCmds, abort); err != nil {
processError("createDbCmds failed: %v", err)
return
}
if len(createViewCmds) > 0 {
scw.wr.Logger().Infof("Creating views on tablet %v", ti.Alias)
if err := runSqlCommands(scw.wr, ti, createViewCmds, abort); err != nil {
processError("createViewCmds failed: %v", err)
return
}
}
for j := 0; j < scw.destinationWriterCount; j++ {
destinationWaitGroup.Add(1)
go func() {
defer destinationWaitGroup.Done()
if err := executeFetchLoop(scw.wr, ti, insertChannel, abort); err != nil {
processError("executeFetchLoop failed: %v", err)
}
}()
}
}(scw.destinationTablets[shardIndex][tabletAlias], insertChannels[shardIndex][i])
}
}
// Now for each table, read data chunks and send them to all
// insertChannels
sourceWaitGroup := sync.WaitGroup{}
for shardIndex, _ := range scw.sourceShards {
sema := sync2.NewSemaphore(scw.sourceReaderCount, 0)
for tableIndex, td := range sourceSchemaDefinition.TableDefinitions {
if td.Type == myproto.TABLE_VIEW {
continue
}
rowSplitter := NewRowSplitter(scw.destinationShards, scw.keyspaceInfo.ShardingColumnType, columnIndexes[tableIndex])
chunks, err := findChunks(scw.wr, scw.sourceTablets[shardIndex], td, scw.minTableSizeForSplit, scw.sourceReaderCount)
if err != nil {
return err
}
for chunkIndex := 0; chunkIndex < len(chunks)-1; chunkIndex++ {
sourceWaitGroup.Add(1)
go func(td *myproto.TableDefinition, tableIndex, chunkIndex int) {
defer sourceWaitGroup.Done()
sema.Acquire()
defer sema.Release()
// build the query, and start the streaming
selectSQL := buildSQLFromChunks(scw.wr, td, chunks, chunkIndex, scw.sourceAliases[shardIndex].String())
qrr, err := NewQueryResultReaderForTablet(scw.wr.TopoServer(), scw.sourceAliases[shardIndex], selectSQL)
if err != nil {
processError("NewQueryResultReaderForTablet failed: %v", err)
return
}
// process the data
if err := scw.processData(td, tableIndex, qrr, rowSplitter, insertChannels, abort); err != nil {
processError("processData failed: %v", err)
}
开发者ID:plobsing,项目名称:vitess,代码行数:67,代码来源:split_clone.go
示例11: MultiRestore
// MultiRestore is the main entry point for multi restore.
// - If the strategy contains the string 'writeBinLogs' then we will
// also write to the binary logs.
// - If the strategy contains the command 'populateBlpCheckpoint' then we
// will populate the blp_checkpoint table with master positions to start from
func (mysqld *Mysqld) MultiRestore(destinationDbName string, keyRange key.KeyRange, sourceAddrs []*url.URL, snapshotConcurrency, fetchConcurrency, insertTableConcurrency, fetchRetryCount int, strategy string) (err error) {
writeBinLogs := strings.Contains(strategy, "writeBinLogs")
manifests := make([]*SplitSnapshotManifest, len(sourceAddrs))
rc := concurrency.NewResourceConstraint(fetchConcurrency)
for i, sourceAddr := range sourceAddrs {
rc.Add(1)
go func(sourceAddr *url.URL, i int) {
rc.Acquire()
defer rc.ReleaseAndDone()
if rc.HasErrors() {
return
}
var sourceDbName string
if len(sourceAddr.Path) < 2 { // "" or "/"
sourceDbName = destinationDbName
} else {
sourceDbName = sourceAddr.Path[1:]
}
ssm, e := fetchSnapshotManifestWithRetry("http://"+sourceAddr.Host, sourceDbName, keyRange, fetchRetryCount)
manifests[i] = ssm
rc.RecordError(e)
}(sourceAddr, i)
}
if err = rc.Wait(); err != nil {
return
}
if e := SanityCheckManifests(manifests); e != nil {
return e
}
tempStoragePath := path.Join(mysqld.SnapshotDir, "multirestore", destinationDbName)
// Start fresh
if err = os.RemoveAll(tempStoragePath); err != nil {
return
}
if err = os.MkdirAll(tempStoragePath, 0775); err != nil {
return err
}
defer func() {
if e := os.RemoveAll(tempStoragePath); e != nil {
log.Errorf("error removing %v: %v", tempStoragePath, e)
}
}()
// Handle our concurrency:
// - fetchConcurrency tasks for network
// - insertTableConcurrency for table inserts from a file
// into an innodb table
// - snapshotConcurrency tasks for table inserts / modify tables
sems := make(map[string]*sync2.Semaphore, len(manifests[0].SchemaDefinition.TableDefinitions)+3)
sems["net"] = sync2.NewSemaphore(fetchConcurrency, 0)
sems["db"] = sync2.NewSemaphore(snapshotConcurrency, 0)
// Store the alter table statements for after restore,
// and how many jobs we're running on each table
// TODO(alainjobart) the jobCount map is a bit weird. replace it
// with a map of WaitGroups, initialized to the number of files
// per table. Have extra go routines for the tables with auto_increment
// to wait on the waitgroup, and apply the modify_table.
postSql := make(map[string]string, len(manifests[0].SchemaDefinition.TableDefinitions))
jobCount := make(map[string]*sync2.AtomicInt32)
// Create the database (it's a good check to know if we're running
// multirestore a second time too!)
manifest := manifests[0] // I am assuming they all match
createDatabase, e := fillStringTemplate(manifest.SchemaDefinition.DatabaseSchema, map[string]string{"DatabaseName": destinationDbName})
if e != nil {
return e
}
if createDatabase == "" {
return fmt.Errorf("Empty create database statement")
}
createDbCmds := make([]string, 0, len(manifest.SchemaDefinition.TableDefinitions)+2)
if !writeBinLogs {
createDbCmds = append(createDbCmds, "SET sql_log_bin = OFF")
}
createDbCmds = append(createDbCmds, createDatabase)
createDbCmds = append(createDbCmds, "USE `"+destinationDbName+"`")
createViewCmds := make([]string, 0, 16)
for _, td := range manifest.SchemaDefinition.TableDefinitions {
if td.Type == TABLE_BASE_TABLE {
createDbCmd, alterTable, err := makeCreateTableSql(td.Schema, td.Name, strategy)
if err != nil {
return err
}
if alterTable != "" {
postSql[td.Name] = alterTable
//.........这里部分代码省略.........
开发者ID:CERN-Stage-3,项目名称:vitess,代码行数:101,代码来源:split.go
示例12: copy
//.........这里部分代码省略.........
insertChannels[i] = make(chan string, vscw.destinationWriterCount*2)
destinationWaitGroup.Add(1)
go func(ti *topo.TabletInfo, insertChannel chan string) {
defer destinationWaitGroup.Done()
vscw.wr.Logger().Infof("Creating tables on tablet %v", ti.Alias)
if err := runSqlCommands(vscw.wr, ti, createDbCmds, abort); err != nil {
processError("createDbCmds failed: %v", err)
return
}
if len(createViewCmds) > 0 {
vscw.wr.Logger().Infof("Creating views on tablet %v", ti.Alias)
if err := runSqlCommands(vscw.wr, ti, createViewCmds, abort); err != nil {
processError("createViewCmds failed: %v", err)
return
}
}
for j := 0; j < vscw.destinationWriterCount; j++ {
destinationWaitGroup.Add(1)
go func() {
defer destinationWaitGroup.Done()
if err := executeFetchLoop(vscw.wr, ti, insertChannel, abort); err != nil {
processError("executeFetchLoop failed: %v", err)
}
}()
}
}(vscw.destinationTablets[tabletAlias], insertChannels[i])
}
// Now for each table, read data chunks and send them to all
// insertChannels
sourceWaitGroup := sync.WaitGroup{}
sema := sync2.NewSemaphore(vscw.sourceReaderCount, 0)
for tableIndex, td := range sourceSchemaDefinition.TableDefinitions {
if td.Type == myproto.TABLE_VIEW {
vscw.tableStatus[tableIndex].setState("view created")
continue
}
vscw.tableStatus[tableIndex].setState("before copy")
chunks, err := findChunks(vscw.wr, vscw.sourceTablet, td, vscw.minTableSizeForSplit, vscw.sourceReaderCount)
if err != nil {
return err
}
for chunkIndex := 0; chunkIndex < len(chunks)-1; chunkIndex++ {
sourceWaitGroup.Add(1)
go func(td *myproto.TableDefinition, tableIndex, chunkIndex int) {
defer sourceWaitGroup.Done()
sema.Acquire()
defer sema.Release()
vscw.tableStatus[tableIndex].setState("started the copy")
// build the query, and start the streaming
selectSQL := buildSQLFromChunks(vscw.wr, td, chunks, chunkIndex, vscw.sourceAlias.String())
qrr, err := NewQueryResultReaderForTablet(vscw.wr.TopoServer(), vscw.sourceAlias, selectSQL)
if err != nil {
processError("NewQueryResultReaderForTablet failed: %v", err)
return
}
// process the data
if err := vscw.processData(td, tableIndex, qrr, insertChannels, abort); err != nil {
开发者ID:nangong92t,项目名称:go_src,代码行数:67,代码来源:vertical_split_clone.go
示例13: copy
//.........这里部分代码省略.........
throttler := scw.destinationThrottlers[keyspaceAndShard]
defer throttler.ThreadFinished(threadID)
executor := newExecutor(scw.wr, scw.tsc, throttler, keyspace, shard, threadID)
if err := executor.fetchLoop(ctx, insertChannel); err != nil {
processError("executer.FetchLoop failed: %v", err)
}
}(j)
}
}(si.Keyspace(), si.ShardName(), insertChannels[shardIndex])
}
// read the vschema if needed
var keyspaceSchema *vindexes.KeyspaceSchema
if *useV3ReshardingMode {
kschema, err := scw.wr.TopoServer().GetVSchema(ctx, scw.keyspace)
if err != nil {
return fmt.Errorf("cannot load VSchema for keyspace %v: %v", scw.keyspace, err)
}
if kschema == nil {
return fmt.Errorf("no VSchema for keyspace %v", scw.keyspace)
}
keyspaceSchema, err = vindexes.BuildKeyspaceSchema(kschema, scw.keyspace)
if err != nil {
return fmt.Errorf("cannot build vschema for keyspace %v: %v", scw.keyspace, err)
}
}
// Now for each table, read data chunks and send them to all
// insertChannels
sourceWaitGroup := sync.WaitGroup{}
for shardIndex := range scw.sourceShards {
sema := sync2.NewSemaphore(scw.sourceReaderCount, 0)
for tableIndex, td := range sourceSchemaDefinition.TableDefinitions {
var keyResolver keyspaceIDResolver
if *useV3ReshardingMode {
keyResolver, err = newV3ResolverFromTableDefinition(keyspaceSchema, td)
if err != nil {
return fmt.Errorf("cannot resolve v3 sharding keys for keyspace %v: %v", scw.keyspace, err)
}
} else {
keyResolver, err = newV2Resolver(scw.keyspaceInfo, td)
if err != nil {
return fmt.Errorf("cannot resolve sharding keys for keyspace %v: %v", scw.keyspace, err)
}
}
rowSplitter := NewRowSplitter(scw.destinationShards, keyResolver)
chunks, err := generateChunks(ctx, scw.wr, scw.sourceTablets[shardIndex], td, scw.sourceReaderCount, defaultMinRowsPerChunk)
if err != nil {
return err
}
scw.tableStatusList.setThreadCount(tableIndex, len(chunks)-1)
for _, c := range chunks {
sourceWaitGroup.Add(1)
go func(td *tabletmanagerdatapb.TableDefinition, tableIndex int, chunk chunk) {
defer sourceWaitGroup.Done()
sema.Acquire()
defer sema.Release()
scw.tableStatusList.threadStarted(tableIndex)
// Start streaming from the source tablets.
开发者ID:dumbunny,项目名称:vitess,代码行数:67,代码来源:legacy_split_clone.go
示例14: copy
// copy phase:
// - copy the data from source tablets to destination masters (wtih replication on)
// Assumes that the schema has already been created on each destination tablet
// (probably from vtctl's CopySchemaShard)
func (vscw *VerticalSplitCloneWorker) copy(ctx context.Context) error {
vscw.setState(WorkerStateCopy)
// get source schema
shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
sourceSchemaDefinition, err := vscw.wr.GetSchema(shortCtx, vscw.sourceAlias, vscw.tables, nil, true)
cancel()
if err != nil {
return fmt.Errorf("cannot get schema from source %v: %v", topo.TabletAliasString(vscw.sourceAlias), err)
}
if len(sourceSchemaDefinition.TableDefinitions) == 0 {
return fmt.Errorf("no tables matching the table filter")
}
vscw.wr.Logger().Infof("Source tablet has %v tables to copy", len(sourceSchemaDefinition.TableDefinitions))
vscw.Mu.Lock()
vscw.tableStatus = make([]*tableStatus, len(sourceSchemaDefinition.TableDefinitions))
for i, td := range sourceSchemaDefinition.TableDefinitions {
vscw.tableStatus[i] = &tableStatus{
name: td.Name,
rowCount: td.RowCount,
}
}
vscw.startTime = time.Now()
vscw.Mu.Unlock()
// Count rows
for i, td := range sourceSchemaDefinition.TableDefinitions {
vscw.tableStatus[i].mu.Lock()
if td.Type == myproto.TableBaseTable {
vscw.tableStatus[i].rowCount = td.RowCount
} else {
vscw.tableStatus[i].isView = true
}
vscw.tableStatus[i].mu.Unlock()
}
// In parallel, setup the channels to send SQL data chunks to
// for each destination tablet.
//
// mu protects firstError
mu := sync.Mutex{}
var firstError error
ctx, cancel = context.WithCancel(ctx)
processError := func(format string, args ...interface{}) {
vscw.wr.Logger().Errorf(format, args...)
mu.Lock()
if firstError == nil {
firstError = fmt.Errorf(format, args...)
cancel()
}
mu.Unlock()
}
destinationWaitGroup := sync.WaitGroup{}
// we create one channel for the destination tablet. It
// is sized to have a buffer of a maximum of
// destinationWriterCount * 2 items, to hopefully
// always have data. We then have
// destinationWriterCount go routines reading from it.
insertChannel := make(chan string, vscw.destinationWriterCount*2)
go func(shardName string, insertChannel chan string) {
for j := 0; j < vscw.destinationWriterCount; j++ {
destinationWaitGroup.Add(1)
go func() {
defer destinationWaitGroup.Done()
if err := executeFetchLoop(ctx, vscw.wr, vscw, shardName, insertChannel); err != nil {
processError("executeFetchLoop failed: %v", err)
}
}()
}
}(vscw.destinationShard, insertChannel)
// Now for each table, read data chunks and send them to insertChannel
sourceWaitGroup := sync.WaitGroup{}
sema := sync2.NewSemaphore(vscw.sourceReaderCount, 0)
for tableIndex, td := range sourceSchemaDefinition.TableDefinitions {
if td.Type == myproto.TableView {
continue
}
chunks, err := FindChunks(ctx, vscw.wr, vscw.sourceTablet, td, vscw.minTableSizeForSplit, vscw.sourceReaderCount)
if err != nil {
return err
}
vscw.tableStatus[tableIndex].setThreadCount(len(chunks) - 1)
for chunkIndex := 0; chunkIndex < len(chunks)-1; chunkIndex++ {
sourceWaitGroup.Add(1)
go func(td *myproto.TableDefinition, tableIndex, chunkIndex int) {
defer sourceWaitGroup.Done()
sema.Acquire()
//.........这里部分代码省略.........
开发者ID:springlee,项目名称:vitess,代码行数:101,代码来源:vertical_split_clone.go
示例15: diff
func (vsdw *VerticalSplitDiffWorker) diff(ctx context.Context) error {
vsdw.SetState(WorkerStateDiff)
vsdw.wr.Logger().Infof("Gathering schema information...")
wg := sync.WaitGroup{}
rec := &concurrency.AllErrorRecorder{}
wg.Add(1)
go func() {
var err error
shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
vsdw.destinationSchemaDefinition, err = vsdw.wr.GetSchema(
shortCtx, vsdw.destinationAlias, vsdw.shardInfo.SourceShards[0].Tables, nil /* excludeTables */, false /* includeViews */)
cancel()
rec.RecordError(err)
vsdw.wr.Logger().Infof("Got schema from destination %v", topoproto.TabletAliasString(vsdw.destinationAlias))
wg.Done()
}()
wg.Add(1)
go func() {
var err error
shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
vsdw.sourceSchemaDefinition, err = vsdw.wr.GetSchema(
shortCtx, vsdw.sourceAlias, vsdw.shardInfo.SourceShards[0].Tables, nil /* excludeTables */, false /* includeViews */)
cancel()
rec.RecordError(err)
vsdw.wr.Logger().Infof("Got schema from source %v", topoproto.TabletAliasString(vsdw.sourceAlias))
wg.Done()
}()
wg.Wait()
if rec.HasErrors() {
return rec.Error()
}
// Check the schema
vsdw.wr.Logger().Infof("Diffing the schema...")
rec = &concurrency.AllErrorRecorder{}
tmutils.DiffSchema("destination", vsdw.destinationSchemaDefinition, "source", vsdw.sourceSchemaDefinition, rec)
if rec.HasErrors() {
vsdw.wr.Logger().Warningf("Different schemas: %v", rec.Error())
} else {
vsdw.wr.Logger().Infof("Schema match, good.")
}
// run the diffs, 8 at a time
vsdw.wr.Logger().Infof("Running the diffs...")
sem := sync2.NewSemaphore(8, 0)
for _, tableDefinition := range vsdw.destinationSchemaDefinition.TableDefinitions {
wg.Add(1)
go func(tableDefinition *tabletmanagerdatapb.TableDefinition) {
defer wg.Done()
sem.Acquire()
defer sem.Release()
vsdw.wr.Logger().Infof("Starting the diff on table %v", tableDefinition.Name)
sourceQueryResultReader, err := TableScan(ctx, vsdw.wr.Logger(), vsdw.wr.TopoServer(), vsdw.sourceAlias, tableDefinition)
if err != nil {
newErr := fmt.Errorf("TableScan(source) failed: %v", err)
rec.RecordError(newErr)
vsdw.wr.Logger().Errorf("%v", newErr)
return
}
defer sourceQueryResultReader.Close()
destinationQueryResultReader, err := TableScan(ctx, vsdw.wr.Logger(), vsdw.wr.TopoServer(), vsdw.destinationAlias, tableDefinition)
if err != nil {
newErr := fmt.Errorf("TableScan(destination) failed: %v", err)
rec.RecordError(newErr)
vsdw.wr.Logger().Errorf("%v", newErr)
return
}
defer destinationQueryResultReader.Close()
differ, err := NewRowDiffer(sourceQueryResultReader, destinationQueryResultReader, tableDefinition)
if err != nil {
newErr := fmt.Errorf("NewRowDiffer() failed: %v", err)
rec.RecordError(newErr)
vsdw.wr.Logger().Errorf("%v", newErr)
return
}
report, err := differ.Go(vsdw.wr.Logger())
if err != nil {
vsdw.wr.Logger().Errorf("Differ.Go failed: %v", err)
} else {
if report.HasDifferences() {
err := fmt.Errorf("Table %v has differences: %v", tableDefinition.Name, report.String())
rec.RecordError(err)
vsdw.wr.Logger().Errorf("%v", err)
} else {
vsdw.wr.Logger().Infof("Table %v checks out (%v rows processed, %v qps)", tableDefinition.Name, report.processedRows, report.processingQPS)
}
}
}(tableDefinition)
}
wg.Wait()
return rec.Error()
}
开发者ID:erzel,项目名称:vitess,代码行数:98,代码来源:vertical_split_diff.go
示例16: clone
// copy phase:
// - copy the data from source tablets to destination masters (with replication on)
// Assumes that the schema has already been created on each destination tablet
// (probably from vtctl's CopySchemaShard)
func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState) error {
if state != WorkerStateCloneOnline && state != WorkerStateCloneOffline {
panic(fmt.Sprintf("invalid state passed to clone(): %v", state))
}
scw.setState(state)
start := time.Now()
defer func() {
statsStateDurationsNs.Set(string(state), time.Now().Sub(start).Nanoseconds())
}()
var firstSourceTablet *topodatapb.Tablet
if state == WorkerStateCloneOffline {
// Use the first source tablet which we took offline.
firstSourceTablet = scw.sourceTablets[0]
} else {
// Pick any healthy serving source tablet.
si := scw.sourceShards[0]
tablets := scw.tsc.GetTabletStats(si.Keyspace(), si.ShardName(), topodatapb.TabletType_RDONLY)
if len(tablets) == 0 {
// We fail fast on this problem and don't retry because at the start all tablets should be healthy.
return fmt.Errorf("no healthy RDONLY tablet in source shard (%v) available (required to find out the schema)", topoproto.KeyspaceShardString(si.Keyspace(), si.ShardName()))
}
firstSourceTablet = tablets[0].Tablet
}
var statsCounters []*stats.Counters
var tableStatusList *tableStatusList
switch state {
case WorkerStateCloneOnline:
statsCounters = []*stats.Counters{statsOnlineInsertsCounters, statsOnlineUpdatesCounters, statsOnlineDeletesCounters, statsOnlineEqualRowsCounters}
tableStatusList = scw.tableStatusListOnline
case WorkerStateCloneOffline:
statsCounters = []*stats.Counters{statsOfflineInsertsCounters, statsOfflineUpdatesCounters, statsOfflineDeletesCounters, statsOfflineEqualRowsCounters}
tableStatusList = scw.tableStatusListOffline
}
// The throttlers exist only for the duration of this clone() call.
// That means a SplitClone invocation with both online and offline phases
// will create throttlers for each phase.
if err := scw.createThrottlers(); err != nil {
return err
}
defer scw.closeThrottlers()
sourceSchemaDefinition, err := scw.getSourceSchema(ctx, firstSourceTablet)
if err != nil {
return err
}
scw.wr.Logger().Infof("Source tablet 0 has %v tables to copy", len(sourceSchemaDefinition.TableDefinitions))
tableStatusList.initialize(sourceSchemaDefinition)
// In parallel, setup the channels to send SQL data chunks to for each destination tablet:
//
// mu protects the context for cancelation, and firstError
mu := sync.Mutex{}
var firstError error
ctx, cancelCopy := context.WithCancel(ctx)
processError := func(format string, args ...interface{}) {
scw.wr.Logger().Errorf(format, args...)
mu.Lock()
if firstError == nil {
firstError = fmt.Errorf(format, args...)
cancelCopy()
}
mu.Unlock()
}
insertChannels := make([]chan string, len(scw.destinationShards))
destinationWaitGroup := sync.WaitGroup{}
for shardIndex, si := range scw.destinationShards {
// We create one channel per destination tablet. It is sized to have a
// buffer of a maximum of destinationWriterCount * 2 items, to hopefully
// always have data. We then have destinationWriterCount go routines reading
// from it.
insertChannels[shardIndex] = make(chan string, scw.destinationWriterCount*2)
for j := 0; j < scw.destinationWriterCount; j++ {
destinationWaitGroup.Add(1)
go func(keyspace, shard string, insertChannel chan string, throttler *throttler.Throttler, threadID int) {
defer destinationWaitGroup.Done()
defer throttler.ThreadFinished(threadID)
executor := newExecutor(scw.wr, scw.tsc, throttler, keyspace, shard, threadID)
if err := executor.fetchLoop(ctx, insertChannel); err != nil {
processError("executer.FetchLoop failed: %v", err)
}
}(si.Keyspace(), si.ShardName(), insertChannels[shardIndex], scw.getThrottler(si.Keyspace(), si.ShardName()), j)
}
}
// Now for each table, read data chunks
|
请发表评论