HBASE-20084 Refactor the RSRpcServices#doBatchOp

Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
Chia-Ping Tsai 2018-02-26 20:49:05 +08:00
parent 7f6e971c4c
commit 197bd79070
1 changed files with 58 additions and 57 deletions

View File

@ -121,6 +121,7 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.AccessChecker; import org.apache.hadoop.hbase.security.access.AccessChecker;
import org.apache.hadoop.hbase.security.access.Permission; import org.apache.hadoop.hbase.security.access.Permission;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CollectionUtils;
import org.apache.hadoop.hbase.util.DNS; import org.apache.hadoop.hbase.util.DNS;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
@ -763,7 +764,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do
// one at a time, we instead pass them in batch. Be aware that the corresponding // one at a time, we instead pass them in batch. Be aware that the corresponding
// ResultOrException instance that matches each Put or Delete is then added down in the // ResultOrException instance that matches each Put or Delete is then added down in the
// doBatchOp call. We should be staying aligned though the Put and Delete are deferred/batched // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are
// deferred/batched
List<ClientProtos.Action> mutations = null; List<ClientProtos.Action> mutations = null;
long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
IOException sizeIOE = null; IOException sizeIOE = null;
@ -802,7 +804,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// use it for the response. // use it for the response.
// //
// This will create a copy in the builder. // This will create a copy in the builder.
hasResultOrException = true;
NameBytesPair pair = ResponseConverter.buildException(sizeIOE); NameBytesPair pair = ResponseConverter.buildException(sizeIOE);
resultOrExceptionBuilder.setException(pair); resultOrExceptionBuilder.setException(pair);
context.incrementResponseExceptionSize(pair.getSerializedSize()); context.incrementResponseExceptionSize(pair.getSerializedSize());
@ -829,29 +830,23 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} }
} else if (action.hasServiceCall()) { } else if (action.hasServiceCall()) {
hasResultOrException = true; hasResultOrException = true;
try { com.google.protobuf.Message result =
com.google.protobuf.Message result = execServiceOnRegion(region, action.getServiceCall());
execServiceOnRegion(region, action.getServiceCall()); ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder = ClientProtos.CoprocessorServiceResult.newBuilder();
ClientProtos.CoprocessorServiceResult.newBuilder(); resultOrExceptionBuilder.setServiceResult(
resultOrExceptionBuilder.setServiceResult( serviceResultBuilder.setValue(
serviceResultBuilder.setValue( serviceResultBuilder.getValueBuilder()
serviceResultBuilder.getValueBuilder() .setName(result.getClass().getName())
.setName(result.getClass().getName()) // TODO: Copy!!!
// TODO: Copy!!! .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray()))));
.setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray()))));
} catch (IOException ioe) {
rpcServer.getMetrics().exception(ioe);
NameBytesPair pair = ResponseConverter.buildException(ioe);
resultOrExceptionBuilder.setException(pair);
context.incrementResponseExceptionSize(pair.getSerializedSize());
}
} else if (action.hasMutation()) { } else if (action.hasMutation()) {
MutationType type = action.getMutation().getMutateType(); MutationType type = action.getMutation().getMutateType();
if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null && if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
!mutations.isEmpty()) { !mutations.isEmpty()) {
// Flush out any Puts or Deletes already collected. // Flush out any Puts or Deletes already collected.
doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement, false); doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner,
spaceQuotaEnforcement);
mutations.clear(); mutations.clear();
} }
switch (type) { switch (type) {
@ -896,7 +891,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// Could get to here and there was no result and no exception. Presumes we added // Could get to here and there was no result and no exception. Presumes we added
// a Put or Delete to the collecting Mutations List for adding later. In this // a Put or Delete to the collecting Mutations List for adding later. In this
// case the corresponding ResultOrException instance for the Put or Delete will be added // case the corresponding ResultOrException instance for the Put or Delete will be added
// down in the doBatchOp method call rather than up here. // down in the doNonAtomicBatchOp method call rather than up here.
} catch (IOException ie) { } catch (IOException ie) {
rpcServer.getMetrics().exception(ie); rpcServer.getMetrics().exception(ie);
hasResultOrException = true; hasResultOrException = true;
@ -911,18 +906,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} }
} }
// Finish up any outstanding mutations // Finish up any outstanding mutations
if (mutations != null && !mutations.isEmpty()) { if (!CollectionUtils.isEmpty(mutations)) {
try { doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement);
doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement, false);
} catch (IOException ioe) {
// TODO do the refactor to avoid this catch as it is useless
// doBatchOp has handled the IOE for all non-atomic operations.
rpcServer.getMetrics().exception(ioe);
NameBytesPair pair = ResponseConverter.buildException(ioe);
resultOrExceptionBuilder.setException(pair);
context.incrementResponseExceptionSize(pair.getSerializedSize());
builder.addResultOrException(resultOrExceptionBuilder.build());
}
} }
return cellsToReturn; return cellsToReturn;
} }
@ -943,6 +928,33 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} }
} }
private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
final OperationQuota quota, final List<ClientProtos.Action> mutations,
final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement)
throws IOException {
// Just throw the exception. The exception will be caught and then added to region-level
// exception for RegionAction. Leaving the null to action result is ok since the null
// result is viewed as failure by hbase client. And the region-lever exception will be used
// to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and
// AsyncBatchRpcRetryingCaller#onComplete for more details.
doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, true);
}
private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
final OperationQuota quota, final List<ClientProtos.Action> mutations,
final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) {
try {
doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, false);
} catch (IOException e) {
// Set the exception for each action. The mutations in same RegionAction are group to
// different batch and then be processed individually. Hence, we don't set the region-level
// exception here for whole RegionAction.
for (Action mutation : mutations) {
builder.addResultOrException(getResultOrException(e, mutation.getIndex()));
}
}
}
/** /**
* Execute a list of Put/Delete mutations. * Execute a list of Put/Delete mutations.
* *
@ -1029,30 +1041,29 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
break; break;
} }
} }
} catch (IOException ie) { } finally {
int processedMutationIndex = 0; int processedMutationIndex = 0;
for (Action mutation : mutations) { for (Action mutation : mutations) {
// The non-null mArray[i] means the cell scanner has been read. // The non-null mArray[i] means the cell scanner has been read.
if (mArray[processedMutationIndex++] == null) { if (mArray[processedMutationIndex++] == null) {
skipCellsForMutation(mutation, cells); skipCellsForMutation(mutation, cells);
} }
if (!atomic) {
builder.addResultOrException(getResultOrException(ie, mutation.getIndex()));
}
}
if (atomic) {
throw ie;
} }
updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
} }
}
private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts,
boolean batchContainsDelete) {
if (regionServer.metricsRegionServer != null) { if (regionServer.metricsRegionServer != null) {
long after = EnvironmentEdgeManager.currentTime(); long after = EnvironmentEdgeManager.currentTime();
if (batchContainsPuts) { if (batchContainsPuts) {
regionServer.metricsRegionServer.updatePutBatch( regionServer.metricsRegionServer
region.getTableDescriptor().getTableName(), after - before); .updatePutBatch(region.getTableDescriptor().getTableName(), after - starttime);
} }
if (batchContainsDelete) { if (batchContainsDelete) {
regionServer.metricsRegionServer.updateDeleteBatch( regionServer.metricsRegionServer
region.getTableDescriptor().getTableName(), after - before); .updateDeleteBatch(region.getTableDescriptor().getTableName(), after - starttime);
} }
} }
} }
@ -1121,17 +1132,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return region.batchReplay(mutations.toArray( return region.batchReplay(mutations.toArray(
new WALSplitter.MutationReplay[mutations.size()]), replaySeqId); new WALSplitter.MutationReplay[mutations.size()]), replaySeqId);
} finally { } finally {
if (regionServer.metricsRegionServer != null) { updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
long after = EnvironmentEdgeManager.currentTime();
if (batchContainsPuts) {
regionServer.metricsRegionServer.updatePutBatch(
region.getTableDescriptor().getTableName(), after - before);
}
if (batchContainsDelete) {
regionServer.metricsRegionServer.updateDeleteBatch(
region.getTableDescriptor().getTableName(), after - before);
}
}
} }
} }
@ -2614,8 +2615,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
cellScanner, row, family, qualifier, op, cellScanner, row, family, qualifier, op,
comparator, regionActionResultBuilder, spaceQuotaEnforcement); comparator, regionActionResultBuilder, spaceQuotaEnforcement);
} else { } else {
doBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
cellScanner, spaceQuotaEnforcement, true); cellScanner, spaceQuotaEnforcement);
processed = Boolean.TRUE; processed = Boolean.TRUE;
} }
} catch (IOException e) { } catch (IOException e) {