HBASE-18092: Removing a peer does not properly clean up the ReplicationSourceManager state and metrics

Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
Ashu Pachauri 2017-05-25 18:24:38 -07:00 committed by tedyu
parent 922894c96e
commit 1e4f8491f7
6 changed files with 115 additions and 43 deletions

View File

@ -40,7 +40,6 @@ public class MetricsSource implements BaseSource {
// tracks last shipped timestamp for each wal group // tracks last shipped timestamp for each wal group
private Map<String, Long> lastTimeStamps = new HashMap<>(); private Map<String, Long> lastTimeStamps = new HashMap<>();
private int lastQueueSize = 0;
private long lastHFileRefsQueueSize = 0; private long lastHFileRefsQueueSize = 0;
private String id; private String id;
@ -181,11 +180,12 @@ public class MetricsSource implements BaseSource {
/** Removes all metrics about this Source. */ /** Removes all metrics about this Source. */
public void clear() { public void clear() {
singleSourceSource.clear(); int lastQueueSize = singleSourceSource.getSizeOfLogQueue();
globalSourceSource.decrSizeOfLogQueue(lastQueueSize); globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
singleSourceSource.decrSizeOfLogQueue(lastQueueSize);
singleSourceSource.clear();
globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize); globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize);
lastTimeStamps.clear(); lastTimeStamps.clear();
lastQueueSize = 0;
lastHFileRefsQueueSize = 0; lastHFileRefsQueueSize = 0;
} }

View File

@ -368,9 +368,7 @@ public class Replication extends WALActionsListener.Base implements
// get source // get source
List<ReplicationSourceInterface> sources = this.replicationManager.getSources(); List<ReplicationSourceInterface> sources = this.replicationManager.getSources();
for (ReplicationSourceInterface source : sources) { for (ReplicationSourceInterface source : sources) {
if (source instanceof ReplicationSource) { sourceMetricsList.add(source.getSourceMetrics());
sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics());
}
} }
// get old source // get old source

View File

@ -540,10 +540,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
return sb.toString(); return sb.toString();
} }
/**
* Get Replication Source Metrics
* @return sourceMetrics
*/
@Override @Override
public MetricsSource getSourceMetrics() { public MetricsSource getSourceMetrics() {
return this.metrics; return this.metrics;

View File

@ -358,6 +358,20 @@ public class ReplicationSourceManager implements ReplicationListener {
return this.oldsources; return this.oldsources;
} }
/**
* Get the normal source for a given peer
* @param peerId
* @return the normal source for the give peer if it exists, otherwise null.
*/
public ReplicationSourceInterface getSource(String peerId) {
for (ReplicationSourceInterface source: getSources()) {
if (source.getPeerId().equals(peerId)) {
return source;
}
}
return null;
}
@VisibleForTesting @VisibleForTesting
List<String> getAllQueues() { List<String> getAllQueues() {
return replicationQueues.getAllQueues(); return replicationQueues.getAllQueues();
@ -542,9 +556,7 @@ public class ReplicationSourceManager implements ReplicationListener {
*/ */
public void closeQueue(ReplicationSourceInterface src) { public void closeQueue(ReplicationSourceInterface src) {
LOG.info("Done with the queue " + src.getPeerClusterZnode()); LOG.info("Done with the queue " + src.getPeerClusterZnode());
if (src instanceof ReplicationSource) { src.getSourceMetrics().clear();
((ReplicationSource) src).getSourceMetrics().clear();
}
this.sources.remove(src); this.sources.remove(src);
deleteSource(src.getPeerClusterZnode(), true); deleteSource(src.getPeerClusterZnode(), true);
this.walsById.remove(src.getPeerClusterZnode()); this.walsById.remove(src.getPeerClusterZnode());
@ -593,10 +605,7 @@ public class ReplicationSourceManager implements ReplicationListener {
} }
for (ReplicationSourceInterface toRemove : srcToRemove) { for (ReplicationSourceInterface toRemove : srcToRemove) {
toRemove.terminate(terminateMessage); toRemove.terminate(terminateMessage);
if (toRemove instanceof ReplicationSource) { closeQueue(toRemove);
((ReplicationSource) toRemove).getSourceMetrics().clear();
}
this.sources.remove(toRemove);
} }
deleteSource(id, true); deleteSource(id, true);
} }

View File

@ -41,6 +41,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
ReplicationSourceManager manager; ReplicationSourceManager manager;
String peerClusterId; String peerClusterId;
Path currentPath; Path currentPath;
MetricsSource metrics;
@Override @Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
@ -50,11 +51,13 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
this.manager = manager; this.manager = manager;
this.peerClusterId = peerClusterId; this.peerClusterId = peerClusterId;
this.metrics = metrics;
} }
@Override @Override
public void enqueueLog(Path log) { public void enqueueLog(Path log) {
this.currentPath = log; this.currentPath = log;
metrics.incrSizeOfLogQueue();
} }
@Override @Override
@ -112,7 +115,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
@Override @Override
public MetricsSource getSourceMetrics() { public MetricsSource getSourceMetrics() {
return null; return metrics;
} }
@Override @Override

View File

@ -20,12 +20,12 @@ package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URLEncoder; import java.net.URLEncoder;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -438,6 +438,9 @@ public abstract class TestReplicationSourceManager {
@Test @Test
public void testPeerRemovalCleanup() throws Exception{ public void testPeerRemovalCleanup() throws Exception{
String replicationSourceImplName = conf.get("replication.replicationsource.implementation"); String replicationSourceImplName = conf.get("replication.replicationsource.implementation");
final String peerId = "FakePeer";
final ReplicationPeerConfig peerConfig =
new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase");
try { try {
DummyServer server = new DummyServer(); DummyServer server = new DummyServer();
final ReplicationQueues rq = final ReplicationQueues rq =
@ -450,40 +453,103 @@ public abstract class TestReplicationSourceManager {
FailInitializeDummyReplicationSource.class.getName()); FailInitializeDummyReplicationSource.class.getName());
final ReplicationPeers rp = manager.getReplicationPeers(); final ReplicationPeers rp = manager.getReplicationPeers();
// Set up the znode and ReplicationPeer for the fake peer // Set up the znode and ReplicationPeer for the fake peer
rp.registerPeer("FakePeer", new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase")); // Don't wait for replication source to initialize, we know it won't.
// Wait for the peer to get created and connected addPeerAndWait(peerId, peerConfig, false);
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return (rp.getConnectedPeer("FakePeer") != null);
}
});
// Make sure that the replication source was not initialized // Sanity check
List<ReplicationSourceInterface> sources = manager.getSources(); assertNull(manager.getSource(peerId));
for (ReplicationSourceInterface source : sources) {
assertNotEquals("FakePeer", source.getPeerId());
}
// Create a replication queue for the fake peer // Create a replication queue for the fake peer
rq.addLog("FakePeer", "FakeFile"); rq.addLog(peerId, "FakeFile");
// Unregister peer, this should remove the peer and clear all queues associated with it // Unregister peer, this should remove the peer and clear all queues associated with it
// Need to wait for the ReplicationTracker to pick up the changes and notify listeners. // Need to wait for the ReplicationTracker to pick up the changes and notify listeners.
rp.unregisterPeer("FakePeer"); removePeerAndWait(peerId);
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { assertFalse(rq.getAllQueues().contains(peerId));
@Override
public boolean evaluate() throws Exception {
List<String> peers = rp.getAllPeerIds();
return (!rq.getAllQueues().contains("FakePeer"))
&& (rp.getConnectedPeer("FakePeer") == null)
&& (!peers.contains("FakePeer"));
}
});
} finally { } finally {
conf.set("replication.replicationsource.implementation", replicationSourceImplName); conf.set("replication.replicationsource.implementation", replicationSourceImplName);
removePeerAndWait(peerId);
} }
} }
@Test
public void testRemovePeerMetricsCleanup() throws Exception {
final String peerId = "DummyPeer";
final ReplicationPeerConfig peerConfig =
new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase");
try {
addPeerAndWait(peerId, peerConfig, true);
ReplicationSourceInterface source = manager.getSource(peerId);
// Sanity check
assertNotNull(source);
// Retrieve the global replication metrics source
Field f = MetricsSource.class.getDeclaredField("globalSourceSource");
f.setAccessible(true);
MetricsReplicationSourceSource globalSource =
(MetricsReplicationSourceSource)f.get(source.getSourceMetrics());
int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
// Enqueue log and check if metrics updated
source.enqueueLog(new Path("abc"));
assertEquals(1, source.getSourceMetrics().getSizeOfLogQueue());
assertEquals(1 + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
// Removing the peer should reset the global metrics
removePeerAndWait(peerId);
assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
// Adding the same peer back again should reset the single source metrics
addPeerAndWait(peerId, peerConfig, true);
source = manager.getSource(peerId);
assertNotNull(source);
assertEquals(0, source.getSourceMetrics().getSizeOfLogQueue());
assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
} finally {
removePeerAndWait(peerId);
}
}
/**
* Add a peer and wait for it to initialize
* @param peerId
* @param peerConfig
* @param waitForSource Whether to wait for replication source to initialize
* @throws Exception
*/
private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig,
final boolean waitForSource) throws Exception {
final ReplicationPeers rp = manager.getReplicationPeers();
rp.registerPeer(peerId, peerConfig);
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
if (waitForSource) {
return (manager.getSource(peerId) != null);
} else {
return (rp.getConnectedPeer(peerId) != null);
}
}
});
}
/**
* Remove a peer and wait for it to get cleaned up
* @param peerId
* @throws Exception
*/
private void removePeerAndWait(final String peerId) throws Exception {
final ReplicationPeers rp = manager.getReplicationPeers();
if (rp.getAllPeerIds().contains(peerId)) {
rp.unregisterPeer(peerId);
}
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
List<String> peers = rp.getAllPeerIds();
return (!manager.getAllQueues().contains(peerId)) && (rp.getConnectedPeer(peerId) == null)
&& (!peers.contains(peerId));
}
});
}
private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> scope) { private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> scope) {
// 1. Create store files for the families // 1. Create store files for the families
Map<byte[], List<Path>> storeFiles = new HashMap<>(1); Map<byte[], List<Path>> storeFiles = new HashMap<>(1);