HBASE-18092: Removing a peer does not properly clean up the ReplicationSourceManager state and metrics

Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
Ashu Pachauri 2017-05-25 18:24:38 -07:00 committed by tedyu
parent 6c4980161b
commit 961337aadc
7 changed files with 132 additions and 48 deletions

View File

@ -41,7 +41,6 @@ public class MetricsSource implements BaseSource {
// tracks last shipped timestamp for each wal group
private Map<String, Long> lastTimeStamps = new HashMap<String, Long>();
private int lastQueueSize = 0;
private long lastHFileRefsQueueSize = 0;
private String id;
@ -182,11 +181,12 @@ public class MetricsSource implements BaseSource {
/** Removes all metrics about this Source. */
public void clear() {
singleSourceSource.clear();
int lastQueueSize = singleSourceSource.getSizeOfLogQueue();
globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
singleSourceSource.decrSizeOfLogQueue(lastQueueSize);
singleSourceSource.clear();
globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize);
lastTimeStamps.clear();
lastQueueSize = 0;
lastHFileRefsQueueSize = 0;
}

View File

@ -451,9 +451,7 @@ public class Replication extends WALActionsListener.Base implements
// get source
List<ReplicationSourceInterface> sources = this.replicationManager.getSources();
for (ReplicationSourceInterface source : sources) {
if (source instanceof ReplicationSource) {
sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics());
}
sourceMetricsList.add(source.getSourceMetrics());
}
// get old source

View File

@ -490,10 +490,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
return sb.toString();
}
/**
* Get Replication Source Metrics
* @return sourceMetrics
*/
@Override
public MetricsSource getSourceMetrics() {
return this.metrics;
}

View File

@ -120,4 +120,11 @@ public interface ReplicationSourceInterface {
void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws ReplicationException;
/**
* Get the associated metrics.
*
* @return The metrics for this source.
*/
MetricsSource getSourceMetrics();
}

View File

@ -370,6 +370,20 @@ public class ReplicationSourceManager implements ReplicationListener {
return this.oldsources;
}
/**
* Get the normal source for a given peer
* @param peerId
* @return the normal source for the give peer if it exists, otherwise null.
*/
public ReplicationSourceInterface getSource(String peerId) {
for (ReplicationSourceInterface source: getSources()) {
if (source.getPeerClusterId().equals(peerId)) {
return source;
}
}
return null;
}
@VisibleForTesting
List<String> getAllQueues() {
return replicationQueues.getAllQueues();
@ -549,9 +563,7 @@ public class ReplicationSourceManager implements ReplicationListener {
*/
public void closeRecoveredQueue(ReplicationSourceInterface src) {
LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
if (src instanceof ReplicationSource) {
((ReplicationSource) src).getSourceMetrics().clear();
}
src.getSourceMetrics().clear();
this.oldsources.remove(src);
deleteSource(src.getPeerClusterZnode(), false);
this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
@ -563,9 +575,7 @@ public class ReplicationSourceManager implements ReplicationListener {
*/
public void closeQueue(ReplicationSourceInterface src) {
LOG.info("Done with the queue " + src.getPeerClusterZnode());
if (src instanceof ReplicationSource) {
((ReplicationSource) src).getSourceMetrics().clear();
}
src.getSourceMetrics().clear();
this.sources.remove(src);
deleteSource(src.getPeerClusterZnode(), true);
this.walsById.remove(src.getPeerClusterZnode());
@ -615,10 +625,7 @@ public class ReplicationSourceManager implements ReplicationListener {
}
for (ReplicationSourceInterface toRemove : srcToRemove) {
toRemove.terminate(terminateMessage);
if (toRemove instanceof ReplicationSource) {
((ReplicationSource) toRemove).getSourceMetrics().clear();
}
this.sources.remove(toRemove);
closeQueue(toRemove);
}
deleteSource(id, true);
}

View File

@ -40,6 +40,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
ReplicationSourceManager manager;
String peerClusterId;
Path currentPath;
MetricsSource metrics;
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
@ -49,11 +50,13 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
this.manager = manager;
this.peerClusterId = peerClusterId;
this.metrics = metrics;
}
@Override
public void enqueueLog(Path log) {
this.currentPath = log;
metrics.incrSizeOfLogQueue();
}
@Override
@ -98,4 +101,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
throws ReplicationException {
return;
}
@Override
public MetricsSource getSourceMetrics() {
return metrics;
}
}

View File

@ -20,13 +20,14 @@ package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Collection;
@ -523,6 +524,9 @@ public class TestReplicationSourceManager {
@Test
public void testPeerRemovalCleanup() throws Exception{
String replicationSourceImplName = conf.get("replication.replicationsource.implementation");
final String peerId = "FakePeer";
final ReplicationPeerConfig peerConfig =
new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase");
try {
DummyServer server = new DummyServer();
final ReplicationQueues rq =
@ -535,42 +539,105 @@ public class TestReplicationSourceManager {
FailInitializeDummyReplicationSource.class.getName());
final ReplicationPeers rp = manager.getReplicationPeers();
// Set up the znode and ReplicationPeer for the fake peer
rp.addPeer("FakePeer", new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase"));
// Wait for the peer to get created and connected
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return (rp.getPeer("FakePeer") != null);
}
});
// Don't wait for replication source to initialize, we know it won't.
addPeerAndWait(peerId, peerConfig, false);
// Make sure that the replication source was not initialized
List<ReplicationSourceInterface> sources = manager.getSources();
for (ReplicationSourceInterface source : sources) {
assertNotEquals("FakePeer", source.getPeerClusterId());
}
// Sanity check
assertNull(manager.getSource(peerId));
// Removing the peer should remove both the replication queue and the ReplicationPeer
manager.removePeer("FakePeer");
assertFalse(rq.getAllQueues().contains("FakePeer"));
assertNull(rp.getPeer("FakePeer"));
// Create a replication queue for the fake peer
rq.addLog(peerId, "FakeFile");
// Unregister peer, this should remove the peer and clear all queues associated with it
// Need to wait for the ReplicationTracker to pick up the changes and notify listeners.
rp.removePeer("FakePeer");
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
List<String> peers = rp.getAllPeerIds();
return (!rq.getAllQueues().contains("FakePeer"))
&& (rp.getPeer("FakePeer") == null)
&& (!peers.contains("FakePeer"));
}
});
removePeerAndWait(peerId);
assertFalse(rq.getAllQueues().contains(peerId));
} finally {
conf.set("replication.replicationsource.implementation", replicationSourceImplName);
removePeerAndWait(peerId);
}
}
@Test
public void testRemovePeerMetricsCleanup() throws Exception {
final String peerId = "DummyPeer";
final ReplicationPeerConfig peerConfig =
new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase");
try {
addPeerAndWait(peerId, peerConfig, true);
ReplicationSourceInterface source = manager.getSource(peerId);
// Sanity check
assertNotNull(source);
// Retrieve the global replication metrics source
Field f = MetricsSource.class.getDeclaredField("globalSourceSource");
f.setAccessible(true);
MetricsReplicationSourceSource globalSource =
(MetricsReplicationSourceSource)f.get(source.getSourceMetrics());
int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
// Enqueue log and check if metrics updated
source.enqueueLog(new Path("abc"));
assertEquals(1, source.getSourceMetrics().getSizeOfLogQueue());
assertEquals(1 + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
// Removing the peer should reset the global metrics
removePeerAndWait(peerId);
assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
// Adding the same peer back again should reset the single source metrics
addPeerAndWait(peerId, peerConfig, true);
source = manager.getSource(peerId);
assertNotNull(source);
assertEquals(0, source.getSourceMetrics().getSizeOfLogQueue());
assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
} finally {
removePeerAndWait(peerId);
}
}
/**
* Add a peer and wait for it to initialize
* @param peerId
* @param peerConfig
* @param waitForSource Whether to wait for replication source to initialize
* @throws Exception
*/
private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig,
final boolean waitForSource) throws Exception {
final ReplicationPeers rp = manager.getReplicationPeers();
rp.addPeer(peerId, peerConfig);
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
if (waitForSource) {
return (manager.getSource(peerId) != null);
} else {
return (rp.getPeer(peerId) != null);
}
}
});
}
/**
* Remove a peer and wait for it to get cleaned up
* @param peerId
* @throws Exception
*/
private void removePeerAndWait(final String peerId) throws Exception {
final ReplicationPeers rp = manager.getReplicationPeers();
if (rp.getAllPeerIds().contains(peerId)) {
rp.removePeer(peerId);
}
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
List<String> peers = rp.getAllPeerIds();
return (!manager.getAllQueues().contains(peerId)) && (rp.getPeer(peerId) == null)
&& (!peers.contains(peerId));
}
});
}
private WALEdit getBulkLoadWALEdit() {
// 1. Create store files for the families
Map<byte[], List<Path>> storeFiles = new HashMap<>(1);