HBASE-19342 fix TestTableBasedReplicationSourceManagerImpl#testRemovePeerMetricsCleanup

This commit is contained in:
Chia-Ping Tsai 2017-11-28 18:06:38 +08:00
parent 0f33931b2a
commit b75510284f
4 changed files with 115 additions and 27 deletions

View File

@ -40,7 +40,6 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -51,7 +50,6 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
@ -68,6 +66,7 @@ import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -305,6 +304,13 @@ public class ReplicationSourceManager implements ReplicationListener {
return src; return src;
} }
@VisibleForTesting
int getSizeOfLatestPath() {
synchronized (latestPaths) {
return latestPaths.size();
}
}
/** /**
* Delete a complete queue of wals associated with a peer cluster * Delete a complete queue of wals associated with a peer cluster
* @param peerId Id of the peer cluster queue of wals to delete * @param peerId Id of the peer cluster queue of wals to delete

View File

@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.replication;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -45,7 +45,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
Path currentPath; Path currentPath;
MetricsSource metrics; MetricsSource metrics;
WALFileLengthProvider walFileLengthProvider; WALFileLengthProvider walFileLengthProvider;
AtomicBoolean startup = new AtomicBoolean(false);
@Override @Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueues rq, ReplicationPeers rp, Server server, String peerClusterId, ReplicationQueues rq, ReplicationPeers rp, Server server, String peerClusterId,
@ -70,7 +70,11 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
@Override @Override
public void startup() { public void startup() {
startup.set(true);
}
public boolean isStartup() {
return startup.get();
} }
@Override @Override

View File

@ -39,7 +39,7 @@ import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -61,9 +61,9 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@ -77,8 +77,10 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
@ -117,6 +119,8 @@ public abstract class TestReplicationSourceManager {
protected static ReplicationSourceManager manager; protected static ReplicationSourceManager manager;
protected static ReplicationSourceManager managerOfCluster;
protected static ZKWatcher zkw; protected static ZKWatcher zkw;
protected static HTableDescriptor htd; protected static HTableDescriptor htd;
@ -170,9 +174,14 @@ public abstract class TestReplicationSourceManager {
logDir = new Path(utility.getDataTestDir(), logDir = new Path(utility.getDataTestDir(),
HConstants.HREGION_LOGDIR_NAME); HConstants.HREGION_LOGDIR_NAME);
replication = new Replication(new DummyServer(), fs, logDir, oldLogDir); replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
manager = replication.getReplicationManager();
managerOfCluster = getManagerFromCluster();
manager = replication.getReplicationManager();
manager.addSource(slaveId); manager.addSource(slaveId);
if (managerOfCluster != null) {
waitPeer(slaveId, managerOfCluster, true);
}
waitPeer(slaveId, manager, true);
htd = new HTableDescriptor(test); htd = new HTableDescriptor(test);
HColumnDescriptor col = new HColumnDescriptor(f1); HColumnDescriptor col = new HColumnDescriptor(f1);
@ -189,9 +198,25 @@ public abstract class TestReplicationSourceManager {
hri = new HRegionInfo(htd.getTableName(), r1, r2); hri = new HRegionInfo(htd.getTableName(), r1, r2);
} }
private static ReplicationSourceManager getManagerFromCluster() {
// TestReplicationSourceManagerZkImpl won't start the mini hbase cluster.
if (utility.getMiniHBaseCluster() == null) {
return null;
}
return utility.getMiniHBaseCluster().getRegionServerThreads()
.stream().map(JVMClusterUtil.RegionServerThread::getRegionServer)
.findAny()
.map(HRegionServer::getReplicationSourceService)
.map(r -> (Replication)r)
.map(Replication::getReplicationManager)
.get();
}
@AfterClass @AfterClass
public static void tearDownAfterClass() throws Exception { public static void tearDownAfterClass() throws Exception {
manager.join(); if (manager != null) {
manager.join();
}
utility.shutdownMiniCluster(); utility.shutdownMiniCluster();
} }
@ -213,6 +238,14 @@ public abstract class TestReplicationSourceManager {
public void tearDown() throws Exception { public void tearDown() throws Exception {
LOG.info("End " + testName.getMethodName()); LOG.info("End " + testName.getMethodName());
cleanLogDir(); cleanLogDir();
List<String> ids = manager.getSources().stream()
.map(ReplicationSourceInterface::getPeerId).collect(Collectors.toList());
for (String id : ids) {
if (slaveId.equals(id)) {
continue;
}
removePeerAndWait(id);
}
} }
@Test @Test
@ -471,28 +504,50 @@ public abstract class TestReplicationSourceManager {
} }
} }
private static MetricsReplicationSourceSource getGlobalSource() throws Exception {
ReplicationSourceInterface source = manager.getSource(slaveId);
// Retrieve the global replication metrics source
Field f = MetricsSource.class.getDeclaredField("globalSourceSource");
f.setAccessible(true);
return (MetricsReplicationSourceSource)f.get(source.getSourceMetrics());
}
private static long getSizeOfLatestPath() {
// If no mini cluster is running, there are extra replication manager influencing the metrics.
if (utility.getMiniHBaseCluster() == null) {
return 0;
}
return utility.getMiniHBaseCluster().getRegionServerThreads()
.stream().map(JVMClusterUtil.RegionServerThread::getRegionServer)
.map(HRegionServer::getReplicationSourceService)
.map(r -> (Replication)r)
.map(Replication::getReplicationManager)
.mapToLong(ReplicationSourceManager::getSizeOfLatestPath)
.sum();
}
@Test @Test
public void testRemovePeerMetricsCleanup() throws Exception { public void testRemovePeerMetricsCleanup() throws Exception {
final String peerId = "DummyPeer"; final String peerId = "DummyPeer";
final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig() final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig()
.setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase"); .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase");
try { try {
MetricsReplicationSourceSource globalSource = getGlobalSource();
final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
final long sizeOfLatestPath = getSizeOfLatestPath();
addPeerAndWait(peerId, peerConfig, true); addPeerAndWait(peerId, peerConfig, true);
assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial,
globalSource.getSizeOfLogQueue());
ReplicationSourceInterface source = manager.getSource(peerId); ReplicationSourceInterface source = manager.getSource(peerId);
// Sanity check // Sanity check
assertNotNull(source); assertNotNull(source);
// Retrieve the global replication metrics source final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
Field f = MetricsSource.class.getDeclaredField("globalSourceSource");
f.setAccessible(true);
MetricsReplicationSourceSource globalSource =
(MetricsReplicationSourceSource)f.get(source.getSourceMetrics());
int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
// Enqueue log and check if metrics updated // Enqueue log and check if metrics updated
source.enqueueLog(new Path("abc")); source.enqueueLog(new Path("abc"));
assertEquals(1, source.getSourceMetrics().getSizeOfLogQueue()); assertEquals(1 + sizeOfSingleLogQueue,
assertEquals(1 + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); source.getSourceMetrics().getSizeOfLogQueue());
assertEquals(source.getSourceMetrics().getSizeOfLogQueue()
+ globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
// Removing the peer should reset the global metrics // Removing the peer should reset the global metrics
removePeerAndWait(peerId); removePeerAndWait(peerId);
@ -502,8 +557,9 @@ public abstract class TestReplicationSourceManager {
addPeerAndWait(peerId, peerConfig, true); addPeerAndWait(peerId, peerConfig, true);
source = manager.getSource(peerId); source = manager.getSource(peerId);
assertNotNull(source); assertNotNull(source);
assertEquals(0, source.getSourceMetrics().getSizeOfLogQueue()); assertEquals(sizeOfLatestPath, source.getSourceMetrics().getSizeOfLogQueue());
assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); assertEquals(source.getSourceMetrics().getSizeOfLogQueue()
+ globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
} finally { } finally {
removePeerAndWait(peerId); removePeerAndWait(peerId);
} }
@ -520,13 +576,27 @@ public abstract class TestReplicationSourceManager {
final boolean waitForSource) throws Exception { final boolean waitForSource) throws Exception {
final ReplicationPeers rp = manager.getReplicationPeers(); final ReplicationPeers rp = manager.getReplicationPeers();
rp.registerPeer(peerId, peerConfig); rp.registerPeer(peerId, peerConfig);
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { waitPeer(peerId, manager, waitForSource);
@Override public boolean evaluate() throws Exception { if (managerOfCluster != null) {
if (waitForSource) { waitPeer(peerId, managerOfCluster, waitForSource);
return (manager.getSource(peerId) != null); }
} else { }
return (rp.getConnectedPeer(peerId) != null);
private static void waitPeer(final String peerId,
ReplicationSourceManager manager, final boolean waitForSource) {
ReplicationPeers rp = manager.getReplicationPeers();
Waiter.waitFor(conf, 20000, () -> {
if (waitForSource) {
ReplicationSourceInterface rs = manager.getSource(peerId);
if (rs == null) {
return false;
} }
if (rs instanceof ReplicationSourceDummy) {
return ((ReplicationSourceDummy)rs).isStartup();
}
return true;
} else {
return (rp.getConnectedPeer(peerId) != null);
} }
}); });
} }
@ -545,7 +615,8 @@ public abstract class TestReplicationSourceManager {
@Override public boolean evaluate() throws Exception { @Override public boolean evaluate() throws Exception {
List<String> peers = rp.getAllPeerIds(); List<String> peers = rp.getAllPeerIds();
return (!manager.getAllQueues().contains(peerId)) && (rp.getConnectedPeer(peerId) == null) return (!manager.getAllQueues().contains(peerId)) && (rp.getConnectedPeer(peerId) == null)
&& (!peers.contains(peerId)); && (!peers.contains(peerId))
&& manager.getSource(peerId) == null;
} }
}); });
} }

View File

@ -20,6 +20,9 @@ package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
@ -50,6 +53,10 @@ public class TestTableBasedReplicationSourceManagerImpl extends TestReplicationS
TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class); TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class);
utility = new HBaseTestingUtility(conf); utility = new HBaseTestingUtility(conf);
utility.startMiniCluster(); utility.startMiniCluster();
Waiter.waitFor(conf, 3 * 1000,
() -> utility.getMiniHBaseCluster().getMaster().isInitialized());
utility.waitUntilAllRegionsAssigned(TableName.valueOf(
NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication"));
setupZkAndReplication(); setupZkAndReplication();
} }