From 97c152ea7a664650423cc60250bc9bbd06a8ff00 Mon Sep 17 00:00:00 2001 From: Sandeep Pal <50725353+sandeepvinayak@users.noreply.github.com> Date: Mon, 22 Mar 2021 22:55:04 -0700 Subject: [PATCH] HBASE-25627: HBase replication should have a metric to represent if the source is stuck getting initialized (#3009) Signed-off-by: Bharath Vissapragada --- .../MetricsReplicationSourceSource.java | 7 ++-- .../MetricsReplicationGlobalSourceSource.java | 18 ++++++++++ .../MetricsReplicationSourceSourceImpl.java | 21 ++++++++++++ .../metrics2/lib/DynamicMetricsRegistry.java | 34 +++++++++++++++++++ .../regionserver/MetricsSource.java | 30 ++++++++++++++-- .../regionserver/ReplicationSource.java | 19 +++++++++-- .../replication/TestReplicationEndpoint.java | 16 +++++++++ .../replication/TestReplicationSource.java | 25 +++++++++++++- 8 files changed, 162 insertions(+), 8 deletions(-) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java index a7cea254468..b997338d3a2 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java @@ -50,8 +50,8 @@ public interface MetricsReplicationSourceSource extends BaseSource { public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs"; public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues"; public static final String SOURCE_FAILED_RECOVERY_QUEUES = "source.failedRecoverQueues"; - /* Used to track the age of oldest wal in ms since its creation time */ - String OLDEST_WAL_AGE = "source.oldestWalAge"; + // This is to track the num of replication sources getting initialized + public static final String SOURCE_INITIALIZING = "source.numInitializing"; void setLastShippedAge(long age); void incrSizeOfLogQueue(int size); @@ -78,4 +78,7 @@ public interface MetricsReplicationSourceSource extends BaseSource { void incrFailedRecoveryQueue(); void setOldestWalAge(long age); long getOldestWalAge(); + void incrSourceInitializing(); + void decrSourceInitializing(); + int getSourceInitializing(); } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java index df774d3100f..ccdf1be6714 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import org.apache.hadoop.hbase.metrics.MetricRegistryInfo; import org.apache.hadoop.metrics2.lib.MutableFastCounter; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.metrics2.lib.MutableHistogram; @@ -46,6 +47,7 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS private final MutableFastCounter completedWAL; private final MutableFastCounter completedRecoveryQueue; private final MutableFastCounter failedRecoveryQueue; + private final MutableGaugeInt sourceInitializing; public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) { this.rms = rms; @@ -82,6 +84,7 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS completedRecoveryQueue = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L); failedRecoveryQueue = rms.getMetricsRegistry() .getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L); + sourceInitializing = rms.getMetricsRegistry().getGaugeInt(SOURCE_INITIALIZING, 0); } @Override public void setLastShippedAge(long age) { @@ -208,6 +211,21 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS return 0; } + @Override + public void incrSourceInitializing() { + sourceInitializing.incr(1); + } + + @Override + public void decrSourceInitializing() { + sourceInitializing.decr(1); + } + + @Override + public int getSourceInitializing() { + return sourceInitializing.value(); + } + @Override public void init() { rms.init(); diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java index c593950eba1..5662ee99e6b 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import org.apache.hadoop.hbase.metrics.MetricRegistryInfo; import org.apache.hadoop.metrics2.lib.MutableFastCounter; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.metrics2.lib.MutableHistogram; @@ -41,6 +42,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou private final String shippedHFilesKey; private final String sizeOfHFileRefsQueueKey; private final String oldestWalAgeKey; + private final String sourceInitializingKey; private final MutableHistogram ageOfLastShippedOpHist; private final MutableGaugeLong sizeOfLogQueueGauge; @@ -69,6 +71,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou private final MutableFastCounter completedWAL; private final MutableFastCounter completedRecoveryQueue; private final MutableGaugeLong oldestWalAge; + private final MutableGaugeInt sourceInitializing; public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) { this.rms = rms; @@ -131,6 +134,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou oldestWalAgeKey = this.keyPrefix + "oldestWalAge"; oldestWalAge = rms.getMetricsRegistry().getGauge(oldestWalAgeKey, 0L); + + sourceInitializingKey = this.keyPrefix + "isInitializing"; + sourceInitializing = rms.getMetricsRegistry().getGaugeInt(sourceInitializingKey, 0); } @Override public void setLastShippedAge(long age) { @@ -197,6 +203,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou rms.removeMetric(completedLogsKey); rms.removeMetric(completedRecoveryKey); rms.removeMetric(oldestWalAgeKey); + rms.removeMetric(sourceInitializingKey); } @Override @@ -270,6 +277,20 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou return oldestWalAge.value(); } + @Override + public void incrSourceInitializing() { + sourceInitializing.incr(1); + } + + @Override + public int getSourceInitializing() { + return sourceInitializing.value(); + } + + @Override public void decrSourceInitializing() { + sourceInitializing.decr(1); + } + @Override public void init() { rms.init(); diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java index 977536ae87d..3840fe4e372 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java @@ -449,6 +449,40 @@ public class DynamicMetricsRegistry { return (MutableGaugeLong) metric; } + /** + * Get a MetricMutableGaugeInt from the storage. If it is not there atomically put it. + * + * @param gaugeName name of the gauge to create or get. + * @param potentialStartingValue value of the new gauge if we have to create it. + */ + public MutableGaugeInt getGaugeInt(String gaugeName, int potentialStartingValue) { + //Try and get the guage. + MutableMetric metric = metricsMap.get(gaugeName); + + //If it's not there then try and put a new one in the storage. + if (metric == null) { + //Create the potential new gauge. + MutableGaugeInt newGauge = new MutableGaugeInt(new MetricsInfoImpl(gaugeName, ""), + potentialStartingValue); + + // Try and put the gauge in. This is atomic. + metric = metricsMap.putIfAbsent(gaugeName, newGauge); + + //If the value we get back is null then the put was successful and we will return that. + //otherwise gaugeInt should contain the thing that was in before the put could be completed. + if (metric == null) { + return newGauge; + } + } + + if (!(metric instanceof MutableGaugeInt)) { + throw new MetricsException("Metric already exists in registry for metric name: " + gaugeName + + " and not of type MetricMutableGaugeInr"); + } + + return (MutableGaugeInt) metric; + } + /** * Get a MetricMutableCounterLong from the storage. If it is not there atomically put it. * diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 83bc6534432..308a7465df0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -22,10 +22,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.metrics.BaseSource; import org.apache.hadoop.hbase.metrics.MetricRegistryInfo; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -134,7 +133,8 @@ public class MetricsSource implements BaseSource { * @param tableName String as group and tableName */ public void setAgeOfLastShippedOpByTable(long timestamp, String tableName) { - getSourceForTable(tableName).setLastShippedAge(EnvironmentEdgeManager.currentTime() - timestamp); + getSourceForTable(tableName) + .setLastShippedAge(EnvironmentEdgeManager.currentTime() - timestamp); } /** @@ -184,6 +184,22 @@ public class MetricsSource implements BaseSource { globalSourceSource.decrSizeOfLogQueue(1); } + /** + * Increment the count for initializing sources + */ + public void incrSourceInitializing() { + singleSourceSource.incrSourceInitializing(); + globalSourceSource.incrSourceInitializing(); + } + + /** + * Decrement the count for initializing sources + */ + public void decrSourceInitializing() { + singleSourceSource.decrSourceInitializing(); + globalSourceSource.decrSourceInitializing(); + } + /** * Add on the the number of log edits read * @@ -290,6 +306,14 @@ public class MetricsSource implements BaseSource { return lastTimestamp; } + /** + * Get the source initializing counts + * @return number of replication sources getting initialized + */ + public int getSourceInitializing() { + return singleSourceSource.getSourceInitializing(); + } + /** * Get the slave peer ID * @return peerID diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index de3b7f64811..a121e651934 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Service; - import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -34,7 +33,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; - import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -119,6 +117,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf private int maxRetriesMultiplier; // Indicates if this particular source is running private volatile boolean sourceRunning = false; + // Indicates if the source initialization is in progress + private volatile boolean startupOngoing = false; // Metrics for this source private MetricsSource metrics; // ReplicationEndpoint which will handle the actual replication @@ -266,16 +266,19 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf public void run() { // mark we are running now this.sourceRunning = true; + this.setSourceStartupStatus(true); try { // start the endpoint, connect to the cluster Service.State state = replicationEndpoint.start().get(); if (state != Service.State.RUNNING) { LOG.warn("ReplicationEndpoint was not started. Exiting"); uninitialize(); + this.setSourceStartupStatus(false); return; } } catch (Exception ex) { LOG.warn("Error starting ReplicationEndpoint, exiting", ex); + this.setSourceStartupStatus(false); throw new RuntimeException(ex); } @@ -300,6 +303,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf } if (!this.isSourceActive()) { + this.setSourceStartupStatus(false); return; } @@ -310,6 +314,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf + peerClusterId + " which is not allowed by ReplicationEndpoint:" + replicationEndpoint.getClass().getName(), null, false); this.manager.closeQueue(this); + this.setSourceStartupStatus(false); return; } LOG.info("Replicating " + clusterId + " -> " + peerClusterId); @@ -327,6 +332,16 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf worker.startup(); } } + this.setSourceStartupStatus(false); + } + + private synchronized void setSourceStartupStatus(boolean initializing) { + startupOngoing = initializing; + if (initializing) { + metrics.incrSourceInitializing(); + } else { + metrics.decrSourceInitializing(); + } } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index 3819877fbc8..9787120fbc6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -419,6 +419,22 @@ public class TestReplicationEndpoint extends TestReplicationBase { cells.get(0).getRowLength(), row, 0, row.length)); } + /** + * Bad Endpoint with failing connection to peer on demand. + */ + public static class BadReplicationEndpoint extends ReplicationEndpointForTest { + static boolean failing = true; + + public BadReplicationEndpoint() { + super(); + } + + @Override + public synchronized UUID getPeerUUID() { + return failing ? null : super.getPeerUUID(); + } + } + public static class ReplicationEndpointForTest extends BaseReplicationEndpoint { static UUID uuid = UUID.randomUUID(); static AtomicInteger contructedCount = new AtomicInteger(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java index e7ff58f0361..ca09aa82847 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java @@ -863,7 +863,7 @@ public class TestReplicationSource { } } - /* + /** Test age of oldest wal metric. */ @Test @@ -898,6 +898,29 @@ public class TestReplicationSource { } } + @Test + public void testReplicationSourceInitializingMetric() throws Exception { + String id = "1"; + MetricsSource metrics = Mockito.spy(new MetricsSource(id)); + Mocks mocks = new Mocks(); + ReplicationSource source = mocks.createReplicationSourceWithMocks(metrics, + new TestReplicationEndpoint.BadReplicationEndpoint()); + source.startup(); + final MetricsReplicationSourceSource metricsSource1 = getSourceMetrics(id); + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() { + return metricsSource1.getSourceInitializing() == 1; + } + }); + TestReplicationEndpoint.BadReplicationEndpoint.failing = false; + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() { + return metricsSource1.getSourceInitializing() == 0; + } + }); + metrics.clear(); + } + private MetricsReplicationSourceSource getSourceMetrics(String sourceId) { MetricsReplicationSourceFactory factory = CompatibilitySingletonFactory .getInstance(MetricsReplicationSourceFactory.class);