HBASE-12012. Improve cancellation for the scan RPCs

This commit is contained in:
Devaraj Das 2014-12-22 16:22:04 -08:00
parent 83db450fc6
commit 1c45d1cd9d
8 changed files with 356 additions and 168 deletions

View File

@ -0,0 +1,31 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* This should be implemented by the Get/Scan implementations that
* talk to replica regions. When an RPC response is received from one
* of the replicas, the RPCs to the other replicas are cancelled.
*/
@InterfaceAudience.Private
interface Cancellable {
public void cancel();
public boolean isCancelled();
}

View File

@ -42,6 +42,8 @@ import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.annotations.VisibleForTesting;
/**
* Implements the scanner interface for the HBase client.
* If there are multiple regions in a table, this scanner will iterate
@ -275,6 +277,11 @@ public class ClientScanner extends AbstractClientScanner {
return true;
}
@VisibleForTesting
boolean isAnyRPCcancelled() {
return callable.isAnyRPCcancelled();
}
static Result[] call(Scan scan, ScannerCallableWithReplicas callable,
RpcRetryingCaller<Result[]> caller, int scannerTimeout)
throws IOException, RuntimeException {

View File

@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
@ -169,7 +168,7 @@ public class ClientSmallScanner extends ClientScanner {
ScanRequest request = RequestConverter.buildScanRequest(getLocation()
.getRegionInfo().getRegionName(), getScan(), getCaching(), true);
ScanResponse response = null;
PayloadCarryingRpcController controller = controllerFactory.newController();
controller = controllerFactory.newController();
try {
controller.setPriority(getTableName());
controller.setCallTimeout(timeout);
@ -183,8 +182,8 @@ public class ClientSmallScanner extends ClientScanner {
@Override
public ScannerCallable getScannerCallableForReplica(int id) {
return new SmallScannerCallable((ClusterConnection)connection, tableName, getScan(), scanMetrics,
controllerFactory, getCaching(), id);
return new SmallScannerCallable((ClusterConnection)connection, tableName, getScan(),
scanMetrics, controllerFactory, getCaching(), id);
}
}

View File

@ -0,0 +1,165 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.htrace.Trace;
/**
* A completion service for the RpcRetryingCallerFactory.
* Keeps the list of the futures, and allows to cancel them all.
* This means as well that it can be used for a small set of tasks only.
* <br>Implementation is not Thread safe.
*/
@InterfaceAudience.Private
public class ResultBoundedCompletionService<V> {
private final RpcRetryingCallerFactory retryingCallerFactory;
private final Executor executor;
private final QueueingFuture<V>[] tasks; // all the tasks
private volatile QueueingFuture<V> completed = null;
class QueueingFuture<T> implements RunnableFuture<T> {
private final RetryingCallable<T> future;
private T result = null;
private ExecutionException exeEx = null;
private volatile boolean cancelled;
private final int callTimeout;
private final RpcRetryingCaller<T> retryingCaller;
private boolean resultObtained = false;
public QueueingFuture(RetryingCallable<T> future, int callTimeout) {
this.future = future;
this.callTimeout = callTimeout;
this.retryingCaller = retryingCallerFactory.<T>newCaller();
}
@SuppressWarnings("unchecked")
@Override
public void run() {
try {
if (!cancelled) {
result =
this.retryingCaller.callWithRetries(future, callTimeout);
resultObtained = true;
}
} catch (Throwable t) {
exeEx = new ExecutionException(t);
} finally {
if (!cancelled && completed == null) {
completed = (QueueingFuture<V>) QueueingFuture.this;
synchronized (tasks) {
tasks.notify();
}
}
}
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (resultObtained || exeEx != null) return false;
retryingCaller.cancel();
if (future instanceof Cancellable) ((Cancellable)future).cancel();
cancelled = true;
return true;
}
@Override
public boolean isCancelled() {
return cancelled;
}
@Override
public boolean isDone() {
return resultObtained || exeEx != null;
}
@Override
public T get() throws InterruptedException, ExecutionException {
try {
return get(1000, TimeUnit.DAYS);
} catch (TimeoutException e) {
throw new RuntimeException("You did wait for 1000 days here?", e);
}
}
@Override
public T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
synchronized (tasks) {
if (resultObtained) {
return result;
}
if (exeEx != null) {
throw exeEx;
}
unit.timedWait(tasks, timeout);
}
if (resultObtained) {
return result;
}
if (exeEx != null) {
throw exeEx;
}
throw new TimeoutException("timeout=" + timeout + ", " + unit);
}
}
@SuppressWarnings("unchecked")
public ResultBoundedCompletionService(
RpcRetryingCallerFactory retryingCallerFactory, Executor executor,
int maxTasks) {
this.retryingCallerFactory = retryingCallerFactory;
this.executor = executor;
this.tasks = new QueueingFuture[maxTasks];
}
public void submit(RetryingCallable<V> task, int callTimeout, int id) {
QueueingFuture<V> newFuture = new QueueingFuture<V>(task, callTimeout);
executor.execute(Trace.wrap(newFuture));
tasks[id] = newFuture;
}
public QueueingFuture<V> take() throws InterruptedException {
synchronized (tasks) {
while (completed == null) tasks.wait();
}
return completed;
}
public QueueingFuture<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
synchronized (tasks) {
if (completed == null) unit.timedWait(tasks, timeout);
}
return completed;
}
public void cancelAll() {
for (QueueingFuture<V> future : tasks) {
if (future != null) future.cancel(true);
}
}
}

View File

@ -27,12 +27,9 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -53,7 +50,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import com.google.protobuf.ServiceException;
import org.htrace.Trace;
/**
* Caller that goes to replica if the primary region does no answer within a configurable
@ -99,7 +95,7 @@ public class RpcRetryingCallerWithReadReplicas {
* - we need to stop retrying when the call is completed
* - we can be interrupted
*/
class ReplicaRegionServerCallable extends RegionServerCallable<Result> {
class ReplicaRegionServerCallable extends RegionServerCallable<Result> implements Cancellable {
final int id;
private final PayloadCarryingRpcController controller;
@ -112,7 +108,8 @@ public class RpcRetryingCallerWithReadReplicas {
controller.setPriority(tableName);
}
public void startCancel() {
@Override
public void cancel() {
controller.startCancel();
}
@ -169,6 +166,11 @@ public class RpcRetryingCallerWithReadReplicas {
throw ProtobufUtil.getRemoteException(se);
}
}
@Override
public boolean isCancelled() {
return controller.isCanceled();
}
}
/**
@ -194,7 +196,8 @@ public class RpcRetryingCallerWithReadReplicas {
RegionLocations rl = getRegionLocations(true, (isTargetReplicaSpecified ? get.getReplicaId()
: RegionReplicaUtil.DEFAULT_REPLICA_ID), cConnection, tableName, get.getRow());
ResultBoundedCompletionService cs = new ResultBoundedCompletionService(pool, rl.size());
ResultBoundedCompletionService<Result> cs =
new ResultBoundedCompletionService<Result>(this.rpcRetryingCallerFactory, pool, rl.size());
if(isTargetReplicaSpecified) {
addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId());
@ -273,12 +276,12 @@ public class RpcRetryingCallerWithReadReplicas {
* @param min - the id of the first replica, inclusive
* @param max - the id of the last replica, inclusive.
*/
private void addCallsForReplica(ResultBoundedCompletionService cs,
private void addCallsForReplica(ResultBoundedCompletionService<Result> cs,
RegionLocations rl, int min, int max) {
for (int id = min; id <= max; id++) {
HRegionLocation hrl = rl.getRegionLocation(id);
ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl);
cs.submit(callOnReplica, callTimeout);
cs.submit(callOnReplica, callTimeout, id);
}
}
@ -308,137 +311,4 @@ public class RpcRetryingCallerWithReadReplicas {
return rl;
}
/**
* A completion service for the RpcRetryingCallerFactory.
* Keeps the list of the futures, and allows to cancel them all.
* This means as well that it can be used for a small set of tasks only.
* <br>Implementation is not Thread safe.
*/
public class ResultBoundedCompletionService {
private final Executor executor;
private final QueueingFuture[] tasks; // all the tasks
private volatile QueueingFuture completed = null;
class QueueingFuture implements RunnableFuture<Result> {
private final ReplicaRegionServerCallable future;
private Result result = null;
private ExecutionException exeEx = null;
private volatile boolean canceled;
private final int callTimeout;
private final RpcRetryingCaller<Result> retryingCaller;
public QueueingFuture(ReplicaRegionServerCallable future, int callTimeout) {
this.future = future;
this.callTimeout = callTimeout;
this.retryingCaller = rpcRetryingCallerFactory.<Result>newCaller();
}
@Override
public void run() {
try {
if (!canceled) {
result =
rpcRetryingCallerFactory.<Result>newCaller().callWithRetries(future, callTimeout);
}
} catch (Throwable t) {
exeEx = new ExecutionException(t);
} finally {
if (!canceled && completed == null) {
completed = QueueingFuture.this;
synchronized (tasks) {
tasks.notify();
}
}
}
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (result != null || exeEx != null) return false;
retryingCaller.cancel();
future.startCancel();
canceled = true;
return true;
}
@Override
public boolean isCancelled() {
return canceled;
}
@Override
public boolean isDone() {
return result != null || exeEx != null;
}
@Override
public Result get() throws InterruptedException, ExecutionException {
try {
return get(1000, TimeUnit.DAYS);
} catch (TimeoutException e) {
throw new RuntimeException("You did wait for 1000 days here?", e);
}
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE",
justification="Is this an issue?")
@Override
public Result get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
synchronized (tasks) {
if (result != null) {
return result;
}
if (exeEx != null) {
throw exeEx;
}
unit.timedWait(tasks, timeout);
}
// Findbugs says this null check is redundant. Will result be set across the wait above?
if (result != null) {
return result;
}
if (exeEx != null) {
throw exeEx;
}
throw new TimeoutException("timeout=" + timeout + ", " + unit);
}
}
public ResultBoundedCompletionService(Executor executor, int maxTasks) {
this.executor = executor;
this.tasks = new QueueingFuture[maxTasks];
}
public void submit(ReplicaRegionServerCallable task, int callTimeout) {
QueueingFuture newFuture = new QueueingFuture(task, callTimeout);
executor.execute(Trace.wrap(newFuture));
tasks[task.id] = newFuture;
}
public QueueingFuture take() throws InterruptedException {
synchronized (tasks) {
while (completed == null) tasks.wait();
}
return completed;
}
public QueueingFuture poll(long timeout, TimeUnit unit) throws InterruptedException {
synchronized (tasks) {
if (completed == null) unit.timedWait(tasks, timeout);
}
return completed;
}
public void cancelAll() {
for (QueueingFuture future : tasks) {
if (future != null) future.cancel(true);
}
}
}
}

View File

@ -88,6 +88,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
protected boolean isRegionServerRemote = true;
private long nextCallSeq = 0;
protected RpcControllerFactory controllerFactory;
protected PayloadCarryingRpcController controller;
/**
* @param connection which connection
@ -123,6 +124,10 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
this.controllerFactory = rpcControllerFactory;
}
PayloadCarryingRpcController getController() {
return controller;
}
/**
* @param reload force reload of server location
* @throws IOException
@ -191,7 +196,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
incRPCcallsMetrics();
request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
ScanResponse response = null;
PayloadCarryingRpcController controller = controllerFactory.newController();
controller = controllerFactory.newController();
controller.setPriority(getTableName());
controller.setCallTimeout(callTimeout);
try {

View File

@ -39,8 +39,9 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.BoundedCompletionService;
import org.apache.hadoop.hbase.util.Pair;
import com.google.common.annotations.VisibleForTesting;
/**
* This class has the logic for handling scanners for regions with and without replicas.
* 1. A scan is attempted on the default (primary) region
@ -69,6 +70,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
private Configuration conf;
private int scannerTimeout;
private Set<ScannerCallable> outstandingCallables = new HashSet<ScannerCallable>();
private boolean someRPCcancelled = false; //required for testing purposes only
public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection,
ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan,
@ -134,8 +136,10 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
// allocate a boundedcompletion pool of some multiple of number of replicas.
// We want to accomodate some RPCs for redundant replica scans (but are still in progress)
BoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
new BoundedCompletionService<Pair<Result[], ScannerCallable>>(pool, rl.size() * 5);
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
new ResultBoundedCompletionService<Pair<Result[], ScannerCallable>>(
new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf), pool,
rl.size() * 5);
List<ExecutionException> exceptions = null;
int submitted = 0, completed = 0;
@ -192,7 +196,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
} finally {
// We get there because we were interrupted or because one or more of the
// calls succeeded or failed. In all case, we stop all our tasks.
cs.cancelAll(true);
cs.cancelAll();
}
if (exceptions != null && !exceptions.isEmpty()) {
@ -226,8 +230,14 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
// want to wait for the "close" to happen yet. The "wait" will happen when
// the table is closed (when the awaitTermination of the underlying pool is called)
s.setClose();
RetryingRPC r = new RetryingRPC(s);
pool.submit(r);
final RetryingRPC r = new RetryingRPC(s);
pool.submit(new Callable<Void>(){
@Override
public Void call() throws Exception {
r.call(scannerTimeout);
return null;
}
});
}
// now clear outstandingCallables since we scheduled a close for all the contained scanners
outstandingCallables.clear();
@ -244,16 +254,16 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
}
private int addCallsForCurrentReplica(
BoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) {
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) {
RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
outstandingCallables.add(currentScannerCallable);
cs.submit(retryingOnReplica);
cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id);
return 1;
}
private int addCallsForOtherReplicas(
BoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl, int min,
int max) {
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl,
int min, int max) {
if (scan.getConsistency() == Consistency.STRONG) {
return 0; // not scheduling on other replicas for strong consistency
}
@ -267,32 +277,85 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
}
outstandingCallables.add(s);
RetryingRPC retryingOnReplica = new RetryingRPC(s);
cs.submit(retryingOnReplica);
cs.submit(retryingOnReplica, scannerTimeout, id);
}
return max - min + 1;
}
class RetryingRPC implements Callable<Pair<Result[], ScannerCallable>> {
@VisibleForTesting
boolean isAnyRPCcancelled() {
return someRPCcancelled;
}
class RetryingRPC implements RetryingCallable<Pair<Result[], ScannerCallable>>, Cancellable {
final ScannerCallable callable;
RpcRetryingCaller<Result[]> caller;
private volatile boolean cancelled = false;
RetryingRPC(ScannerCallable callable) {
this.callable = callable;
}
@Override
public Pair<Result[], ScannerCallable> call() throws IOException {
// For the Consistency.STRONG (default case), we reuse the caller
// to keep compatibility with what is done in the past
// For the Consistency.TIMELINE case, we can't reuse the caller
// since we could be making parallel RPCs (caller.callWithRetries is synchronized
// and we can't invoke it multiple times at the same time)
RpcRetryingCaller<Result[]> caller = ScannerCallableWithReplicas.this.caller;
this.caller = ScannerCallableWithReplicas.this.caller;
if (scan.getConsistency() == Consistency.TIMELINE) {
caller = new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf).
this.caller = new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf).
<Result[]>newCaller();
}
Result[] res = caller.callWithRetries(callable, scannerTimeout);
return new Pair<Result[], ScannerCallable>(res, callable);
}
@Override
public Pair<Result[], ScannerCallable> call(int callTimeout) throws IOException {
// since the retries is done within the ResultBoundedCompletionService,
// we don't invoke callWithRetries here
if (cancelled) {
return null;
}
Result[] res = this.caller.callWithoutRetries(this.callable, callTimeout);
return new Pair<Result[], ScannerCallable>(res, this.callable);
}
@Override
public void prepare(boolean reload) throws IOException {
if (cancelled) return;
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
callable.prepare(reload);
}
@Override
public void throwable(Throwable t, boolean retrying) {
callable.throwable(t, retrying);
}
@Override
public String getExceptionMessageAdditionalDetail() {
return callable.getExceptionMessageAdditionalDetail();
}
@Override
public long sleep(long pause, int tries) {
return callable.sleep(pause, tries);
}
@Override
public void cancel() {
cancelled = true;
caller.cancel();
if (callable.getController() != null) {
callable.getController().startCancel();
}
someRPCcancelled = true;
}
@Override
public boolean isCancelled() {
return cancelled;
}
}

View File

@ -539,6 +539,54 @@ public class TestReplicasClient {
runMultipleScansOfOneType(true, false);
}
@Test
public void testCancelOfScan() throws Exception {
openRegion(hriSecondary);
int NUMROWS = 100;
try {
for (int i = 0; i < NUMROWS; i++) {
byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
Put p = new Put(b1);
p.add(f, b1, b1);
table.put(p);
}
LOG.debug("PUT done");
int caching = 20;
byte[] start;
start = Bytes.toBytes("testUseRegionWithReplica" + 0);
flushRegion(hriPrimary);
LOG.info("flush done");
Thread.sleep(1000 + REFRESH_PERIOD * 2);
// now make some 'next' calls slow
SlowMeCopro.slowDownNext.set(true);
SlowMeCopro.countOfNext.set(0);
SlowMeCopro.sleepTime.set(5000);
Scan scan = new Scan(start);
scan.setCaching(caching);
scan.setConsistency(Consistency.TIMELINE);
ResultScanner scanner = table.getScanner(scan);
Iterator<Result> iter = scanner.iterator();
iter.next();
Assert.assertTrue(((ClientScanner)scanner).isAnyRPCcancelled());
SlowMeCopro.slowDownNext.set(false);
SlowMeCopro.countOfNext.set(0);
} finally {
SlowMeCopro.getCdl().get().countDown();
SlowMeCopro.sleepTime.set(0);
SlowMeCopro.slowDownNext.set(false);
SlowMeCopro.countOfNext.set(0);
for (int i = 0; i < NUMROWS; i++) {
byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
Delete d = new Delete(b1);
table.delete(d);
}
closeRegion(hriSecondary);
}
}
private void runMultipleScansOfOneType(boolean reversed, boolean small) throws Exception {
openRegion(hriSecondary);
int NUMROWS = 100;