From b75510284f8af2a70986b7ddf9174fe985d160a9 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Tue, 28 Nov 2017 18:06:38 +0800 Subject: [PATCH] HBASE-19342 fix TestTableBasedReplicationSourceManagerImpl#testRemovePeerMetricsCleanup --- .../ReplicationSourceManager.java | 10 +- .../replication/ReplicationSourceDummy.java | 8 +- .../TestReplicationSourceManager.java | 117 ++++++++++++++---- ...ableBasedReplicationSourceManagerImpl.java | 7 ++ 4 files changed, 115 insertions(+), 27 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 45d7d94c238..3aa3843a238 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -40,7 +40,6 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -51,7 +50,6 @@ import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.regionserver.HRegionServer; @@ -68,6 +66,7 @@ import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -305,6 +304,13 @@ public class ReplicationSourceManager implements ReplicationListener { return src; } + @VisibleForTesting + int getSizeOfLatestPath() { + synchronized (latestPaths) { + return latestPaths.size(); + } + } + /** * Delete a complete queue of wals associated with a peer cluster * @param peerId Id of the peer cluster queue of wals to delete diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index a12cebd15cd..7ea79f941c0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; import java.util.List; import java.util.UUID; - +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -45,7 +45,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { Path currentPath; MetricsSource metrics; WALFileLengthProvider walFileLengthProvider; - + AtomicBoolean startup = new AtomicBoolean(false); @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueues rq, ReplicationPeers rp, Server server, String peerClusterId, @@ -70,7 +70,11 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { @Override public void startup() { + startup.set(true); + } + public boolean isStartup() { + return startup.get(); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index a104d4f09bd..83dc636a392 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -39,7 +39,7 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.CountDownLatch; - +import java.util.stream.Collectors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -61,9 +61,9 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; -import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; @@ -77,8 +77,10 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; @@ -117,6 +119,8 @@ public abstract class TestReplicationSourceManager { protected static ReplicationSourceManager manager; + protected static ReplicationSourceManager managerOfCluster; + protected static ZKWatcher zkw; protected static HTableDescriptor htd; @@ -170,9 +174,14 @@ public abstract class TestReplicationSourceManager { logDir = new Path(utility.getDataTestDir(), HConstants.HREGION_LOGDIR_NAME); replication = new Replication(new DummyServer(), fs, logDir, oldLogDir); - manager = replication.getReplicationManager(); + managerOfCluster = getManagerFromCluster(); + manager = replication.getReplicationManager(); manager.addSource(slaveId); + if (managerOfCluster != null) { + waitPeer(slaveId, managerOfCluster, true); + } + waitPeer(slaveId, manager, true); htd = new HTableDescriptor(test); HColumnDescriptor col = new HColumnDescriptor(f1); @@ -189,9 +198,25 @@ public abstract class TestReplicationSourceManager { hri = new HRegionInfo(htd.getTableName(), r1, r2); } + private static ReplicationSourceManager getManagerFromCluster() { + // TestReplicationSourceManagerZkImpl won't start the mini hbase cluster. + if (utility.getMiniHBaseCluster() == null) { + return null; + } + return utility.getMiniHBaseCluster().getRegionServerThreads() + .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer) + .findAny() + .map(HRegionServer::getReplicationSourceService) + .map(r -> (Replication)r) + .map(Replication::getReplicationManager) + .get(); + } + @AfterClass public static void tearDownAfterClass() throws Exception { - manager.join(); + if (manager != null) { + manager.join(); + } utility.shutdownMiniCluster(); } @@ -213,6 +238,14 @@ public abstract class TestReplicationSourceManager { public void tearDown() throws Exception { LOG.info("End " + testName.getMethodName()); cleanLogDir(); + List ids = manager.getSources().stream() + .map(ReplicationSourceInterface::getPeerId).collect(Collectors.toList()); + for (String id : ids) { + if (slaveId.equals(id)) { + continue; + } + removePeerAndWait(id); + } } @Test @@ -471,28 +504,50 @@ public abstract class TestReplicationSourceManager { } } + private static MetricsReplicationSourceSource getGlobalSource() throws Exception { + ReplicationSourceInterface source = manager.getSource(slaveId); + // Retrieve the global replication metrics source + Field f = MetricsSource.class.getDeclaredField("globalSourceSource"); + f.setAccessible(true); + return (MetricsReplicationSourceSource)f.get(source.getSourceMetrics()); + } + + private static long getSizeOfLatestPath() { + // If no mini cluster is running, there are extra replication manager influencing the metrics. + if (utility.getMiniHBaseCluster() == null) { + return 0; + } + return utility.getMiniHBaseCluster().getRegionServerThreads() + .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer) + .map(HRegionServer::getReplicationSourceService) + .map(r -> (Replication)r) + .map(Replication::getReplicationManager) + .mapToLong(ReplicationSourceManager::getSizeOfLatestPath) + .sum(); + } + @Test public void testRemovePeerMetricsCleanup() throws Exception { final String peerId = "DummyPeer"; final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig() .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase"); try { + MetricsReplicationSourceSource globalSource = getGlobalSource(); + final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue(); + final long sizeOfLatestPath = getSizeOfLatestPath(); addPeerAndWait(peerId, peerConfig, true); - + assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, + globalSource.getSizeOfLogQueue()); ReplicationSourceInterface source = manager.getSource(peerId); // Sanity check assertNotNull(source); - // Retrieve the global replication metrics source - Field f = MetricsSource.class.getDeclaredField("globalSourceSource"); - f.setAccessible(true); - MetricsReplicationSourceSource globalSource = - (MetricsReplicationSourceSource)f.get(source.getSourceMetrics()); - int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue(); - + final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue(); // Enqueue log and check if metrics updated source.enqueueLog(new Path("abc")); - assertEquals(1, source.getSourceMetrics().getSizeOfLogQueue()); - assertEquals(1 + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); + assertEquals(1 + sizeOfSingleLogQueue, + source.getSourceMetrics().getSizeOfLogQueue()); + assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); // Removing the peer should reset the global metrics removePeerAndWait(peerId); @@ -502,8 +557,9 @@ public abstract class TestReplicationSourceManager { addPeerAndWait(peerId, peerConfig, true); source = manager.getSource(peerId); assertNotNull(source); - assertEquals(0, source.getSourceMetrics().getSizeOfLogQueue()); - assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); + assertEquals(sizeOfLatestPath, source.getSourceMetrics().getSizeOfLogQueue()); + assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); } finally { removePeerAndWait(peerId); } @@ -520,13 +576,27 @@ public abstract class TestReplicationSourceManager { final boolean waitForSource) throws Exception { final ReplicationPeers rp = manager.getReplicationPeers(); rp.registerPeer(peerId, peerConfig); - Waiter.waitFor(conf, 20000, new Waiter.Predicate() { - @Override public boolean evaluate() throws Exception { - if (waitForSource) { - return (manager.getSource(peerId) != null); - } else { - return (rp.getConnectedPeer(peerId) != null); + waitPeer(peerId, manager, waitForSource); + if (managerOfCluster != null) { + waitPeer(peerId, managerOfCluster, waitForSource); + } + } + + private static void waitPeer(final String peerId, + ReplicationSourceManager manager, final boolean waitForSource) { + ReplicationPeers rp = manager.getReplicationPeers(); + Waiter.waitFor(conf, 20000, () -> { + if (waitForSource) { + ReplicationSourceInterface rs = manager.getSource(peerId); + if (rs == null) { + return false; } + if (rs instanceof ReplicationSourceDummy) { + return ((ReplicationSourceDummy)rs).isStartup(); + } + return true; + } else { + return (rp.getConnectedPeer(peerId) != null); } }); } @@ -545,7 +615,8 @@ public abstract class TestReplicationSourceManager { @Override public boolean evaluate() throws Exception { List peers = rp.getAllPeerIds(); return (!manager.getAllQueues().contains(peerId)) && (rp.getConnectedPeer(peerId) == null) - && (!peers.contains(peerId)); + && (!peers.contains(peerId)) + && manager.getSource(peerId) == null; } }); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java index e606257af65..19457e2290f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java @@ -20,6 +20,9 @@ package org.apache.hadoop.hbase.replication.regionserver; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; @@ -50,6 +53,10 @@ public class TestTableBasedReplicationSourceManagerImpl extends TestReplicationS TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class); utility = new HBaseTestingUtility(conf); utility.startMiniCluster(); + Waiter.waitFor(conf, 3 * 1000, + () -> utility.getMiniHBaseCluster().getMaster().isInitialized()); + utility.waitUntilAllRegionsAssigned(TableName.valueOf( + NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication")); setupZkAndReplication(); }