HBASE-10942. support parallel request cancellation for multi-get (Nicolas Liochon & Devaraj Das)

This commit is contained in:
Devaraj Das 2015-02-02 23:01:31 -08:00
parent eb351b9ff8
commit cf5ad96fcc
4 changed files with 125 additions and 10 deletions

View File

@ -29,6 +29,7 @@ import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
@ -685,21 +686,33 @@ class AsyncProcess {
private final MultiAction<Row> multiAction; private final MultiAction<Row> multiAction;
private final int numAttempt; private final int numAttempt;
private final ServerName server; private final ServerName server;
private final Set<MultiServerCallable<Row>> callsInProgress;
private SingleServerRequestRunnable( private SingleServerRequestRunnable(
MultiAction<Row> multiAction, int numAttempt, ServerName server) { MultiAction<Row> multiAction, int numAttempt, ServerName server,
Set<MultiServerCallable<Row>> callsInProgress) {
this.multiAction = multiAction; this.multiAction = multiAction;
this.numAttempt = numAttempt; this.numAttempt = numAttempt;
this.server = server; this.server = server;
this.callsInProgress = callsInProgress;
} }
@Override @Override
public void run() { public void run() {
MultiResponse res; MultiResponse res;
MultiServerCallable<Row> callable = null;
try { try {
MultiServerCallable<Row> callable = createCallable(server, tableName, multiAction); callable = createCallable(server, tableName, multiAction);
try { try {
res = createCaller(callable).callWithoutRetries(callable, timeout); RpcRetryingCaller<MultiResponse> caller = createCaller(callable);
if (callsInProgress != null) callsInProgress.add(callable);
res = caller.callWithoutRetries(callable, timeout);
if (res == null) {
// Cancelled
return;
}
} catch (IOException e) { } catch (IOException e) {
// The service itself failed . It may be an error coming from the communication // The service itself failed . It may be an error coming from the communication
// layer, but, as well, a functional error raised by the server. // layer, but, as well, a functional error raised by the server.
@ -722,6 +735,9 @@ class AsyncProcess {
throw new RuntimeException(t); throw new RuntimeException(t);
} finally { } finally {
decTaskCounters(multiAction.getRegions(), server); decTaskCounters(multiAction.getRegions(), server);
if (callsInProgress != null && callable != null) {
callsInProgress.remove(callable);
}
} }
} }
} }
@ -730,6 +746,7 @@ class AsyncProcess {
private final BatchErrors errors; private final BatchErrors errors;
private final ConnectionManager.ServerErrorTracker errorsByServer; private final ConnectionManager.ServerErrorTracker errorsByServer;
private final ExecutorService pool; private final ExecutorService pool;
private final Set<MultiServerCallable<Row>> callsInProgress;
private final TableName tableName; private final TableName tableName;
@ -814,10 +831,17 @@ class AsyncProcess {
} else { } else {
this.replicaGetIndices = null; this.replicaGetIndices = null;
} }
this.callsInProgress = !hasAnyReplicaGets ? null :
Collections.newSetFromMap(new ConcurrentHashMap<MultiServerCallable<Row>, Boolean>());
this.errorsByServer = createServerErrorTracker(); this.errorsByServer = createServerErrorTracker();
this.errors = (globalErrors != null) ? globalErrors : new BatchErrors(); this.errors = (globalErrors != null) ? globalErrors : new BatchErrors();
} }
public Set<MultiServerCallable<Row>> getCallsInProgress() {
return callsInProgress;
}
/** /**
* Group a list of actions per region servers, and send them. * Group a list of actions per region servers, and send them.
* *
@ -980,7 +1004,7 @@ class AsyncProcess {
// no stats to manage, just do the standard action // no stats to manage, just do the standard action
if (AsyncProcess.this.connection.getStatisticsTracker() == null) { if (AsyncProcess.this.connection.getStatisticsTracker() == null) {
return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction",
new SingleServerRequestRunnable(multiAction, numAttempt, server))); new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress)));
} }
// group the actions by the amount of delay // group the actions by the amount of delay
@ -1002,7 +1026,8 @@ class AsyncProcess {
for (DelayingRunner runner : actions.values()) { for (DelayingRunner runner : actions.values()) {
String traceText = "AsyncProcess.sendMultiAction"; String traceText = "AsyncProcess.sendMultiAction";
Runnable runnable = Runnable runnable =
new SingleServerRequestRunnable(runner.getActions(), numAttempt, server); new SingleServerRequestRunnable(runner.getActions(), numAttempt, server,
callsInProgress);
// use a delay runner only if we need to sleep for some time // use a delay runner only if we need to sleep for some time
if (runner.getSleepTime() > 0) { if (runner.getSleepTime() > 0) {
runner.setRunner(runnable); runner.setRunner(runnable);
@ -1520,6 +1545,12 @@ class AsyncProcess {
waitUntilDone(Long.MAX_VALUE); waitUntilDone(Long.MAX_VALUE);
} catch (InterruptedException iex) { } catch (InterruptedException iex) {
throw new InterruptedIOException(iex.getMessage()); throw new InterruptedIOException(iex.getMessage());
} finally {
if (callsInProgress != null) {
for (MultiServerCallable<Row> clb : callsInProgress) {
clb.cancel();
}
}
} }
} }

View File

@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
@ -50,21 +51,21 @@ import com.google.protobuf.ServiceException;
* {@link RegionServerCallable} that goes against multiple regions. * {@link RegionServerCallable} that goes against multiple regions.
* @param <R> * @param <R>
*/ */
class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> { class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> implements Cancellable {
private final MultiAction<R> multiAction; private final MultiAction<R> multiAction;
private final boolean cellBlock; private final boolean cellBlock;
private RpcControllerFactory rpcFactory; private final PayloadCarryingRpcController controller;
MultiServerCallable(final ClusterConnection connection, final TableName tableName, MultiServerCallable(final ClusterConnection connection, final TableName tableName,
final ServerName location, RpcControllerFactory rpcFactory, final MultiAction<R> multi) { final ServerName location, RpcControllerFactory rpcFactory, final MultiAction<R> multi) {
super(connection, tableName, null); super(connection, tableName, null);
this.rpcFactory = rpcFactory;
this.multiAction = multi; this.multiAction = multi;
// RegionServerCallable has HRegionLocation field, but this is a multi-region request. // RegionServerCallable has HRegionLocation field, but this is a multi-region request.
// Using region info from parent HRegionLocation would be a mistake for this class; so // Using region info from parent HRegionLocation would be a mistake for this class; so
// we will store the server here, and throw if someone tries to obtain location/regioninfo. // we will store the server here, and throw if someone tries to obtain location/regioninfo.
this.location = new HRegionLocation(null, location); this.location = new HRegionLocation(null, location);
this.cellBlock = isCellBlock(); this.cellBlock = isCellBlock();
controller = rpcFactory.newController();
} }
@Override @Override
@ -119,7 +120,7 @@ class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
// Controller optionally carries cell data over the proxy/service boundary and also // Controller optionally carries cell data over the proxy/service boundary and also
// optionally ferries cell response data back out again. // optionally ferries cell response data back out again.
PayloadCarryingRpcController controller = rpcFactory.newController(cells); if (cells != null) controller.setCellScanner(CellUtil.createCellScanner(cells));
controller.setPriority(getTableName()); controller.setPriority(getTableName());
controller.setCallTimeout(callTimeout); controller.setCallTimeout(callTimeout);
ClientProtos.MultiResponse responseProto; ClientProtos.MultiResponse responseProto;
@ -129,10 +130,19 @@ class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
} catch (ServiceException e) { } catch (ServiceException e) {
throw ProtobufUtil.getRemoteException(e); throw ProtobufUtil.getRemoteException(e);
} }
if (responseProto == null) return null; // Occurs on cancel
return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner()); return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner());
} }
@Override
public void cancel() {
controller.startCancel();
}
@Override
public boolean isCancelled() {
return controller.isCanceled();
}
/** /**
* @return True if we should send data in cellblocks. This is an expensive call. Cache the * @return True if we should send data in cellblocks. This is an expensive call. Cache the

View File

@ -307,9 +307,14 @@ public class AsyncRpcChannel {
controller.notifyOnCancel(new RpcCallback<Object>() { controller.notifyOnCancel(new RpcCallback<Object>() {
@Override @Override
public void run(Object parameter) { public void run(Object parameter) {
failCall(call, new IOException("Canceled connection")); calls.remove(call.id);
} }
}); });
if (controller.isCanceled()) {
// To finish if the call was cancelled before we set the notification (race condition)
call.cancel(true);
return call;
}
calls.put(call.id, call); calls.put(call.id, call);

View File

@ -20,10 +20,12 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -43,6 +45,8 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFutureImpl;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@ -521,6 +525,71 @@ public class TestReplicasClient {
} }
} }
@Test
public void testCancelOfMultiGet() throws Exception {
openRegion(hriSecondary);
try {
List<Put> puts = new ArrayList<Put>(2);
byte[] b1 = Bytes.toBytes("testCancelOfMultiGet" + 0);
Put p = new Put(b1);
p.add(f, b1, b1);
puts.add(p);
byte[] b2 = Bytes.toBytes("testCancelOfMultiGet" + 1);
p = new Put(b2);
p.add(f, b2, b2);
puts.add(p);
table.put(puts);
LOG.debug("PUT done");
flushRegion(hriPrimary);
LOG.info("flush done");
Thread.sleep(1000 + REFRESH_PERIOD * 2);
AsyncProcess ap = ((ClusterConnection) HTU.getHBaseAdmin().getConnection())
.getAsyncProcess();
// Make primary slowdown
SlowMeCopro.getCdl().set(new CountDownLatch(1));
List<Get> gets = new ArrayList<Get>();
Get g = new Get(b1);
g.setCheckExistenceOnly(true);
g.setConsistency(Consistency.TIMELINE);
gets.add(g);
g = new Get(b2);
g.setCheckExistenceOnly(true);
g.setConsistency(Consistency.TIMELINE);
gets.add(g);
Object[] results = new Object[2];
AsyncRequestFuture reqs = ap.submitAll(table.getPool(), table.getName(),
gets, null, results);
reqs.waitUntilDone();
// verify we got the right results back
for (Object r : results) {
Assert.assertTrue(((Result)r).isStale());
Assert.assertTrue(((Result)r).getExists());
}
Set<MultiServerCallable<Row>> set = ((AsyncRequestFutureImpl<?>)reqs).getCallsInProgress();
// verify we did cancel unneeded calls
Assert.assertTrue(!set.isEmpty());
for (MultiServerCallable<Row> m : set) {
Assert.assertTrue(m.isCancelled());
}
} finally {
SlowMeCopro.getCdl().get().countDown();
SlowMeCopro.sleepTime.set(0);
SlowMeCopro.slowDownNext.set(false);
SlowMeCopro.countOfNext.set(0);
for (int i = 0; i < 2; i++) {
byte[] b1 = Bytes.toBytes("testCancelOfMultiGet" + i);
Delete d = new Delete(b1);
table.delete(d);
}
closeRegion(hriSecondary);
}
}
@Test @Test
public void testScanWithReplicas() throws Exception { public void testScanWithReplicas() throws Exception {
//simple scan //simple scan