HBASE-12865 WALs may be deleted before they are replicated to peers (He Liangliang)

This commit is contained in:
Andrew Purtell 2015-08-07 15:08:29 -07:00
parent 832a61513b
commit 68cb53d151
6 changed files with 106 additions and 27 deletions

View File

@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.replication;
import java.util.List; import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
/** /**
* This provides an interface for clients of replication to view replication queues. These queues * This provides an interface for clients of replication to view replication queues. These queues
@ -39,21 +39,30 @@ public interface ReplicationQueuesClient {
* Get a list of all region servers that have outstanding replication queues. These servers could * Get a list of all region servers that have outstanding replication queues. These servers could
* be alive, dead or from a previous run of the cluster. * be alive, dead or from a previous run of the cluster.
* @return a list of server names * @return a list of server names
* @throws KeeperException zookeeper exception
*/ */
List<String> getListOfReplicators(); List<String> getListOfReplicators() throws KeeperException;
/** /**
* Get a list of all WALs in the given queue on the given region server. * Get a list of all WALs in the given queue on the given region server.
* @param serverName the server name of the region server that owns the queue * @param serverName the server name of the region server that owns the queue
* @param queueId a String that identifies the queue * @param queueId a String that identifies the queue
* @return a list of WALs, null if this region server is dead and has no outstanding queues * @return a list of WALs, null if this region server is dead and has no outstanding queues
* @throws KeeperException zookeeper exception
*/ */
List<String> getLogsInQueue(String serverName, String queueId); List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException;
/** /**
* Get a list of all queues for the specified region server. * Get a list of all queues for the specified region server.
* @param serverName the server name of the region server that owns the set of queues * @param serverName the server name of the region server that owns the set of queues
* @return a list of queueIds, null if this region server is not a replicator. * @return a list of queueIds, null if this region server is not a replicator.
*/ */
List<String> getAllQueues(String serverName); List<String> getAllQueues(String serverName) throws KeeperException;
/**
* Get the cversion of replication rs node. This can be used as optimistic locking to get a
* consistent snapshot of the replication queues.
* @return cversion of replication rs node
*/
int getQueuesZNodeCversion() throws KeeperException;
} }

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
@InterfaceAudience.Private @InterfaceAudience.Private
public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implements public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implements
@ -46,7 +47,7 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem
} }
@Override @Override
public List<String> getLogsInQueue(String serverName, String queueId) { public List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException {
String znode = ZKUtil.joinZNode(this.queuesZNode, serverName); String znode = ZKUtil.joinZNode(this.queuesZNode, serverName);
znode = ZKUtil.joinZNode(znode, queueId); znode = ZKUtil.joinZNode(znode, queueId);
List<String> result = null; List<String> result = null;
@ -55,20 +56,32 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem
} catch (KeeperException e) { } catch (KeeperException e) {
this.abortable.abort("Failed to get list of wals for queueId=" + queueId this.abortable.abort("Failed to get list of wals for queueId=" + queueId
+ " and serverName=" + serverName, e); + " and serverName=" + serverName, e);
throw e;
} }
return result; return result;
} }
@Override @Override
public List<String> getAllQueues(String serverName) { public List<String> getAllQueues(String serverName) throws KeeperException {
String znode = ZKUtil.joinZNode(this.queuesZNode, serverName); String znode = ZKUtil.joinZNode(this.queuesZNode, serverName);
List<String> result = null; List<String> result = null;
try { try {
result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
} catch (KeeperException e) { } catch (KeeperException e) {
this.abortable.abort("Failed to get list of queues for serverName=" + serverName, e); this.abortable.abort("Failed to get list of queues for serverName=" + serverName, e);
throw e;
} }
return result; return result;
} }
@Override public int getQueuesZNodeCversion() throws KeeperException {
try {
Stat stat = new Stat();
ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat);
return stat.getCversion();
} catch (KeeperException e) {
this.abortable.abort("Failed to get stat of replication rs node", e);
throw e;
}
}
} }

View File

@ -334,7 +334,9 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
// add delete op for peer // add delete op for peer
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
} }
// add delete op for dead rs // add delete op for dead rs, this will update the cversion of the parent.
// The reader will make optimistic locking with this to get a consistent
// snapshot
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath)); listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath));
if (LOG.isTraceEnabled()) LOG.trace(" The multi list size is: " + listOfOps.size()); if (LOG.isTraceEnabled()) LOG.trace(" The multi list size is: " + listOfOps.size());
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false); ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -39,6 +40,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import org.apache.zookeeper.KeeperException;
/** /**
* Implementation of a log cleaner that checks if a log is still scheduled for * Implementation of a log cleaner that checks if a log is still scheduled for
@ -61,7 +63,15 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
return files; return files;
} }
final Set<String> wals = loadWALsFromQueues(); final Set<String> wals;
try {
// The concurrently created new WALs may not be included in the return list,
// but they won't be deleted because they're not in the checking set.
wals = loadWALsFromQueues();
} catch (KeeperException e) {
LOG.warn("Failed to read zookeeper, skipping checking deletable files");
return Collections.emptyList();
}
return Iterables.filter(files, new Predicate<FileStatus>() { return Iterables.filter(files, new Predicate<FileStatus>() {
@Override @Override
public boolean apply(FileStatus file) { public boolean apply(FileStatus file) {
@ -79,29 +89,40 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
} }
/** /**
* Load all wals in all replication queues from ZK * Load all wals in all replication queues from ZK. This method guarantees to return a
* snapshot which contains all WALs in the zookeeper at the start of this call even there
* is concurrent queue failover. However, some newly created WALs during the call may
* not be included.
*/ */
private Set<String> loadWALsFromQueues() { private Set<String> loadWALsFromQueues() throws KeeperException {
List<String> rss = replicationQueues.getListOfReplicators(); int v0 = replicationQueues.getQueuesZNodeCversion();
if (rss == null) { for (int retry = 0; ; retry++) {
LOG.debug("Didn't find any region server that replicates, won't prevent any deletions."); List<String> rss = replicationQueues.getListOfReplicators();
return ImmutableSet.of(); if (rss == null) {
} LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
Set<String> wals = Sets.newHashSet(); return ImmutableSet.of();
for (String rs: rss) {
List<String> listOfPeers = replicationQueues.getAllQueues(rs);
// if rs just died, this will be null
if (listOfPeers == null) {
continue;
} }
for (String id : listOfPeers) { Set<String> wals = Sets.newHashSet();
List<String> peersWals = replicationQueues.getLogsInQueue(rs, id); for (String rs : rss) {
if (peersWals != null) { List<String> listOfPeers = replicationQueues.getAllQueues(rs);
wals.addAll(peersWals); // if rs just died, this will be null
if (listOfPeers == null) {
continue;
}
for (String id : listOfPeers) {
List<String> peersWals = replicationQueues.getLogsInQueue(rs, id);
if (peersWals != null) {
wals.addAll(peersWals);
}
} }
} }
int v1 = replicationQueues.getQueuesZNodeCversion();
if (v0 == v1) {
return wals;
}
LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d",
v0, v1, retry));
} }
return wals;
} }
@Override @Override

View File

@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.zookeeper.KeeperException;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -66,7 +67,7 @@ public abstract class TestReplicationStateBasic {
} }
@Test @Test
public void testReplicationQueuesClient() throws ReplicationException { public void testReplicationQueuesClient() throws ReplicationException, KeeperException {
rqc.init(); rqc.init();
// Test methods with empty state // Test methods with empty state
assertEquals(0, rqc.getListOfReplicators().size()); assertEquals(0, rqc.getListOfReplicators().size());

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
@ -367,6 +368,38 @@ public class TestReplicationSourceManager {
server.abort("", null); server.abort("", null);
} }
@Test
public void testFailoverDeadServerCversionChange() throws Exception {
LOG.debug("testFailoverDeadServerCversionChange");
conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
final Server s0 = new DummyServer("cversion-change0.example.org");
ReplicationQueues repQueues =
ReplicationFactory.getReplicationQueues(s0.getZooKeeper(), conf, s0);
repQueues.init(s0.getServerName().toString());
// populate some znodes in the peer znode
files.add("log1");
files.add("log2");
for (String file : files) {
repQueues.addLog("1", file);
}
// simulate queue transfer
Server s1 = new DummyServer("cversion-change1.example.org");
ReplicationQueues rq1 =
ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
rq1.init(s1.getServerName().toString());
ReplicationQueuesClient client =
ReplicationFactory.getReplicationQueuesClient(s1.getZooKeeper(), s1.getConfiguration(), s1);
int v0 = client.getQueuesZNodeCversion();
rq1.claimQueues(s0.getServerName().getServerName());
int v1 = client.getQueuesZNodeCversion();
// cversion should increased by 1 since a child node is deleted
assertEquals(v0 + 1, v1);
s0.abort("", null);
}
static class DummyNodeFailoverWorker extends Thread { static class DummyNodeFailoverWorker extends Thread {
private SortedMap<String, SortedSet<String>> logZnodesMap; private SortedMap<String, SortedSet<String>> logZnodesMap;