diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java index fed1791ffec..5b3e541b696 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java @@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.replication; import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; - +import org.apache.zookeeper.KeeperException; /** * This provides an interface for clients of replication to view replication queues. These queues @@ -39,21 +39,30 @@ public interface ReplicationQueuesClient { * Get a list of all region servers that have outstanding replication queues. These servers could * be alive, dead or from a previous run of the cluster. * @return a list of server names + * @throws KeeperException zookeeper exception */ - List getListOfReplicators(); + List getListOfReplicators() throws KeeperException; /** * Get a list of all WALs in the given queue on the given region server. * @param serverName the server name of the region server that owns the queue * @param queueId a String that identifies the queue * @return a list of WALs, null if this region server is dead and has no outstanding queues + * @throws KeeperException zookeeper exception */ - List getLogsInQueue(String serverName, String queueId); + List getLogsInQueue(String serverName, String queueId) throws KeeperException; /** * Get a list of all queues for the specified region server. * @param serverName the server name of the region server that owns the set of queues * @return a list of queueIds, null if this region server is not a replicator. */ - List getAllQueues(String serverName); + List getAllQueues(String serverName) throws KeeperException; + + /** + * Get the cversion of replication rs node. This can be used as optimistic locking to get a + * consistent snapshot of the replication queues. + * @return cversion of replication rs node + */ + int getQueuesZNodeCversion() throws KeeperException; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java index 43262a0d3a7..e1a6a495917 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; @InterfaceAudience.Private public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implements @@ -46,7 +47,7 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem } @Override - public List getLogsInQueue(String serverName, String queueId) { + public List getLogsInQueue(String serverName, String queueId) throws KeeperException { String znode = ZKUtil.joinZNode(this.queuesZNode, serverName); znode = ZKUtil.joinZNode(znode, queueId); List result = null; @@ -55,20 +56,32 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem } catch (KeeperException e) { this.abortable.abort("Failed to get list of wals for queueId=" + queueId + " and serverName=" + serverName, e); + throw e; } return result; } @Override - public List getAllQueues(String serverName) { + public List getAllQueues(String serverName) throws KeeperException { String znode = ZKUtil.joinZNode(this.queuesZNode, serverName); List result = null; try { result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); } catch (KeeperException e) { this.abortable.abort("Failed to get list of queues for serverName=" + serverName, e); + throw e; } return result; } + @Override public int getQueuesZNodeCversion() throws KeeperException { + try { + Stat stat = new Stat(); + ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat); + return stat.getCversion(); + } catch (KeeperException e) { + this.abortable.abort("Failed to get stat of replication rs node", e); + throw e; + } + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java index 0535b4b22b4..97763e23585 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java @@ -334,7 +334,9 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R // add delete op for peer listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); } - // add delete op for dead rs + // add delete op for dead rs, this will update the cversion of the parent. + // The reader will make optimistic locking with this to get a consistent + // snapshot listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath)); if (LOG.isTraceEnabled()) LOG.trace(" The multi list size is: " + listOfOps.size()); ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java index 525b7ad9fe6..474f4975043 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Set; @@ -39,6 +40,7 @@ import com.google.common.base.Predicate; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; +import org.apache.zookeeper.KeeperException; /** * Implementation of a log cleaner that checks if a log is still scheduled for @@ -61,7 +63,15 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo return files; } - final Set wals = loadWALsFromQueues(); + final Set wals; + try { + // The concurrently created new WALs may not be included in the return list, + // but they won't be deleted because they're not in the checking set. + wals = loadWALsFromQueues(); + } catch (KeeperException e) { + LOG.warn("Failed to read zookeeper, skipping checking deletable files"); + return Collections.emptyList(); + } return Iterables.filter(files, new Predicate() { @Override public boolean apply(FileStatus file) { @@ -79,29 +89,40 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo } /** - * Load all wals in all replication queues from ZK + * Load all wals in all replication queues from ZK. This method guarantees to return a + * snapshot which contains all WALs in the zookeeper at the start of this call even there + * is concurrent queue failover. However, some newly created WALs during the call may + * not be included. */ - private Set loadWALsFromQueues() { - List rss = replicationQueues.getListOfReplicators(); - if (rss == null) { - LOG.debug("Didn't find any region server that replicates, won't prevent any deletions."); - return ImmutableSet.of(); - } - Set wals = Sets.newHashSet(); - for (String rs: rss) { - List listOfPeers = replicationQueues.getAllQueues(rs); - // if rs just died, this will be null - if (listOfPeers == null) { - continue; + private Set loadWALsFromQueues() throws KeeperException { + int v0 = replicationQueues.getQueuesZNodeCversion(); + for (int retry = 0; ; retry++) { + List rss = replicationQueues.getListOfReplicators(); + if (rss == null) { + LOG.debug("Didn't find any region server that replicates, won't prevent any deletions."); + return ImmutableSet.of(); } - for (String id : listOfPeers) { - List peersWals = replicationQueues.getLogsInQueue(rs, id); - if (peersWals != null) { - wals.addAll(peersWals); + Set wals = Sets.newHashSet(); + for (String rs : rss) { + List listOfPeers = replicationQueues.getAllQueues(rs); + // if rs just died, this will be null + if (listOfPeers == null) { + continue; + } + for (String id : listOfPeers) { + List peersWals = replicationQueues.getLogsInQueue(rs, id); + if (peersWals != null) { + wals.addAll(peersWals); + } } } + int v1 = replicationQueues.getQueuesZNodeCversion(); + if (v0 == v1) { + return wals; + } + LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d", + v0, v1, retry)); } - return wals; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index e560620fbde..f05ecebb21a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -28,6 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.zookeeper.KeeperException; import org.junit.Before; import org.junit.Test; @@ -66,7 +67,7 @@ public abstract class TestReplicationStateBasic { } @Test - public void testReplicationQueuesClient() throws ReplicationException { + public void testReplicationQueuesClient() throws ReplicationException, KeeperException { rqc.init(); // Test methods with empty state assertEquals(0, rqc.getListOfReplicators().size()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 9315f62dd47..571be2654a8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker; @@ -368,6 +369,38 @@ public class TestReplicationSourceManager { server.abort("", null); } + @Test + public void testFailoverDeadServerCversionChange() throws Exception { + LOG.debug("testFailoverDeadServerCversionChange"); + + conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); + final Server s0 = new DummyServer("cversion-change0.example.org"); + ReplicationQueues repQueues = + ReplicationFactory.getReplicationQueues(s0.getZooKeeper(), conf, s0); + repQueues.init(s0.getServerName().toString()); + // populate some znodes in the peer znode + files.add("log1"); + files.add("log2"); + for (String file : files) { + repQueues.addLog("1", file); + } + // simulate queue transfer + Server s1 = new DummyServer("cversion-change1.example.org"); + ReplicationQueues rq1 = + ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1); + rq1.init(s1.getServerName().toString()); + + ReplicationQueuesClient client = + ReplicationFactory.getReplicationQueuesClient(s1.getZooKeeper(), s1.getConfiguration(), s1); + + int v0 = client.getQueuesZNodeCversion(); + rq1.claimQueues(s0.getServerName().getServerName()); + int v1 = client.getQueuesZNodeCversion(); + // cversion should increased by 1 since a child node is deleted + assertEquals(v0 + 1, v1); + + s0.abort("", null); + } static class DummyNodeFailoverWorker extends Thread { private SortedMap> logZnodesMap;