HBASE-12295 - Addendum for multiGets to add the call back(Ram)

This commit is contained in:
ramkrishna 2015-07-23 14:28:58 +05:30
parent 4f60d9c28d
commit 4ec69ccf3a
1 changed files with 15 additions and 13 deletions

View File

@ -639,31 +639,24 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* @param builder
* @param cellsToReturn Could be null. May be allocated in this method. This is what this
* method returns as a 'result'.
* @param closeCallBack the callback to be used with multigets
* @param context the current RpcCallContext
* @return Return the <code>cellScanner</code> passed
*/
private List<CellScannable> doNonAtomicRegionMutation(final Region region,
final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup) {
final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup,
final RegionScannersCloseCallBack closeCallBack, RpcCallContext context) {
// Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do
// one at a time, we instead pass them in batch. Be aware that the corresponding
// ResultOrException instance that matches each Put or Delete is then added down in the
// doBatchOp call. We should be staying aligned though the Put and Delete are deferred/batched
List<ClientProtos.Action> mutations = null;
RpcCallContext context = RpcServer.getCurrentCall();
// An RpcCallBack that creates a list of scanners that needs to perform callBack
// operation on completion of multiGets.
RegionScannersCloseCallBack closeCallBack = null;
for (ClientProtos.Action action : actions.getActionList()) {
ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null;
try {
Result r = null;
if (action.hasGet()) {
if (closeCallBack == null) {
// Initialize only once
closeCallBack = new RegionScannersCloseCallBack();
// Set the call back here itself.
context.setCallBack(closeCallBack);
}
Get get = ProtobufUtil.toGet(action.getGet());
r = get(get, ((HRegion) region), closeCallBack, context);
} else if (action.hasServiceCall()) {
@ -2084,7 +2077,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
Boolean processed = null;
RegionScannersCloseCallBack closeCallBack = null;
RpcCallContext context = null;
for (RegionAction regionAction : request.getRegionActionList()) {
this.requestCount.add(regionAction.getActionCount());
OperationQuota quota;
@ -2131,8 +2125,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
} else {
// doNonAtomicRegionMutation manages the exception internally
if (closeCallBack == null) {
// An RpcCallBack that creates a list of scanners that needs to perform callBack
// operation on completion of multiGets.
// Set this only once
closeCallBack = new RegionScannersCloseCallBack();
context = RpcServer.getCurrentCall();
context.setCallBack(closeCallBack);
}
cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
regionActionResultBuilder, cellsToReturn, nonceGroup);
regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context);
}
responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
quota.close();