HBASE-8207 Replication could have data loss when machine name contains hyphen "-" (Jeffrey)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1462515 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
973d38db30
commit
1babc2abbc
|
@ -810,7 +810,10 @@ public class ReplicationZookeeper implements Closeable {
|
||||||
// check the logs queue for the old peer cluster
|
// check the logs queue for the old peer cluster
|
||||||
String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
|
String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
|
||||||
List<String> hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
|
List<String> hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
|
||||||
if (hlogs == null || hlogs.size() == 0) continue; // empty log queue.
|
if (hlogs == null || hlogs.size() == 0) {
|
||||||
|
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
|
||||||
|
continue; // empty log queue.
|
||||||
|
}
|
||||||
// create the new cluster znode
|
// create the new cluster znode
|
||||||
SortedSet<String> logQueue = new TreeSet<String>();
|
SortedSet<String> logQueue = new TreeSet<String>();
|
||||||
queues.put(newPeerId, logQueue);
|
queues.put(newPeerId, logQueue);
|
||||||
|
|
|
@ -25,6 +25,7 @@ import java.net.ConnectException;
|
||||||
import java.net.SocketTimeoutException;
|
import java.net.SocketTimeoutException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -124,7 +125,7 @@ public class ReplicationSource extends Thread
|
||||||
// Indicates if this queue is recovered (and will be deleted when depleted)
|
// Indicates if this queue is recovered (and will be deleted when depleted)
|
||||||
private boolean queueRecovered;
|
private boolean queueRecovered;
|
||||||
// List of all the dead region servers that had this queue (if recovered)
|
// List of all the dead region servers that had this queue (if recovered)
|
||||||
private String[] deadRegionServers;
|
private List<String> deadRegionServers = new ArrayList<String>();
|
||||||
// Maximum number of retries before taking bold actions
|
// Maximum number of retries before taking bold actions
|
||||||
private int maxRetriesMultiplier;
|
private int maxRetriesMultiplier;
|
||||||
// Socket timeouts require even bolder actions since we don't want to DDOS
|
// Socket timeouts require even bolder actions since we don't want to DDOS
|
||||||
|
@ -201,19 +202,83 @@ public class ReplicationSource extends Thread
|
||||||
|
|
||||||
// The passed znode will be either the id of the peer cluster or
|
// The passed znode will be either the id of the peer cluster or
|
||||||
// the handling story of that queue in the form of id-servername-*
|
// the handling story of that queue in the form of id-servername-*
|
||||||
private void checkIfQueueRecovered(String peerClusterZnode) {
|
//
|
||||||
String[] parts = peerClusterZnode.split("-");
|
// package access for testing
|
||||||
|
void checkIfQueueRecovered(String peerClusterZnode) {
|
||||||
|
String[] parts = peerClusterZnode.split("-", 2);
|
||||||
this.queueRecovered = parts.length != 1;
|
this.queueRecovered = parts.length != 1;
|
||||||
this.peerId = this.queueRecovered ?
|
this.peerId = this.queueRecovered ?
|
||||||
parts[0] : peerClusterZnode;
|
parts[0] : peerClusterZnode;
|
||||||
this.peerClusterZnode = peerClusterZnode;
|
this.peerClusterZnode = peerClusterZnode;
|
||||||
this.deadRegionServers = new String[parts.length-1];
|
|
||||||
// Extract all the places where we could find the hlogs
|
if (parts.length < 2) {
|
||||||
for (int i = 1; i < parts.length; i++) {
|
// not queue recovered situation
|
||||||
this.deadRegionServers[i-1] = parts[i];
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// extract dead servers
|
||||||
|
extracDeadServersFromZNodeString(parts[1], this.deadRegionServers);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* for tests only
|
||||||
|
*/
|
||||||
|
List<String> getDeadRegionServers() {
|
||||||
|
return Collections.unmodifiableList(this.deadRegionServers);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parse dead server names from znode string servername can contain "-" such as
|
||||||
|
* "ip-10-46-221-101.ec2.internal", so we need skip some "-" during parsing for the following
|
||||||
|
* cases: 2-ip-10-46-221-101.ec2.internal,52170,1364333181125-<server name>-...
|
||||||
|
*/
|
||||||
|
private static void
|
||||||
|
extracDeadServersFromZNodeString(String deadServerListStr, List<String> result) {
|
||||||
|
|
||||||
|
if(deadServerListStr == null || result == null || deadServerListStr.isEmpty()) return;
|
||||||
|
|
||||||
|
// valid server name delimiter "-" has to be after "," in a server name
|
||||||
|
int seenCommaCnt = 0;
|
||||||
|
int startIndex = 0;
|
||||||
|
int len = deadServerListStr.length();
|
||||||
|
|
||||||
|
for (int i = 0; i < len; i++) {
|
||||||
|
switch (deadServerListStr.charAt(i)) {
|
||||||
|
case ',':
|
||||||
|
seenCommaCnt += 1;
|
||||||
|
break;
|
||||||
|
case '-':
|
||||||
|
if(seenCommaCnt>=2) {
|
||||||
|
if (i > startIndex) {
|
||||||
|
String serverName = deadServerListStr.substring(startIndex, i);
|
||||||
|
if(ServerName.isFullServerName(serverName)){
|
||||||
|
result.add(serverName);
|
||||||
|
} else {
|
||||||
|
LOG.error("Found invalid server name:" + serverName);
|
||||||
|
}
|
||||||
|
startIndex = i + 1;
|
||||||
|
}
|
||||||
|
seenCommaCnt = 0;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// add tail
|
||||||
|
if(startIndex < len - 1){
|
||||||
|
String serverName = deadServerListStr.substring(startIndex, len);
|
||||||
|
if(ServerName.isFullServerName(serverName)){
|
||||||
|
result.add(serverName);
|
||||||
|
} else {
|
||||||
|
LOG.error("Found invalid server name at the end:" + serverName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.debug("Found dead servers:" + result);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Select a number of peers at random using the ratio. Mininum 1.
|
* Select a number of peers at random using the ratio. Mininum 1.
|
||||||
*/
|
*/
|
||||||
|
@ -509,11 +574,10 @@ public class ReplicationSource extends Thread
|
||||||
// We didn't find the log in the archive directory, look if it still
|
// We didn't find the log in the archive directory, look if it still
|
||||||
// exists in the dead RS folder (there could be a chain of failures
|
// exists in the dead RS folder (there could be a chain of failures
|
||||||
// to look at)
|
// to look at)
|
||||||
LOG.info("NB dead servers : " + deadRegionServers.length);
|
LOG.info("NB dead servers : " + deadRegionServers.size());
|
||||||
for (int i = this.deadRegionServers.length - 1; i >= 0; i--) {
|
for (String curDeadServerName : deadRegionServers) {
|
||||||
|
|
||||||
Path deadRsDirectory =
|
Path deadRsDirectory =
|
||||||
new Path(manager.getLogDir().getParent(), this.deadRegionServers[i]);
|
new Path(manager.getLogDir().getParent(), curDeadServerName);
|
||||||
Path[] locs = new Path[] {
|
Path[] locs = new Path[] {
|
||||||
new Path(deadRsDirectory, currentPath.getName()),
|
new Path(deadRsDirectory, currentPath.getName()),
|
||||||
new Path(deadRsDirectory.suffix(HLog.SPLITTING_EXT),
|
new Path(deadRsDirectory.suffix(HLog.SPLITTING_EXT),
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.hbase.replication.regionserver;
|
package org.apache.hadoop.hbase.replication.regionserver;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.net.URLEncoder;
|
import java.net.URLEncoder;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -266,6 +267,51 @@ public class TestReplicationSourceManager {
|
||||||
server.abort("", null);
|
server.abort("", null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodeFailoverDeadServerParsing() throws Exception {
|
||||||
|
LOG.debug("testNodeFailoverDeadServerParsing");
|
||||||
|
conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
|
||||||
|
final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
|
||||||
|
AtomicBoolean replicating = new AtomicBoolean(true);
|
||||||
|
ReplicationZookeeper rz = new ReplicationZookeeper(server, replicating);
|
||||||
|
// populate some znodes in the peer znode
|
||||||
|
files.add("log1");
|
||||||
|
files.add("log2");
|
||||||
|
for (String file : files) {
|
||||||
|
rz.addLogToList(file, "1");
|
||||||
|
}
|
||||||
|
// create 3 DummyServers
|
||||||
|
Server s1 = new DummyServer("ip-10-8-101-114.ec2.internal");
|
||||||
|
Server s2 = new DummyServer("ec2-107-20-52-47.compute-1.amazonaws.com");
|
||||||
|
Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com");
|
||||||
|
|
||||||
|
// simulate three servers fail sequentially
|
||||||
|
ReplicationZookeeper rz1 = new ReplicationZookeeper(s1, new AtomicBoolean(true));
|
||||||
|
SortedMap<String, SortedSet<String>> testMap =
|
||||||
|
rz1.copyQueuesFromRSUsingMulti(server.getServerName().getServerName());
|
||||||
|
rz1.close();
|
||||||
|
ReplicationZookeeper rz2 = new ReplicationZookeeper(s2, new AtomicBoolean(true));
|
||||||
|
testMap = rz2.copyQueuesFromRSUsingMulti(s1.getServerName().getServerName());
|
||||||
|
rz2.close();
|
||||||
|
ReplicationZookeeper rz3 = new ReplicationZookeeper(s3, new AtomicBoolean(true));
|
||||||
|
testMap = rz3.copyQueuesFromRSUsingMulti(s2.getServerName().getServerName());
|
||||||
|
rz3.close();
|
||||||
|
|
||||||
|
ReplicationSource s = new ReplicationSource();
|
||||||
|
s.checkIfQueueRecovered(testMap.firstKey());
|
||||||
|
List<String> result = s.getDeadRegionServers();
|
||||||
|
|
||||||
|
// verify
|
||||||
|
assertTrue(result.contains(server.getServerName().getServerName()));
|
||||||
|
assertTrue(result.contains(s1.getServerName().getServerName()));
|
||||||
|
assertTrue(result.contains(s2.getServerName().getServerName()));
|
||||||
|
|
||||||
|
// close out the resources.
|
||||||
|
rz.close();
|
||||||
|
server.abort("", null);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static class DummyNodeFailoverWorker extends Thread {
|
static class DummyNodeFailoverWorker extends Thread {
|
||||||
private SortedMap<String, SortedSet<String>> logZnodesMap;
|
private SortedMap<String, SortedSet<String>> logZnodesMap;
|
||||||
Server server;
|
Server server;
|
||||||
|
@ -341,7 +387,7 @@ public class TestReplicationSourceManager {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ServerName getServerName() {
|
public ServerName getServerName() {
|
||||||
return new ServerName(hostname, 1234, -1L);
|
return new ServerName(hostname, 1234, 1L);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue