From ca87d05a518338e64099f42c229d557b93ce51c8 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Fri, 29 Sep 2017 14:02:49 -0700 Subject: [PATCH] HBASE-18436 Add client-side hedged read metrics (Yun Zhao) --- .../hbase/client/MetricsConnection.java | 14 ++ .../RpcRetryingCallerWithReadReplicas.java | 11 +- .../hbase/client/TestReplicasClient.java | 126 +++++++++++++++--- .../TestSplitTransactionOnCluster.java | 6 +- 4 files changed, 132 insertions(+), 25 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java index 31612f34f83..c54729b866c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java @@ -269,6 +269,8 @@ public class MetricsConnection implements StatisticTrackable { @VisibleForTesting protected final RunnerStats runnerStats; @VisibleForTesting protected final Counter metaCacheNumClearServer; @VisibleForTesting protected final Counter metaCacheNumClearRegion; + @VisibleForTesting protected final Counter hedgedReadOps; + @VisibleForTesting protected final Counter hedgedReadWin; // dynamic metrics @@ -315,6 +317,8 @@ public class MetricsConnection implements StatisticTrackable { "metaCacheNumClearServer", scope)); this.metaCacheNumClearRegion = registry.counter(name(this.getClass(), "metaCacheNumClearRegion", scope)); + this.hedgedReadOps = registry.counter(name(this.getClass(), "hedgedReadOps", scope)); + this.hedgedReadWin = registry.counter(name(this.getClass(), "hedgedReadWin", scope)); this.getTracker = new CallTracker(this.registry, "Get", scope); this.scanTracker = new CallTracker(this.registry, "Scan", scope); this.appendTracker = new CallTracker(this.registry, "Mutate", "Append", scope); @@ -373,6 +377,16 @@ public class MetricsConnection implements StatisticTrackable { metaCacheNumClearRegion.inc(); } + /** Increment the number of hedged read that have occurred. */ + public void incrHedgedReadOps() { + hedgedReadOps.inc(); + } + + /** Increment the number of hedged read returned faster than the original read. */ + public void incrHedgedReadWin() { + hedgedReadWin.inc(); + } + /** Increment the number of normal runner counts. */ public void incrNormalRunners() { this.runnerStats.incrNormalRunners(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index e7a4ba6a353..c6ba228a785 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -216,6 +216,9 @@ public class RpcRetryingCallerWithReadReplicas { if (f != null) { return f.get(); //great we got a response } + if (cConnection.getConnectionMetrics() != null) { + cConnection.getConnectionMetrics().incrHedgedReadOps(); + } } catch (ExecutionException e) { // We ignore the ExecutionException and continue with the secondary replicas if (LOG.isDebugEnabled()) { @@ -238,13 +241,17 @@ public class RpcRetryingCallerWithReadReplicas { addCallsForReplica(cs, rl, 1, rl.size() - 1); } try { - Future f = cs.pollForFirstSuccessfullyCompletedTask(operationTimeout, - TimeUnit.MILLISECONDS, startIndex, endIndex); + ResultBoundedCompletionService.QueueingFuture f = + cs.pollForFirstSuccessfullyCompletedTask(operationTimeout, TimeUnit.MILLISECONDS, startIndex, endIndex); if (f == null) { throw new RetriesExhaustedException("Timed out after " + operationTimeout + "ms. Get is sent to replicas with startIndex: " + startIndex + ", endIndex: " + endIndex + ", Locations: " + rl); } + if (cConnection.getConnectionMetrics() != null && !isTargetReplicaSpecified && + !skipPrimary && f.getReplicaId() != RegionReplicaUtil.DEFAULT_REPLICA_ID) { + cConnection.getConnectionMetrics().incrHedgedReadWin(); + } return f.get(); } catch (ExecutionException e) { throwEnrichedException(e, retries); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java index 1a3cfbff0d7..ced7ce8be1b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import com.codahale.metrics.Counter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; @@ -104,7 +105,9 @@ public class TestReplicasClient { static final AtomicLong sleepTime = new AtomicLong(0); static final AtomicBoolean slowDownNext = new AtomicBoolean(false); static final AtomicInteger countOfNext = new AtomicInteger(0); - private static final AtomicReference cdl = + private static final AtomicReference primaryCdl = + new AtomicReference<>(new CountDownLatch(0)); + private static final AtomicReference secondaryCdl = new AtomicReference<>(new CountDownLatch(0)); Random r = new Random(); public SlowMeCopro() { @@ -146,7 +149,8 @@ public class TestReplicasClient { private void slowdownCode(final ObserverContext e) { if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) { - CountDownLatch latch = getCdl().get(); + LOG.info("We're the primary replicas."); + CountDownLatch latch = getPrimaryCdl().get(); try { if (sleepTime.get() > 0) { LOG.info("Sleeping for " + sleepTime.get() + " ms"); @@ -163,11 +167,27 @@ public class TestReplicasClient { } } else { LOG.info("We're not the primary replicas."); + CountDownLatch latch = getSecondaryCdl().get(); + try { + if (latch.getCount() > 0) { + LOG.info("Waiting for the secondary counterCountDownLatch"); + latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. + if (latch.getCount() > 0) { + throw new RuntimeException("Can't wait more"); + } + } + } catch (InterruptedException e1) { + LOG.error(e1); + } } } - public static AtomicReference getCdl() { - return cdl; + public static AtomicReference getPrimaryCdl() { + return primaryCdl; + } + + public static AtomicReference getSecondaryCdl() { + return secondaryCdl; } } @@ -177,6 +197,7 @@ public class TestReplicasClient { HTU.getConfiguration().setInt( StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, REFRESH_PERIOD); HTU.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true); + HTU.getConfiguration().setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true); ConnectionUtils.setupMasterlessConnection(HTU.getConfiguration()); HTU.startMiniCluster(NB_SERVERS); @@ -296,7 +317,7 @@ public class TestReplicasClient { public void testUseRegionWithoutReplica() throws Exception { byte[] b1 = "testUseRegionWithoutReplica".getBytes(); openRegion(hriSecondary); - SlowMeCopro.getCdl().set(new CountDownLatch(0)); + SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(0)); try { Get g = new Get(b1); Result r = table.get(g); @@ -352,14 +373,14 @@ public class TestReplicasClient { byte[] b1 = "testGetNoResultStaleRegionWithReplica".getBytes(); openRegion(hriSecondary); - SlowMeCopro.getCdl().set(new CountDownLatch(1)); + SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); try { Get g = new Get(b1); g.setConsistency(Consistency.TIMELINE); Result r = table.get(g); Assert.assertTrue(r.isStale()); } finally { - SlowMeCopro.getCdl().get().countDown(); + SlowMeCopro.getPrimaryCdl().get().countDown(); closeRegion(hriSecondary); } } @@ -469,13 +490,13 @@ public class TestReplicasClient { LOG.info("sleep and is not stale done"); // But if we ask for stale we will get it - SlowMeCopro.getCdl().set(new CountDownLatch(1)); + SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); g = new Get(b1); g.setConsistency(Consistency.TIMELINE); r = table.get(g); Assert.assertTrue(r.isStale()); Assert.assertTrue(r.getColumnCells(f, b1).isEmpty()); - SlowMeCopro.getCdl().get().countDown(); + SlowMeCopro.getPrimaryCdl().get().countDown(); LOG.info("stale done"); @@ -488,14 +509,14 @@ public class TestReplicasClient { LOG.info("exists not stale done"); // exists works on stale but don't see the put - SlowMeCopro.getCdl().set(new CountDownLatch(1)); + SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); g = new Get(b1); g.setCheckExistenceOnly(true); g.setConsistency(Consistency.TIMELINE); r = table.get(g); Assert.assertTrue(r.isStale()); Assert.assertFalse("The secondary has stale data", r.getExists()); - SlowMeCopro.getCdl().get().countDown(); + SlowMeCopro.getPrimaryCdl().get().countDown(); LOG.info("exists stale before flush done"); flushRegion(hriPrimary); @@ -504,28 +525,93 @@ public class TestReplicasClient { Thread.sleep(1000 + REFRESH_PERIOD * 2); // get works and is not stale - SlowMeCopro.getCdl().set(new CountDownLatch(1)); + SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); g = new Get(b1); g.setConsistency(Consistency.TIMELINE); r = table.get(g); Assert.assertTrue(r.isStale()); Assert.assertFalse(r.isEmpty()); - SlowMeCopro.getCdl().get().countDown(); + SlowMeCopro.getPrimaryCdl().get().countDown(); LOG.info("stale done"); // exists works on stale and we see the put after the flush - SlowMeCopro.getCdl().set(new CountDownLatch(1)); + SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); g = new Get(b1); g.setCheckExistenceOnly(true); g.setConsistency(Consistency.TIMELINE); r = table.get(g); Assert.assertTrue(r.isStale()); Assert.assertTrue(r.getExists()); - SlowMeCopro.getCdl().get().countDown(); + SlowMeCopro.getPrimaryCdl().get().countDown(); LOG.info("exists stale after flush done"); } finally { - SlowMeCopro.getCdl().get().countDown(); + SlowMeCopro.getPrimaryCdl().get().countDown(); + SlowMeCopro.sleepTime.set(0); + Delete d = new Delete(b1); + table.delete(d); + closeRegion(hriSecondary); + } + } + + @Test + public void testHedgedRead() throws Exception { + byte[] b1 = "testHedgedRead".getBytes(); + openRegion(hriSecondary); + + try { + // A simple put works, even if there here a second replica + Put p = new Put(b1); + p.addColumn(f, b1, b1); + table.put(p); + LOG.info("Put done"); + + // A get works and is not stale + Get g = new Get(b1); + Result r = table.get(g); + Assert.assertFalse(r.isStale()); + Assert.assertFalse(r.getColumnCells(f, b1).isEmpty()); + LOG.info("get works and is not stale done"); + + //reset + ClusterConnection connection = (ClusterConnection) HTU.getConnection(); + Counter hedgedReadOps = connection.getConnectionMetrics().hedgedReadOps; + Counter hedgedReadWin = connection.getConnectionMetrics().hedgedReadWin; + hedgedReadOps.dec(hedgedReadOps.getCount()); + hedgedReadWin.dec(hedgedReadWin.getCount()); + + // Wait a little on the main region, just enough to happen once hedged read + // and hedged read did not returned faster + int primaryCallTimeoutMicroSecond = connection.getConnectionConfiguration().getPrimaryCallTimeoutMicroSecond(); + SlowMeCopro.sleepTime.set(TimeUnit.MICROSECONDS.toMillis(primaryCallTimeoutMicroSecond)); + SlowMeCopro.getSecondaryCdl().set(new CountDownLatch(1)); + g = new Get(b1); + g.setConsistency(Consistency.TIMELINE); + r = table.get(g); + Assert.assertFalse(r.isStale()); + Assert.assertFalse(r.getColumnCells(f, b1).isEmpty()); + Assert.assertEquals(hedgedReadOps.getCount(), 1); + Assert.assertEquals(hedgedReadWin.getCount(), 0); + SlowMeCopro.sleepTime.set(0); + SlowMeCopro.getSecondaryCdl().get().countDown(); + LOG.info("hedged read occurred but not faster"); + + + // But if we ask for stale we will get it and hedged read returned faster + SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); + g = new Get(b1); + g.setConsistency(Consistency.TIMELINE); + r = table.get(g); + Assert.assertTrue(r.isStale()); + Assert.assertTrue(r.getColumnCells(f, b1).isEmpty()); + Assert.assertEquals(hedgedReadOps.getCount(), 2); + Assert.assertEquals(hedgedReadWin.getCount(), 1); + SlowMeCopro.getPrimaryCdl().get().countDown(); + LOG.info("hedged read occurred and faster"); + + } finally { + SlowMeCopro.getPrimaryCdl().get().countDown(); + SlowMeCopro.getSecondaryCdl().get().countDown(); SlowMeCopro.sleepTime.set(0); Delete d = new Delete(b1); table.delete(d); @@ -557,7 +643,7 @@ public class TestReplicasClient { AsyncProcess ap = ((ClusterConnection) HTU.getConnection()).getAsyncProcess(); // Make primary slowdown - SlowMeCopro.getCdl().set(new CountDownLatch(1)); + SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); List gets = new ArrayList<>(); Get g = new Get(b1); @@ -595,7 +681,7 @@ public class TestReplicasClient { Assert.assertTrue(m.isCancelled()); } } finally { - SlowMeCopro.getCdl().get().countDown(); + SlowMeCopro.getPrimaryCdl().get().countDown(); SlowMeCopro.sleepTime.set(0); SlowMeCopro.slowDownNext.set(false); SlowMeCopro.countOfNext.set(0); @@ -661,7 +747,7 @@ public class TestReplicasClient { SlowMeCopro.slowDownNext.set(false); SlowMeCopro.countOfNext.set(0); } finally { - SlowMeCopro.getCdl().get().countDown(); + SlowMeCopro.getPrimaryCdl().get().countDown(); SlowMeCopro.sleepTime.set(0); SlowMeCopro.slowDownNext.set(false); SlowMeCopro.countOfNext.set(0); @@ -742,7 +828,7 @@ public class TestReplicasClient { SlowMeCopro.slowDownNext.set(false); SlowMeCopro.countOfNext.set(0); } finally { - SlowMeCopro.getCdl().get().countDown(); + SlowMeCopro.getPrimaryCdl().get().countDown(); SlowMeCopro.sleepTime.set(0); SlowMeCopro.slowDownNext.set(false); SlowMeCopro.countOfNext.set(0); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java index 1a69be382ee..cee4caf3fd3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java @@ -617,15 +617,15 @@ public class TestSplitTransactionOnCluster { Assert.assertFalse(r.isStale()); LOG.info("exists stale after flush done"); - SlowMeCopro.getCdl().set(new CountDownLatch(1)); + SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); g = new Get(b1); g.setConsistency(Consistency.TIMELINE); // This will succeed because in the previous GET we get the location of the replica r = t.get(g); Assert.assertTrue(r.isStale()); - SlowMeCopro.getCdl().get().countDown(); + SlowMeCopro.getPrimaryCdl().get().countDown(); } finally { - SlowMeCopro.getCdl().get().countDown(); + SlowMeCopro.getPrimaryCdl().get().countDown(); admin.setBalancerRunning(true, false); cluster.getMaster().setCatalogJanitorEnabled(true); t.close();