HBASE-10942. support parallel request cancellation for multi-get (Nicolas Liochon & Devaraj Das)

This commit is contained in:
Devaraj Das 2015-02-02 23:02:20 -08:00
parent 2fd27ea80c
commit 44596148c7
4 changed files with 125 additions and 10 deletions

View File

@ -29,6 +29,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
@ -684,21 +685,33 @@ class AsyncProcess {
private final MultiAction<Row> multiAction;
private final int numAttempt;
private final ServerName server;
private final Set<MultiServerCallable<Row>> callsInProgress;
private SingleServerRequestRunnable(
MultiAction<Row> multiAction, int numAttempt, ServerName server) {
MultiAction<Row> multiAction, int numAttempt, ServerName server,
Set<MultiServerCallable<Row>> callsInProgress) {
this.multiAction = multiAction;
this.numAttempt = numAttempt;
this.server = server;
this.callsInProgress = callsInProgress;
}
@Override
public void run() {
MultiResponse res;
MultiServerCallable<Row> callable = null;
try {
MultiServerCallable<Row> callable = createCallable(server, tableName, multiAction);
callable = createCallable(server, tableName, multiAction);
try {
res = createCaller(callable).callWithoutRetries(callable, timeout);
RpcRetryingCaller<MultiResponse> caller = createCaller(callable);
if (callsInProgress != null) callsInProgress.add(callable);
res = caller.callWithoutRetries(callable, timeout);
if (res == null) {
// Cancelled
return;
}
} catch (IOException e) {
// The service itself failed . It may be an error coming from the communication
// layer, but, as well, a functional error raised by the server.
@ -721,6 +734,9 @@ class AsyncProcess {
throw new RuntimeException(t);
} finally {
decTaskCounters(multiAction.getRegions(), server);
if (callsInProgress != null && callable != null) {
callsInProgress.remove(callable);
}
}
}
}
@ -729,6 +745,7 @@ class AsyncProcess {
private final BatchErrors errors;
private final ConnectionManager.ServerErrorTracker errorsByServer;
private final ExecutorService pool;
private final Set<MultiServerCallable<Row>> callsInProgress;
private final TableName tableName;
@ -813,10 +830,17 @@ class AsyncProcess {
} else {
this.replicaGetIndices = null;
}
this.callsInProgress = !hasAnyReplicaGets ? null :
Collections.newSetFromMap(new ConcurrentHashMap<MultiServerCallable<Row>, Boolean>());
this.errorsByServer = createServerErrorTracker();
this.errors = (globalErrors != null) ? globalErrors : new BatchErrors();
}
public Set<MultiServerCallable<Row>> getCallsInProgress() {
return callsInProgress;
}
/**
* Group a list of actions per region servers, and send them.
*
@ -979,7 +1003,7 @@ class AsyncProcess {
// no stats to manage, just do the standard action
if (AsyncProcess.this.connection.getStatisticsTracker() == null) {
return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction",
new SingleServerRequestRunnable(multiAction, numAttempt, server)));
new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress)));
}
// group the actions by the amount of delay
@ -1001,7 +1025,8 @@ class AsyncProcess {
for (DelayingRunner runner : actions.values()) {
String traceText = "AsyncProcess.sendMultiAction";
Runnable runnable =
new SingleServerRequestRunnable(runner.getActions(), numAttempt, server);
new SingleServerRequestRunnable(runner.getActions(), numAttempt, server,
callsInProgress);
// use a delay runner only if we need to sleep for some time
if (runner.getSleepTime() > 0) {
runner.setRunner(runnable);
@ -1517,6 +1542,12 @@ class AsyncProcess {
waitUntilDone(Long.MAX_VALUE);
} catch (InterruptedException iex) {
throw new InterruptedIOException(iex.getMessage());
} finally {
if (callsInProgress != null) {
for (MultiServerCallable<Row> clb : callsInProgress) {
clb.cancel();
}
}
}
}

View File

@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@ -50,21 +51,21 @@ import com.google.protobuf.ServiceException;
* {@link RegionServerCallable} that goes against multiple regions.
* @param <R>
*/
class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> implements Cancellable {
private final MultiAction<R> multiAction;
private final boolean cellBlock;
private RpcControllerFactory rpcFactory;
private final PayloadCarryingRpcController controller;
MultiServerCallable(final ClusterConnection connection, final TableName tableName,
final ServerName location, RpcControllerFactory rpcFactory, final MultiAction<R> multi) {
super(connection, tableName, null);
this.rpcFactory = rpcFactory;
this.multiAction = multi;
// RegionServerCallable has HRegionLocation field, but this is a multi-region request.
// Using region info from parent HRegionLocation would be a mistake for this class; so
// we will store the server here, and throw if someone tries to obtain location/regioninfo.
this.location = new HRegionLocation(null, location);
this.cellBlock = isCellBlock();
controller = rpcFactory.newController();
}
@Override
@ -119,7 +120,7 @@ class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
// Controller optionally carries cell data over the proxy/service boundary and also
// optionally ferries cell response data back out again.
PayloadCarryingRpcController controller = rpcFactory.newController(cells);
if (cells != null) controller.setCellScanner(CellUtil.createCellScanner(cells));
controller.setPriority(getTableName());
controller.setCallTimeout(callTimeout);
ClientProtos.MultiResponse responseProto;
@ -129,10 +130,19 @@ class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
} catch (ServiceException e) {
throw ProtobufUtil.getRemoteException(e);
}
if (responseProto == null) return null; // Occurs on cancel
return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner());
}
@Override
public void cancel() {
controller.startCancel();
}
@Override
public boolean isCancelled() {
return controller.isCanceled();
}
/**
* @return True if we should send data in cellblocks. This is an expensive call. Cache the

View File

@ -307,9 +307,14 @@ public class AsyncRpcChannel {
controller.notifyOnCancel(new RpcCallback<Object>() {
@Override
public void run(Object parameter) {
failCall(call, new IOException("Canceled connection"));
calls.remove(call.id);
}
});
if (controller.isCanceled()) {
// To finish if the call was cancelled before we set the notification (race condition)
call.cancel(true);
return call;
}
calls.put(call.id, call);

View File

@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFutureImpl;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@ -55,10 +57,12 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -530,6 +534,71 @@ public class TestReplicasClient {
}
}
@Test
public void testCancelOfMultiGet() throws Exception {
openRegion(hriSecondary);
try {
List<Put> puts = new ArrayList<Put>(2);
byte[] b1 = Bytes.toBytes("testCancelOfMultiGet" + 0);
Put p = new Put(b1);
p.add(f, b1, b1);
puts.add(p);
byte[] b2 = Bytes.toBytes("testCancelOfMultiGet" + 1);
p = new Put(b2);
p.add(f, b2, b2);
puts.add(p);
table.put(puts);
LOG.debug("PUT done");
flushRegion(hriPrimary);
LOG.info("flush done");
Thread.sleep(1000 + REFRESH_PERIOD * 2);
AsyncProcess ap = ((ClusterConnection) HTU.getHBaseAdmin().getConnection())
.getAsyncProcess();
// Make primary slowdown
SlowMeCopro.getCdl().set(new CountDownLatch(1));
List<Get> gets = new ArrayList<Get>();
Get g = new Get(b1);
g.setCheckExistenceOnly(true);
g.setConsistency(Consistency.TIMELINE);
gets.add(g);
g = new Get(b2);
g.setCheckExistenceOnly(true);
g.setConsistency(Consistency.TIMELINE);
gets.add(g);
Object[] results = new Object[2];
AsyncRequestFuture reqs = ap.submitAll(table.getPool(), table.getName(),
gets, null, results);
reqs.waitUntilDone();
// verify we got the right results back
for (Object r : results) {
Assert.assertTrue(((Result)r).isStale());
Assert.assertTrue(((Result)r).getExists());
}
Set<MultiServerCallable<Row>> set = ((AsyncRequestFutureImpl<?>)reqs).getCallsInProgress();
// verify we did cancel unneeded calls
Assert.assertTrue(!set.isEmpty());
for (MultiServerCallable<Row> m : set) {
Assert.assertTrue(m.isCancelled());
}
} finally {
SlowMeCopro.getCdl().get().countDown();
SlowMeCopro.sleepTime.set(0);
SlowMeCopro.slowDownNext.set(false);
SlowMeCopro.countOfNext.set(0);
for (int i = 0; i < 2; i++) {
byte[] b1 = Bytes.toBytes("testCancelOfMultiGet" + i);
Delete d = new Delete(b1);
table.delete(d);
}
closeRegion(hriSecondary);
}
}
@Test
public void testScanWithReplicas() throws Exception {
//simple scan