HBASE-18549 Add metrics for failed replication queue recovery
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
79ec1a1fd8
commit
8a5537b5f5
|
@ -51,6 +51,7 @@ public interface MetricsReplicationSourceSource extends BaseSource {
|
||||||
public static final String SOURCE_REPEATED_LOG_FILE_BYTES = "source.repeatedLogFileBytes";
|
public static final String SOURCE_REPEATED_LOG_FILE_BYTES = "source.repeatedLogFileBytes";
|
||||||
public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs";
|
public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs";
|
||||||
public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues";
|
public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues";
|
||||||
|
public static final String SOURCE_FAILED_RECOVERY_QUEUES = "source.failedRecoverQueues";
|
||||||
|
|
||||||
void setLastShippedAge(long age);
|
void setLastShippedAge(long age);
|
||||||
void incrSizeOfLogQueue(int size);
|
void incrSizeOfLogQueue(int size);
|
||||||
|
@ -74,4 +75,5 @@ public interface MetricsReplicationSourceSource extends BaseSource {
|
||||||
void incrRepeatedFileBytes(final long bytes);
|
void incrRepeatedFileBytes(final long bytes);
|
||||||
void incrCompletedWAL();
|
void incrCompletedWAL();
|
||||||
void incrCompletedRecoveryQueue();
|
void incrCompletedRecoveryQueue();
|
||||||
|
void incrFailedRecoveryQueue();
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,6 +52,7 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
|
||||||
private final MutableFastCounter repeatedFileBytes;
|
private final MutableFastCounter repeatedFileBytes;
|
||||||
private final MutableFastCounter completedWAL;
|
private final MutableFastCounter completedWAL;
|
||||||
private final MutableFastCounter completedRecoveryQueue;
|
private final MutableFastCounter completedRecoveryQueue;
|
||||||
|
private final MutableFastCounter failedRecoveryQueue;
|
||||||
|
|
||||||
public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
|
public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
|
||||||
this.rms = rms;
|
this.rms = rms;
|
||||||
|
@ -89,6 +90,8 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
|
||||||
completedWAL = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_LOGS, 0L);
|
completedWAL = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_LOGS, 0L);
|
||||||
completedRecoveryQueue = rms.getMetricsRegistry()
|
completedRecoveryQueue = rms.getMetricsRegistry()
|
||||||
.getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L);
|
.getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L);
|
||||||
|
failedRecoveryQueue = rms.getMetricsRegistry()
|
||||||
|
.getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public void setLastShippedAge(long age) {
|
@Override public void setLastShippedAge(long age) {
|
||||||
|
@ -199,7 +202,10 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
|
||||||
public void incrCompletedRecoveryQueue() {
|
public void incrCompletedRecoveryQueue() {
|
||||||
completedRecoveryQueue.incr(1L);
|
completedRecoveryQueue.incr(1L);
|
||||||
}
|
}
|
||||||
|
@Override
|
||||||
|
public void incrFailedRecoveryQueue() {
|
||||||
|
failedRecoveryQueue.incr(1L);
|
||||||
|
}
|
||||||
@Override
|
@Override
|
||||||
public void init() {
|
public void init() {
|
||||||
rms.init();
|
rms.init();
|
||||||
|
|
|
@ -257,6 +257,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
|
||||||
completedRecoveryQueue.incr(1L);
|
completedRecoveryQueue.incr(1L);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void incrFailedRecoveryQueue() {/*no op*/}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init() {
|
public void init() {
|
||||||
rms.init();
|
rms.init();
|
||||||
|
|
|
@ -202,4 +202,11 @@ public interface ReplicationQueueStorage {
|
||||||
* created hfile references during the call may not be included.
|
* created hfile references during the call may not be included.
|
||||||
*/
|
*/
|
||||||
Set<String> getAllHFileRefs() throws ReplicationException;
|
Set<String> getAllHFileRefs() throws ReplicationException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get full znode name for given region server
|
||||||
|
* @param serverName the name of the region server
|
||||||
|
* @return full znode name
|
||||||
|
*/
|
||||||
|
String getRsNode(ServerName serverName);
|
||||||
}
|
}
|
||||||
|
|
|
@ -118,7 +118,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
|
||||||
.get(ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY, ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT));
|
.get(ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY, ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT));
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getRsNode(ServerName serverName) {
|
@Override
|
||||||
|
public String getRsNode(ServerName serverName) {
|
||||||
return ZNodePaths.joinZNode(queuesZNode, serverName.getServerName());
|
return ZNodePaths.joinZNode(queuesZNode, serverName.getServerName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -326,6 +326,10 @@ public class MetricsSource implements BaseSource {
|
||||||
globalSourceSource.incrCompletedRecoveryQueue();
|
globalSourceSource.incrCompletedRecoveryQueue();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void incrFailedRecoveryQueue() {
|
||||||
|
globalSourceSource.incrFailedRecoveryQueue();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init() {
|
public void init() {
|
||||||
singleSourceSource.init();
|
singleSourceSource.init();
|
||||||
|
|
|
@ -43,6 +43,7 @@ import java.util.stream.Collectors;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.Server;
|
import org.apache.hadoop.hbase.Server;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
@ -635,6 +636,8 @@ public class ReplicationSourceManager implements ReplicationListener {
|
||||||
try {
|
try {
|
||||||
this.executor.execute(transfer);
|
this.executor.execute(transfer);
|
||||||
} catch (RejectedExecutionException ex) {
|
} catch (RejectedExecutionException ex) {
|
||||||
|
CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
|
||||||
|
.getGlobalSource().incrFailedRecoveryQueue();
|
||||||
LOG.info("Cancelling the transfer of " + deadRS + " because of " + ex.getMessage());
|
LOG.info("Cancelling the transfer of " + deadRS + " because of " + ex.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -706,7 +709,12 @@ public class ReplicationSourceManager implements ReplicationListener {
|
||||||
queueStorage.removeReplicatorIfQueueIsEmpty(deadRS);
|
queueStorage.removeReplicatorIfQueueIsEmpty(deadRS);
|
||||||
}
|
}
|
||||||
} catch (ReplicationException e) {
|
} catch (ReplicationException e) {
|
||||||
server.abort("Failed to claim queue from dead regionserver", e);
|
LOG.error(String.format("ReplicationException: cannot claim dead region (%s)'s " +
|
||||||
|
"replication queue. Znode : (%s)" +
|
||||||
|
" Possible solution: check if znode size exceeds jute.maxBuffer value. " +
|
||||||
|
" If so, increase it for both client and server side." + e), deadRS,
|
||||||
|
queueStorage.getRsNode(deadRS));
|
||||||
|
server.abort("Failed to claim queue from dead regionserver.", e);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// Copying over the failed queue is completed.
|
// Copying over the failed queue is completed.
|
||||||
|
|
|
@ -17,7 +17,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.replication;
|
package org.apache.hadoop.hbase.replication;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.doNothing;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
@ -316,11 +318,17 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
||||||
MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class);
|
MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class);
|
||||||
when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
|
when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
|
||||||
|
|
||||||
|
|
||||||
MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id);
|
MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id);
|
||||||
MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms);
|
MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms);
|
||||||
|
MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource);
|
||||||
|
doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
|
||||||
|
|
||||||
Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable = new HashMap<>();
|
Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable = new HashMap<>();
|
||||||
MetricsSource source = new MetricsSource(id, singleSourceSource, globalSourceSource,
|
MetricsSource source = new MetricsSource(id, singleSourceSource, spyglobalSourceSource,
|
||||||
singleSourceSourceByTable);
|
singleSourceSourceByTable);
|
||||||
|
|
||||||
|
|
||||||
String gaugeName = "gauge";
|
String gaugeName = "gauge";
|
||||||
String singleGaugeName = "source.id." + gaugeName;
|
String singleGaugeName = "source.id." + gaugeName;
|
||||||
String globalGaugeName = "source." + gaugeName;
|
String globalGaugeName = "source." + gaugeName;
|
||||||
|
@ -340,6 +348,8 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
||||||
source.removeMetric(gaugeName);
|
source.removeMetric(gaugeName);
|
||||||
source.setGauge(gaugeName, delta);
|
source.setGauge(gaugeName, delta);
|
||||||
source.updateHistogram(counterName, count);
|
source.updateHistogram(counterName, count);
|
||||||
|
source.incrFailedRecoveryQueue();
|
||||||
|
|
||||||
|
|
||||||
verify(singleRms).decGauge(singleGaugeName, delta);
|
verify(singleRms).decGauge(singleGaugeName, delta);
|
||||||
verify(globalRms).decGauge(globalGaugeName, delta);
|
verify(globalRms).decGauge(globalGaugeName, delta);
|
||||||
|
@ -357,6 +367,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
||||||
verify(globalRms).setGauge(globalGaugeName, delta);
|
verify(globalRms).setGauge(globalGaugeName, delta);
|
||||||
verify(singleRms).updateHistogram(singleCounterName, count);
|
verify(singleRms).updateHistogram(singleCounterName, count);
|
||||||
verify(globalRms).updateHistogram(globalCounterName, count);
|
verify(globalRms).updateHistogram(globalCounterName, count);
|
||||||
|
verify(spyglobalSourceSource).incrFailedRecoveryQueue();
|
||||||
|
|
||||||
//check singleSourceSourceByTable metrics.
|
//check singleSourceSourceByTable metrics.
|
||||||
// singleSourceSourceByTable map entry will be created only
|
// singleSourceSourceByTable map entry will be created only
|
||||||
|
@ -373,6 +384,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
||||||
// cannot put more concreate value here to verify because the age is arbitrary.
|
// cannot put more concreate value here to verify because the age is arbitrary.
|
||||||
// as long as it's greater than 0, we see it as correct answer.
|
// as long as it's greater than 0, we see it as correct answer.
|
||||||
Assert.assertTrue(msr.getLastShippedAge() > 0);
|
Assert.assertTrue(msr.getLastShippedAge() > 0);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void doPut(byte[] row) throws IOException {
|
private void doPut(byte[] row) throws IOException {
|
||||||
|
|
Loading…
Reference in New Issue