diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java index e2b28a5dd44..f8ed492db63 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java @@ -810,7 +810,10 @@ public class ReplicationZookeeper implements Closeable { // check the logs queue for the old peer cluster String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId); List hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode); - if (hlogs == null || hlogs.size() == 0) continue; // empty log queue. + if (hlogs == null || hlogs.size() == 0) { + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); + continue; // empty log queue. + } // create the new cluster znode SortedSet logQueue = new TreeSet(); queues.put(newPeerId, logQueue); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 7ceb637babd..5b319060408 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -25,6 +25,7 @@ import java.net.ConnectException; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.List; @@ -124,7 +125,7 @@ public class ReplicationSource extends Thread // Indicates if this queue is recovered (and will be deleted when depleted) private boolean queueRecovered; // List of all the dead region servers that had this queue (if recovered) - private String[] deadRegionServers; + private List deadRegionServers = new ArrayList(); // Maximum number of retries before taking bold actions private int maxRetriesMultiplier; // Socket timeouts require even bolder actions since we don't want to DDOS @@ -201,19 +202,83 @@ public class ReplicationSource extends Thread // The passed znode will be either the id of the peer cluster or // the handling story of that queue in the form of id-servername-* - private void checkIfQueueRecovered(String peerClusterZnode) { - String[] parts = peerClusterZnode.split("-"); + // + // package access for testing + void checkIfQueueRecovered(String peerClusterZnode) { + String[] parts = peerClusterZnode.split("-", 2); this.queueRecovered = parts.length != 1; this.peerId = this.queueRecovered ? parts[0] : peerClusterZnode; this.peerClusterZnode = peerClusterZnode; - this.deadRegionServers = new String[parts.length-1]; - // Extract all the places where we could find the hlogs - for (int i = 1; i < parts.length; i++) { - this.deadRegionServers[i-1] = parts[i]; + + if (parts.length < 2) { + // not queue recovered situation + return; } + + // extract dead servers + extracDeadServersFromZNodeString(parts[1], this.deadRegionServers); + } + + /** + * for tests only + */ + List getDeadRegionServers() { + return Collections.unmodifiableList(this.deadRegionServers); } + /** + * Parse dead server names from znode string servername can contain "-" such as + * "ip-10-46-221-101.ec2.internal", so we need skip some "-" during parsing for the following + * cases: 2-ip-10-46-221-101.ec2.internal,52170,1364333181125--... + */ + private static void + extracDeadServersFromZNodeString(String deadServerListStr, List result) { + + if(deadServerListStr == null || result == null || deadServerListStr.isEmpty()) return; + + // valid server name delimiter "-" has to be after "," in a server name + int seenCommaCnt = 0; + int startIndex = 0; + int len = deadServerListStr.length(); + + for (int i = 0; i < len; i++) { + switch (deadServerListStr.charAt(i)) { + case ',': + seenCommaCnt += 1; + break; + case '-': + if(seenCommaCnt>=2) { + if (i > startIndex) { + String serverName = deadServerListStr.substring(startIndex, i); + if(ServerName.isFullServerName(serverName)){ + result.add(serverName); + } else { + LOG.error("Found invalid server name:" + serverName); + } + startIndex = i + 1; + } + seenCommaCnt = 0; + } + break; + default: + break; + } + } + + // add tail + if(startIndex < len - 1){ + String serverName = deadServerListStr.substring(startIndex, len); + if(ServerName.isFullServerName(serverName)){ + result.add(serverName); + } else { + LOG.error("Found invalid server name at the end:" + serverName); + } + } + + LOG.debug("Found dead servers:" + result); + } + /** * Select a number of peers at random using the ratio. Mininum 1. */ @@ -509,11 +574,10 @@ public class ReplicationSource extends Thread // We didn't find the log in the archive directory, look if it still // exists in the dead RS folder (there could be a chain of failures // to look at) - LOG.info("NB dead servers : " + deadRegionServers.length); - for (int i = this.deadRegionServers.length - 1; i >= 0; i--) { - + LOG.info("NB dead servers : " + deadRegionServers.size()); + for (String curDeadServerName : deadRegionServers) { Path deadRsDirectory = - new Path(manager.getLogDir().getParent(), this.deadRegionServers[i]); + new Path(manager.getLogDir().getParent(), curDeadServerName); Path[] locs = new Path[] { new Path(deadRsDirectory, currentPath.getName()), new Path(deadRsDirectory.suffix(HLog.SPLITTING_EXT), diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index ca90772614f..87fdbdc82a8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.net.URLEncoder; import java.util.ArrayList; @@ -266,6 +267,51 @@ public class TestReplicationSourceManager { server.abort("", null); } + @Test + public void testNodeFailoverDeadServerParsing() throws Exception { + LOG.debug("testNodeFailoverDeadServerParsing"); + conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); + final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com"); + AtomicBoolean replicating = new AtomicBoolean(true); + ReplicationZookeeper rz = new ReplicationZookeeper(server, replicating); + // populate some znodes in the peer znode + files.add("log1"); + files.add("log2"); + for (String file : files) { + rz.addLogToList(file, "1"); + } + // create 3 DummyServers + Server s1 = new DummyServer("ip-10-8-101-114.ec2.internal"); + Server s2 = new DummyServer("ec2-107-20-52-47.compute-1.amazonaws.com"); + Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com"); + + // simulate three servers fail sequentially + ReplicationZookeeper rz1 = new ReplicationZookeeper(s1, new AtomicBoolean(true)); + SortedMap> testMap = + rz1.copyQueuesFromRSUsingMulti(server.getServerName().getServerName()); + rz1.close(); + ReplicationZookeeper rz2 = new ReplicationZookeeper(s2, new AtomicBoolean(true)); + testMap = rz2.copyQueuesFromRSUsingMulti(s1.getServerName().getServerName()); + rz2.close(); + ReplicationZookeeper rz3 = new ReplicationZookeeper(s3, new AtomicBoolean(true)); + testMap = rz3.copyQueuesFromRSUsingMulti(s2.getServerName().getServerName()); + rz3.close(); + + ReplicationSource s = new ReplicationSource(); + s.checkIfQueueRecovered(testMap.firstKey()); + List result = s.getDeadRegionServers(); + + // verify + assertTrue(result.contains(server.getServerName().getServerName())); + assertTrue(result.contains(s1.getServerName().getServerName())); + assertTrue(result.contains(s2.getServerName().getServerName())); + + // close out the resources. + rz.close(); + server.abort("", null); + } + + static class DummyNodeFailoverWorker extends Thread { private SortedMap> logZnodesMap; Server server; @@ -341,7 +387,7 @@ public class TestReplicationSourceManager { @Override public ServerName getServerName() { - return new ServerName(hostname, 1234, -1L); + return new ServerName(hostname, 1234, 1L); } @Override