diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
index 9b32e93106e..2848c9df743 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
@@ -18,13 +18,18 @@
*/
package org.apache.hadoop.hbase.client;
+import java.util.ArrayList;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.htrace.Trace;
/**
@@ -32,13 +37,21 @@ import org.apache.htrace.Trace;
* Keeps the list of the futures, and allows to cancel them all.
* This means as well that it can be used for a small set of tasks only.
*
Implementation is not Thread safe.
+ *
+ * CompletedTasks is implemented as a queue, the entry is added based on the time order. I.e,
+ * when the first task completes (whether it is a success or failure), it is added as a first
+ * entry in the queue, the next completed task is added as a second entry in the queue, ...
+ * When iterating through the queue, we know it is based on time order. If the first
+ * completed task succeeds, it is returned. If it is failure, the iteration goes on until it
+ * finds a success.
*/
@InterfaceAudience.Private
public class ResultBoundedCompletionService {
+ private static final Log LOG = LogFactory.getLog(ResultBoundedCompletionService.class);
private final RpcRetryingCallerFactory retryingCallerFactory;
private final Executor executor;
private final QueueingFuture[] tasks; // all the tasks
- private volatile QueueingFuture completed = null;
+ private final ArrayList completedTasks; // completed tasks
private volatile boolean cancelled = false;
class QueueingFuture implements RunnableFuture {
@@ -49,12 +62,14 @@ public class ResultBoundedCompletionService {
private final int callTimeout;
private final RpcRetryingCaller retryingCaller;
private boolean resultObtained = false;
+ private final int replicaId; // replica id
- public QueueingFuture(RetryingCallable future, int callTimeout) {
+ public QueueingFuture(RetryingCallable future, int callTimeout, int id) {
this.future = future;
this.callTimeout = callTimeout;
this.retryingCaller = retryingCallerFactory.newCaller();
+ this.replicaId = id;
}
@SuppressWarnings("unchecked")
@@ -70,8 +85,8 @@ public class ResultBoundedCompletionService {
} finally {
synchronized (tasks) {
// If this wasn't canceled then store the result.
- if (!cancelled && completed == null) {
- completed = (QueueingFuture) QueueingFuture.this;
+ if (!cancelled) {
+ completedTasks.add(QueueingFuture.this);
}
// Notify just in case there was someone waiting and this was canceled.
@@ -80,6 +95,7 @@ public class ResultBoundedCompletionService {
}
}
}
+
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (resultObtained || exeEx != null) return false;
@@ -129,6 +145,14 @@ public class ResultBoundedCompletionService {
throw new TimeoutException("timeout=" + timeout + ", " + unit);
}
+
+ public int getReplicaId() {
+ return replicaId;
+ }
+
+ public ExecutionException getExeEx() {
+ return exeEx;
+ }
}
@SuppressWarnings("unchecked")
@@ -138,27 +162,103 @@ public class ResultBoundedCompletionService {
this.retryingCallerFactory = retryingCallerFactory;
this.executor = executor;
this.tasks = new QueueingFuture[maxTasks];
+ this.completedTasks = new ArrayList<>(maxTasks);
}
public void submit(RetryingCallable task, int callTimeout, int id) {
- QueueingFuture newFuture = new QueueingFuture(task, callTimeout);
+ QueueingFuture newFuture = new QueueingFuture(task, callTimeout, id);
executor.execute(Trace.wrap(newFuture));
tasks[id] = newFuture;
}
public QueueingFuture take() throws InterruptedException {
synchronized (tasks) {
- while (completed == null && !cancelled) tasks.wait();
+ while (!cancelled && (completedTasks.size() < 1)) tasks.wait();
}
- return completed;
+ return completedTasks.get(0);
}
+ /**
+ * Poll for the first completed task whether it is a success or execution exception.
+ *
+ * @param timeout - time to wait before it times out
+ * @param unit - time unit for timeout
+ */
public QueueingFuture poll(long timeout, TimeUnit unit) throws InterruptedException {
- synchronized (tasks) {
- if (completed == null && !cancelled) unit.timedWait(tasks, timeout);
+ return pollForSpecificCompletedTask(timeout, unit, 0);
+ }
+
+ /**
+ * Poll for the first successfully completed task whose completed order is in startIndex,
+ * endIndex(exclusive) range
+ *
+ * @param timeout - time to wait before it times out
+ * @param unit - time unit for timeout
+ * @param startIndex - start index, starting from 0, inclusive
+ * @param endIndex - end index, exclusive
+ *
+ * @return If within timeout time, there is no successfully completed task, return null; If all
+ * tasks get execution exception, it will throw out the last execution exception,
+ * otherwise return the first successfully completed task's result.
+ */
+ public QueueingFuture pollForFirstSuccessfullyCompletedTask(long timeout, TimeUnit unit,
+ int startIndex, int endIndex)
+ throws InterruptedException, CancellationException, ExecutionException {
+
+ QueueingFuture f;
+ long start, duration;
+ for (int i = startIndex; i < endIndex; i ++) {
+
+ start = EnvironmentEdgeManager.currentTime();
+ f = pollForSpecificCompletedTask(timeout, unit, i);
+ duration = EnvironmentEdgeManager.currentTime() - start;
+
+ // Even with operationTimeout less than 0, still loop through the rest as there could
+ // be other completed tasks before operationTimeout.
+ timeout -= duration;
+
+ if (f == null) {
+ return null;
+ } else if (f.getExeEx() != null) {
+ // we continue here as we need to loop through all the results.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Replica " + ((f == null) ? 0 : f.getReplicaId()) + " returns " +
+ f.getExeEx().getCause());
+ }
+
+ if (i == (endIndex - 1)) {
+ // Rethrow this exception
+ throw f.getExeEx();
+ }
+ continue;
+ }
+
+ return f;
}
- return completed;
+
+ // impossible to reach
+ return null;
+ }
+
+ /**
+ * Poll for the Nth completed task (index starts from 0 (the 1st), 1 (the second)...)
+ *
+ * @param timeout - time to wait before it times out
+ * @param unit - time unit for timeout
+ * @param index - the index(th) completed task, index starting from 0
+ */
+ private QueueingFuture pollForSpecificCompletedTask(long timeout, TimeUnit unit, int index)
+ throws InterruptedException {
+ if (index < 0) {
+ return null;
+ }
+
+ synchronized (tasks) {
+ if (!cancelled && (completedTasks.size() <= index)) unit.timedWait(tasks, timeout);
+ if (completedTasks.size() <= index) return null;
+ }
+ return completedTasks.get(index);
}
public void cancelAll() {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index aefa3bc66b6..d1c40abb6a4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -200,9 +200,12 @@ public class RpcRetryingCallerWithReadReplicas {
: RegionReplicaUtil.DEFAULT_REPLICA_ID), cConnection, tableName, get.getRow());
ResultBoundedCompletionService cs =
new ResultBoundedCompletionService(this.rpcRetryingCallerFactory, pool, rl.size());
+ int startIndex = 0;
+ int endIndex = rl.size();
if(isTargetReplicaSpecified) {
addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId());
+ endIndex = 1;
} else {
addCallsForReplica(cs, rl, 0, 0);
try {
@@ -212,7 +215,13 @@ public class RpcRetryingCallerWithReadReplicas {
return f.get(); //great we got a response
}
} catch (ExecutionException e) {
- throwEnrichedException(e, retries);
+ // We ignore the ExecutionException and continue with the secondary replicas
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Primary replica returns " + e.getCause());
+ }
+
+ // Skip the result from the primary as we know that there is something wrong
+ startIndex = 1;
} catch (CancellationException e) {
throw new InterruptedIOException();
} catch (InterruptedException e) {
@@ -224,19 +233,14 @@ public class RpcRetryingCallerWithReadReplicas {
}
try {
- try {
- long start = EnvironmentEdgeManager.currentTime();
- Future f = cs.poll(operationTimeout, TimeUnit.MILLISECONDS);
- long duration = EnvironmentEdgeManager.currentTime() - start;
- if (f == null) {
- throw new RetriesExhaustedException("timed out after " + duration + " ms");
- }
- return f.get(operationTimeout - duration, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throwEnrichedException(e, retries);
- } catch (TimeoutException te) {
+ Future f = cs.pollForFirstSuccessfullyCompletedTask(operationTimeout,
+ TimeUnit.MILLISECONDS, startIndex, endIndex);
+ if (f == null) {
throw new RetriesExhaustedException("timed out after " + operationTimeout + " ms");
}
+ return f.get();
+ } catch (ExecutionException e) {
+ throwEnrichedException(e, retries);
} catch (CancellationException e) {
throw new InterruptedIOException();
} catch (InterruptedException e) {
@@ -247,6 +251,7 @@ public class RpcRetryingCallerWithReadReplicas {
cs.cancelAll();
}
+ LOG.error("Imposible? Arrive at an unreachable line..."); // unreachable
return null; // unreachable
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index 4e8647fead3..a030e67efc6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -169,54 +169,70 @@ class ScannerCallableWithReplicas implements RetryingCallable {
replicaSwitched.set(false);
// submit call for the primary replica.
addCallsForCurrentReplica(cs, rl);
+ int startIndex = 0;
try {
// wait for the timeout to see whether the primary responds back
Future> f = cs.poll(timeBeforeReplicas,
TimeUnit.MICROSECONDS); // Yes, microseconds
if (f != null) {
- Pair r = f.get(timeout, TimeUnit.MILLISECONDS);
+ // After poll, if f is not null, there must be a completed task
+ Pair r = f.get();
if (r != null && r.getSecond() != null) {
updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
}
return r == null ? null : r.getFirst(); //great we got a response
}
} catch (ExecutionException e) {
- RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
+ // We ignore the ExecutionException and continue with the replicas
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Scan with primary region returns " + e.getCause());
+ }
+
+ // If rl's size is 1 or scan's consitency is strong, it needs to throw
+ // out the exception from the primary replica
+ if ((rl.size() == 1) || (scan.getConsistency() == Consistency.STRONG)) {
+ // Rethrow the first exception
+ RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
+ }
+
+ startIndex = 1;
} catch (CancellationException e) {
throw new InterruptedIOException(e.getMessage());
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
- } catch (TimeoutException e) {
- throw new InterruptedIOException(e.getMessage());
}
// submit call for the all of the secondaries at once
- // TODO: this may be an overkill for large region replication
- addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
+ int endIndex = rl.size();
+ if (scan.getConsistency() == Consistency.STRONG) {
+ // When scan's consistency is strong, do not send to the secondaries
+ endIndex = 1;
+ } else {
+ // TODO: this may be an overkill for large region replication
+ addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
+ }
try {
- long start = EnvironmentEdgeManager.currentTime();
- Future> f = cs.poll(timeout, TimeUnit.MILLISECONDS);
- long duration = EnvironmentEdgeManager.currentTime() - start;
- if (f != null) {
- Pair r = f.get(timeout - duration, TimeUnit.MILLISECONDS);
- if (r != null && r.getSecond() != null) {
- updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
- }
- return r == null ? null : r.getFirst(); // great we got an answer
- } else {
- throw new IOException("Failed to get result within timeout, timeout="
- + timeout + "ms");
+ Future> f = cs.pollForFirstSuccessfullyCompletedTask(timeout,
+ TimeUnit.MILLISECONDS, startIndex, endIndex);
+
+ if (f == null) {
+ throw new IOException("Failed to get result within timeout, timeout=" + timeout + "ms");
}
+ Pair r = f.get();
+
+ if (r != null && r.getSecond() != null) {
+ updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
+ }
+ return r == null ? null : r.getFirst(); // great we got an answer
+
} catch (ExecutionException e) {
RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
} catch (CancellationException e) {
throw new InterruptedIOException(e.getMessage());
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
- } catch (TimeoutException e) {
- throw new InterruptedIOException(e.getMessage());
} finally {
// We get there because we were interrupted or because one or more of the
// calls succeeded or failed. In all case, we stop all our tasks.
@@ -293,9 +309,7 @@ class ScannerCallableWithReplicas implements RetryingCallable {
private void addCallsForOtherReplicas(
ResultBoundedCompletionService> cs, RegionLocations rl,
int min, int max) {
- if (scan.getConsistency() == Consistency.STRONG) {
- return; // not scheduling on other replicas for strong consistency
- }
+
for (int id = min; id <= max; id++) {
if (currentScannerCallable.id == id) {
continue; //this was already scheduled earlier
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
index 5967a69938d..617a8f70156 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
@@ -38,6 +38,8 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@@ -63,7 +65,7 @@ import java.util.concurrent.atomic.AtomicReference;
public class TestReplicaWithCluster {
private static final Log LOG = LogFactory.getLog(TestReplicaWithCluster.class);
- private static final int NB_SERVERS = 2;
+ private static final int NB_SERVERS = 3;
private static final byte[] row = TestReplicaWithCluster.class.getName().getBytes();
private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
@@ -110,6 +112,51 @@ public class TestReplicaWithCluster {
}
}
+ /**
+ * This copro is used to simulate region server down exception for Get and Scan
+ */
+ public static class RegionServerStoppedCopro extends BaseRegionObserver {
+
+ public RegionServerStoppedCopro() {
+ }
+
+ @Override
+ public void preGetOp(final ObserverContext e,
+ final Get get, final List results) throws IOException {
+
+ int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
+
+ // Fail for the primary replica and replica 1
+ if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) {
+ LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId);
+ throw new RegionServerStoppedException("Server " +
+ e.getEnvironment().getRegionServerServices().getServerName()
+ + " not running");
+ } else {
+ LOG.info("We're replica region " + replicaId);
+ }
+ }
+
+ @Override
+ public RegionScanner preScannerOpen(final ObserverContext e,
+ final Scan scan, final RegionScanner s) throws IOException {
+
+ int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
+
+ // Fail for the primary replica and replica 1
+ if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) {
+ LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId);
+ throw new RegionServerStoppedException("Server " +
+ e.getEnvironment().getRegionServerServices().getServerName()
+ + " not running");
+ } else {
+ LOG.info("We're replica region " + replicaId);
+ }
+
+ return null;
+ }
+ }
+
@BeforeClass
public static void beforeClass() throws Exception {
// enable store file refreshing
@@ -124,13 +171,19 @@ public class TestReplicaWithCluster {
HTU.getConfiguration().setInt("zookeeper.recovery.retry", 1);
HTU.getConfiguration().setInt("zookeeper.recovery.retry.intervalmill", 10);
+ // Wait for primary call longer so make sure that it will get exception from the primary call
+ HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.get", 1000000);
+ HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.scan", 1000000);
+
HTU.startMiniCluster(NB_SERVERS);
HTU.getHBaseCluster().startMaster();
}
@AfterClass
public static void afterClass() throws Exception {
- HTU2.shutdownMiniCluster();
+ if (HTU2 != null) {
+ HTU2.shutdownMiniCluster();
+ }
HTU.shutdownMiniCluster();
}
@@ -382,4 +435,90 @@ public class TestReplicaWithCluster {
HTU.getHBaseAdmin().disableTable(hdt.getTableName());
HTU.deleteTable(hdt.getTableName());
}
+
+ @Test
+ public void testReplicaGetWithPrimaryDown() throws IOException {
+ // Create table then get the single region for our new table.
+ HTableDescriptor hdt = HTU.createTableDescriptor("testCreateDeleteTable");
+ hdt.setRegionReplication(NB_SERVERS);
+ hdt.addCoprocessor(RegionServerStoppedCopro.class.getName());
+ try {
+ // Retry less so it can fail faster
+ HTU.getConfiguration().setInt("hbase.client.retries.number", 1);
+
+ Table table = HTU.createTable(hdt, new byte[][] { f }, null);
+
+ Put p = new Put(row);
+ p.addColumn(f, row, row);
+ table.put(p);
+
+ // Flush so it can be picked by the replica refresher thread
+ HTU.flush(table.getName());
+
+ // Sleep for some time until data is picked up by replicas
+ try {
+ Thread.sleep(2 * REFRESH_PERIOD);
+ } catch (InterruptedException e1) {
+ LOG.error(e1);
+ }
+
+ // But if we ask for stale we will get it
+ Get g = new Get(row);
+ g.setConsistency(Consistency.TIMELINE);
+ Result r = table.get(g);
+ Assert.assertTrue(r.isStale());
+ } finally {
+ HTU.getConfiguration().unset("hbase.client.retries.number");
+ HTU.getHBaseAdmin().disableTable(hdt.getTableName());
+ HTU.deleteTable(hdt.getTableName());
+ }
+ }
+
+ @Test
+ public void testReplicaScanWithPrimaryDown() throws IOException {
+ // Create table then get the single region for our new table.
+ HTableDescriptor hdt = HTU.createTableDescriptor("testCreateDeleteTable");
+ hdt.setRegionReplication(NB_SERVERS);
+ hdt.addCoprocessor(RegionServerStoppedCopro.class.getName());
+
+ try {
+ // Retry less so it can fail faster
+ HTU.getConfiguration().setInt("hbase.client.retries.number", 1);
+
+ Table table = HTU.createTable(hdt, new byte[][] { f }, null);
+
+ Put p = new Put(row);
+ p.addColumn(f, row, row);
+ table.put(p);
+
+ // Flush so it can be picked by the replica refresher thread
+ HTU.flush(table.getName());
+
+ // Sleep for some time until data is picked up by replicas
+ try {
+ Thread.sleep(2 * REFRESH_PERIOD);
+ } catch (InterruptedException e1) {
+ LOG.error(e1);
+ }
+
+ // But if we ask for stale we will get it
+ // Instantiating the Scan class
+ Scan scan = new Scan();
+
+ // Scanning the required columns
+ scan.addFamily(f);
+ scan.setConsistency(Consistency.TIMELINE);
+
+ // Getting the scan result
+ ResultScanner scanner = table.getScanner(scan);
+
+ Result r = scanner.next();
+
+ Assert.assertTrue(r.isStale());
+ } finally {
+ HTU.getConfiguration().unset("hbase.client.retries.number");
+ HTU.getHBaseAdmin().disableTable(hdt.getTableName());
+ HTU.deleteTable(hdt.getTableName());
+ }
+ }
}
|