HBASE-18092: Removing a peer does not properly clean up the ReplicationSourceManager state and metrics

Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
Ashu Pachauri 2017-05-25 18:24:38 -07:00 committed by tedyu
parent 30817b922e
commit 7b40f4f3ec
6 changed files with 115 additions and 43 deletions

View File

@ -40,7 +40,6 @@ public class MetricsSource implements BaseSource {
// tracks last shipped timestamp for each wal group
private Map<String, Long> lastTimeStamps = new HashMap<>();
private int lastQueueSize = 0;
private long lastHFileRefsQueueSize = 0;
private String id;
@ -181,11 +180,12 @@ public class MetricsSource implements BaseSource {
/** Removes all metrics about this Source. */
public void clear() {
singleSourceSource.clear();
int lastQueueSize = singleSourceSource.getSizeOfLogQueue();
globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
singleSourceSource.decrSizeOfLogQueue(lastQueueSize);
singleSourceSource.clear();
globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize);
lastTimeStamps.clear();
lastQueueSize = 0;
lastHFileRefsQueueSize = 0;
}

View File

@ -368,9 +368,7 @@ public class Replication extends WALActionsListener.Base implements
// get source
List<ReplicationSourceInterface> sources = this.replicationManager.getSources();
for (ReplicationSourceInterface source : sources) {
if (source instanceof ReplicationSource) {
sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics());
}
sourceMetricsList.add(source.getSourceMetrics());
}
// get old source

View File

@ -540,10 +540,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
return sb.toString();
}
/**
* Get Replication Source Metrics
* @return sourceMetrics
*/
@Override
public MetricsSource getSourceMetrics() {
return this.metrics;

View File

@ -358,6 +358,20 @@ public class ReplicationSourceManager implements ReplicationListener {
return this.oldsources;
}
/**
* Get the normal source for a given peer
* @param peerId
* @return the normal source for the give peer if it exists, otherwise null.
*/
public ReplicationSourceInterface getSource(String peerId) {
for (ReplicationSourceInterface source: getSources()) {
if (source.getPeerId().equals(peerId)) {
return source;
}
}
return null;
}
@VisibleForTesting
List<String> getAllQueues() {
return replicationQueues.getAllQueues();
@ -542,9 +556,7 @@ public class ReplicationSourceManager implements ReplicationListener {
*/
public void closeQueue(ReplicationSourceInterface src) {
LOG.info("Done with the queue " + src.getPeerClusterZnode());
if (src instanceof ReplicationSource) {
((ReplicationSource) src).getSourceMetrics().clear();
}
src.getSourceMetrics().clear();
this.sources.remove(src);
deleteSource(src.getPeerClusterZnode(), true);
this.walsById.remove(src.getPeerClusterZnode());
@ -593,10 +605,7 @@ public class ReplicationSourceManager implements ReplicationListener {
}
for (ReplicationSourceInterface toRemove : srcToRemove) {
toRemove.terminate(terminateMessage);
if (toRemove instanceof ReplicationSource) {
((ReplicationSource) toRemove).getSourceMetrics().clear();
}
this.sources.remove(toRemove);
closeQueue(toRemove);
}
deleteSource(id, true);
}

View File

@ -41,6 +41,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
ReplicationSourceManager manager;
String peerClusterId;
Path currentPath;
MetricsSource metrics;
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
@ -50,11 +51,13 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
this.manager = manager;
this.peerClusterId = peerClusterId;
this.metrics = metrics;
}
@Override
public void enqueueLog(Path log) {
this.currentPath = log;
metrics.incrSizeOfLogQueue();
}
@Override
@ -112,7 +115,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
@Override
public MetricsSource getSourceMetrics() {
return null;
return metrics;
}
@Override

View File

@ -20,12 +20,12 @@ package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Collection;
@ -438,6 +438,9 @@ public abstract class TestReplicationSourceManager {
@Test
public void testPeerRemovalCleanup() throws Exception{
String replicationSourceImplName = conf.get("replication.replicationsource.implementation");
final String peerId = "FakePeer";
final ReplicationPeerConfig peerConfig =
new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase");
try {
DummyServer server = new DummyServer();
final ReplicationQueues rq =
@ -450,40 +453,103 @@ public abstract class TestReplicationSourceManager {
FailInitializeDummyReplicationSource.class.getName());
final ReplicationPeers rp = manager.getReplicationPeers();
// Set up the znode and ReplicationPeer for the fake peer
rp.registerPeer("FakePeer", new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase"));
// Wait for the peer to get created and connected
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return (rp.getConnectedPeer("FakePeer") != null);
}
});
// Don't wait for replication source to initialize, we know it won't.
addPeerAndWait(peerId, peerConfig, false);
// Make sure that the replication source was not initialized
List<ReplicationSourceInterface> sources = manager.getSources();
for (ReplicationSourceInterface source : sources) {
assertNotEquals("FakePeer", source.getPeerId());
}
// Sanity check
assertNull(manager.getSource(peerId));
// Create a replication queue for the fake peer
rq.addLog("FakePeer", "FakeFile");
rq.addLog(peerId, "FakeFile");
// Unregister peer, this should remove the peer and clear all queues associated with it
// Need to wait for the ReplicationTracker to pick up the changes and notify listeners.
rp.unregisterPeer("FakePeer");
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
List<String> peers = rp.getAllPeerIds();
return (!rq.getAllQueues().contains("FakePeer"))
&& (rp.getConnectedPeer("FakePeer") == null)
&& (!peers.contains("FakePeer"));
}
});
removePeerAndWait(peerId);
assertFalse(rq.getAllQueues().contains(peerId));
} finally {
conf.set("replication.replicationsource.implementation", replicationSourceImplName);
removePeerAndWait(peerId);
}
}
@Test
public void testRemovePeerMetricsCleanup() throws Exception {
final String peerId = "DummyPeer";
final ReplicationPeerConfig peerConfig =
new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase");
try {
addPeerAndWait(peerId, peerConfig, true);
ReplicationSourceInterface source = manager.getSource(peerId);
// Sanity check
assertNotNull(source);
// Retrieve the global replication metrics source
Field f = MetricsSource.class.getDeclaredField("globalSourceSource");
f.setAccessible(true);
MetricsReplicationSourceSource globalSource =
(MetricsReplicationSourceSource)f.get(source.getSourceMetrics());
int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
// Enqueue log and check if metrics updated
source.enqueueLog(new Path("abc"));
assertEquals(1, source.getSourceMetrics().getSizeOfLogQueue());
assertEquals(1 + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
// Removing the peer should reset the global metrics
removePeerAndWait(peerId);
assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
// Adding the same peer back again should reset the single source metrics
addPeerAndWait(peerId, peerConfig, true);
source = manager.getSource(peerId);
assertNotNull(source);
assertEquals(0, source.getSourceMetrics().getSizeOfLogQueue());
assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
} finally {
removePeerAndWait(peerId);
}
}
/**
* Add a peer and wait for it to initialize
* @param peerId
* @param peerConfig
* @param waitForSource Whether to wait for replication source to initialize
* @throws Exception
*/
private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig,
final boolean waitForSource) throws Exception {
final ReplicationPeers rp = manager.getReplicationPeers();
rp.registerPeer(peerId, peerConfig);
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
if (waitForSource) {
return (manager.getSource(peerId) != null);
} else {
return (rp.getConnectedPeer(peerId) != null);
}
}
});
}
/**
* Remove a peer and wait for it to get cleaned up
* @param peerId
* @throws Exception
*/
private void removePeerAndWait(final String peerId) throws Exception {
final ReplicationPeers rp = manager.getReplicationPeers();
if (rp.getAllPeerIds().contains(peerId)) {
rp.unregisterPeer(peerId);
}
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
List<String> peers = rp.getAllPeerIds();
return (!manager.getAllQueues().contains(peerId)) && (rp.getConnectedPeer(peerId) == null)
&& (!peers.contains(peerId));
}
});
}
private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> scope) {
// 1. Create store files for the families
Map<byte[], List<Path>> storeFiles = new HashMap<>(1);