HBASE-20084 Refactor the RSRpcServices#doBatchOp
Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
parent
c459282fe0
commit
e50e6f7ce9
|
@ -122,6 +122,7 @@ import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.security.access.AccessChecker;
|
import org.apache.hadoop.hbase.security.access.AccessChecker;
|
||||||
import org.apache.hadoop.hbase.security.access.Permission;
|
import org.apache.hadoop.hbase.security.access.Permission;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.CollectionUtils;
|
||||||
import org.apache.hadoop.hbase.util.DNS;
|
import org.apache.hadoop.hbase.util.DNS;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
|
@ -762,7 +763,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
// Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do
|
// Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do
|
||||||
// one at a time, we instead pass them in batch. Be aware that the corresponding
|
// one at a time, we instead pass them in batch. Be aware that the corresponding
|
||||||
// ResultOrException instance that matches each Put or Delete is then added down in the
|
// ResultOrException instance that matches each Put or Delete is then added down in the
|
||||||
// doBatchOp call. We should be staying aligned though the Put and Delete are deferred/batched
|
// doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are
|
||||||
|
// deferred/batched
|
||||||
List<ClientProtos.Action> mutations = null;
|
List<ClientProtos.Action> mutations = null;
|
||||||
long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
|
long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
|
||||||
IOException sizeIOE = null;
|
IOException sizeIOE = null;
|
||||||
|
@ -801,7 +803,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
// use it for the response.
|
// use it for the response.
|
||||||
//
|
//
|
||||||
// This will create a copy in the builder.
|
// This will create a copy in the builder.
|
||||||
hasResultOrException = true;
|
|
||||||
NameBytesPair pair = ResponseConverter.buildException(sizeIOE);
|
NameBytesPair pair = ResponseConverter.buildException(sizeIOE);
|
||||||
resultOrExceptionBuilder.setException(pair);
|
resultOrExceptionBuilder.setException(pair);
|
||||||
context.incrementResponseExceptionSize(pair.getSerializedSize());
|
context.incrementResponseExceptionSize(pair.getSerializedSize());
|
||||||
|
@ -828,29 +829,23 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
}
|
}
|
||||||
} else if (action.hasServiceCall()) {
|
} else if (action.hasServiceCall()) {
|
||||||
hasResultOrException = true;
|
hasResultOrException = true;
|
||||||
try {
|
com.google.protobuf.Message result =
|
||||||
com.google.protobuf.Message result =
|
execServiceOnRegion(region, action.getServiceCall());
|
||||||
execServiceOnRegion(region, action.getServiceCall());
|
ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
|
||||||
ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
|
ClientProtos.CoprocessorServiceResult.newBuilder();
|
||||||
ClientProtos.CoprocessorServiceResult.newBuilder();
|
resultOrExceptionBuilder.setServiceResult(
|
||||||
resultOrExceptionBuilder.setServiceResult(
|
serviceResultBuilder.setValue(
|
||||||
serviceResultBuilder.setValue(
|
serviceResultBuilder.getValueBuilder()
|
||||||
serviceResultBuilder.getValueBuilder()
|
.setName(result.getClass().getName())
|
||||||
.setName(result.getClass().getName())
|
// TODO: Copy!!!
|
||||||
// TODO: Copy!!!
|
.setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray()))));
|
||||||
.setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray()))));
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
rpcServer.getMetrics().exception(ioe);
|
|
||||||
NameBytesPair pair = ResponseConverter.buildException(ioe);
|
|
||||||
resultOrExceptionBuilder.setException(pair);
|
|
||||||
context.incrementResponseExceptionSize(pair.getSerializedSize());
|
|
||||||
}
|
|
||||||
} else if (action.hasMutation()) {
|
} else if (action.hasMutation()) {
|
||||||
MutationType type = action.getMutation().getMutateType();
|
MutationType type = action.getMutation().getMutateType();
|
||||||
if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
|
if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
|
||||||
!mutations.isEmpty()) {
|
!mutations.isEmpty()) {
|
||||||
// Flush out any Puts or Deletes already collected.
|
// Flush out any Puts or Deletes already collected.
|
||||||
doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement, false);
|
doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner,
|
||||||
|
spaceQuotaEnforcement);
|
||||||
mutations.clear();
|
mutations.clear();
|
||||||
}
|
}
|
||||||
switch (type) {
|
switch (type) {
|
||||||
|
@ -895,7 +890,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
// Could get to here and there was no result and no exception. Presumes we added
|
// Could get to here and there was no result and no exception. Presumes we added
|
||||||
// a Put or Delete to the collecting Mutations List for adding later. In this
|
// a Put or Delete to the collecting Mutations List for adding later. In this
|
||||||
// case the corresponding ResultOrException instance for the Put or Delete will be added
|
// case the corresponding ResultOrException instance for the Put or Delete will be added
|
||||||
// down in the doBatchOp method call rather than up here.
|
// down in the doNonAtomicBatchOp method call rather than up here.
|
||||||
} catch (IOException ie) {
|
} catch (IOException ie) {
|
||||||
rpcServer.getMetrics().exception(ie);
|
rpcServer.getMetrics().exception(ie);
|
||||||
hasResultOrException = true;
|
hasResultOrException = true;
|
||||||
|
@ -910,18 +905,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Finish up any outstanding mutations
|
// Finish up any outstanding mutations
|
||||||
if (mutations != null && !mutations.isEmpty()) {
|
if (!CollectionUtils.isEmpty(mutations)) {
|
||||||
try {
|
doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement);
|
||||||
doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement, false);
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
// TODO do the refactor to avoid this catch as it is useless
|
|
||||||
// doBatchOp has handled the IOE for all non-atomic operations.
|
|
||||||
rpcServer.getMetrics().exception(ioe);
|
|
||||||
NameBytesPair pair = ResponseConverter.buildException(ioe);
|
|
||||||
resultOrExceptionBuilder.setException(pair);
|
|
||||||
context.incrementResponseExceptionSize(pair.getSerializedSize());
|
|
||||||
builder.addResultOrException(resultOrExceptionBuilder.build());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return cellsToReturn;
|
return cellsToReturn;
|
||||||
}
|
}
|
||||||
|
@ -942,6 +927,33 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
|
||||||
|
final OperationQuota quota, final List<ClientProtos.Action> mutations,
|
||||||
|
final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement)
|
||||||
|
throws IOException {
|
||||||
|
// Just throw the exception. The exception will be caught and then added to region-level
|
||||||
|
// exception for RegionAction. Leaving the null to action result is ok since the null
|
||||||
|
// result is viewed as failure by hbase client. And the region-lever exception will be used
|
||||||
|
// to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and
|
||||||
|
// AsyncBatchRpcRetryingCaller#onComplete for more details.
|
||||||
|
doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
|
||||||
|
final OperationQuota quota, final List<ClientProtos.Action> mutations,
|
||||||
|
final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) {
|
||||||
|
try {
|
||||||
|
doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, false);
|
||||||
|
} catch (IOException e) {
|
||||||
|
// Set the exception for each action. The mutations in same RegionAction are group to
|
||||||
|
// different batch and then be processed individually. Hence, we don't set the region-level
|
||||||
|
// exception here for whole RegionAction.
|
||||||
|
for (Action mutation : mutations) {
|
||||||
|
builder.addResultOrException(getResultOrException(e, mutation.getIndex()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Execute a list of Put/Delete mutations.
|
* Execute a list of Put/Delete mutations.
|
||||||
*
|
*
|
||||||
|
@ -1028,30 +1040,29 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (IOException ie) {
|
} finally {
|
||||||
int processedMutationIndex = 0;
|
int processedMutationIndex = 0;
|
||||||
for (Action mutation : mutations) {
|
for (Action mutation : mutations) {
|
||||||
// The non-null mArray[i] means the cell scanner has been read.
|
// The non-null mArray[i] means the cell scanner has been read.
|
||||||
if (mArray[processedMutationIndex++] == null) {
|
if (mArray[processedMutationIndex++] == null) {
|
||||||
skipCellsForMutation(mutation, cells);
|
skipCellsForMutation(mutation, cells);
|
||||||
}
|
}
|
||||||
if (!atomic) {
|
|
||||||
builder.addResultOrException(getResultOrException(ie, mutation.getIndex()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (atomic) {
|
|
||||||
throw ie;
|
|
||||||
}
|
}
|
||||||
|
updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts,
|
||||||
|
boolean batchContainsDelete) {
|
||||||
if (regionServer.metricsRegionServer != null) {
|
if (regionServer.metricsRegionServer != null) {
|
||||||
long after = EnvironmentEdgeManager.currentTime();
|
long after = EnvironmentEdgeManager.currentTime();
|
||||||
if (batchContainsPuts) {
|
if (batchContainsPuts) {
|
||||||
regionServer.metricsRegionServer.updatePutBatch(
|
regionServer.metricsRegionServer
|
||||||
region.getTableDescriptor().getTableName(), after - before);
|
.updatePutBatch(region.getTableDescriptor().getTableName(), after - starttime);
|
||||||
}
|
}
|
||||||
if (batchContainsDelete) {
|
if (batchContainsDelete) {
|
||||||
regionServer.metricsRegionServer.updateDeleteBatch(
|
regionServer.metricsRegionServer
|
||||||
region.getTableDescriptor().getTableName(), after - before);
|
.updateDeleteBatch(region.getTableDescriptor().getTableName(), after - starttime);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1120,17 +1131,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
return region.batchReplay(mutations.toArray(
|
return region.batchReplay(mutations.toArray(
|
||||||
new WALSplitter.MutationReplay[mutations.size()]), replaySeqId);
|
new WALSplitter.MutationReplay[mutations.size()]), replaySeqId);
|
||||||
} finally {
|
} finally {
|
||||||
if (regionServer.metricsRegionServer != null) {
|
updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
|
||||||
long after = EnvironmentEdgeManager.currentTime();
|
|
||||||
if (batchContainsPuts) {
|
|
||||||
regionServer.metricsRegionServer.updatePutBatch(
|
|
||||||
region.getTableDescriptor().getTableName(), after - before);
|
|
||||||
}
|
|
||||||
if (batchContainsDelete) {
|
|
||||||
regionServer.metricsRegionServer.updateDeleteBatch(
|
|
||||||
region.getTableDescriptor().getTableName(), after - before);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2613,8 +2614,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
cellScanner, row, family, qualifier, op,
|
cellScanner, row, family, qualifier, op,
|
||||||
comparator, regionActionResultBuilder, spaceQuotaEnforcement);
|
comparator, regionActionResultBuilder, spaceQuotaEnforcement);
|
||||||
} else {
|
} else {
|
||||||
doBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
|
doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
|
||||||
cellScanner, spaceQuotaEnforcement, true);
|
cellScanner, spaceQuotaEnforcement);
|
||||||
processed = Boolean.TRUE;
|
processed = Boolean.TRUE;
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
|
Loading…
Reference in New Issue