HBASE-16345 RpcRetryingCallerWithReadReplicas#call() should catch some RegionServer Exceptions

Fix logic for
1). how to handle exception while waiting for reply from the primary replica.
2). handle exception from replicas while waiting for a correct response.

Signed-off-by: Esteban Gutierrez <esteban@apache.org>
This commit is contained in:
Huaxiang Sun 2016-10-18 14:10:09 -07:00 committed by Esteban Gutierrez
parent d9ee25e82a
commit 72db953886
4 changed files with 311 additions and 56 deletions

View File

@ -18,13 +18,18 @@
*/
package org.apache.hadoop.hbase.client;
import java.util.ArrayList;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.htrace.Trace;
/**
@ -32,13 +37,21 @@ import org.apache.htrace.Trace;
* Keeps the list of the futures, and allows to cancel them all.
* This means as well that it can be used for a small set of tasks only.
* <br>Implementation is not Thread safe.
*
* CompletedTasks is implemented as a queue, the entry is added based on the time order. I.e,
* when the first task completes (whether it is a success or failure), it is added as a first
* entry in the queue, the next completed task is added as a second entry in the queue, ...
* When iterating through the queue, we know it is based on time order. If the first
* completed task succeeds, it is returned. If it is failure, the iteration goes on until it
* finds a success.
*/
@InterfaceAudience.Private
public class ResultBoundedCompletionService<V> {
private static final Log LOG = LogFactory.getLog(ResultBoundedCompletionService.class);
private final RpcRetryingCallerFactory retryingCallerFactory;
private final Executor executor;
private final QueueingFuture<V>[] tasks; // all the tasks
private volatile QueueingFuture<V> completed = null;
private final ArrayList<QueueingFuture> completedTasks; // completed tasks
private volatile boolean cancelled = false;
class QueueingFuture<T> implements RunnableFuture<T> {
@ -49,12 +62,14 @@ public class ResultBoundedCompletionService<V> {
private final int callTimeout;
private final RpcRetryingCaller<T> retryingCaller;
private boolean resultObtained = false;
private final int replicaId; // replica id
public QueueingFuture(RetryingCallable<T> future, int callTimeout) {
public QueueingFuture(RetryingCallable<T> future, int callTimeout, int id) {
this.future = future;
this.callTimeout = callTimeout;
this.retryingCaller = retryingCallerFactory.<T>newCaller();
this.replicaId = id;
}
@SuppressWarnings("unchecked")
@ -70,8 +85,8 @@ public class ResultBoundedCompletionService<V> {
} finally {
synchronized (tasks) {
// If this wasn't canceled then store the result.
if (!cancelled && completed == null) {
completed = (QueueingFuture<V>) QueueingFuture.this;
if (!cancelled) {
completedTasks.add(QueueingFuture.this);
}
// Notify just in case there was someone waiting and this was canceled.
@ -80,6 +95,7 @@ public class ResultBoundedCompletionService<V> {
}
}
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (resultObtained || exeEx != null) return false;
@ -129,6 +145,14 @@ public class ResultBoundedCompletionService<V> {
throw new TimeoutException("timeout=" + timeout + ", " + unit);
}
public int getReplicaId() {
return replicaId;
}
public ExecutionException getExeEx() {
return exeEx;
}
}
@SuppressWarnings("unchecked")
@ -138,27 +162,103 @@ public class ResultBoundedCompletionService<V> {
this.retryingCallerFactory = retryingCallerFactory;
this.executor = executor;
this.tasks = new QueueingFuture[maxTasks];
this.completedTasks = new ArrayList<>(maxTasks);
}
public void submit(RetryingCallable<V> task, int callTimeout, int id) {
QueueingFuture<V> newFuture = new QueueingFuture<V>(task, callTimeout);
QueueingFuture<V> newFuture = new QueueingFuture<V>(task, callTimeout, id);
executor.execute(Trace.wrap(newFuture));
tasks[id] = newFuture;
}
public QueueingFuture<V> take() throws InterruptedException {
synchronized (tasks) {
while (completed == null && !cancelled) tasks.wait();
while (!cancelled && (completedTasks.size() < 1)) tasks.wait();
}
return completed;
return completedTasks.get(0);
}
/**
* Poll for the first completed task whether it is a success or execution exception.
*
* @param timeout - time to wait before it times out
* @param unit - time unit for timeout
*/
public QueueingFuture<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
synchronized (tasks) {
if (completed == null && !cancelled) unit.timedWait(tasks, timeout);
return pollForSpecificCompletedTask(timeout, unit, 0);
}
/**
* Poll for the first successfully completed task whose completed order is in startIndex,
* endIndex(exclusive) range
*
* @param timeout - time to wait before it times out
* @param unit - time unit for timeout
* @param startIndex - start index, starting from 0, inclusive
* @param endIndex - end index, exclusive
*
* @return If within timeout time, there is no successfully completed task, return null; If all
* tasks get execution exception, it will throw out the last execution exception,
* otherwise return the first successfully completed task's result.
*/
public QueueingFuture<V> pollForFirstSuccessfullyCompletedTask(long timeout, TimeUnit unit,
int startIndex, int endIndex)
throws InterruptedException, CancellationException, ExecutionException {
QueueingFuture<V> f;
long start, duration;
for (int i = startIndex; i < endIndex; i ++) {
start = EnvironmentEdgeManager.currentTime();
f = pollForSpecificCompletedTask(timeout, unit, i);
duration = EnvironmentEdgeManager.currentTime() - start;
// Even with operationTimeout less than 0, still loop through the rest as there could
// be other completed tasks before operationTimeout.
timeout -= duration;
if (f == null) {
return null;
} else if (f.getExeEx() != null) {
// we continue here as we need to loop through all the results.
if (LOG.isDebugEnabled()) {
LOG.debug("Replica " + ((f == null) ? 0 : f.getReplicaId()) + " returns " +
f.getExeEx().getCause());
}
if (i == (endIndex - 1)) {
// Rethrow this exception
throw f.getExeEx();
}
continue;
}
return f;
}
return completed;
// impossible to reach
return null;
}
/**
* Poll for the Nth completed task (index starts from 0 (the 1st), 1 (the second)...)
*
* @param timeout - time to wait before it times out
* @param unit - time unit for timeout
* @param index - the index(th) completed task, index starting from 0
*/
private QueueingFuture<V> pollForSpecificCompletedTask(long timeout, TimeUnit unit, int index)
throws InterruptedException {
if (index < 0) {
return null;
}
synchronized (tasks) {
if (!cancelled && (completedTasks.size() <= index)) unit.timedWait(tasks, timeout);
if (completedTasks.size() <= index) return null;
}
return completedTasks.get(index);
}
public void cancelAll() {

View File

@ -29,6 +29,8 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
@ -52,6 +54,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
*/
@InterfaceAudience.Private
public class RpcRetryingCallerWithReadReplicas {
private static final Log LOG = LogFactory.getLog(RpcRetryingCallerWithReadReplicas.class);
protected final ExecutorService pool;
protected final ClusterConnection cConnection;
protected final Configuration conf;
@ -171,9 +174,12 @@ public class RpcRetryingCallerWithReadReplicas {
: RegionReplicaUtil.DEFAULT_REPLICA_ID), cConnection, tableName, get.getRow());
final ResultBoundedCompletionService<Result> cs =
new ResultBoundedCompletionService<Result>(this.rpcRetryingCallerFactory, pool, rl.size());
int startIndex = 0;
int endIndex = rl.size();
if(isTargetReplicaSpecified) {
addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId());
endIndex = 1;
} else {
addCallsForReplica(cs, rl, 0, 0);
try {
@ -183,7 +189,13 @@ public class RpcRetryingCallerWithReadReplicas {
return f.get(); //great we got a response
}
} catch (ExecutionException e) {
throwEnrichedException(e, retries);
// We ignore the ExecutionException and continue with the secondary replicas
if (LOG.isDebugEnabled()) {
LOG.debug("Primary replica returns " + e.getCause());
}
// Skip the result from the primary as we know that there is something wrong
startIndex = 1;
} catch (CancellationException e) {
throw new InterruptedIOException();
} catch (InterruptedException e) {
@ -194,19 +206,14 @@ public class RpcRetryingCallerWithReadReplicas {
addCallsForReplica(cs, rl, 1, rl.size() - 1);
}
try {
try {
long start = EnvironmentEdgeManager.currentTime();
Future<Result> f = cs.poll(operationTimeout, TimeUnit.MILLISECONDS);
long duration = EnvironmentEdgeManager.currentTime() - start;
if (f == null) {
throw new RetriesExhaustedException("timed out after " + duration + " ms");
}
return f.get(operationTimeout - duration, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throwEnrichedException(e, retries);
} catch (TimeoutException te) {
Future<Result> f = cs.pollForFirstSuccessfullyCompletedTask(operationTimeout,
TimeUnit.MILLISECONDS, startIndex, endIndex);
if (f == null) {
throw new RetriesExhaustedException("timed out after " + operationTimeout + " ms");
}
return f.get();
} catch (ExecutionException e) {
throwEnrichedException(e, retries);
} catch (CancellationException e) {
throw new InterruptedIOException();
} catch (InterruptedException e) {
@ -217,6 +224,7 @@ public class RpcRetryingCallerWithReadReplicas {
cs.cancelAll();
}
LOG.error("Imposible? Arrive at an unreachable line..."); // unreachable
return null; // unreachable
}

View File

@ -169,54 +169,70 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
replicaSwitched.set(false);
// submit call for the primary replica.
addCallsForCurrentReplica(cs, rl);
int startIndex = 0;
try {
// wait for the timeout to see whether the primary responds back
Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeBeforeReplicas,
TimeUnit.MICROSECONDS); // Yes, microseconds
if (f != null) {
Pair<Result[], ScannerCallable> r = f.get(timeout, TimeUnit.MILLISECONDS);
// After poll, if f is not null, there must be a completed task
Pair<Result[], ScannerCallable> r = f.get();
if (r != null && r.getSecond() != null) {
updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
}
return r == null ? null : r.getFirst(); //great we got a response
}
} catch (ExecutionException e) {
RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
// We ignore the ExecutionException and continue with the replicas
if (LOG.isDebugEnabled()) {
LOG.debug("Scan with primary region returns " + e.getCause());
}
// If rl's size is 1 or scan's consitency is strong, it needs to throw
// out the exception from the primary replica
if ((rl.size() == 1) || (scan.getConsistency() == Consistency.STRONG)) {
// Rethrow the first exception
RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
}
startIndex = 1;
} catch (CancellationException e) {
throw new InterruptedIOException(e.getMessage());
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
} catch (TimeoutException e) {
throw new InterruptedIOException(e.getMessage());
}
// submit call for the all of the secondaries at once
// TODO: this may be an overkill for large region replication
addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
int endIndex = rl.size();
if (scan.getConsistency() == Consistency.STRONG) {
// When scan's consistency is strong, do not send to the secondaries
endIndex = 1;
} else {
// TODO: this may be an overkill for large region replication
addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
}
try {
long start = EnvironmentEdgeManager.currentTime();
Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeout, TimeUnit.MILLISECONDS);
long duration = EnvironmentEdgeManager.currentTime() - start;
if (f != null) {
Pair<Result[], ScannerCallable> r = f.get(timeout - duration, TimeUnit.MILLISECONDS);
if (r != null && r.getSecond() != null) {
updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
}
return r == null ? null : r.getFirst(); // great we got an answer
} else {
throw new IOException("Failed to get result within timeout, timeout="
+ timeout + "ms");
Future<Pair<Result[], ScannerCallable>> f = cs.pollForFirstSuccessfullyCompletedTask(timeout,
TimeUnit.MILLISECONDS, startIndex, endIndex);
if (f == null) {
throw new IOException("Failed to get result within timeout, timeout=" + timeout + "ms");
}
Pair<Result[], ScannerCallable> r = f.get();
if (r != null && r.getSecond() != null) {
updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
}
return r == null ? null : r.getFirst(); // great we got an answer
} catch (ExecutionException e) {
RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
} catch (CancellationException e) {
throw new InterruptedIOException(e.getMessage());
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
} catch (TimeoutException e) {
throw new InterruptedIOException(e.getMessage());
} finally {
// We get there because we were interrupted or because one or more of the
// calls succeeded or failed. In all case, we stop all our tasks.
@ -292,9 +308,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
private void addCallsForOtherReplicas(
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl,
int min, int max) {
if (scan.getConsistency() == Consistency.STRONG) {
return; // not scheduling on other replicas for strong consistency
}
for (int id = min; id <= max; id++) {
if (currentScannerCallable.id == id) {
continue; //this was already scheduled earlier

View File

@ -45,6 +45,8 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@ -63,7 +65,7 @@ import org.junit.experimental.categories.Category;
public class TestReplicaWithCluster {
private static final Log LOG = LogFactory.getLog(TestReplicaWithCluster.class);
private static final int NB_SERVERS = 2;
private static final int NB_SERVERS = 3;
private static final byte[] row = TestReplicaWithCluster.class.getName().getBytes();
private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
@ -109,11 +111,56 @@ public class TestReplicaWithCluster {
}
}
/**
* This copro is used to simulate region server down exception for Get and Scan
*/
public static class RegionServerStoppedCopro extends BaseRegionObserver {
public RegionServerStoppedCopro() {
}
@Override
public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
final Get get, final List<Cell> results) throws IOException {
int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
// Fail for the primary replica and replica 1
if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) {
LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId);
throw new RegionServerStoppedException("Server " +
e.getEnvironment().getRegionServerServices().getServerName()
+ " not running");
} else {
LOG.info("We're replica region " + replicaId);
}
}
@Override
public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
final Scan scan, final RegionScanner s) throws IOException {
int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
// Fail for the primary replica and replica 1
if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) {
LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId);
throw new RegionServerStoppedException("Server " +
e.getEnvironment().getRegionServerServices().getServerName()
+ " not running");
} else {
LOG.info("We're replica region " + replicaId);
}
return null;
}
}
@BeforeClass
public static void beforeClass() throws Exception {
// enable store file refreshing
HTU.getConfiguration().setInt(
StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, REFRESH_PERIOD);
HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
REFRESH_PERIOD);
HTU.getConfiguration().setFloat("hbase.regionserver.logroll.multiplier", 0.0001f);
HTU.getConfiguration().setInt("replication.source.size.capacity", 10240);
@ -123,6 +170,13 @@ public class TestReplicaWithCluster {
HTU.getConfiguration().setInt("zookeeper.recovery.retry", 1);
HTU.getConfiguration().setInt("zookeeper.recovery.retry.intervalmill", 10);
// Wait for primary call longer so make sure that it will get exception from the primary call
HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.get", 1000000);
HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.scan", 1000000);
// Retry less so it can fail faster
HTU.getConfiguration().setInt("hbase.client.retries.number", 1);
HTU.startMiniCluster(NB_SERVERS);
HTU.getHBaseCluster().startMaster();
}
@ -263,39 +317,39 @@ public class TestReplicaWithCluster {
LOG.info("Put & flush done on the first cluster. Now doing a get on the same cluster.");
Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
@Override public boolean evaluate() throws Exception {
try {
SlowMeCopro.cdl.set(new CountDownLatch(1));
Get g = new Get(row);
g.setConsistency(Consistency.TIMELINE);
Result r = table.get(g);
Assert.assertTrue(r.isStale());
return !r.isEmpty();
return !r.isEmpty();
} finally {
SlowMeCopro.cdl.get().countDown();
SlowMeCopro.sleepTime.set(0);
}
}});
}
});
table.close();
LOG.info("stale get on the first cluster done. Now for the second.");
final Table table2 = HTU.getConnection().getTable(hdt.getTableName());
Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
@Override public boolean evaluate() throws Exception {
try {
SlowMeCopro.cdl.set(new CountDownLatch(1));
Get g = new Get(row);
g.setConsistency(Consistency.TIMELINE);
Result r = table2.get(g);
Assert.assertTrue(r.isStale());
return !r.isEmpty();
return !r.isEmpty();
} finally {
SlowMeCopro.cdl.get().countDown();
SlowMeCopro.sleepTime.set(0);
}
}});
}
});
table2.close();
HTU.getHBaseAdmin().disableTable(hdt.getTableName());
@ -387,4 +441,83 @@ public class TestReplicaWithCluster {
HTU.getHBaseAdmin().disableTable(hdt.getTableName());
HTU.deleteTable(hdt.getTableName());
}
@Test
public void testReplicaGetWithPrimaryDown() throws IOException {
// Create table then get the single region for our new table.
HTableDescriptor hdt = HTU.createTableDescriptor("testCreateDeleteTable");
hdt.setRegionReplication(NB_SERVERS);
hdt.addCoprocessor(RegionServerStoppedCopro.class.getName());
try {
Table table = HTU.createTable(hdt, new byte[][] { f }, null);
Put p = new Put(row);
p.addColumn(f, row, row);
table.put(p);
// Flush so it can be picked by the replica refresher thread
HTU.flush(table.getName());
// Sleep for some time until data is picked up by replicas
try {
Thread.sleep(2 * REFRESH_PERIOD);
} catch (InterruptedException e1) {
LOG.error(e1);
}
// But if we ask for stale we will get it
Get g = new Get(row);
g.setConsistency(Consistency.TIMELINE);
Result r = table.get(g);
Assert.assertTrue(r.isStale());
} finally {
HTU.getHBaseAdmin().disableTable(hdt.getTableName());
HTU.deleteTable(hdt.getTableName());
}
}
@Test
public void testReplicaScanWithPrimaryDown() throws IOException {
// Create table then get the single region for our new table.
HTableDescriptor hdt = HTU.createTableDescriptor("testCreateDeleteTable");
hdt.setRegionReplication(NB_SERVERS);
hdt.addCoprocessor(RegionServerStoppedCopro.class.getName());
try {
Table table = HTU.createTable(hdt, new byte[][] { f }, null);
Put p = new Put(row);
p.addColumn(f, row, row);
table.put(p);
// Flush so it can be picked by the replica refresher thread
HTU.flush(table.getName());
// Sleep for some time until data is picked up by replicas
try {
Thread.sleep(2 * REFRESH_PERIOD);
} catch (InterruptedException e1) {
LOG.error(e1);
}
// But if we ask for stale we will get it
// Instantiating the Scan class
Scan scan = new Scan();
// Scanning the required columns
scan.addFamily(f);
scan.setConsistency(Consistency.TIMELINE);
// Getting the scan result
ResultScanner scanner = table.getScanner(scan);
Result r = scanner.next();
Assert.assertTrue(r.isStale());
} finally {
HTU.getHBaseAdmin().disableTable(hdt.getTableName());
HTU.deleteTable(hdt.getTableName());
}
}
}