diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index db9c505cbf4..e196588288c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -30,11 +30,12 @@ import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.CopyOnWriteArrayList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -86,6 +87,8 @@ public class ReplicationSourceManager implements ReplicationListener { private final Stoppable stopper; // All logs we are currently tracking private final Map> hlogsById; + // Logs for recovered sources we are currently tracking + private final Map> hlogsByIdRecoveredQueues; private final Configuration conf; private final FileSystem fs; // The path to the latest log we saw, for new coming sources @@ -126,6 +129,7 @@ public class ReplicationSourceManager implements ReplicationListener { this.replicationTracker = replicationTracker; this.stopper = stopper; this.hlogsById = new HashMap>(); + this.hlogsByIdRecoveredQueues = new ConcurrentHashMap>(); this.oldsources = new CopyOnWriteArrayList(); this.conf = conf; this.fs = fs; @@ -177,20 +181,29 @@ public class ReplicationSourceManager implements ReplicationListener { * @param id id of the peer cluster * @param queueRecovered Whether this is a recovered queue */ - public void cleanOldLogs(String key, - String id, - boolean queueRecovered) { - synchronized (this.hlogsById) { - SortedSet hlogs = this.hlogsById.get(id); - if (queueRecovered || hlogs.first().equals(key)) { - return; + public void cleanOldLogs(String key, String id, boolean queueRecovered) { + if (queueRecovered) { + SortedSet hlogs = hlogsByIdRecoveredQueues.get(id); + if (hlogs != null && !hlogs.first().equals(key)) { + cleanOldLogs(hlogs, key, id); } - SortedSet hlogSet = hlogs.headSet(key); - for (String hlog : hlogSet) { - this.replicationQueues.removeLog(id, hlog); + } else { + synchronized (this.hlogsById) { + SortedSet hlogs = hlogsById.get(id); + if (!hlogs.first().equals(key)) { + cleanOldLogs(hlogs, key, id); + } } - hlogSet.clear(); } + } + + private void cleanOldLogs(SortedSet hlogs, String key, String id) { + SortedSet hlogSet = hlogs.headSet(key); + LOG.debug("Removing " + hlogSet.size() + " logs in the list: " + hlogSet); + for (String hlog : hlogSet) { + this.replicationQueues.removeLog(id, hlog); + } + hlogSet.clear(); } /** @@ -285,6 +298,14 @@ public class ReplicationSourceManager implements ReplicationListener { protected Map> getHLogs() { return Collections.unmodifiableMap(hlogsById); } + + /** + * Get a copy of the hlogs of the recovered sources on this rs + * @return a sorted set of hlog names + */ + protected Map> getHlogsByIdRecoveredQueues() { + return Collections.unmodifiableMap(hlogsByIdRecoveredQueues); + } /** * Get a list of all the normal sources of this rs @@ -303,7 +324,6 @@ public class ReplicationSourceManager implements ReplicationListener { } void preLogRoll(Path newLog) throws IOException { - synchronized (this.hlogsById) { String name = newLog.getName(); for (ReplicationSourceInterface source : this.sources) { @@ -416,6 +436,7 @@ public class ReplicationSourceManager implements ReplicationListener { LOG.info("Done with the recovered queue " + src.getPeerClusterZnode()); this.oldsources.remove(src); deleteSource(src.getPeerClusterZnode(), false); + this.hlogsByIdRecoveredQueues.remove(src.getPeerClusterZnode()); } /** @@ -563,10 +584,12 @@ public class ReplicationSourceManager implements ReplicationListener { break; } oldsources.add(src); - for (String hlog : entry.getValue()) { + SortedSet hlogsSet = entry.getValue(); + for (String hlog : hlogsSet) { src.enqueueLog(new Path(oldLogDir, hlog)); } src.startup(); + hlogsByIdRecoveredQueues.put(peerId, hlogsSet); } catch (IOException e) { // TODO manage it LOG.error("Failed creating a source", e); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index 13a18ce9e24..f463f760321 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -80,7 +80,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { @Override public String getPeerClusterId() { - return peerClusterId; + String[] parts = peerClusterId.split("-", 2); + return parts.length != 1 ? + parts[0] : peerClusterId; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 09fa0965350..99ad601df8a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -27,6 +27,8 @@ import java.util.Collection; import java.util.List; import java.util.SortedMap; import java.util.SortedSet; +import java.util.TreeSet; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; @@ -55,10 +57,12 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; @@ -71,6 +75,8 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import com.google.common.collect.Sets; + @Category(MediumTests.class) public class TestReplicationSourceManager { @@ -138,14 +144,14 @@ public class TestReplicationSourceManager { ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES); ZKClusterId.setClusterId(zkw, new ClusterId()); - - replication = new Replication(new DummyServer(), fs, logDir, oldLogDir); - manager = replication.getReplicationManager(); fs = FileSystem.get(conf); oldLogDir = new Path(utility.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME); logDir = new Path(utility.getDataTestDir(), HConstants.HREGION_LOGDIR_NAME); + replication = new Replication(new DummyServer(), fs, logDir, oldLogDir); + manager = replication.getReplicationManager(); + logName = HConstants.HREGION_LOGDIR_NAME; manager.addSource(slaveId); @@ -274,6 +280,40 @@ public class TestReplicationSourceManager { assertEquals(1, populatedMap); server.abort("", null); } + + @Test + public void testCleanupFailoverQueues() throws Exception { + final Server server = new DummyServer("hostname1.example.org"); + ReplicationQueues rq = + ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), + server); + rq.init(server.getServerName().toString()); + // populate some znodes in the peer znode + SortedSet files = new TreeSet(); + files.add("log1"); + files.add("log2"); + for (String file : files) { + rq.addLog("1", file); + } + Server s1 = new DummyServer("dummyserver1.example.org"); + ReplicationQueues rq1 = + ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1); + rq1.init(s1.getServerName().toString()); + ReplicationPeers rp1 = + ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1); + rp1.init(); + NodeFailoverWorker w1 = + manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID( + new Long(1), new Long(2))); + w1.start(); + w1.join(5000); + assertEquals(1, manager.getHlogsByIdRecoveredQueues().size()); + String id = "1-" + server.getServerName().getServerName(); + assertEquals(files, manager.getHlogsByIdRecoveredQueues().get(id)); + manager.cleanOldLogs("log2", id, true); + // log1 should be deleted + assertEquals(Sets.newHashSet("log2"), manager.getHlogsByIdRecoveredQueues().get(id)); + } @Test public void testNodeFailoverDeadServerParsing() throws Exception {