HBASE-18436 Add client-side hedged read metrics (Yun Zhao)
Conflicts: hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
This commit is contained in:
parent
aa86657a52
commit
93ad1aba70
|
@ -278,6 +278,8 @@ public class MetricsConnection implements StatisticTrackable {
|
|||
@VisibleForTesting protected final RunnerStats runnerStats;
|
||||
@VisibleForTesting protected final Counter metaCacheNumClearServer;
|
||||
@VisibleForTesting protected final Counter metaCacheNumClearRegion;
|
||||
@VisibleForTesting protected final Counter hedgedReadOps;
|
||||
@VisibleForTesting protected final Counter hedgedReadWin;
|
||||
|
||||
// dynamic metrics
|
||||
|
||||
|
@ -336,6 +338,8 @@ public class MetricsConnection implements StatisticTrackable {
|
|||
"metaCacheNumClearServer", scope);
|
||||
this.metaCacheNumClearRegion = registry.newCounter(this.getClass(),
|
||||
"metaCacheNumClearRegion", scope);
|
||||
this.hedgedReadOps = registry.newCounter(this.getClass(), "hedgedReadOps", scope);
|
||||
this.hedgedReadWin = registry.newCounter(this.getClass(), "hedgedReadWin", scope);
|
||||
this.getTracker = new CallTracker(this.registry, "Get", scope);
|
||||
this.scanTracker = new CallTracker(this.registry, "Scan", scope);
|
||||
this.appendTracker = new CallTracker(this.registry, "Mutate", "Append", scope);
|
||||
|
@ -395,6 +399,16 @@ public class MetricsConnection implements StatisticTrackable {
|
|||
metaCacheNumClearRegion.inc();
|
||||
}
|
||||
|
||||
/** Increment the number of hedged read that have occurred. */
|
||||
public void incrHedgedReadOps() {
|
||||
hedgedReadOps.inc();
|
||||
}
|
||||
|
||||
/** Increment the number of hedged read returned faster than the original read. */
|
||||
public void incrHedgedReadWin() {
|
||||
hedgedReadWin.inc();
|
||||
}
|
||||
|
||||
/** Increment the number of normal runner counts. */
|
||||
public void incrNormalRunners() {
|
||||
this.runnerStats.incrNormalRunners();
|
||||
|
|
|
@ -244,6 +244,9 @@ public class RpcRetryingCallerWithReadReplicas {
|
|||
if (f != null) {
|
||||
return f.get(); //great we got a response
|
||||
}
|
||||
if (cConnection.getConnectionMetrics() != null) {
|
||||
cConnection.getConnectionMetrics().incrHedgedReadOps();
|
||||
}
|
||||
} catch (ExecutionException e) {
|
||||
// We ignore the ExecutionException and continue with the secondary replicas
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
@ -267,13 +270,17 @@ public class RpcRetryingCallerWithReadReplicas {
|
|||
}
|
||||
|
||||
try {
|
||||
Future<Result> f = cs.pollForFirstSuccessfullyCompletedTask(operationTimeout,
|
||||
TimeUnit.MILLISECONDS, startIndex, endIndex);
|
||||
ResultBoundedCompletionService<Result>.QueueingFuture<Result> f =
|
||||
cs.pollForFirstSuccessfullyCompletedTask(operationTimeout, TimeUnit.MILLISECONDS, startIndex, endIndex);
|
||||
if (f == null) {
|
||||
throw new RetriesExhaustedException("Timed out after " + operationTimeout +
|
||||
"ms. Get is sent to replicas with startIndex: " + startIndex +
|
||||
", endIndex: " + endIndex + ", Locations: " + rl);
|
||||
}
|
||||
if (cConnection.getConnectionMetrics() != null && !isTargetReplicaSpecified &&
|
||||
!skipPrimary && f.getReplicaId() != RegionReplicaUtil.DEFAULT_REPLICA_ID) {
|
||||
cConnection.getConnectionMetrics().incrHedgedReadWin();
|
||||
}
|
||||
return f.get();
|
||||
} catch (ExecutionException e) {
|
||||
throwEnrichedException(e, retries);
|
||||
|
|
|
@ -71,6 +71,8 @@ import org.junit.BeforeClass;
|
|||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import com.yammer.metrics.core.Counter;
|
||||
|
||||
/**
|
||||
* Tests for region replicas. Sad that we cannot isolate these without bringing up a whole
|
||||
* cluster. See {@link org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster}.
|
||||
|
@ -102,8 +104,10 @@ public class TestReplicasClient {
|
|||
static final AtomicLong sleepTime = new AtomicLong(0);
|
||||
static final AtomicBoolean slowDownNext = new AtomicBoolean(false);
|
||||
static final AtomicInteger countOfNext = new AtomicInteger(0);
|
||||
private static final AtomicReference<CountDownLatch> cdl =
|
||||
new AtomicReference<CountDownLatch>(new CountDownLatch(0));
|
||||
private static final AtomicReference<CountDownLatch> primaryCdl =
|
||||
new AtomicReference<>(new CountDownLatch(0));
|
||||
private static final AtomicReference<CountDownLatch> secondaryCdl =
|
||||
new AtomicReference<>(new CountDownLatch(0));
|
||||
Random r = new Random();
|
||||
public SlowMeCopro() {
|
||||
}
|
||||
|
@ -139,7 +143,8 @@ public class TestReplicasClient {
|
|||
|
||||
private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) {
|
||||
if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
|
||||
CountDownLatch latch = getCdl().get();
|
||||
LOG.info("We're the primary replicas.");
|
||||
CountDownLatch latch = getPrimaryCdl().get();
|
||||
try {
|
||||
if (sleepTime.get() > 0) {
|
||||
LOG.info("Sleeping for " + sleepTime.get() + " ms");
|
||||
|
@ -156,11 +161,27 @@ public class TestReplicasClient {
|
|||
}
|
||||
} else {
|
||||
LOG.info("We're not the primary replicas.");
|
||||
CountDownLatch latch = getSecondaryCdl().get();
|
||||
try {
|
||||
if (latch.getCount() > 0) {
|
||||
LOG.info("Waiting for the secondary counterCountDownLatch");
|
||||
latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
|
||||
if (latch.getCount() > 0) {
|
||||
throw new RuntimeException("Can't wait more");
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e1) {
|
||||
LOG.error(e1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static AtomicReference<CountDownLatch> getCdl() {
|
||||
return cdl;
|
||||
public static AtomicReference<CountDownLatch> getPrimaryCdl() {
|
||||
return primaryCdl;
|
||||
}
|
||||
|
||||
public static AtomicReference<CountDownLatch> getSecondaryCdl() {
|
||||
return secondaryCdl;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -170,6 +191,7 @@ public class TestReplicasClient {
|
|||
HTU.getConfiguration().setInt(
|
||||
StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, REFRESH_PERIOD);
|
||||
HTU.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
|
||||
HTU.getConfiguration().setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
|
||||
ConnectionUtils.setupMasterlessConnection(HTU.getConfiguration());
|
||||
HTU.startMiniCluster(NB_SERVERS);
|
||||
|
||||
|
@ -297,7 +319,7 @@ public class TestReplicasClient {
|
|||
public void testUseRegionWithoutReplica() throws Exception {
|
||||
byte[] b1 = "testUseRegionWithoutReplica".getBytes();
|
||||
openRegion(hriSecondary);
|
||||
SlowMeCopro.getCdl().set(new CountDownLatch(0));
|
||||
SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(0));
|
||||
try {
|
||||
Get g = new Get(b1);
|
||||
Result r = table.get(g);
|
||||
|
@ -353,14 +375,14 @@ public class TestReplicasClient {
|
|||
byte[] b1 = "testGetNoResultStaleRegionWithReplica".getBytes();
|
||||
openRegion(hriSecondary);
|
||||
|
||||
SlowMeCopro.getCdl().set(new CountDownLatch(1));
|
||||
SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
|
||||
try {
|
||||
Get g = new Get(b1);
|
||||
g.setConsistency(Consistency.TIMELINE);
|
||||
Result r = table.get(g);
|
||||
Assert.assertTrue(r.isStale());
|
||||
} finally {
|
||||
SlowMeCopro.getCdl().get().countDown();
|
||||
SlowMeCopro.getPrimaryCdl().get().countDown();
|
||||
closeRegion(hriSecondary);
|
||||
}
|
||||
}
|
||||
|
@ -470,13 +492,13 @@ public class TestReplicasClient {
|
|||
LOG.info("sleep and is not stale done");
|
||||
|
||||
// But if we ask for stale we will get it
|
||||
SlowMeCopro.getCdl().set(new CountDownLatch(1));
|
||||
SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
|
||||
g = new Get(b1);
|
||||
g.setConsistency(Consistency.TIMELINE);
|
||||
r = table.get(g);
|
||||
Assert.assertTrue(r.isStale());
|
||||
Assert.assertTrue(r.getColumnCells(f, b1).isEmpty());
|
||||
SlowMeCopro.getCdl().get().countDown();
|
||||
SlowMeCopro.getPrimaryCdl().get().countDown();
|
||||
|
||||
LOG.info("stale done");
|
||||
|
||||
|
@ -489,14 +511,14 @@ public class TestReplicasClient {
|
|||
LOG.info("exists not stale done");
|
||||
|
||||
// exists works on stale but don't see the put
|
||||
SlowMeCopro.getCdl().set(new CountDownLatch(1));
|
||||
SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
|
||||
g = new Get(b1);
|
||||
g.setCheckExistenceOnly(true);
|
||||
g.setConsistency(Consistency.TIMELINE);
|
||||
r = table.get(g);
|
||||
Assert.assertTrue(r.isStale());
|
||||
Assert.assertFalse("The secondary has stale data", r.getExists());
|
||||
SlowMeCopro.getCdl().get().countDown();
|
||||
SlowMeCopro.getPrimaryCdl().get().countDown();
|
||||
LOG.info("exists stale before flush done");
|
||||
|
||||
flushRegion(hriPrimary);
|
||||
|
@ -505,28 +527,93 @@ public class TestReplicasClient {
|
|||
Thread.sleep(1000 + REFRESH_PERIOD * 2);
|
||||
|
||||
// get works and is not stale
|
||||
SlowMeCopro.getCdl().set(new CountDownLatch(1));
|
||||
SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
|
||||
g = new Get(b1);
|
||||
g.setConsistency(Consistency.TIMELINE);
|
||||
r = table.get(g);
|
||||
Assert.assertTrue(r.isStale());
|
||||
Assert.assertFalse(r.isEmpty());
|
||||
SlowMeCopro.getCdl().get().countDown();
|
||||
SlowMeCopro.getPrimaryCdl().get().countDown();
|
||||
LOG.info("stale done");
|
||||
|
||||
// exists works on stale and we see the put after the flush
|
||||
SlowMeCopro.getCdl().set(new CountDownLatch(1));
|
||||
SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
|
||||
g = new Get(b1);
|
||||
g.setCheckExistenceOnly(true);
|
||||
g.setConsistency(Consistency.TIMELINE);
|
||||
r = table.get(g);
|
||||
Assert.assertTrue(r.isStale());
|
||||
Assert.assertTrue(r.getExists());
|
||||
SlowMeCopro.getCdl().get().countDown();
|
||||
SlowMeCopro.getPrimaryCdl().get().countDown();
|
||||
LOG.info("exists stale after flush done");
|
||||
|
||||
} finally {
|
||||
SlowMeCopro.getCdl().get().countDown();
|
||||
SlowMeCopro.getPrimaryCdl().get().countDown();
|
||||
SlowMeCopro.sleepTime.set(0);
|
||||
Delete d = new Delete(b1);
|
||||
table.delete(d);
|
||||
closeRegion(hriSecondary);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHedgedRead() throws Exception {
|
||||
byte[] b1 = "testHedgedRead".getBytes();
|
||||
openRegion(hriSecondary);
|
||||
|
||||
try {
|
||||
// A simple put works, even if there here a second replica
|
||||
Put p = new Put(b1);
|
||||
p.addColumn(f, b1, b1);
|
||||
table.put(p);
|
||||
LOG.info("Put done");
|
||||
|
||||
// A get works and is not stale
|
||||
Get g = new Get(b1);
|
||||
Result r = table.get(g);
|
||||
Assert.assertFalse(r.isStale());
|
||||
Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
|
||||
LOG.info("get works and is not stale done");
|
||||
|
||||
//reset
|
||||
ClusterConnection connection = (ClusterConnection) HTU.getConnection();
|
||||
Counter hedgedReadOps = connection.getConnectionMetrics().hedgedReadOps;
|
||||
Counter hedgedReadWin = connection.getConnectionMetrics().hedgedReadWin;
|
||||
hedgedReadOps.dec(hedgedReadOps.count());
|
||||
hedgedReadWin.dec(hedgedReadWin.count());
|
||||
|
||||
// Wait a little on the main region, just enough to happen once hedged read
|
||||
// and hedged read did not returned faster
|
||||
int primaryCallTimeoutMicroSecond = connection.getConnectionConfiguration().getPrimaryCallTimeoutMicroSecond();
|
||||
SlowMeCopro.sleepTime.set(TimeUnit.MICROSECONDS.toMillis(primaryCallTimeoutMicroSecond));
|
||||
SlowMeCopro.getSecondaryCdl().set(new CountDownLatch(1));
|
||||
g = new Get(b1);
|
||||
g.setConsistency(Consistency.TIMELINE);
|
||||
r = table.get(g);
|
||||
Assert.assertFalse(r.isStale());
|
||||
Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
|
||||
Assert.assertEquals(hedgedReadOps.count(), 1);
|
||||
Assert.assertEquals(hedgedReadWin.count(), 0);
|
||||
SlowMeCopro.sleepTime.set(0);
|
||||
SlowMeCopro.getSecondaryCdl().get().countDown();
|
||||
LOG.info("hedged read occurred but not faster");
|
||||
|
||||
|
||||
// But if we ask for stale we will get it and hedged read returned faster
|
||||
SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
|
||||
g = new Get(b1);
|
||||
g.setConsistency(Consistency.TIMELINE);
|
||||
r = table.get(g);
|
||||
Assert.assertTrue(r.isStale());
|
||||
Assert.assertTrue(r.getColumnCells(f, b1).isEmpty());
|
||||
Assert.assertEquals(hedgedReadOps.count(), 2);
|
||||
Assert.assertEquals(hedgedReadWin.count(), 1);
|
||||
SlowMeCopro.getPrimaryCdl().get().countDown();
|
||||
LOG.info("hedged read occurred and faster");
|
||||
|
||||
} finally {
|
||||
SlowMeCopro.getPrimaryCdl().get().countDown();
|
||||
SlowMeCopro.getSecondaryCdl().get().countDown();
|
||||
SlowMeCopro.sleepTime.set(0);
|
||||
Delete d = new Delete(b1);
|
||||
table.delete(d);
|
||||
|
@ -559,7 +646,7 @@ public class TestReplicasClient {
|
|||
.getAsyncProcess();
|
||||
|
||||
// Make primary slowdown
|
||||
SlowMeCopro.getCdl().set(new CountDownLatch(1));
|
||||
SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
|
||||
|
||||
List<Get> gets = new ArrayList<Get>();
|
||||
Get g = new Get(b1);
|
||||
|
@ -587,7 +674,7 @@ public class TestReplicasClient {
|
|||
Assert.assertTrue(m.isCancelled());
|
||||
}
|
||||
} finally {
|
||||
SlowMeCopro.getCdl().get().countDown();
|
||||
SlowMeCopro.getPrimaryCdl().get().countDown();
|
||||
SlowMeCopro.sleepTime.set(0);
|
||||
SlowMeCopro.slowDownNext.set(false);
|
||||
SlowMeCopro.countOfNext.set(0);
|
||||
|
@ -653,7 +740,7 @@ public class TestReplicasClient {
|
|||
SlowMeCopro.slowDownNext.set(false);
|
||||
SlowMeCopro.countOfNext.set(0);
|
||||
} finally {
|
||||
SlowMeCopro.cdl.get().countDown();
|
||||
SlowMeCopro.getPrimaryCdl().get().countDown();
|
||||
SlowMeCopro.sleepTime.set(0);
|
||||
SlowMeCopro.slowDownNext.set(false);
|
||||
SlowMeCopro.countOfNext.set(0);
|
||||
|
@ -734,7 +821,7 @@ public class TestReplicasClient {
|
|||
SlowMeCopro.slowDownNext.set(false);
|
||||
SlowMeCopro.countOfNext.set(0);
|
||||
} finally {
|
||||
SlowMeCopro.getCdl().get().countDown();
|
||||
SlowMeCopro.getPrimaryCdl().get().countDown();
|
||||
SlowMeCopro.sleepTime.set(0);
|
||||
SlowMeCopro.slowDownNext.set(false);
|
||||
SlowMeCopro.countOfNext.set(0);
|
||||
|
|
|
@ -1011,15 +1011,15 @@ public class TestSplitTransactionOnCluster {
|
|||
Assert.assertFalse(r.isStale());
|
||||
LOG.info("exists stale after flush done");
|
||||
|
||||
SlowMeCopro.getCdl().set(new CountDownLatch(1));
|
||||
SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
|
||||
g = new Get(b1);
|
||||
g.setConsistency(Consistency.TIMELINE);
|
||||
// This will succeed because in the previous GET we get the location of the replica
|
||||
r = t.get(g);
|
||||
Assert.assertTrue(r.isStale());
|
||||
SlowMeCopro.getCdl().get().countDown();
|
||||
SlowMeCopro.getPrimaryCdl().get().countDown();
|
||||
} finally {
|
||||
SlowMeCopro.getCdl().get().countDown();
|
||||
SlowMeCopro.getPrimaryCdl().get().countDown();
|
||||
admin.setBalancerRunning(true, false);
|
||||
cluster.getMaster().setCatalogJanitorEnabled(true);
|
||||
t.close();
|
||||
|
|
Loading…
Reference in New Issue