HBASE-12012. Improve cancellation for the scan RPCs (patch from master; had to resolve some conflicts)

This commit is contained in:
Devaraj Das 2014-12-22 16:45:41 -08:00
parent fd232cc3b5
commit 949982fc6e
8 changed files with 356 additions and 166 deletions

View File

@ -0,0 +1,31 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* This should be implemented by the Get/Scan implementations that
* talk to replica regions. When an RPC response is received from one
* of the replicas, the RPCs to the other replicas are cancelled.
*/
@InterfaceAudience.Private
interface Cancellable {
public void cancel();
public boolean isCancelled();
}

View File

@ -43,6 +43,8 @@ import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.annotations.VisibleForTesting;
/**
* Implements the scanner interface for the HBase client.
* If there are multiple regions in a table, this scanner will iterate
@ -276,6 +278,11 @@ public class ClientScanner extends AbstractClientScanner {
return true;
}
@VisibleForTesting
boolean isAnyRPCcancelled() {
return callable.isAnyRPCcancelled();
}
static Result[] call(Scan scan, ScannerCallableWithReplicas callable,
RpcRetryingCaller<Result[]> caller, int scannerTimeout)
throws IOException, RuntimeException {

View File

@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
@ -169,7 +168,7 @@ public class ClientSmallScanner extends ClientScanner {
ScanRequest request = RequestConverter.buildScanRequest(getLocation()
.getRegionInfo().getRegionName(), getScan(), getCaching(), true);
ScanResponse response = null;
PayloadCarryingRpcController controller = controllerFactory.newController();
controller = controllerFactory.newController();
try {
controller.setPriority(getTableName());
controller.setCallTimeout(timeout);
@ -183,8 +182,8 @@ public class ClientSmallScanner extends ClientScanner {
@Override
public ScannerCallable getScannerCallableForReplica(int id) {
return new SmallScannerCallable((ClusterConnection)connection, tableName, getScan(), scanMetrics,
controllerFactory, getCaching(), id);
return new SmallScannerCallable((ClusterConnection)connection, tableName, getScan(),
scanMetrics, controllerFactory, getCaching(), id);
}
}

View File

@ -0,0 +1,165 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.htrace.Trace;
/**
* A completion service for the RpcRetryingCallerFactory.
* Keeps the list of the futures, and allows to cancel them all.
* This means as well that it can be used for a small set of tasks only.
* <br>Implementation is not Thread safe.
*/
@InterfaceAudience.Private
public class ResultBoundedCompletionService<V> {
private final RpcRetryingCallerFactory retryingCallerFactory;
private final Executor executor;
private final QueueingFuture<V>[] tasks; // all the tasks
private volatile QueueingFuture<V> completed = null;
class QueueingFuture<T> implements RunnableFuture<T> {
private final RetryingCallable<T> future;
private T result = null;
private ExecutionException exeEx = null;
private volatile boolean cancelled;
private final int callTimeout;
private final RpcRetryingCaller<T> retryingCaller;
private boolean resultObtained = false;
public QueueingFuture(RetryingCallable<T> future, int callTimeout) {
this.future = future;
this.callTimeout = callTimeout;
this.retryingCaller = retryingCallerFactory.<T>newCaller();
}
@SuppressWarnings("unchecked")
@Override
public void run() {
try {
if (!cancelled) {
result =
this.retryingCaller.callWithRetries(future, callTimeout);
resultObtained = true;
}
} catch (Throwable t) {
exeEx = new ExecutionException(t);
} finally {
if (!cancelled && completed == null) {
completed = (QueueingFuture<V>) QueueingFuture.this;
synchronized (tasks) {
tasks.notify();
}
}
}
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (resultObtained || exeEx != null) return false;
retryingCaller.cancel();
if (future instanceof Cancellable) ((Cancellable)future).cancel();
cancelled = true;
return true;
}
@Override
public boolean isCancelled() {
return cancelled;
}
@Override
public boolean isDone() {
return resultObtained || exeEx != null;
}
@Override
public T get() throws InterruptedException, ExecutionException {
try {
return get(1000, TimeUnit.DAYS);
} catch (TimeoutException e) {
throw new RuntimeException("You did wait for 1000 days here?", e);
}
}
@Override
public T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
synchronized (tasks) {
if (resultObtained) {
return result;
}
if (exeEx != null) {
throw exeEx;
}
unit.timedWait(tasks, timeout);
}
if (resultObtained) {
return result;
}
if (exeEx != null) {
throw exeEx;
}
throw new TimeoutException("timeout=" + timeout + ", " + unit);
}
}
@SuppressWarnings("unchecked")
public ResultBoundedCompletionService(
RpcRetryingCallerFactory retryingCallerFactory, Executor executor,
int maxTasks) {
this.retryingCallerFactory = retryingCallerFactory;
this.executor = executor;
this.tasks = new QueueingFuture[maxTasks];
}
public void submit(RetryingCallable<V> task, int callTimeout, int id) {
QueueingFuture<V> newFuture = new QueueingFuture<V>(task, callTimeout);
executor.execute(Trace.wrap(newFuture));
tasks[id] = newFuture;
}
public QueueingFuture<V> take() throws InterruptedException {
synchronized (tasks) {
while (completed == null) tasks.wait();
}
return completed;
}
public QueueingFuture<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
synchronized (tasks) {
if (completed == null) unit.timedWait(tasks, timeout);
}
return completed;
}
public void cancelAll() {
for (QueueingFuture<V> future : tasks) {
if (future != null) future.cancel(true);
}
}
}

View File

@ -21,7 +21,6 @@
package org.apache.hadoop.hbase.client;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -41,7 +40,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import com.google.protobuf.ServiceException;
import org.htrace.Trace;
import java.io.IOException;
import java.io.InterruptedIOException;
@ -100,7 +98,7 @@ public class RpcRetryingCallerWithReadReplicas {
* - we need to stop retrying when the call is completed
* - we can be interrupted
*/
class ReplicaRegionServerCallable extends RegionServerCallable<Result> {
class ReplicaRegionServerCallable extends RegionServerCallable<Result> implements Cancellable {
final int id;
private final PayloadCarryingRpcController controller;
@ -113,7 +111,8 @@ public class RpcRetryingCallerWithReadReplicas {
controller.setPriority(tableName);
}
public void startCancel() {
@Override
public void cancel() {
controller.startCancel();
}
@ -170,6 +169,11 @@ public class RpcRetryingCallerWithReadReplicas {
throw ProtobufUtil.getRemoteException(se);
}
}
@Override
public boolean isCancelled() {
return controller.isCanceled();
}
}
/**
@ -195,7 +199,8 @@ public class RpcRetryingCallerWithReadReplicas {
RegionLocations rl = getRegionLocations(true, (isTargetReplicaSpecified ? get.getReplicaId()
: RegionReplicaUtil.DEFAULT_REPLICA_ID), cConnection, tableName, get.getRow());
ResultBoundedCompletionService cs = new ResultBoundedCompletionService(pool, rl.size());
ResultBoundedCompletionService<Result> cs =
new ResultBoundedCompletionService<Result>(this.rpcRetryingCallerFactory, pool, rl.size());
if(isTargetReplicaSpecified) {
addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId());
@ -274,12 +279,12 @@ public class RpcRetryingCallerWithReadReplicas {
* @param min - the id of the first replica, inclusive
* @param max - the id of the last replica, inclusive.
*/
private void addCallsForReplica(ResultBoundedCompletionService cs,
private void addCallsForReplica(ResultBoundedCompletionService<Result> cs,
RegionLocations rl, int min, int max) {
for (int id = min; id <= max; id++) {
HRegionLocation hrl = rl.getRegionLocation(id);
ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl);
cs.submit(callOnReplica, callTimeout);
cs.submit(callOnReplica, callTimeout, id);
}
}
@ -309,137 +314,4 @@ public class RpcRetryingCallerWithReadReplicas {
return rl;
}
/**
* A completion service for the RpcRetryingCallerFactory.
* Keeps the list of the futures, and allows to cancel them all.
* This means as well that it can be used for a small set of tasks only.
* <br>Implementation is not Thread safe.
*/
public class ResultBoundedCompletionService {
private final Executor executor;
private final QueueingFuture[] tasks; // all the tasks
private volatile QueueingFuture completed = null;
class QueueingFuture implements RunnableFuture<Result> {
private final ReplicaRegionServerCallable future;
private Result result = null;
private ExecutionException exeEx = null;
private volatile boolean canceled;
private final int callTimeout;
private final RpcRetryingCaller<Result> retryingCaller;
public QueueingFuture(ReplicaRegionServerCallable future, int callTimeout) {
this.future = future;
this.callTimeout = callTimeout;
this.retryingCaller = rpcRetryingCallerFactory.<Result>newCaller();
}
@Override
public void run() {
try {
if (!canceled) {
result =
rpcRetryingCallerFactory.<Result>newCaller().callWithRetries(future, callTimeout);
}
} catch (Throwable t) {
exeEx = new ExecutionException(t);
} finally {
if (!canceled && completed == null) {
completed = QueueingFuture.this;
synchronized (tasks) {
tasks.notify();
}
}
}
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (result != null || exeEx != null) return false;
retryingCaller.cancel();
future.startCancel();
canceled = true;
return true;
}
@Override
public boolean isCancelled() {
return canceled;
}
@Override
public boolean isDone() {
return result != null || exeEx != null;
}
@Override
public Result get() throws InterruptedException, ExecutionException {
try {
return get(1000, TimeUnit.DAYS);
} catch (TimeoutException e) {
throw new RuntimeException("You did wait for 1000 days here?", e);
}
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE",
justification="Is this an issue?")
@Override
public Result get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
synchronized (tasks) {
if (result != null) {
return result;
}
if (exeEx != null) {
throw exeEx;
}
unit.timedWait(tasks, timeout);
}
// Findbugs says this null check is redundant. Will result be set across the wait above?
if (result != null) {
return result;
}
if (exeEx != null) {
throw exeEx;
}
throw new TimeoutException("timeout=" + timeout + ", " + unit);
}
}
public ResultBoundedCompletionService(Executor executor, int maxTasks) {
this.executor = executor;
this.tasks = new QueueingFuture[maxTasks];
}
public void submit(ReplicaRegionServerCallable task, int callTimeout) {
QueueingFuture newFuture = new QueueingFuture(task, callTimeout);
executor.execute(Trace.wrap(newFuture));
tasks[task.id] = newFuture;
}
public QueueingFuture take() throws InterruptedException {
synchronized (tasks) {
while (completed == null) tasks.wait();
}
return completed;
}
public QueueingFuture poll(long timeout, TimeUnit unit) throws InterruptedException {
synchronized (tasks) {
if (completed == null) unit.timedWait(tasks, timeout);
}
return completed;
}
public void cancelAll() {
for (QueueingFuture future : tasks) {
if (future != null) future.cancel(true);
}
}
}
}

View File

@ -89,6 +89,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
protected boolean isRegionServerRemote = true;
private long nextCallSeq = 0;
protected RpcControllerFactory controllerFactory;
protected PayloadCarryingRpcController controller;
/**
* @param connection which connection
@ -124,6 +125,10 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
this.controllerFactory = rpcControllerFactory;
}
PayloadCarryingRpcController getController() {
return controller;
}
/**
* @param reload force reload of server location
* @throws IOException
@ -192,7 +197,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
incRPCcallsMetrics();
request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
ScanResponse response = null;
PayloadCarryingRpcController controller = controllerFactory.newController();
controller = controllerFactory.newController();
controller.setPriority(getTableName());
controller.setCallTimeout(callTimeout);
try {

View File

@ -39,8 +39,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.BoundedCompletionService;
import org.apache.hadoop.hbase.util.Pair;
import com.google.common.annotations.VisibleForTesting;
/**
* This class has the logic for handling scanners for regions with and without replicas.
* 1. A scan is attempted on the default (primary) region
@ -69,8 +70,9 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
private Configuration conf;
private int scannerTimeout;
private Set<ScannerCallable> outstandingCallables = new HashSet<ScannerCallable>();
private boolean someRPCcancelled = false; //required for testing purposes only
public ScannerCallableWithReplicas (TableName tableName, ClusterConnection cConnection,
public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection,
ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan,
int retries, int scannerTimeout, int caching, Configuration conf,
RpcRetryingCaller<Result []> caller) {
@ -134,8 +136,10 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
// allocate a boundedcompletion pool of some multiple of number of replicas.
// We want to accomodate some RPCs for redundant replica scans (but are still in progress)
BoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
new BoundedCompletionService<Pair<Result[], ScannerCallable>>(pool, rl.size() * 5);
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
new ResultBoundedCompletionService<Pair<Result[], ScannerCallable>>(
new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf), pool,
rl.size() * 5);
List<ExecutionException> exceptions = null;
int submitted = 0, completed = 0;
@ -192,7 +196,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
} finally {
// We get there because we were interrupted or because one or more of the
// calls succeeded or failed. In all case, we stop all our tasks.
cs.cancelAll(true);
cs.cancelAll();
}
if (exceptions != null && !exceptions.isEmpty()) {
@ -226,8 +230,14 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
// want to wait for the "close" to happen yet. The "wait" will happen when
// the table is closed (when the awaitTermination of the underlying pool is called)
s.setClose();
RetryingRPC r = new RetryingRPC(s);
pool.submit(r);
final RetryingRPC r = new RetryingRPC(s);
pool.submit(new Callable<Void>(){
@Override
public Void call() throws Exception {
r.call(scannerTimeout);
return null;
}
});
}
// now clear outstandingCallables since we scheduled a close for all the contained scanners
outstandingCallables.clear();
@ -244,16 +254,16 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
}
private int addCallsForCurrentReplica(
BoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) {
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) {
RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
outstandingCallables.add(currentScannerCallable);
cs.submit(retryingOnReplica);
cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id);
return 1;
}
private int addCallsForOtherReplicas(
BoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl, int min,
int max) {
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl,
int min, int max) {
if (scan.getConsistency() == Consistency.STRONG) {
return 0; // not scheduling on other replicas for strong consistency
}
@ -267,32 +277,85 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
}
outstandingCallables.add(s);
RetryingRPC retryingOnReplica = new RetryingRPC(s);
cs.submit(retryingOnReplica);
cs.submit(retryingOnReplica, scannerTimeout, id);
}
return max - min + 1;
}
class RetryingRPC implements Callable<Pair<Result[], ScannerCallable>> {
@VisibleForTesting
boolean isAnyRPCcancelled() {
return someRPCcancelled;
}
class RetryingRPC implements RetryingCallable<Pair<Result[], ScannerCallable>>, Cancellable {
final ScannerCallable callable;
RpcRetryingCaller<Result[]> caller;
private volatile boolean cancelled = false;
RetryingRPC(ScannerCallable callable) {
this.callable = callable;
}
@Override
public Pair<Result[], ScannerCallable> call() throws IOException {
// For the Consistency.STRONG (default case), we reuse the caller
// to keep compatibility with what is done in the past
// For the Consistency.TIMELINE case, we can't reuse the caller
// since we could be making parallel RPCs (caller.callWithRetries is synchronized
// and we can't invoke it multiple times at the same time)
RpcRetryingCaller<Result[]> caller = ScannerCallableWithReplicas.this.caller;
this.caller = ScannerCallableWithReplicas.this.caller;
if (scan.getConsistency() == Consistency.TIMELINE) {
caller = new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf).
this.caller = new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf).
<Result[]>newCaller();
}
Result[] res = caller.callWithRetries(callable, scannerTimeout);
return new Pair<Result[], ScannerCallable>(res, callable);
}
@Override
public Pair<Result[], ScannerCallable> call(int callTimeout) throws IOException {
// since the retries is done within the ResultBoundedCompletionService,
// we don't invoke callWithRetries here
if (cancelled) {
return null;
}
Result[] res = this.caller.callWithoutRetries(this.callable, callTimeout);
return new Pair<Result[], ScannerCallable>(res, this.callable);
}
@Override
public void prepare(boolean reload) throws IOException {
if (cancelled) return;
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
callable.prepare(reload);
}
@Override
public void throwable(Throwable t, boolean retrying) {
callable.throwable(t, retrying);
}
@Override
public String getExceptionMessageAdditionalDetail() {
return callable.getExceptionMessageAdditionalDetail();
}
@Override
public long sleep(long pause, int tries) {
return callable.sleep(pause, tries);
}
@Override
public void cancel() {
cancelled = true;
caller.cancel();
if (callable.getController() != null) {
callable.getController().startCancel();
}
someRPCcancelled = true;
}
@Override
public boolean isCancelled() {
return cancelled;
}
}

View File

@ -544,6 +544,54 @@ public class TestReplicasClient {
runMultipleScansOfOneType(true, false);
}
@Test
public void testCancelOfScan() throws Exception {
openRegion(hriSecondary);
int NUMROWS = 100;
try {
for (int i = 0; i < NUMROWS; i++) {
byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
Put p = new Put(b1);
p.add(f, b1, b1);
table.put(p);
}
LOG.debug("PUT done");
int caching = 20;
byte[] start;
start = Bytes.toBytes("testUseRegionWithReplica" + 0);
flushRegion(hriPrimary);
LOG.info("flush done");
Thread.sleep(1000 + REFRESH_PERIOD * 2);
// now make some 'next' calls slow
SlowMeCopro.slowDownNext.set(true);
SlowMeCopro.countOfNext.set(0);
SlowMeCopro.sleepTime.set(5000);
Scan scan = new Scan(start);
scan.setCaching(caching);
scan.setConsistency(Consistency.TIMELINE);
ResultScanner scanner = table.getScanner(scan);
Iterator<Result> iter = scanner.iterator();
iter.next();
Assert.assertTrue(((ClientScanner)scanner).isAnyRPCcancelled());
SlowMeCopro.slowDownNext.set(false);
SlowMeCopro.countOfNext.set(0);
} finally {
SlowMeCopro.cdl.get().countDown();
SlowMeCopro.sleepTime.set(0);
SlowMeCopro.slowDownNext.set(false);
SlowMeCopro.countOfNext.set(0);
for (int i = 0; i < NUMROWS; i++) {
byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
Delete d = new Delete(b1);
table.delete(d);
}
closeRegion(hriSecondary);
}
}
private void runMultipleScansOfOneType(boolean reversed, boolean small) throws Exception {
openRegion(hriSecondary);
int NUMROWS = 100;