HBASE-25627: HBase replication should have a metric to represent if the source is stuck getting initialized (#3009)

Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
This commit is contained in:
Sandeep Pal 2021-03-22 22:55:04 -07:00 committed by GitHub
parent 716c685497
commit 97c152ea7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 162 additions and 8 deletions

View File

@ -50,8 +50,8 @@ public interface MetricsReplicationSourceSource extends BaseSource {
public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs"; public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs";
public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues"; public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues";
public static final String SOURCE_FAILED_RECOVERY_QUEUES = "source.failedRecoverQueues"; public static final String SOURCE_FAILED_RECOVERY_QUEUES = "source.failedRecoverQueues";
/* Used to track the age of oldest wal in ms since its creation time */ // This is to track the num of replication sources getting initialized
String OLDEST_WAL_AGE = "source.oldestWalAge"; public static final String SOURCE_INITIALIZING = "source.numInitializing";
void setLastShippedAge(long age); void setLastShippedAge(long age);
void incrSizeOfLogQueue(int size); void incrSizeOfLogQueue(int size);
@ -78,4 +78,7 @@ public interface MetricsReplicationSourceSource extends BaseSource {
void incrFailedRecoveryQueue(); void incrFailedRecoveryQueue();
void setOldestWalAge(long age); void setOldestWalAge(long age);
long getOldestWalAge(); long getOldestWalAge();
void incrSourceInitializing();
void decrSourceInitializing();
int getSourceInitializing();
} }

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.metrics.MetricRegistryInfo; import org.apache.hadoop.hbase.metrics.MetricRegistryInfo;
import org.apache.hadoop.metrics2.lib.MutableFastCounter; import org.apache.hadoop.metrics2.lib.MutableFastCounter;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableHistogram; import org.apache.hadoop.metrics2.lib.MutableHistogram;
@ -46,6 +47,7 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
private final MutableFastCounter completedWAL; private final MutableFastCounter completedWAL;
private final MutableFastCounter completedRecoveryQueue; private final MutableFastCounter completedRecoveryQueue;
private final MutableFastCounter failedRecoveryQueue; private final MutableFastCounter failedRecoveryQueue;
private final MutableGaugeInt sourceInitializing;
public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) { public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
this.rms = rms; this.rms = rms;
@ -82,6 +84,7 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
completedRecoveryQueue = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L); completedRecoveryQueue = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L);
failedRecoveryQueue = rms.getMetricsRegistry() failedRecoveryQueue = rms.getMetricsRegistry()
.getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L); .getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L);
sourceInitializing = rms.getMetricsRegistry().getGaugeInt(SOURCE_INITIALIZING, 0);
} }
@Override public void setLastShippedAge(long age) { @Override public void setLastShippedAge(long age) {
@ -208,6 +211,21 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
return 0; return 0;
} }
@Override
public void incrSourceInitializing() {
sourceInitializing.incr(1);
}
@Override
public void decrSourceInitializing() {
sourceInitializing.decr(1);
}
@Override
public int getSourceInitializing() {
return sourceInitializing.value();
}
@Override @Override
public void init() { public void init() {
rms.init(); rms.init();

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.metrics.MetricRegistryInfo; import org.apache.hadoop.hbase.metrics.MetricRegistryInfo;
import org.apache.hadoop.metrics2.lib.MutableFastCounter; import org.apache.hadoop.metrics2.lib.MutableFastCounter;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableHistogram; import org.apache.hadoop.metrics2.lib.MutableHistogram;
@ -41,6 +42,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final String shippedHFilesKey; private final String shippedHFilesKey;
private final String sizeOfHFileRefsQueueKey; private final String sizeOfHFileRefsQueueKey;
private final String oldestWalAgeKey; private final String oldestWalAgeKey;
private final String sourceInitializingKey;
private final MutableHistogram ageOfLastShippedOpHist; private final MutableHistogram ageOfLastShippedOpHist;
private final MutableGaugeLong sizeOfLogQueueGauge; private final MutableGaugeLong sizeOfLogQueueGauge;
@ -69,6 +71,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final MutableFastCounter completedWAL; private final MutableFastCounter completedWAL;
private final MutableFastCounter completedRecoveryQueue; private final MutableFastCounter completedRecoveryQueue;
private final MutableGaugeLong oldestWalAge; private final MutableGaugeLong oldestWalAge;
private final MutableGaugeInt sourceInitializing;
public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) { public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) {
this.rms = rms; this.rms = rms;
@ -131,6 +134,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
oldestWalAgeKey = this.keyPrefix + "oldestWalAge"; oldestWalAgeKey = this.keyPrefix + "oldestWalAge";
oldestWalAge = rms.getMetricsRegistry().getGauge(oldestWalAgeKey, 0L); oldestWalAge = rms.getMetricsRegistry().getGauge(oldestWalAgeKey, 0L);
sourceInitializingKey = this.keyPrefix + "isInitializing";
sourceInitializing = rms.getMetricsRegistry().getGaugeInt(sourceInitializingKey, 0);
} }
@Override public void setLastShippedAge(long age) { @Override public void setLastShippedAge(long age) {
@ -197,6 +203,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
rms.removeMetric(completedLogsKey); rms.removeMetric(completedLogsKey);
rms.removeMetric(completedRecoveryKey); rms.removeMetric(completedRecoveryKey);
rms.removeMetric(oldestWalAgeKey); rms.removeMetric(oldestWalAgeKey);
rms.removeMetric(sourceInitializingKey);
} }
@Override @Override
@ -270,6 +277,20 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
return oldestWalAge.value(); return oldestWalAge.value();
} }
@Override
public void incrSourceInitializing() {
sourceInitializing.incr(1);
}
@Override
public int getSourceInitializing() {
return sourceInitializing.value();
}
@Override public void decrSourceInitializing() {
sourceInitializing.decr(1);
}
@Override @Override
public void init() { public void init() {
rms.init(); rms.init();

View File

@ -449,6 +449,40 @@ public class DynamicMetricsRegistry {
return (MutableGaugeLong) metric; return (MutableGaugeLong) metric;
} }
/**
* Get a MetricMutableGaugeInt from the storage. If it is not there atomically put it.
*
* @param gaugeName name of the gauge to create or get.
* @param potentialStartingValue value of the new gauge if we have to create it.
*/
public MutableGaugeInt getGaugeInt(String gaugeName, int potentialStartingValue) {
//Try and get the guage.
MutableMetric metric = metricsMap.get(gaugeName);
//If it's not there then try and put a new one in the storage.
if (metric == null) {
//Create the potential new gauge.
MutableGaugeInt newGauge = new MutableGaugeInt(new MetricsInfoImpl(gaugeName, ""),
potentialStartingValue);
// Try and put the gauge in. This is atomic.
metric = metricsMap.putIfAbsent(gaugeName, newGauge);
//If the value we get back is null then the put was successful and we will return that.
//otherwise gaugeInt should contain the thing that was in before the put could be completed.
if (metric == null) {
return newGauge;
}
}
if (!(metric instanceof MutableGaugeInt)) {
throw new MetricsException("Metric already exists in registry for metric name: " + gaugeName +
" and not of type MetricMutableGaugeInr");
}
return (MutableGaugeInt) metric;
}
/** /**
* Get a MetricMutableCounterLong from the storage. If it is not there atomically put it. * Get a MetricMutableCounterLong from the storage. If it is not there atomically put it.
* *

View File

@ -22,10 +22,9 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.metrics.BaseSource; import org.apache.hadoop.hbase.metrics.BaseSource;
import org.apache.hadoop.hbase.metrics.MetricRegistryInfo; import org.apache.hadoop.hbase.metrics.MetricRegistryInfo;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -134,7 +133,8 @@ public class MetricsSource implements BaseSource {
* @param tableName String as group and tableName * @param tableName String as group and tableName
*/ */
public void setAgeOfLastShippedOpByTable(long timestamp, String tableName) { public void setAgeOfLastShippedOpByTable(long timestamp, String tableName) {
getSourceForTable(tableName).setLastShippedAge(EnvironmentEdgeManager.currentTime() - timestamp); getSourceForTable(tableName)
.setLastShippedAge(EnvironmentEdgeManager.currentTime() - timestamp);
} }
/** /**
@ -184,6 +184,22 @@ public class MetricsSource implements BaseSource {
globalSourceSource.decrSizeOfLogQueue(1); globalSourceSource.decrSizeOfLogQueue(1);
} }
/**
* Increment the count for initializing sources
*/
public void incrSourceInitializing() {
singleSourceSource.incrSourceInitializing();
globalSourceSource.incrSourceInitializing();
}
/**
* Decrement the count for initializing sources
*/
public void decrSourceInitializing() {
singleSourceSource.decrSourceInitializing();
globalSourceSource.decrSourceInitializing();
}
/** /**
* Add on the the number of log edits read * Add on the the number of log edits read
* *
@ -290,6 +306,14 @@ public class MetricsSource implements BaseSource {
return lastTimestamp; return lastTimestamp;
} }
/**
* Get the source initializing counts
* @return number of replication sources getting initialized
*/
public int getSourceInitializing() {
return singleSourceSource.getSourceInitializing();
}
/** /**
* Get the slave peer ID * Get the slave peer ID
* @return peerID * @return peerID

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service; import com.google.common.util.concurrent.Service;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -34,7 +33,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -119,6 +117,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
private int maxRetriesMultiplier; private int maxRetriesMultiplier;
// Indicates if this particular source is running // Indicates if this particular source is running
private volatile boolean sourceRunning = false; private volatile boolean sourceRunning = false;
// Indicates if the source initialization is in progress
private volatile boolean startupOngoing = false;
// Metrics for this source // Metrics for this source
private MetricsSource metrics; private MetricsSource metrics;
// ReplicationEndpoint which will handle the actual replication // ReplicationEndpoint which will handle the actual replication
@ -266,16 +266,19 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
public void run() { public void run() {
// mark we are running now // mark we are running now
this.sourceRunning = true; this.sourceRunning = true;
this.setSourceStartupStatus(true);
try { try {
// start the endpoint, connect to the cluster // start the endpoint, connect to the cluster
Service.State state = replicationEndpoint.start().get(); Service.State state = replicationEndpoint.start().get();
if (state != Service.State.RUNNING) { if (state != Service.State.RUNNING) {
LOG.warn("ReplicationEndpoint was not started. Exiting"); LOG.warn("ReplicationEndpoint was not started. Exiting");
uninitialize(); uninitialize();
this.setSourceStartupStatus(false);
return; return;
} }
} catch (Exception ex) { } catch (Exception ex) {
LOG.warn("Error starting ReplicationEndpoint, exiting", ex); LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
this.setSourceStartupStatus(false);
throw new RuntimeException(ex); throw new RuntimeException(ex);
} }
@ -300,6 +303,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
} }
if (!this.isSourceActive()) { if (!this.isSourceActive()) {
this.setSourceStartupStatus(false);
return; return;
} }
@ -310,6 +314,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
+ peerClusterId + " which is not allowed by ReplicationEndpoint:" + peerClusterId + " which is not allowed by ReplicationEndpoint:"
+ replicationEndpoint.getClass().getName(), null, false); + replicationEndpoint.getClass().getName(), null, false);
this.manager.closeQueue(this); this.manager.closeQueue(this);
this.setSourceStartupStatus(false);
return; return;
} }
LOG.info("Replicating " + clusterId + " -> " + peerClusterId); LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
@ -327,6 +332,16 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
worker.startup(); worker.startup();
} }
} }
this.setSourceStartupStatus(false);
}
private synchronized void setSourceStartupStatus(boolean initializing) {
startupOngoing = initializing;
if (initializing) {
metrics.incrSourceInitializing();
} else {
metrics.decrSourceInitializing();
}
} }
/** /**

View File

@ -419,6 +419,22 @@ public class TestReplicationEndpoint extends TestReplicationBase {
cells.get(0).getRowLength(), row, 0, row.length)); cells.get(0).getRowLength(), row, 0, row.length));
} }
/**
* Bad Endpoint with failing connection to peer on demand.
*/
public static class BadReplicationEndpoint extends ReplicationEndpointForTest {
static boolean failing = true;
public BadReplicationEndpoint() {
super();
}
@Override
public synchronized UUID getPeerUUID() {
return failing ? null : super.getPeerUUID();
}
}
public static class ReplicationEndpointForTest extends BaseReplicationEndpoint { public static class ReplicationEndpointForTest extends BaseReplicationEndpoint {
static UUID uuid = UUID.randomUUID(); static UUID uuid = UUID.randomUUID();
static AtomicInteger contructedCount = new AtomicInteger(); static AtomicInteger contructedCount = new AtomicInteger();

View File

@ -863,7 +863,7 @@ public class TestReplicationSource {
} }
} }
/* /**
Test age of oldest wal metric. Test age of oldest wal metric.
*/ */
@Test @Test
@ -898,6 +898,29 @@ public class TestReplicationSource {
} }
} }
@Test
public void testReplicationSourceInitializingMetric() throws Exception {
String id = "1";
MetricsSource metrics = Mockito.spy(new MetricsSource(id));
Mocks mocks = new Mocks();
ReplicationSource source = mocks.createReplicationSourceWithMocks(metrics,
new TestReplicationEndpoint.BadReplicationEndpoint());
source.startup();
final MetricsReplicationSourceSource metricsSource1 = getSourceMetrics(id);
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() {
return metricsSource1.getSourceInitializing() == 1;
}
});
TestReplicationEndpoint.BadReplicationEndpoint.failing = false;
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() {
return metricsSource1.getSourceInitializing() == 0;
}
});
metrics.clear();
}
private MetricsReplicationSourceSource getSourceMetrics(String sourceId) { private MetricsReplicationSourceSource getSourceMetrics(String sourceId) {
MetricsReplicationSourceFactory factory = CompatibilitySingletonFactory MetricsReplicationSourceFactory factory = CompatibilitySingletonFactory
.getInstance(MetricsReplicationSourceFactory.class); .getInstance(MetricsReplicationSourceFactory.class);