HBASE-19342 fix TestTableBasedReplicationSourceManagerImpl#testRemovePeerMetricsCleanup

This commit is contained in:
Chia-Ping Tsai 2017-11-28 18:06:38 +08:00
parent ed16667208
commit 8b6f305ac7
4 changed files with 115 additions and 27 deletions

View File

@ -40,7 +40,6 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -51,7 +50,6 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@ -68,6 +66,7 @@ import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -305,6 +304,13 @@ public class ReplicationSourceManager implements ReplicationListener {
return src;
}
@VisibleForTesting
int getSizeOfLatestPath() {
synchronized (latestPaths) {
return latestPaths.size();
}
}
/**
* Delete a complete queue of wals associated with a peer cluster
* @param peerId Id of the peer cluster queue of wals to delete

View File

@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -45,7 +45,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
Path currentPath;
MetricsSource metrics;
WALFileLengthProvider walFileLengthProvider;
AtomicBoolean startup = new AtomicBoolean(false);
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueues rq, ReplicationPeers rp, Server server, String peerClusterId,
@ -70,7 +70,11 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
@Override
public void startup() {
startup.set(true);
}
public boolean isStartup() {
return startup.get();
}
@Override

View File

@ -39,7 +39,7 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -61,9 +61,9 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@ -77,8 +77,10 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
@ -117,6 +119,8 @@ public abstract class TestReplicationSourceManager {
protected static ReplicationSourceManager manager;
protected static ReplicationSourceManager managerOfCluster;
protected static ZKWatcher zkw;
protected static HTableDescriptor htd;
@ -170,9 +174,14 @@ public abstract class TestReplicationSourceManager {
logDir = new Path(utility.getDataTestDir(),
HConstants.HREGION_LOGDIR_NAME);
replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
manager = replication.getReplicationManager();
managerOfCluster = getManagerFromCluster();
manager = replication.getReplicationManager();
manager.addSource(slaveId);
if (managerOfCluster != null) {
waitPeer(slaveId, managerOfCluster, true);
}
waitPeer(slaveId, manager, true);
htd = new HTableDescriptor(test);
HColumnDescriptor col = new HColumnDescriptor(f1);
@ -189,9 +198,25 @@ public abstract class TestReplicationSourceManager {
hri = new HRegionInfo(htd.getTableName(), r1, r2);
}
private static ReplicationSourceManager getManagerFromCluster() {
// TestReplicationSourceManagerZkImpl won't start the mini hbase cluster.
if (utility.getMiniHBaseCluster() == null) {
return null;
}
return utility.getMiniHBaseCluster().getRegionServerThreads()
.stream().map(JVMClusterUtil.RegionServerThread::getRegionServer)
.findAny()
.map(HRegionServer::getReplicationSourceService)
.map(r -> (Replication)r)
.map(Replication::getReplicationManager)
.get();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
manager.join();
if (manager != null) {
manager.join();
}
utility.shutdownMiniCluster();
}
@ -213,6 +238,14 @@ public abstract class TestReplicationSourceManager {
public void tearDown() throws Exception {
LOG.info("End " + testName.getMethodName());
cleanLogDir();
List<String> ids = manager.getSources().stream()
.map(ReplicationSourceInterface::getPeerId).collect(Collectors.toList());
for (String id : ids) {
if (slaveId.equals(id)) {
continue;
}
removePeerAndWait(id);
}
}
@Test
@ -471,28 +504,50 @@ public abstract class TestReplicationSourceManager {
}
}
private static MetricsReplicationSourceSource getGlobalSource() throws Exception {
ReplicationSourceInterface source = manager.getSource(slaveId);
// Retrieve the global replication metrics source
Field f = MetricsSource.class.getDeclaredField("globalSourceSource");
f.setAccessible(true);
return (MetricsReplicationSourceSource)f.get(source.getSourceMetrics());
}
private static long getSizeOfLatestPath() {
// If no mini cluster is running, there are extra replication manager influencing the metrics.
if (utility.getMiniHBaseCluster() == null) {
return 0;
}
return utility.getMiniHBaseCluster().getRegionServerThreads()
.stream().map(JVMClusterUtil.RegionServerThread::getRegionServer)
.map(HRegionServer::getReplicationSourceService)
.map(r -> (Replication)r)
.map(Replication::getReplicationManager)
.mapToLong(ReplicationSourceManager::getSizeOfLatestPath)
.sum();
}
@Test
public void testRemovePeerMetricsCleanup() throws Exception {
final String peerId = "DummyPeer";
final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig()
.setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase");
try {
MetricsReplicationSourceSource globalSource = getGlobalSource();
final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
final long sizeOfLatestPath = getSizeOfLatestPath();
addPeerAndWait(peerId, peerConfig, true);
assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial,
globalSource.getSizeOfLogQueue());
ReplicationSourceInterface source = manager.getSource(peerId);
// Sanity check
assertNotNull(source);
// Retrieve the global replication metrics source
Field f = MetricsSource.class.getDeclaredField("globalSourceSource");
f.setAccessible(true);
MetricsReplicationSourceSource globalSource =
(MetricsReplicationSourceSource)f.get(source.getSourceMetrics());
int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
// Enqueue log and check if metrics updated
source.enqueueLog(new Path("abc"));
assertEquals(1, source.getSourceMetrics().getSizeOfLogQueue());
assertEquals(1 + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
assertEquals(1 + sizeOfSingleLogQueue,
source.getSourceMetrics().getSizeOfLogQueue());
assertEquals(source.getSourceMetrics().getSizeOfLogQueue()
+ globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
// Removing the peer should reset the global metrics
removePeerAndWait(peerId);
@ -502,8 +557,9 @@ public abstract class TestReplicationSourceManager {
addPeerAndWait(peerId, peerConfig, true);
source = manager.getSource(peerId);
assertNotNull(source);
assertEquals(0, source.getSourceMetrics().getSizeOfLogQueue());
assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
assertEquals(sizeOfLatestPath, source.getSourceMetrics().getSizeOfLogQueue());
assertEquals(source.getSourceMetrics().getSizeOfLogQueue()
+ globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
} finally {
removePeerAndWait(peerId);
}
@ -520,13 +576,27 @@ public abstract class TestReplicationSourceManager {
final boolean waitForSource) throws Exception {
final ReplicationPeers rp = manager.getReplicationPeers();
rp.registerPeer(peerId, peerConfig);
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
if (waitForSource) {
return (manager.getSource(peerId) != null);
} else {
return (rp.getConnectedPeer(peerId) != null);
waitPeer(peerId, manager, waitForSource);
if (managerOfCluster != null) {
waitPeer(peerId, managerOfCluster, waitForSource);
}
}
private static void waitPeer(final String peerId,
ReplicationSourceManager manager, final boolean waitForSource) {
ReplicationPeers rp = manager.getReplicationPeers();
Waiter.waitFor(conf, 20000, () -> {
if (waitForSource) {
ReplicationSourceInterface rs = manager.getSource(peerId);
if (rs == null) {
return false;
}
if (rs instanceof ReplicationSourceDummy) {
return ((ReplicationSourceDummy)rs).isStartup();
}
return true;
} else {
return (rp.getConnectedPeer(peerId) != null);
}
});
}
@ -545,7 +615,8 @@ public abstract class TestReplicationSourceManager {
@Override public boolean evaluate() throws Exception {
List<String> peers = rp.getAllPeerIds();
return (!manager.getAllQueues().contains(peerId)) && (rp.getConnectedPeer(peerId) == null)
&& (!peers.contains(peerId));
&& (!peers.contains(peerId))
&& manager.getSource(peerId) == null;
}
});
}

View File

@ -20,6 +20,9 @@ package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
@ -50,6 +53,10 @@ public class TestTableBasedReplicationSourceManagerImpl extends TestReplicationS
TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class);
utility = new HBaseTestingUtility(conf);
utility.startMiniCluster();
Waiter.waitFor(conf, 3 * 1000,
() -> utility.getMiniHBaseCluster().getMaster().isInitialized());
utility.waitUntilAllRegionsAssigned(TableName.valueOf(
NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication"));
setupZkAndReplication();
}