本文整理汇总了Java中org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse类的典型用法代码示例。如果您正苦于以下问题:Java ReplicateWALEntryResponse类的具体用法?Java ReplicateWALEntryResponse怎么用?Java ReplicateWALEntryResponse使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
ReplicateWALEntryResponse类属于org.apache.hadoop.hbase.protobuf.generated.AdminProtos包,在下文中一共展示了ReplicateWALEntryResponse类的18个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: replayEdits
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; //导入依赖的package包/类
private void replayEdits(final HRegionLocation regionLoc, final HRegionInfo regionInfo,
final List<Entry> entries) throws IOException {
try {
RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf, null);
ReplayServerCallable<ReplicateWALEntryResponse> callable =
new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.tableName, regionLoc,
regionInfo, entries);
factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, this.replayTimeout);
} catch (IOException ie) {
if (skipErrors) {
LOG.warn(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
+ "=true so continuing replayEdits with error:" + ie.getMessage());
} else {
throw ie;
}
}
}
开发者ID:fengchen8086,项目名称:ditb,代码行数:18,代码来源:WALEditsReplaySink.java
示例2: replicateWALEntry
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; //导入依赖的package包/类
/**
* Replicate WAL entries on the region server.
*
* @param controller the RPC controller
* @param request the request
* @throws ServiceException
*/
@Override
@QosPriority(priority=HConstants.REPLICATION_QOS)
public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
final ReplicateWALEntryRequest request) throws ServiceException {
try {
checkOpen();
if (regionServer.replicationSinkHandler != null) {
requestCount.increment();
List<WALEntry> entries = request.getEntryList();
CellScanner cellScanner = ((PayloadCarryingRpcController)controller).cellScanner();
regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner);
regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner);
regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(entries, cellScanner);
return ReplicateWALEntryResponse.newBuilder().build();
} else {
throw new ServiceException("Replication services are not initialized yet");
}
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
开发者ID:fengchen8086,项目名称:ditb,代码行数:29,代码来源:RSRpcServices.java
示例3: replicateUsingCallable
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; //导入依赖的package包/类
private void replicateUsingCallable(ClusterConnection connection, Queue<Entry> entries)
throws IOException, RuntimeException {
Entry entry;
while ((entry = entries.poll()) != null) {
byte[] row = entry.getEdit().getCells().get(0).getRow();
RegionLocations locations = connection.locateRegion(tableName, row, true, true);
RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection,
RpcControllerFactory.instantiate(connection.getConfiguration()),
table.getName(), locations.getRegionLocation(1),
locations.getRegionLocation(1).getRegionInfo(), row, Lists.newArrayList(entry),
new AtomicLong());
RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(
connection.getConfiguration());
factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, 10000);
}
}
开发者ID:fengchen8086,项目名称:ditb,代码行数:18,代码来源:TestRegionReplicaReplicationEndpointNoMaster.java
示例4: replicateWALEntry
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; //导入依赖的package包/类
/**
* Replicate WAL entries on the region server.
*
* @param controller the RPC controller
* @param request the request
* @throws ServiceException
*/
@Override
@QosPriority(priority = HConstants.REPLICATION_QOS)
public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
final ReplicateWALEntryRequest request) throws ServiceException {
try {
if (regionServer.replicationSinkHandler != null) {
checkOpen();
requestCount.increment();
List<WALEntry> entries = request.getEntryList();
CellScanner cellScanner = ((PayloadCarryingRpcController) controller).cellScanner();
regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner);
regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner);
regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(entries, cellScanner);
}
return ReplicateWALEntryResponse.newBuilder().build();
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
开发者ID:grokcoder,项目名称:pbase,代码行数:27,代码来源:RSRpcServices.java
示例5: replicateWALEntry
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; //导入依赖的package包/类
/**
* Replicate WAL entries on the region server.
*
* @param controller the RPC controller
* @param request the request
* @throws ServiceException
*/
@Override
@QosPriority(priority=HConstants.REPLICATION_QOS)
public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
final ReplicateWALEntryRequest request)
throws ServiceException {
try {
if (replicationSinkHandler != null) {
checkOpen();
requestCount.increment();
this.replicationSinkHandler.replicateLogEntries(request.getEntryList(),
((PayloadCarryingRpcController)controller).cellScanner());
}
return ReplicateWALEntryResponse.newBuilder().build();
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
开发者ID:tenggyut,项目名称:HIndex,代码行数:25,代码来源:HRegionServer.java
示例6: replayEdits
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; //导入依赖的package包/类
private void replayEdits(final HRegionLocation regionLoc, final HRegionInfo regionInfo,
final List<HLog.Entry> entries) throws IOException {
try {
RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf);
ReplayServerCallable<ReplicateWALEntryResponse> callable =
new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.tableName, regionLoc,
regionInfo, entries);
factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, this.replayTimeout);
} catch (IOException ie) {
if (skipErrors) {
LOG.warn(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
+ "=true so continuing replayEdits with error:" + ie.getMessage());
} else {
throw ie;
}
}
}
开发者ID:tenggyut,项目名称:HIndex,代码行数:18,代码来源:WALEditsReplaySink.java
示例7: replicateWALEntry
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; //导入依赖的package包/类
/**
* Replicate WAL entries on the region server.
*
* @param controller the RPC controller
* @param request the request
* @throws ServiceException
*/
@Override
@QosPriority(priority=HConstants.REPLICATION_QOS)
public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
final ReplicateWALEntryRequest request) throws ServiceException {
try {
if (regionServer.replicationSinkHandler != null) {
checkOpen();
requestCount.increment();
regionServer.replicationSinkHandler.replicateLogEntries(request.getEntryList(),
((PayloadCarryingRpcController)controller).cellScanner());
}
return ReplicateWALEntryResponse.newBuilder().build();
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
开发者ID:shenli-uiuc,项目名称:PyroDB,代码行数:24,代码来源:RSRpcServices.java
示例8: replicateWALEntry
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; //导入依赖的package包/类
/**
* Replicate WAL entries on the region server.
*
* @param controller the RPC controller
* @param request the request
* @throws ServiceException
*/
@Override
@QosPriority(priority=HConstants.REPLICATION_QOS)
public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
final ReplicateWALEntryRequest request) throws ServiceException {
try {
if (replicationSinkHandler != null) {
checkOpen();
requestCount.increment();
HLog.Entry[] entries = ProtobufUtil.toHLogEntries(request.getEntryList());
if (entries != null && entries.length > 0) {
replicationSinkHandler.replicateLogEntries(entries);
}
}
return ReplicateWALEntryResponse.newBuilder().build();
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
开发者ID:daidong,项目名称:DominoHBase,代码行数:26,代码来源:HRegionServer.java
示例9: call
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; //导入依赖的package包/类
@Override
public ReplicateWALEntryResponse call(int callTimeout) throws IOException {
try {
replayToServer(this.regionInfo, this.entries);
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
return null;
}
开发者ID:fengchen8086,项目名称:ditb,代码行数:10,代码来源:WALEditsReplaySink.java
示例10: replay
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; //导入依赖的package包/类
@Override
public ReplicateWALEntryResponse
replay(RpcController controller, ReplicateWALEntryRequest request)
throws ServiceException {
// TODO Auto-generated method stub
return null;
}
开发者ID:fengchen8086,项目名称:ditb,代码行数:8,代码来源:MockRegionServer.java
示例11: call
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; //导入依赖的package包/类
@Override
public ReplicateWALEntryResponse call() throws IOException {
try {
replayToServer(this.regionInfo, this.entries);
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
return null;
}
开发者ID:tenggyut,项目名称:HIndex,代码行数:10,代码来源:WALEditsReplaySink.java
示例12: call
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; //导入依赖的package包/类
@Override
public ReplicateWALEntryResponse call(int timeout) throws IOException {
return replayToServer(this.entries, timeout);
}
开发者ID:fengchen8086,项目名称:ditb,代码行数:5,代码来源:RegionReplicaReplicationEndpoint.java
示例13: replayToServer
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; //导入依赖的package包/类
private ReplicateWALEntryResponse replayToServer(List<Entry> entries, int timeout)
throws IOException {
// check whether we should still replay this entry. If the regions are changed, or the
// entry is not coming form the primary region, filter it out because we do not need it.
// Regions can change because of (1) region split (2) region merge (3) table recreated
boolean skip = false;
if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(),
initialEncodedRegionName)) {
skip = true;
}
if (!entries.isEmpty() && !skip) {
Entry[] entriesArray = new Entry[entries.size()];
entriesArray = entries.toArray(entriesArray);
// set the region name for the target region replica
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
ReplicationProtbufUtil.buildReplicateWALEntryRequest(
entriesArray, location.getRegionInfo().getEncodedNameAsBytes());
try {
PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
controller.setCallTimeout(timeout);
controller.setPriority(tableName);
return stub.replay(controller, p.getFirst());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
if (skip) {
if (LOG.isTraceEnabled()) {
LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
+ " because located region " + location.getRegionInfo().getEncodedName()
+ " is different than the original region "
+ Bytes.toStringBinary(initialEncodedRegionName) + " from WALEdit");
for (Entry entry : entries) {
LOG.trace("Skipping : " + entry);
}
}
skippedEntries.addAndGet(entries.size());
}
return ReplicateWALEntryResponse.newBuilder().build();
}
开发者ID:fengchen8086,项目名称:ditb,代码行数:44,代码来源:RegionReplicaReplicationEndpoint.java
示例14: replicateWALEntry
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; //导入依赖的package包/类
@Override
public ReplicateWALEntryResponse replicateWALEntry(RpcController controller,
ReplicateWALEntryRequest request) throws ServiceException {
// TODO Auto-generated method stub
return null;
}
开发者ID:fengchen8086,项目名称:ditb,代码行数:7,代码来源:MockRegionServer.java
示例15: replay
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; //导入依赖的package包/类
/**
* Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
* that the given mutations will be durable on the receiving RS if this method returns without any
* exception.
*
* @param controller the RPC controller
* @param request the request
* @throws ServiceException
*/
@Override
@QosPriority(priority = HConstants.REPLAY_QOS)
public ReplicateWALEntryResponse replay(final RpcController controller,
final ReplicateWALEntryRequest request) throws ServiceException {
long before = EnvironmentEdgeManager.currentTime();
CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner();
try {
checkOpen();
List<WALEntry> entries = request.getEntryList();
if (entries == null || entries.isEmpty()) {
// empty input
return ReplicateWALEntryResponse.newBuilder().build();
}
HRegion region = regionServer.getRegionByEncodedName(
entries.get(0).getKey().getEncodedRegionName().toStringUtf8());
RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<Pair<WALKey, WALEdit>>();
for (WALEntry entry : entries) {
if (regionServer.nonceManager != null) {
long nonceGroup = entry.getKey().hasNonceGroup()
? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
regionServer.nonceManager.reportOperationFromWal(nonceGroup, nonce, entry.getKey().getWriteTime());
}
Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
new Pair<WALKey, WALEdit>();
List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry,
cells, walEntry);
if (coprocessorHost != null) {
// Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
// KeyValue.
if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
walEntry.getSecond())) {
// if bypass this log entry, ignore it ...
continue;
}
walEntries.add(walEntry);
}
if (edits != null && !edits.isEmpty()) {
long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
// check if it's a partial success
for (int i = 0; result != null && i < result.length; i++) {
if (result[i] != OperationStatus.SUCCESS) {
throw new IOException(result[i].getExceptionMsg());
}
}
}
}
//sync wal at the end because ASYNC_WAL is used above
region.syncWal();
if (coprocessorHost != null) {
for (Pair<WALKey, WALEdit> wal : walEntries) {
coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(),
wal.getSecond());
}
}
return ReplicateWALEntryResponse.newBuilder().build();
} catch (IOException ie) {
throw new ServiceException(ie);
} finally {
if (regionServer.metricsRegionServer != null) {
regionServer.metricsRegionServer.updateReplay(
EnvironmentEdgeManager.currentTime() - before);
}
}
}
开发者ID:grokcoder,项目名称:pbase,代码行数:80,代码来源:RSRpcServices.java
示例16: replay
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; //导入依赖的package包/类
/**
* Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
* that the given mutations will be durable on the receiving RS if this method returns without any
* exception.
* @param controller the RPC controller
* @param request the request
* @throws ServiceException
*/
@Override
@QosPriority(priority = HConstants.REPLAY_QOS)
public ReplicateWALEntryResponse replay(final RpcController controller,
final ReplicateWALEntryRequest request) throws ServiceException {
long before = EnvironmentEdgeManager.currentTimeMillis();
CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner();
try {
checkOpen();
List<WALEntry> entries = request.getEntryList();
if (entries == null || entries.isEmpty()) {
// empty input
return ReplicateWALEntryResponse.newBuilder().build();
}
HRegion region = this.getRegionByEncodedName(
entries.get(0).getKey().getEncodedRegionName().toStringUtf8());
RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey, WALEdit>>();
List<HLogSplitter.MutationReplay> mutations = new ArrayList<HLogSplitter.MutationReplay>();
// when tag is enabled, we need tag replay edits with log sequence number
boolean needAddReplayTag = (HFile.getFormatVersion(this.conf) >= 3);
for (WALEntry entry : entries) {
if (nonceManager != null) {
long nonceGroup = entry.getKey().hasNonceGroup()
? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
nonceManager.reportOperationFromWal(nonceGroup, nonce, entry.getKey().getWriteTime());
}
Pair<HLogKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
new Pair<HLogKey, WALEdit>();
List<HLogSplitter.MutationReplay> edits = HLogSplitter.getMutationsFromWALEntry(entry,
cells, walEntry, needAddReplayTag);
if (coprocessorHost != null) {
// Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
// KeyValue.
if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
walEntry.getSecond())) {
// if bypass this log entry, ignore it ...
continue;
}
walEntries.add(walEntry);
}
mutations.addAll(edits);
}
if (!mutations.isEmpty()) {
OperationStatus[] result = doReplayBatchOp(region, mutations);
// check if it's a partial success
for (int i = 0; result != null && i < result.length; i++) {
if (result[i] != OperationStatus.SUCCESS) {
throw new IOException(result[i].getExceptionMsg());
}
}
}
if (coprocessorHost != null) {
for (Pair<HLogKey, WALEdit> wal : walEntries) {
coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(),
wal.getSecond());
}
}
return ReplicateWALEntryResponse.newBuilder().build();
} catch (IOException ie) {
throw new ServiceException(ie);
} finally {
metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTimeMillis() - before);
}
}
开发者ID:tenggyut,项目名称:HIndex,代码行数:75,代码来源:HRegionServer.java
示例17: replay
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; //导入依赖的package包/类
/**
* Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
* that the given mutations will be durable on the receiving RS if this method returns without any
* exception.
* @param controller the RPC controller
* @param request the request
* @throws ServiceException
*/
@Override
@QosPriority(priority = HConstants.REPLAY_QOS)
public ReplicateWALEntryResponse replay(final RpcController controller,
final ReplicateWALEntryRequest request) throws ServiceException {
long before = EnvironmentEdgeManager.currentTimeMillis();
CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner();
try {
checkOpen();
List<WALEntry> entries = request.getEntryList();
if (entries == null || entries.isEmpty()) {
// empty input
return ReplicateWALEntryResponse.newBuilder().build();
}
HRegion region = regionServer.getRegionByEncodedName(
entries.get(0).getKey().getEncodedRegionName().toStringUtf8());
RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey, WALEdit>>();
List<HLogSplitter.MutationReplay> mutations = new ArrayList<HLogSplitter.MutationReplay>();
// when tag is enabled, we need tag replay edits with log sequence number
boolean needAddReplayTag = (HFile.getFormatVersion(regionServer.conf) >= 3);
for (WALEntry entry : entries) {
if (regionServer.nonceManager != null) {
long nonceGroup = entry.getKey().hasNonceGroup()
? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
regionServer.nonceManager.reportOperationFromWal(nonceGroup, nonce, entry.getKey().getWriteTime());
}
Pair<HLogKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
new Pair<HLogKey, WALEdit>();
List<HLogSplitter.MutationReplay> edits = HLogSplitter.getMutationsFromWALEntry(entry,
cells, walEntry, needAddReplayTag);
if (coprocessorHost != null) {
// Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
// KeyValue.
if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
walEntry.getSecond())) {
// if bypass this log entry, ignore it ...
continue;
}
walEntries.add(walEntry);
}
mutations.addAll(edits);
}
if (!mutations.isEmpty()) {
OperationStatus[] result = doReplayBatchOp(region, mutations);
// check if it's a partial success
for (int i = 0; result != null && i < result.length; i++) {
if (result[i] != OperationStatus.SUCCESS) {
throw new IOException(result[i].getExceptionMsg());
}
}
}
if (coprocessorHost != null) {
for (Pair<HLogKey, WALEdit> wal : walEntries) {
coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(),
wal.getSecond());
}
}
return ReplicateWALEntryResponse.newBuilder().build();
} catch (IOException ie) {
throw new ServiceException(ie);
} finally {
if (regionServer.metricsRegionServer != null) {
regionServer.metricsRegionServer.updateReplay(
EnvironmentEdgeManager.currentTimeMillis() - before);
}
}
}
开发者ID:shenli-uiuc,项目名称:PyroDB,代码行数:78,代码来源:RSRpcServices.java
示例18: replay
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; //导入依赖的package包/类
/**
* Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
* that the given mutations will be durable on the receiving RS if this method returns without any
* exception.
* @param controller the RPC controller
* @param request the request
* @throws ServiceException
*/
@Override
@QosPriority(priority = HConstants.REPLAY_QOS)
public ReplicateWALEntryResponse replay(final RpcController controller,
final ReplicateWALEntryRequest request) throws ServiceException {
long before = EnvironmentEdgeManager.currentTimeMillis();
CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner();
try {
checkOpen();
List<WALEntry> entries = request.getEntryList();
if (entries == null || entries.isEmpty()) {
// empty input
return ReplicateWALEntryResponse.newBuilder().build();
}
HRegion region = this.getRegionByEncodedName(
entries.get(0).getKey().getEncodedRegionName().toStringUtf8());
RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey, WALEdit>>();
List<Pair<MutationType, Mutation>> mutations = new ArrayList<Pair<MutationType, Mutation>>();
for (WALEntry entry : entries) {
Pair<HLogKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
new Pair<HLogKey, WALEdit>();
List<Pair<MutationType, Mutation>> edits = HLogSplitter.getMutationsFromWALEntry(entry,
cells, walEntry);
if (coprocessorHost != null) {
// Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
// KeyValue.
if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
walEntry.getSecond())) {
// if bypass this log entry, ignore it ...
continue;
}
walEntries.add(walEntry);
}
mutations.addAll(edits);
}
if (!mutations.isEmpty()) {
OperationStatus[] result = doBatchOp(region, mutations, true);
// check if it's a partial success
for (int i = 0; result != null && i < result.length; i++) {
if (result[i] != OperationStatus.SUCCESS) {
throw new IOException(result[i].getExceptionMsg());
}
}
}
if (coprocessorHost != null) {
for (Pair<HLogKey, WALEdit> wal : walEntries) {
coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(),
wal.getSecond());
}
}
return ReplicateWALEntryResponse.newBuilder().build();
} catch (IOException ie) {
throw new ServiceException(ie);
} finally {
metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTimeMillis() - before);
}
}
开发者ID:cloud-software-foundation,项目名称:c5,代码行数:67,代码来源:HRegionServer.java
注:本文中的org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论