diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java index a3635e4da9b..baea74f09ba 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java @@ -316,9 +316,11 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R for (String peerId : peerIdsToProcess) { ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); if (!peerExists(replicationQueueInfo.getPeerId())) { - LOG.warn("Peer " + peerId + " didn't exist, skipping the replay"); - // Protection against moving orphaned queues - continue; + // the orphaned queues must be moved, otherwise the delete op of dead rs will fail, + // this will cause the whole multi op fail. + // NodeFailoverWorker will skip the orphaned queues. + LOG.warn("Peer " + peerId + + " didn't exist, will move its queue to avoid the failure of multi op"); } String newPeerId = peerId + "-" + znode; String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 07ee46a5cde..143d6e28fef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication.regionserver; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; @@ -344,6 +345,11 @@ public class ReplicationSourceManager implements ReplicationListener { return this.oldsources; } + @VisibleForTesting + List getAllQueues() { + return replicationQueues.getAllQueues(); + } + void preLogRoll(Path newLog) throws IOException { recordLog(newLog); String logName = newLog.getName(); @@ -371,8 +377,8 @@ public class ReplicationSourceManager implements ReplicationListener { String logName = logPath.getName(); String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName); // update replication queues on ZK - synchronized (replicationPeers) {// synchronize on replicationPeers to avoid adding source for - // the to-be-removed peer + // synchronize on replicationPeers to avoid adding source for the to-be-removed peer + synchronized (replicationPeers) { for (String id : replicationPeers.getConnectedPeerIds()) { try { this.replicationQueues.addLog(id, logName); @@ -528,24 +534,28 @@ public class ReplicationSourceManager implements ReplicationListener { + sources.size() + " and another " + oldsources.size() + " that were recovered"); String terminateMessage = "Replication stream was removed by a user"; - List srcToRemove = new ArrayList(); List oldSourcesToDelete = new ArrayList(); - // First close all the recovered sources for this peer - for (ReplicationSourceInterface src : oldsources) { - if (id.equals(src.getPeerClusterId())) { - oldSourcesToDelete.add(src); + // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer + // see NodeFailoverWorker.run + synchronized (oldsources) { + // First close all the recovered sources for this peer + for (ReplicationSourceInterface src : oldsources) { + if (id.equals(src.getPeerClusterId())) { + oldSourcesToDelete.add(src); + } + } + for (ReplicationSourceInterface src : oldSourcesToDelete) { + src.terminate(terminateMessage); + closeRecoveredQueue(src); } - } - for (ReplicationSourceInterface src : oldSourcesToDelete) { - src.terminate(terminateMessage); - closeRecoveredQueue((src)); } LOG.info("Number of deleted recovered sources for " + id + ": " + oldSourcesToDelete.size()); // Now look for the one on this cluster - synchronized (this.replicationPeers) {// synchronize on replicationPeers to avoid adding source - // for the to-be-removed peer + List srcToRemove = new ArrayList(); + // synchronize on replicationPeers to avoid adding source for the to-be-removed peer + synchronized (this.replicationPeers) { for (ReplicationSourceInterface src : this.sources) { if (id.equals(src.getPeerClusterId())) { srcToRemove.add(src); @@ -603,9 +613,12 @@ public class ReplicationSourceManager implements ReplicationListener { private final UUID clusterId; /** - * * @param rsZnode */ + public NodeFailoverWorker(String rsZnode) { + this(rsZnode, replicationQueues, replicationPeers, ReplicationSourceManager.this.clusterId); + } + public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final UUID clusterId) { super("Failover-for-"+rsZnode); @@ -661,6 +674,7 @@ public class ReplicationSourceManager implements ReplicationListener { } if (peer == null || peerConfig == null) { LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode); + replicationQueues.removeQueue(peerId); continue; } // track sources in walsByIdRecoveredQueues @@ -680,15 +694,20 @@ public class ReplicationSourceManager implements ReplicationListener { ReplicationSourceInterface src = getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp, server, peerId, this.clusterId, peerConfig, peer); - if (!this.rp.getConnectedPeerIds().contains((src.getPeerClusterId()))) { - src.terminate("Recovered queue doesn't belong to any current peer"); - break; + // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer + // see removePeer + synchronized (oldsources) { + if (!this.rp.getConnectedPeerIds().contains(src.getPeerClusterId())) { + src.terminate("Recovered queue doesn't belong to any current peer"); + closeRecoveredQueue(src); + continue; + } + oldsources.add(src); + for (String wal : walsSet) { + src.enqueueLog(new Path(oldLogDir, wal)); + } + src.startup(); } - oldsources.add(src); - for (String wal : walsSet) { - src.enqueueLog(new Path(oldLogDir, wal)); - } - src.startup(); } catch (IOException e) { // TODO manage it LOG.error("Failed creating a source", e); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index f60982efe0d..4442bbbfb3e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -84,8 +84,10 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; import com.google.common.collect.Sets; @@ -187,15 +189,24 @@ public abstract class TestReplicationSourceManager { utility.shutdownMiniCluster(); } - @Before - public void setUp() throws Exception { + @Rule + public TestName testName = new TestName(); + + private void cleanLogDir() throws IOException { fs.delete(logDir, true); fs.delete(oldLogDir, true); } + @Before + public void setUp() throws Exception { + LOG.info("Start " + testName.getMethodName()); + cleanLogDir(); + } + @After public void tearDown() throws Exception { - setUp(); + LOG.info("End " + testName.getMethodName()); + cleanLogDir(); } @Test @@ -274,7 +285,6 @@ public abstract class TestReplicationSourceManager { @Test public void testClaimQueues() throws Exception { - LOG.debug("testNodeFailoverWorkerCopyQueuesFromRSUsingMulti"); conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); final Server server = new DummyServer("hostname0.example.org"); @@ -354,6 +364,27 @@ public abstract class TestReplicationSourceManager { assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group)); } + @Test + public void testCleanupUnknownPeerZNode() throws Exception { + final Server server = new DummyServer("hostname2.example.org"); + ReplicationQueues rq = ReplicationFactory.getReplicationQueues( + new ReplicationQueuesArguments(server.getConfiguration(), server, server.getZooKeeper())); + rq.init(server.getServerName().toString()); + // populate some znodes in the peer znode + // add log to an unknown peer + String group = "testgroup"; + rq.addLog("2", group + ".log1"); + rq.addLog("2", group + ".log2"); + + NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName().getServerName()); + w1.run(); + + // The log of the unknown peer should be removed from zk + for (String peer : manager.getAllQueues()) { + assertTrue(peer.startsWith("1")); + } + } + @Test public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception { NavigableMap scope = new TreeMap(Bytes.BYTES_COMPARATOR); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java index f47e986f83b..75ed8357821 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java @@ -64,7 +64,6 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan // Tests the naming convention of adopted queues for ReplicationQueuesZkImpl @Test public void testNodeFailoverDeadServerParsing() throws Exception { - LOG.debug("testNodeFailoverDeadServerParsing"); conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com"); ReplicationQueues repQueues = @@ -114,8 +113,6 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan @Test public void testFailoverDeadServerCversionChange() throws Exception { - LOG.debug("testFailoverDeadServerCversionChange"); - conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); final Server s0 = new DummyServer("cversion-change0.example.org"); ReplicationQueues repQueues = diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java index 20f5cc8cde4..e606257af65 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; @@ -28,7 +27,6 @@ import org.apache.hadoop.hbase.replication.TableBasedReplicationQueuesClientImpl import org.apache.hadoop.hbase.replication.TableBasedReplicationQueuesImpl; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; - import org.junit.BeforeClass; import org.junit.experimental.categories.Category;