diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 969e8caff38..de3b7f64811 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -61,7 +61,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; -import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException; import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch; @@ -784,13 +783,9 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf } private void updateLogPosition(long lastReadPosition) { - try { - manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode, lastReadPosition, - this.replicationQueueInfo.isQueueRecovered(), false); - lastLoggedPosition = lastReadPosition; - } catch (ReplicationSourceWithoutPeerException re) { - source.terminate("Replication peer is removed and source should terminate", re); - } + manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode, lastReadPosition, + this.replicationQueueInfo.isQueueRecovered(), false); + lastLoggedPosition = lastReadPosition; } public void startup() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index a8e8e76e2a3..b0e32f8a4f5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -189,12 +189,13 @@ public class ReplicationSourceManager implements ReplicationListener { * @param holdLogInZK if true then the log is retained in ZK */ public synchronized void logPositionAndCleanOldLogs(Path log, String id, long position, - boolean queueRecovered, boolean holdLogInZK) throws ReplicationSourceWithoutPeerException { + boolean queueRecovered, boolean holdLogInZK) { String fileName = log.getName(); this.replicationQueues.setLogPosition(id, fileName, position); if (holdLogInZK) { return; } + cleanOldLogs(fileName, id, queueRecovered); } @@ -205,8 +206,7 @@ public class ReplicationSourceManager implements ReplicationListener { * @param id id of the peer cluster * @param queueRecovered Whether this is a recovered queue */ - public void cleanOldLogs(String key, String id, boolean queueRecovered) - throws ReplicationSourceWithoutPeerException { + public void cleanOldLogs(String key, String id, boolean queueRecovered) { String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(key); if (queueRecovered) { Map> walsForPeer = walsByIdRecoveredQueues.get(id); @@ -218,7 +218,7 @@ public class ReplicationSourceManager implements ReplicationListener { } } else { synchronized (this.walsById) { - SortedSet wals = walsById.get(id).get(logPrefix); + SortedSet wals = getLogsWithPrefix(id, logPrefix); if (wals != null && !wals.first().equals(key)) { cleanOldLogs(wals, key, id); } @@ -226,16 +226,36 @@ public class ReplicationSourceManager implements ReplicationListener { } } - private void cleanOldLogs(SortedSet wals, String key, String id) - throws ReplicationSourceWithoutPeerException { + private void cleanOldLogs(SortedSet wals, String key, String id) { SortedSet walSet = wals.headSet(key); LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet); - for (String wal : walSet) { - this.replicationQueues.removeLog(id, wal); + try { + for (String wal : walSet) { + this.replicationQueues.removeLog(id, wal); + } + } catch (ReplicationSourceWithoutPeerException rspe) { + // This means the source is running and replication peer have been removed + // We should call the removePeer workflow to terminate the source gracefully + LOG.warn("Replication peer " + id + " has been removed and source is still running", rspe); + String peerId = id; + if (peerId.contains("-")) { + peerId = peerId.split("-")[0]; + } + peerRemoved(peerId); } walSet.clear(); } + /** + * Get logs with log prefix for the given wal group + * @param walGroupId wal group ID + * @param logPrefix log prefix + * @return logs with the given prefix + */ + public SortedSet getLogsWithPrefix(String walGroupId, String logPrefix) { + return walsById.get(walGroupId).get(logPrefix); + } + /** * Adds a normal source per registered peer cluster and tries to process all * old region server wal queues @@ -579,7 +599,7 @@ public class ReplicationSourceManager implements ReplicationListener { } /** - * Thie method first deletes all the recovered sources for the specified + * This method first deletes all the recovered sources for the specified * id, then deletes the normal source (deleting all related data in ZK). * @param id The id of the peer cluster */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummyWithNoTermination.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummyWithNoTermination.java index 4a899178312..1fba87f12dd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummyWithNoTermination.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummyWithNoTermination.java @@ -16,11 +16,14 @@ package org.apache.hadoop.hbase.replication; public class ReplicationSourceDummyWithNoTermination extends ReplicationSourceDummy { - + volatile boolean firstTime = true; @Override public void terminate(String reason) { // This is to block the zk listener to close the queues // to simulate the znodes getting deleted without zk listener getting invoked - throw new RuntimeException(fakeExceptionMessage); + if (firstTime) { + firstTime = false; + throw new RuntimeException(fakeExceptionMessage); + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java index b0a2a8c62cf..e7ff58f0361 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java @@ -28,20 +28,22 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.internal.verification.VerificationModeFactory.times; - import com.google.common.collect.Lists; - +import com.google.common.collect.Sets; import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.SortedSet; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ExecutorService; @@ -62,6 +64,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; @@ -247,7 +250,6 @@ public class TestReplicationSource { public boolean evaluate() throws Exception { return future.isDone(); } - }); } @@ -277,7 +279,7 @@ public class TestReplicationSource { } private static final class Mocks { - private final ReplicationSourceManager manager = mock(ReplicationSourceManager.class); + private ReplicationSourceManager manager = mock(ReplicationSourceManager.class); private final ReplicationQueues queues = mock(ReplicationQueues.class); private final ReplicationPeers peers = mock(ReplicationPeers.class); private final MetricsSource metrics = mock(MetricsSource.class); @@ -291,12 +293,32 @@ public class TestReplicationSource { when(manager.getTotalBufferUsed()).thenReturn(totalBufferUsed); } - // source manager throws the exception while cleaning logs - private void setReplicationSourceWithoutPeerException() - throws ReplicationSourceWithoutPeerException { - doThrow(new ReplicationSourceWithoutPeerException("No peer")).when(manager) + ReplicationSource createReplicationSourceAndManagerWithMocks(ReplicationEndpoint endpoint) + throws Exception { + ReplicationTracker tracker = mock(ReplicationTracker.class); + Server server = mock(Server.class); + FileSystem fs = mock(FileSystem.class); + UUID clusterId = UUID.randomUUID(); + String peerId = "testPeerClusterZnode"; + + manager = Mockito.spy(new ReplicationSourceManager( + queues, peers, tracker, conf, server, fs, logDir, oldLogDir, clusterId)); + + doCallRealMethod().when(manager).removePeer(Mockito.anyString()); + // Mock the failure during cleaning log with node already deleted + doThrow(new ReplicationSourceWithoutPeerException("Peer Removed")).when(queues) + .removeLog(anyString(), anyString()); + doCallRealMethod().when(manager) .logPositionAndCleanOldLogs(Mockito.anyObject(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyBoolean(), Mockito.anyBoolean()); + final ReplicationSource source = new ReplicationSource(); + endpoint.init(context); + source.init(conf, FS, manager, queues, peers, mock(Stoppable.class), + peerId, clusterId, endpoint, metrics); + manager.getSources().add(source); + SortedSet walsWithPrefix = Sets.newTreeSet(Collections.singletonList("fake")); + doReturn(walsWithPrefix).when(manager).getLogsWithPrefix(anyString(), anyString()); + return source; } ReplicationSource createReplicationSourceWithMocks(ReplicationEndpoint endpoint, @@ -522,8 +544,7 @@ public class TestReplicationSource { */ @Test public void testReplicationSourceTerminationWhenNoZnodeForPeerAndQueues() throws Exception { - Mocks mocks = new Mocks(); - mocks.setReplicationSourceWithoutPeerException(); + final Mocks mocks = new Mocks(); // set table cfs to filter all cells out final TableName replicatedTable = TableName.valueOf("replicated_table"); final Map> cfs = @@ -543,7 +564,7 @@ public class TestReplicationSource { } }; - final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, false); + final ReplicationSource source = mocks.createReplicationSourceAndManagerWithMocks(endpoint); source.run(); source.enqueueLog(log1); @@ -561,10 +582,9 @@ public class TestReplicationSource { } }); - // After that the source should be terminated + // And the source should be terminated Waiter.waitFor(conf, 20000, new Waiter.Predicate() { @Override public boolean evaluate() { - // wait until reader read all cells return !source.isSourceActive(); } }); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerBase.java similarity index 95% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceBase.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerBase.java index ab4d19d9985..ec2facdda53 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerBase.java @@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.replication.ReplicationSourceDummyWithNoTermination; +import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer; import org.apache.hadoop.hbase.util.Bytes; @@ -51,10 +51,10 @@ import org.junit.BeforeClass; import org.junit.Rule; import org.junit.rules.TestName; -public abstract class TestReplicationSourceBase { +public abstract class TestReplicationSourceManagerBase { private static final Log LOG = - LogFactory.getLog(TestReplicationSourceBase.class); + LogFactory.getLog(TestReplicationSourceManagerBase.class); protected static Configuration conf; protected static HBaseTestingUtility utility; @@ -75,10 +75,12 @@ public abstract class TestReplicationSourceBase { protected static Path logDir; protected static DummyServer server; - @BeforeClass public static void setUpBeforeClass() throws Exception { + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf = HBaseConfiguration.create(); conf.set("replication.replicationsource.implementation", - ReplicationSourceDummyWithNoTermination.class.getCanonicalName()); + ReplicationSourceDummy.class.getCanonicalName()); conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT); conf.setLong("replication.sleep.before.failover", 2000); conf.setInt("replication.source.maxretriesmultiplier", 10); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerManager.java similarity index 88% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerManager.java index f0c18d30614..50c96cfe53f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerManager.java @@ -25,7 +25,6 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import com.google.common.collect.Sets; - import java.io.IOException; import java.lang.reflect.Field; import java.net.URLEncoder; @@ -47,17 +46,13 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.ClusterId; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Stoppable; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; @@ -72,91 +67,25 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; -import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker; import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.hbase.zookeeper.ZKClusterId; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @Category(MediumTests.class) -public class TestReplicationSourceManager extends TestReplicationSourceBase { - +public class TestReplicationSourceManagerManager extends TestReplicationSourceManagerBase { private static final Log LOG = - LogFactory.getLog(TestReplicationSourceManager.class); - private static final TableName test = - TableName.valueOf("test"); - private static final String slaveId = "1"; - private static CountDownLatch latch; + LogFactory.getLog(TestReplicationSourceManagerManager.class); private static List files = new ArrayList<>(); - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - - conf = HBaseConfiguration.create(); - conf.set("replication.replicationsource.implementation", - ReplicationSourceDummy.class.getCanonicalName()); - conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, - HConstants.REPLICATION_ENABLE_DEFAULT); - conf.setLong("replication.sleep.before.failover", 2000); - conf.setInt("replication.source.maxretriesmultiplier", 10); - utility = new HBaseTestingUtility(conf); - utility.startMiniZKCluster(); - - zkw = new ZooKeeperWatcher(conf, "test", null); - ZKUtil.createWithParents(zkw, "/hbase/replication"); - ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1"); - ZKUtil.setData(zkw, "/hbase/replication/peers/1", - Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" - + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1")); - ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state"); - ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state", - ReplicationStateZKBase.ENABLED_ZNODE_BYTES); - ZKUtil.createWithParents(zkw, "/hbase/replication/state"); - ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES); - - ZKClusterId.setClusterId(zkw, new ClusterId()); - FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir()); - fs = FileSystem.get(conf); - oldLogDir = new Path(utility.getDataTestDir(), - HConstants.HREGION_OLDLOGDIR_NAME); - logDir = new Path(utility.getDataTestDir(), - HConstants.HREGION_LOGDIR_NAME); - server = new DummyServer(conf, "example.hostname.com", zkw); - replication = new Replication(server, fs, logDir, oldLogDir); - manager = replication.getReplicationManager(); - - manager.addSource(slaveId); - - htd = new HTableDescriptor(test); - HColumnDescriptor col = new HColumnDescriptor(f1); - col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); - htd.addFamily(col); - col = new HColumnDescriptor(f2); - col.setScope(HConstants.REPLICATION_SCOPE_LOCAL); - htd.addFamily(col); - - hri = new HRegionInfo(htd.getTableName(), r1, r2); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - manager.join(); - utility.shutdownMiniCluster(); - } + private static CountDownLatch latch; @Test public void testLogRoll() throws Exception { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java index 095710d08fe..c8235480ef2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java @@ -21,25 +21,30 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.net.URLEncoder; import java.util.ArrayList; import java.util.List; - import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.replication.ReplicationException; -import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException; +import org.apache.hadoop.hbase.replication.ReplicationSourceDummyWithNoTermination; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.zookeeper.ZKUtil; - import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; @Category(MediumTests.class) -public class TestReplicationSourceWithoutReplicationZnodes extends TestReplicationSourceBase { +public class TestReplicationSourceWithoutReplicationZnodes + extends TestReplicationSourceManagerBase { + + @Before + public void removeExistingSourcesFromSourceManager() { + manager.getSources().clear(); + manager.getOldSources().clear(); + } /** * When the peer is removed, hbase remove the peer znodes and there is zk watcher @@ -47,39 +52,47 @@ public class TestReplicationSourceWithoutReplicationZnodes extends TestReplicati * or a race condition between source deleting the log znode and zk watcher * terminating the source, we might get the NoNode exception. In that case, the right * thing is to terminate the replication source. + * * @throws Exception throws exception */ @Test public void testReplicationSourceRunningWithoutPeerZnodes() throws Exception { + String replicationSourceImplName = conf.get("replication.replicationsource.implementation"); MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); KeyValue kv = new KeyValue(r1, f1, r1); WALEdit edit = new WALEdit(); edit.add(kv); - - List listeners = new ArrayList<>(); - listeners.add(replication); - final WALFactory wals = new WALFactory(utility.getConfiguration(), listeners, - URLEncoder.encode("regionserver:60020", "UTF8")); - final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace()); - manager.init(); - - final long txid = wal.append(htd, hri, - new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc), - edit, true); - wal.sync(txid); - - wal.rollWriter(); - ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/peers/1"); - ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/rs/"+ server.getServerName() + "/1"); - - ReplicationException exceptionThrown = null; try { - manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), - "1", 0, false, false); - } catch (ReplicationException e) { - exceptionThrown = e; - } + conf.set("replication.replicationsource.implementation", + ReplicationSourceDummyWithNoTermination.class.getCanonicalName()); + List listeners = new ArrayList<>(); + listeners.add(replication); + final WALFactory wals = new WALFactory(utility.getConfiguration(), listeners, + URLEncoder.encode("regionserver:60020", "UTF8")); + final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace()); + manager.init(); - Assert.assertTrue(exceptionThrown instanceof ReplicationSourceWithoutPeerException); + final long txid = wal.append(htd, hri, + new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc), edit, + true); + wal.sync(txid); + + wal.rollWriter(); + ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/peers/1"); + ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/rs/" + server.getServerName() + "/1"); + + Assert.assertEquals("There should be exactly one source", + 1, manager.getSources().size()); + Assert.assertEquals("Replication source is not correct", + ReplicationSourceDummyWithNoTermination.class, + manager.getSources().get(0).getClass()); + manager + .logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), "1", 0, false, + false); + Assert.assertTrue("Replication source should be terminated and removed", + manager.getSources().isEmpty()); + } finally { + conf.set("replication.replicationsource.implementation", replicationSourceImplName); + } } }