diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 7e01c9a8e50..4dd826fc971 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -121,6 +121,7 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.access.AccessChecker; import org.apache.hadoop.hbase.security.access.Permission; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CollectionUtils; import org.apache.hadoop.hbase.util.DNS; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; @@ -763,7 +764,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do // one at a time, we instead pass them in batch. Be aware that the corresponding // ResultOrException instance that matches each Put or Delete is then added down in the - // doBatchOp call. We should be staying aligned though the Put and Delete are deferred/batched + // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are + // deferred/batched List mutations = null; long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); IOException sizeIOE = null; @@ -802,7 +804,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // use it for the response. // // This will create a copy in the builder. - hasResultOrException = true; NameBytesPair pair = ResponseConverter.buildException(sizeIOE); resultOrExceptionBuilder.setException(pair); context.incrementResponseExceptionSize(pair.getSerializedSize()); @@ -829,29 +830,23 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } else if (action.hasServiceCall()) { hasResultOrException = true; - try { - com.google.protobuf.Message result = - execServiceOnRegion(region, action.getServiceCall()); - ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder = - ClientProtos.CoprocessorServiceResult.newBuilder(); - resultOrExceptionBuilder.setServiceResult( - serviceResultBuilder.setValue( - serviceResultBuilder.getValueBuilder() - .setName(result.getClass().getName()) - // TODO: Copy!!! - .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray())))); - } catch (IOException ioe) { - rpcServer.getMetrics().exception(ioe); - NameBytesPair pair = ResponseConverter.buildException(ioe); - resultOrExceptionBuilder.setException(pair); - context.incrementResponseExceptionSize(pair.getSerializedSize()); - } + com.google.protobuf.Message result = + execServiceOnRegion(region, action.getServiceCall()); + ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder = + ClientProtos.CoprocessorServiceResult.newBuilder(); + resultOrExceptionBuilder.setServiceResult( + serviceResultBuilder.setValue( + serviceResultBuilder.getValueBuilder() + .setName(result.getClass().getName()) + // TODO: Copy!!! + .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray())))); } else if (action.hasMutation()) { MutationType type = action.getMutation().getMutateType(); if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null && !mutations.isEmpty()) { // Flush out any Puts or Deletes already collected. - doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement, false); + doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, + spaceQuotaEnforcement); mutations.clear(); } switch (type) { @@ -896,7 +891,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // Could get to here and there was no result and no exception. Presumes we added // a Put or Delete to the collecting Mutations List for adding later. In this // case the corresponding ResultOrException instance for the Put or Delete will be added - // down in the doBatchOp method call rather than up here. + // down in the doNonAtomicBatchOp method call rather than up here. } catch (IOException ie) { rpcServer.getMetrics().exception(ie); hasResultOrException = true; @@ -911,18 +906,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } // Finish up any outstanding mutations - if (mutations != null && !mutations.isEmpty()) { - try { - doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement, false); - } catch (IOException ioe) { - // TODO do the refactor to avoid this catch as it is useless - // doBatchOp has handled the IOE for all non-atomic operations. - rpcServer.getMetrics().exception(ioe); - NameBytesPair pair = ResponseConverter.buildException(ioe); - resultOrExceptionBuilder.setException(pair); - context.incrementResponseExceptionSize(pair.getSerializedSize()); - builder.addResultOrException(resultOrExceptionBuilder.build()); - } + if (!CollectionUtils.isEmpty(mutations)) { + doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); } return cellsToReturn; } @@ -943,6 +928,33 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } + private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, + final OperationQuota quota, final List mutations, + final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) + throws IOException { + // Just throw the exception. The exception will be caught and then added to region-level + // exception for RegionAction. Leaving the null to action result is ok since the null + // result is viewed as failure by hbase client. And the region-lever exception will be used + // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and + // AsyncBatchRpcRetryingCaller#onComplete for more details. + doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, true); + } + + private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, + final OperationQuota quota, final List mutations, + final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) { + try { + doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, false); + } catch (IOException e) { + // Set the exception for each action. The mutations in same RegionAction are group to + // different batch and then be processed individually. Hence, we don't set the region-level + // exception here for whole RegionAction. + for (Action mutation : mutations) { + builder.addResultOrException(getResultOrException(e, mutation.getIndex())); + } + } + } + /** * Execute a list of Put/Delete mutations. * @@ -1029,30 +1041,29 @@ public class RSRpcServices implements HBaseRPCErrorHandler, break; } } - } catch (IOException ie) { + } finally { int processedMutationIndex = 0; for (Action mutation : mutations) { // The non-null mArray[i] means the cell scanner has been read. if (mArray[processedMutationIndex++] == null) { skipCellsForMutation(mutation, cells); } - if (!atomic) { - builder.addResultOrException(getResultOrException(ie, mutation.getIndex())); - } - } - if (atomic) { - throw ie; } + updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); } + } + + private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts, + boolean batchContainsDelete) { if (regionServer.metricsRegionServer != null) { long after = EnvironmentEdgeManager.currentTime(); if (batchContainsPuts) { - regionServer.metricsRegionServer.updatePutBatch( - region.getTableDescriptor().getTableName(), after - before); + regionServer.metricsRegionServer + .updatePutBatch(region.getTableDescriptor().getTableName(), after - starttime); } if (batchContainsDelete) { - regionServer.metricsRegionServer.updateDeleteBatch( - region.getTableDescriptor().getTableName(), after - before); + regionServer.metricsRegionServer + .updateDeleteBatch(region.getTableDescriptor().getTableName(), after - starttime); } } } @@ -1121,17 +1132,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return region.batchReplay(mutations.toArray( new WALSplitter.MutationReplay[mutations.size()]), replaySeqId); } finally { - if (regionServer.metricsRegionServer != null) { - long after = EnvironmentEdgeManager.currentTime(); - if (batchContainsPuts) { - regionServer.metricsRegionServer.updatePutBatch( - region.getTableDescriptor().getTableName(), after - before); - } - if (batchContainsDelete) { - regionServer.metricsRegionServer.updateDeleteBatch( - region.getTableDescriptor().getTableName(), after - before); - } - } + updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); } } @@ -2614,8 +2615,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, cellScanner, row, family, qualifier, op, comparator, regionActionResultBuilder, spaceQuotaEnforcement); } else { - doBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), - cellScanner, spaceQuotaEnforcement, true); + doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), + cellScanner, spaceQuotaEnforcement); processed = Boolean.TRUE; } } catch (IOException e) {