HBASE-25627: HBase replication should have a metric to represent if the source is stuck getting initialized (#3018)

Introduces a new metric that tracks number of replication sources that are stuck in initialization.

Signed-off-by: Xu Cang <xucang@apache.org>
Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
(cherry picked from commit ff3821814a)
This commit is contained in:
Sandeep Pal 2021-03-17 09:10:44 -07:00 committed by Bharath Vissapragada
parent a61e9b963d
commit 72496272aa
No known key found for this signature in database
GPG Key ID: 18AE42A0B5A93FA7
7 changed files with 155 additions and 17 deletions

View File

@ -52,8 +52,8 @@ public interface MetricsReplicationSourceSource extends BaseSource {
public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs"; public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs";
public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues"; public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues";
public static final String SOURCE_FAILED_RECOVERY_QUEUES = "source.failedRecoverQueues"; public static final String SOURCE_FAILED_RECOVERY_QUEUES = "source.failedRecoverQueues";
/* Used to track the age of oldest wal in ms since its creation time */ // This is to track the num of replication sources getting initialized
String OLDEST_WAL_AGE = "source.oldestWalAge"; public static final String SOURCE_INITIALIZING = "source.numInitializing";
void setLastShippedAge(long age); void setLastShippedAge(long age);
void incrSizeOfLogQueue(int size); void incrSizeOfLogQueue(int size);
@ -83,4 +83,7 @@ public interface MetricsReplicationSourceSource extends BaseSource {
long getEditsFiltered(); long getEditsFiltered();
void setOldestWalAge(long age); void setOldestWalAge(long age);
long getOldestWalAge(); long getOldestWalAge();
void incrSourceInitializing();
void decrSourceInitializing();
int getSourceInitializing();
} }

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.replication.regionserver; package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.metrics2.lib.MutableFastCounter; import org.apache.hadoop.metrics2.lib.MutableFastCounter;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableHistogram; import org.apache.hadoop.metrics2.lib.MutableHistogram;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -55,6 +56,7 @@ public class MetricsReplicationGlobalSourceSourceImpl
private final MutableFastCounter completedRecoveryQueue; private final MutableFastCounter completedRecoveryQueue;
private final MutableFastCounter failedRecoveryQueue; private final MutableFastCounter failedRecoveryQueue;
private final MutableGaugeLong walReaderBufferUsageBytes; private final MutableGaugeLong walReaderBufferUsageBytes;
private final MutableGaugeInt sourceInitializing;
public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms) { public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms) {
this.rms = rms; this.rms = rms;
@ -97,6 +99,7 @@ public class MetricsReplicationGlobalSourceSourceImpl
walReaderBufferUsageBytes = rms.getMetricsRegistry() walReaderBufferUsageBytes = rms.getMetricsRegistry()
.getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L); .getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L);
sourceInitializing = rms.getMetricsRegistry().getGaugeInt(SOURCE_INITIALIZING, 0);
} }
@Override public void setLastShippedAge(long age) { @Override public void setLastShippedAge(long age) {
@ -222,6 +225,21 @@ public class MetricsReplicationGlobalSourceSourceImpl
return 0; return 0;
} }
@Override
public void incrSourceInitializing() {
sourceInitializing.incr(1);
}
@Override
public void decrSourceInitializing() {
sourceInitializing.decr(1);
}
@Override
public int getSourceInitializing() {
return sourceInitializing.value();
}
@Override @Override
public void init() { public void init() {
rms.init(); rms.init();

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.replication.regionserver; package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.metrics2.lib.MutableFastCounter; import org.apache.hadoop.metrics2.lib.MutableFastCounter;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableHistogram; import org.apache.hadoop.metrics2.lib.MutableHistogram;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -45,6 +46,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final String shippedHFilesKey; private final String shippedHFilesKey;
private final String sizeOfHFileRefsQueueKey; private final String sizeOfHFileRefsQueueKey;
private final String oldestWalAgeKey; private final String oldestWalAgeKey;
private final String sourceInitializingKey;
private final MutableHistogram ageOfLastShippedOpHist; private final MutableHistogram ageOfLastShippedOpHist;
private final MutableGaugeLong sizeOfLogQueueGauge; private final MutableGaugeLong sizeOfLogQueueGauge;
@ -73,6 +75,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final MutableFastCounter completedWAL; private final MutableFastCounter completedWAL;
private final MutableFastCounter completedRecoveryQueue; private final MutableFastCounter completedRecoveryQueue;
private final MutableGaugeLong oldestWalAge; private final MutableGaugeLong oldestWalAge;
private final MutableGaugeInt sourceInitializing;
public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) { public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) {
this.rms = rms; this.rms = rms;
@ -135,6 +138,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
oldestWalAgeKey = this.keyPrefix + "oldestWalAge"; oldestWalAgeKey = this.keyPrefix + "oldestWalAge";
oldestWalAge = rms.getMetricsRegistry().getGauge(oldestWalAgeKey, 0L); oldestWalAge = rms.getMetricsRegistry().getGauge(oldestWalAgeKey, 0L);
sourceInitializingKey = this.keyPrefix + "isInitializing";
sourceInitializing = rms.getMetricsRegistry().getGaugeInt(sourceInitializingKey, 0);
} }
@Override public void setLastShippedAge(long age) { @Override public void setLastShippedAge(long age) {
@ -201,6 +207,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
rms.removeMetric(completedLogsKey); rms.removeMetric(completedLogsKey);
rms.removeMetric(completedRecoveryKey); rms.removeMetric(completedRecoveryKey);
rms.removeMetric(oldestWalAgeKey); rms.removeMetric(oldestWalAgeKey);
rms.removeMetric(sourceInitializingKey);
} }
@Override @Override
@ -274,6 +281,20 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
return oldestWalAge.value(); return oldestWalAge.value();
} }
@Override
public void incrSourceInitializing() {
sourceInitializing.incr(1);
}
@Override
public int getSourceInitializing() {
return sourceInitializing.value();
}
@Override public void decrSourceInitializing() {
sourceInitializing.decr(1);
}
@Override @Override
public void init() { public void init() {
rms.init(); rms.init();

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.metrics2.lib;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.hbase.metrics.Interns; import org.apache.hadoop.hbase.metrics.Interns;
import org.apache.hadoop.metrics2.MetricsException; import org.apache.hadoop.metrics2.MetricsException;
import org.apache.hadoop.metrics2.MetricsInfo; import org.apache.hadoop.metrics2.MetricsInfo;
@ -30,7 +29,6 @@ import org.apache.hadoop.metrics2.impl.MsInfo;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps; import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
@ -452,6 +450,40 @@ public class DynamicMetricsRegistry {
return (MutableGaugeLong) metric; return (MutableGaugeLong) metric;
} }
/**
* Get a MetricMutableGaugeInt from the storage. If it is not there atomically put it.
*
* @param gaugeName name of the gauge to create or get.
* @param potentialStartingValue value of the new gauge if we have to create it.
*/
public MutableGaugeInt getGaugeInt(String gaugeName, int potentialStartingValue) {
//Try and get the guage.
MutableMetric metric = metricsMap.get(gaugeName);
//If it's not there then try and put a new one in the storage.
if (metric == null) {
//Create the potential new gauge.
MutableGaugeInt newGauge = new MutableGaugeInt(new MetricsInfoImpl(gaugeName, ""),
potentialStartingValue);
// Try and put the gauge in. This is atomic.
metric = metricsMap.putIfAbsent(gaugeName, newGauge);
//If the value we get back is null then the put was successful and we will return that.
//otherwise gaugeInt should contain the thing that was in before the put could be completed.
if (metric == null) {
return newGauge;
}
}
if (!(metric instanceof MutableGaugeInt)) {
throw new MetricsException("Metric already exists in registry for metric name: " + gaugeName +
" and not of type MetricMutableGaugeInr");
}
return (MutableGaugeInt) metric;
}
/** /**
* Get a MetricMutableCounterLong from the storage. If it is not there atomically put it. * Get a MetricMutableCounterLong from the storage. If it is not there atomically put it.
* *

View File

@ -21,13 +21,12 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.metrics.BaseSource; import org.apache.hadoop.hbase.metrics.BaseSource;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -62,7 +61,8 @@ public class MetricsSource implements BaseSource {
singleSourceSource = singleSourceSource =
CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
.getSource(id); .getSource(id);
globalSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource(); globalSourceSource = CompatibilitySingletonFactory
.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
singleSourceSourceByTable = new HashMap<>(); singleSourceSourceByTable = new HashMap<>();
} }
@ -168,6 +168,22 @@ public class MetricsSource implements BaseSource {
globalSourceSource.decrSizeOfLogQueue(1); globalSourceSource.decrSizeOfLogQueue(1);
} }
/**
* Increment the count for initializing sources
*/
public void incrSourceInitializing() {
singleSourceSource.incrSourceInitializing();
globalSourceSource.incrSourceInitializing();
}
/**
* Decrement the count for initializing sources
*/
public void decrSourceInitializing() {
singleSourceSource.decrSourceInitializing();
globalSourceSource.decrSourceInitializing();
}
/** /**
* Add on the the number of log edits read * Add on the the number of log edits read
* *
@ -335,6 +351,14 @@ public class MetricsSource implements BaseSource {
} }
} }
/**
* Get the source initializing counts
* @return number of replication sources getting initialized
*/
public int getSourceInitializing() {
return singleSourceSource.getSourceInitializing();
}
/** /**
* Get the slave peer ID * Get the slave peer ID
* @return peerID * @return peerID

View File

@ -548,7 +548,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
sleepMultiplier++; sleepMultiplier++;
} else { } else {
retryStartup.set(!this.abortOnError); retryStartup.set(!this.abortOnError);
this.startupOngoing.set(false); setSourceStartupStatus(false);
throw new RuntimeException("Exhausted retries to start replication endpoint."); throw new RuntimeException("Exhausted retries to start replication endpoint.");
} }
} }
@ -556,7 +556,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
if (!this.isSourceActive()) { if (!this.isSourceActive()) {
retryStartup.set(!this.abortOnError); retryStartup.set(!this.abortOnError);
this.startupOngoing.set(false); setSourceStartupStatus(false);
throw new IllegalStateException("Source should be active."); throw new IllegalStateException("Source should be active.");
} }
@ -580,7 +580,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
if(!this.isSourceActive()) { if(!this.isSourceActive()) {
retryStartup.set(!this.abortOnError); retryStartup.set(!this.abortOnError);
this.startupOngoing.set(false); setSourceStartupStatus(false);
throw new IllegalStateException("Source should be active."); throw new IllegalStateException("Source should be active.");
} }
LOG.info("{} queueId={} (queues={}) is replicating from cluster={} to cluster={}", LOG.info("{} queueId={} (queues={}) is replicating from cluster={} to cluster={}",
@ -591,7 +591,16 @@ public class ReplicationSource implements ReplicationSourceInterface {
for (String walGroupId: logQueue.getQueues().keySet()) { for (String walGroupId: logQueue.getQueues().keySet()) {
tryStartNewShipper(walGroupId); tryStartNewShipper(walGroupId);
} }
this.startupOngoing.set(false); setSourceStartupStatus(false);
}
private synchronized void setSourceStartupStatus(boolean initializing) {
startupOngoing.set(initializing);
if (initializing) {
metrics.incrSourceInitializing();
} else {
metrics.decrSourceInitializing();
}
} }
@Override @Override
@ -600,7 +609,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
return this; return this;
} }
this.sourceRunning = true; this.sourceRunning = true;
startupOngoing.set(true); setSourceStartupStatus(true);
initThread = new Thread(this::initialize); initThread = new Thread(this::initialize);
Threads.setDaemonThreadRunning(initThread, Threads.setDaemonThreadRunning(initThread,
Thread.currentThread().getName() + ".replicationSource," + this.queueId, Thread.currentThread().getName() + ".replicationSource," + this.queueId,
@ -614,12 +623,12 @@ public class ReplicationSource implements ReplicationSourceInterface {
do { do {
if(retryStartup.get()) { if(retryStartup.get()) {
this.sourceRunning = true; this.sourceRunning = true;
startupOngoing.set(true); setSourceStartupStatus(true);
retryStartup.set(false); retryStartup.set(false);
try { try {
initialize(); initialize();
} catch(Throwable error){ } catch(Throwable error){
sourceRunning = false; setSourceStartupStatus(false);
uncaughtException(t, error, null, null); uncaughtException(t, error, null, null);
retryStartup.set(!this.abortOnError); retryStartup.set(!this.abortOnError);
} }

View File

@ -467,6 +467,18 @@ public class TestReplicationSource {
} }
/**
* Bad Endpoint with failing connection to peer on demand.
*/
public static class BadReplicationEndpoint extends DoNothingReplicationEndpoint {
static boolean failing = true;
@Override
public synchronized UUID getPeerUUID() {
return failing ? null : super.getPeerUUID();
}
}
public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint { public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint {
static int count = 0; static int count = 0;
@ -556,6 +568,25 @@ public class TestReplicationSource {
} }
} }
@Test
public void testReplicationSourceInitializingMetric() throws IOException {
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.setBoolean("replication.source.regionserver.abort", false);
ReplicationSource rs = new ReplicationSource();
RegionServerServices rss = setupForAbortTests(rs, conf,
BadReplicationEndpoint.class.getName());
try {
rs.startup();
assertTrue(rs.isSourceActive());
Waiter.waitFor(conf, 1000, () -> rs.getSourceMetrics().getSourceInitializing() == 1);
BadReplicationEndpoint.failing = false;
Waiter.waitFor(conf, 1000, () -> rs.getSourceMetrics().getSourceInitializing() == 0);
} finally {
rs.terminate("Done");
rss.stop("Done");
}
}
/** /**
* Test ReplicationSource keeps retrying startup indefinitely without blocking the main thread, * Test ReplicationSource keeps retrying startup indefinitely without blocking the main thread,
* when <b>eplication.source.regionserver.abort</b> is set to false. * when <b>eplication.source.regionserver.abort</b> is set to false.