HBASE-16135 PeerClusterZnode under rs of removed peer may never be deleted

This commit is contained in:
zhangduo 2016-07-01 13:41:01 +08:00
parent d22c23c396
commit 6944a17ad4
5 changed files with 81 additions and 34 deletions

View File

@ -316,9 +316,11 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
for (String peerId : peerIdsToProcess) { for (String peerId : peerIdsToProcess) {
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
if (!peerExists(replicationQueueInfo.getPeerId())) { if (!peerExists(replicationQueueInfo.getPeerId())) {
LOG.warn("Peer " + peerId + " didn't exist, skipping the replay"); // the orphaned queues must be moved, otherwise the delete op of dead rs will fail,
// Protection against moving orphaned queues // this will cause the whole multi op fail.
continue; // NodeFailoverWorker will skip the orphaned queues.
LOG.warn("Peer " + peerId
+ " didn't exist, will move its queue to avoid the failure of multi op");
} }
String newPeerId = peerId + "-" + znode; String newPeerId = peerId + "-" + znode;
String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId); String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId);

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.replication.regionserver; package org.apache.hadoop.hbase.replication.regionserver;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException; import java.io.IOException;
@ -344,6 +345,11 @@ public class ReplicationSourceManager implements ReplicationListener {
return this.oldsources; return this.oldsources;
} }
@VisibleForTesting
List<String> getAllQueues() {
return replicationQueues.getAllQueues();
}
void preLogRoll(Path newLog) throws IOException { void preLogRoll(Path newLog) throws IOException {
recordLog(newLog); recordLog(newLog);
String logName = newLog.getName(); String logName = newLog.getName();
@ -371,8 +377,8 @@ public class ReplicationSourceManager implements ReplicationListener {
String logName = logPath.getName(); String logName = logPath.getName();
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName); String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
// update replication queues on ZK // update replication queues on ZK
synchronized (replicationPeers) {// synchronize on replicationPeers to avoid adding source for // synchronize on replicationPeers to avoid adding source for the to-be-removed peer
// the to-be-removed peer synchronized (replicationPeers) {
for (String id : replicationPeers.getConnectedPeerIds()) { for (String id : replicationPeers.getConnectedPeerIds()) {
try { try {
this.replicationQueues.addLog(id, logName); this.replicationQueues.addLog(id, logName);
@ -528,24 +534,28 @@ public class ReplicationSourceManager implements ReplicationListener {
+ sources.size() + " and another " + sources.size() + " and another "
+ oldsources.size() + " that were recovered"); + oldsources.size() + " that were recovered");
String terminateMessage = "Replication stream was removed by a user"; String terminateMessage = "Replication stream was removed by a user";
List<ReplicationSourceInterface> srcToRemove = new ArrayList<ReplicationSourceInterface>();
List<ReplicationSourceInterface> oldSourcesToDelete = List<ReplicationSourceInterface> oldSourcesToDelete =
new ArrayList<ReplicationSourceInterface>(); new ArrayList<ReplicationSourceInterface>();
// First close all the recovered sources for this peer // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
for (ReplicationSourceInterface src : oldsources) { // see NodeFailoverWorker.run
if (id.equals(src.getPeerClusterId())) { synchronized (oldsources) {
oldSourcesToDelete.add(src); // First close all the recovered sources for this peer
for (ReplicationSourceInterface src : oldsources) {
if (id.equals(src.getPeerClusterId())) {
oldSourcesToDelete.add(src);
}
}
for (ReplicationSourceInterface src : oldSourcesToDelete) {
src.terminate(terminateMessage);
closeRecoveredQueue(src);
} }
}
for (ReplicationSourceInterface src : oldSourcesToDelete) {
src.terminate(terminateMessage);
closeRecoveredQueue((src));
} }
LOG.info("Number of deleted recovered sources for " + id + ": " LOG.info("Number of deleted recovered sources for " + id + ": "
+ oldSourcesToDelete.size()); + oldSourcesToDelete.size());
// Now look for the one on this cluster // Now look for the one on this cluster
synchronized (this.replicationPeers) {// synchronize on replicationPeers to avoid adding source List<ReplicationSourceInterface> srcToRemove = new ArrayList<ReplicationSourceInterface>();
// for the to-be-removed peer // synchronize on replicationPeers to avoid adding source for the to-be-removed peer
synchronized (this.replicationPeers) {
for (ReplicationSourceInterface src : this.sources) { for (ReplicationSourceInterface src : this.sources) {
if (id.equals(src.getPeerClusterId())) { if (id.equals(src.getPeerClusterId())) {
srcToRemove.add(src); srcToRemove.add(src);
@ -603,9 +613,12 @@ public class ReplicationSourceManager implements ReplicationListener {
private final UUID clusterId; private final UUID clusterId;
/** /**
*
* @param rsZnode * @param rsZnode
*/ */
public NodeFailoverWorker(String rsZnode) {
this(rsZnode, replicationQueues, replicationPeers, ReplicationSourceManager.this.clusterId);
}
public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues, public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
final ReplicationPeers replicationPeers, final UUID clusterId) { final ReplicationPeers replicationPeers, final UUID clusterId) {
super("Failover-for-"+rsZnode); super("Failover-for-"+rsZnode);
@ -661,6 +674,7 @@ public class ReplicationSourceManager implements ReplicationListener {
} }
if (peer == null || peerConfig == null) { if (peer == null || peerConfig == null) {
LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode); LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
replicationQueues.removeQueue(peerId);
continue; continue;
} }
// track sources in walsByIdRecoveredQueues // track sources in walsByIdRecoveredQueues
@ -680,15 +694,20 @@ public class ReplicationSourceManager implements ReplicationListener {
ReplicationSourceInterface src = ReplicationSourceInterface src =
getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp, getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
server, peerId, this.clusterId, peerConfig, peer); server, peerId, this.clusterId, peerConfig, peer);
if (!this.rp.getConnectedPeerIds().contains((src.getPeerClusterId()))) { // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
src.terminate("Recovered queue doesn't belong to any current peer"); // see removePeer
break; synchronized (oldsources) {
if (!this.rp.getConnectedPeerIds().contains(src.getPeerClusterId())) {
src.terminate("Recovered queue doesn't belong to any current peer");
closeRecoveredQueue(src);
continue;
}
oldsources.add(src);
for (String wal : walsSet) {
src.enqueueLog(new Path(oldLogDir, wal));
}
src.startup();
} }
oldsources.add(src);
for (String wal : walsSet) {
src.enqueueLog(new Path(oldLogDir, wal));
}
src.startup();
} catch (IOException e) { } catch (IOException e) {
// TODO manage it // TODO manage it
LOG.error("Failed creating a source", e); LOG.error("Failed creating a source", e);

View File

@ -84,8 +84,10 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
@ -187,15 +189,24 @@ public abstract class TestReplicationSourceManager {
utility.shutdownMiniCluster(); utility.shutdownMiniCluster();
} }
@Before @Rule
public void setUp() throws Exception { public TestName testName = new TestName();
private void cleanLogDir() throws IOException {
fs.delete(logDir, true); fs.delete(logDir, true);
fs.delete(oldLogDir, true); fs.delete(oldLogDir, true);
} }
@Before
public void setUp() throws Exception {
LOG.info("Start " + testName.getMethodName());
cleanLogDir();
}
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
setUp(); LOG.info("End " + testName.getMethodName());
cleanLogDir();
} }
@Test @Test
@ -274,7 +285,6 @@ public abstract class TestReplicationSourceManager {
@Test @Test
public void testClaimQueues() throws Exception { public void testClaimQueues() throws Exception {
LOG.debug("testNodeFailoverWorkerCopyQueuesFromRSUsingMulti");
conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
final Server server = new DummyServer("hostname0.example.org"); final Server server = new DummyServer("hostname0.example.org");
@ -354,6 +364,27 @@ public abstract class TestReplicationSourceManager {
assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group)); assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group));
} }
@Test
public void testCleanupUnknownPeerZNode() throws Exception {
final Server server = new DummyServer("hostname2.example.org");
ReplicationQueues rq = ReplicationFactory.getReplicationQueues(
new ReplicationQueuesArguments(server.getConfiguration(), server, server.getZooKeeper()));
rq.init(server.getServerName().toString());
// populate some znodes in the peer znode
// add log to an unknown peer
String group = "testgroup";
rq.addLog("2", group + ".log1");
rq.addLog("2", group + ".log2");
NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName().getServerName());
w1.run();
// The log of the unknown peer should be removed from zk
for (String peer : manager.getAllQueues()) {
assertTrue(peer.startsWith("1"));
}
}
@Test @Test
public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception { public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception {
NavigableMap<byte[], Integer> scope = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); NavigableMap<byte[], Integer> scope = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);

View File

@ -64,7 +64,6 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan
// Tests the naming convention of adopted queues for ReplicationQueuesZkImpl // Tests the naming convention of adopted queues for ReplicationQueuesZkImpl
@Test @Test
public void testNodeFailoverDeadServerParsing() throws Exception { public void testNodeFailoverDeadServerParsing() throws Exception {
LOG.debug("testNodeFailoverDeadServerParsing");
conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com"); final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
ReplicationQueues repQueues = ReplicationQueues repQueues =
@ -114,8 +113,6 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan
@Test @Test
public void testFailoverDeadServerCversionChange() throws Exception { public void testFailoverDeadServerCversionChange() throws Exception {
LOG.debug("testFailoverDeadServerCversionChange");
conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
final Server s0 = new DummyServer("cversion-change0.example.org"); final Server s0 = new DummyServer("cversion-change0.example.org");
ReplicationQueues repQueues = ReplicationQueues repQueues =

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
@ -28,7 +27,6 @@ import org.apache.hadoop.hbase.replication.TableBasedReplicationQueuesClientImpl
import org.apache.hadoop.hbase.replication.TableBasedReplicationQueuesImpl; import org.apache.hadoop.hbase.replication.TableBasedReplicationQueuesImpl;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;