HBASE-16135 PeerClusterZnode under rs of removed peer may never be deleted
This commit is contained in:
parent
24fd547292
commit
721c5b7128
|
@ -311,9 +311,11 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
|
|||
for (String peerId : peerIdsToProcess) {
|
||||
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
|
||||
if (!peerExists(replicationQueueInfo.getPeerId())) {
|
||||
LOG.warn("Peer " + peerId + " didn't exist, skipping the replay");
|
||||
// Protection against moving orphaned queues
|
||||
continue;
|
||||
// the orphaned queues must be moved, otherwise the delete op of dead rs will fail,
|
||||
// this will cause the whole multi op fail.
|
||||
// NodeFailoverWorker will skip the orphaned queues.
|
||||
LOG.warn("Peer " + peerId
|
||||
+ " didn't exist, will move its queue to avoid the failure of multi op");
|
||||
}
|
||||
String newPeerId = peerId + "-" + znode;
|
||||
String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId);
|
||||
|
|
|
@ -19,6 +19,9 @@
|
|||
|
||||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
|
@ -63,8 +66,6 @@ import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
|||
import org.apache.hadoop.hbase.replication.ReplicationTracker;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
* This class is responsible to manage all the replication
|
||||
* sources. There are two classes of sources:
|
||||
|
@ -356,6 +357,11 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
return this.oldsources;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
List<String> getAllQueues() {
|
||||
return replicationQueues.getAllQueues();
|
||||
}
|
||||
|
||||
void preLogRoll(Path newLog) throws IOException {
|
||||
recordLog(newLog);
|
||||
String logName = newLog.getName();
|
||||
|
@ -540,24 +546,28 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
+ sources.size() + " and another "
|
||||
+ oldsources.size() + " that were recovered");
|
||||
String terminateMessage = "Replication stream was removed by a user";
|
||||
List<ReplicationSourceInterface> srcToRemove = new ArrayList<ReplicationSourceInterface>();
|
||||
List<ReplicationSourceInterface> oldSourcesToDelete =
|
||||
new ArrayList<ReplicationSourceInterface>();
|
||||
// First close all the recovered sources for this peer
|
||||
for (ReplicationSourceInterface src : oldsources) {
|
||||
if (id.equals(src.getPeerClusterId())) {
|
||||
oldSourcesToDelete.add(src);
|
||||
// synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
|
||||
// see NodeFailoverWorker.run
|
||||
synchronized (oldsources) {
|
||||
// First close all the recovered sources for this peer
|
||||
for (ReplicationSourceInterface src : oldsources) {
|
||||
if (id.equals(src.getPeerClusterId())) {
|
||||
oldSourcesToDelete.add(src);
|
||||
}
|
||||
}
|
||||
for (ReplicationSourceInterface src : oldSourcesToDelete) {
|
||||
src.terminate(terminateMessage);
|
||||
closeRecoveredQueue(src);
|
||||
}
|
||||
}
|
||||
for (ReplicationSourceInterface src : oldSourcesToDelete) {
|
||||
src.terminate(terminateMessage);
|
||||
closeRecoveredQueue((src));
|
||||
}
|
||||
LOG.info("Number of deleted recovered sources for " + id + ": "
|
||||
+ oldSourcesToDelete.size());
|
||||
// Now look for the one on this cluster
|
||||
synchronized (this.replicationPeers) {// synchronize on replicationPeers to avoid adding source
|
||||
// for the to-be-removed peer
|
||||
List<ReplicationSourceInterface> srcToRemove = new ArrayList<ReplicationSourceInterface>();
|
||||
// synchronize on replicationPeers to avoid adding source for the to-be-removed peer
|
||||
synchronized (this.replicationPeers) {
|
||||
for (ReplicationSourceInterface src : this.sources) {
|
||||
if (id.equals(src.getPeerClusterId())) {
|
||||
srcToRemove.add(src);
|
||||
|
@ -615,9 +625,12 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
private final UUID clusterId;
|
||||
|
||||
/**
|
||||
*
|
||||
* @param rsZnode
|
||||
*/
|
||||
public NodeFailoverWorker(String rsZnode) {
|
||||
this(rsZnode, replicationQueues, replicationPeers, ReplicationSourceManager.this.clusterId);
|
||||
}
|
||||
|
||||
public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
|
||||
final ReplicationPeers replicationPeers, final UUID clusterId) {
|
||||
super("Failover-for-"+rsZnode);
|
||||
|
@ -673,6 +686,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
}
|
||||
if (peer == null || peerConfig == null) {
|
||||
LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
|
||||
replicationQueues.removeQueue(peerId);
|
||||
continue;
|
||||
}
|
||||
// track sources in walsByIdRecoveredQueues
|
||||
|
@ -692,15 +706,20 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
ReplicationSourceInterface src =
|
||||
getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
|
||||
server, peerId, this.clusterId, peerConfig, peer);
|
||||
if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) {
|
||||
src.terminate("Recovered queue doesn't belong to any current peer");
|
||||
break;
|
||||
// synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
|
||||
// see removePeer
|
||||
synchronized (oldsources) {
|
||||
if (!this.rp.getPeerIds().contains(src.getPeerClusterId())) {
|
||||
src.terminate("Recovered queue doesn't belong to any current peer");
|
||||
closeRecoveredQueue(src);
|
||||
continue;
|
||||
}
|
||||
oldsources.add(src);
|
||||
for (String wal : walsSet) {
|
||||
src.enqueueLog(new Path(oldLogDir, wal));
|
||||
}
|
||||
src.startup();
|
||||
}
|
||||
oldsources.add(src);
|
||||
for (String wal : walsSet) {
|
||||
src.enqueueLog(new Path(oldLogDir, wal));
|
||||
}
|
||||
src.startup();
|
||||
} catch (IOException e) {
|
||||
// TODO manage it
|
||||
LOG.error("Failed creating a source", e);
|
||||
|
|
|
@ -23,6 +23,8 @@ import static org.junit.Assert.assertFalse;
|
|||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URLEncoder;
|
||||
import java.util.ArrayList;
|
||||
|
@ -84,10 +86,10 @@ import org.junit.After;
|
|||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
@Category(MediumTests.class)
|
||||
public class TestReplicationSourceManager {
|
||||
|
@ -186,15 +188,24 @@ public class TestReplicationSourceManager {
|
|||
utility.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
@Rule
|
||||
public TestName testName = new TestName();
|
||||
|
||||
private void cleanLogDir() throws IOException {
|
||||
fs.delete(logDir, true);
|
||||
fs.delete(oldLogDir, true);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
LOG.info("Start " + testName.getMethodName());
|
||||
cleanLogDir();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
setUp();
|
||||
LOG.info("End " + testName.getMethodName());
|
||||
cleanLogDir();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -268,7 +279,6 @@ public class TestReplicationSourceManager {
|
|||
|
||||
@Test
|
||||
public void testClaimQueues() throws Exception {
|
||||
LOG.debug("testNodeFailoverWorkerCopyQueuesFromRSUsingMulti");
|
||||
conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
|
||||
final Server server = new DummyServer("hostname0.example.org");
|
||||
ReplicationQueues rq =
|
||||
|
@ -425,6 +435,28 @@ public class TestReplicationSourceManager {
|
|||
s0.abort("", null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCleanupUnknownPeerZNode() throws Exception {
|
||||
final Server server = new DummyServer("hostname2.example.org");
|
||||
ReplicationQueues rq =
|
||||
ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
|
||||
server);
|
||||
rq.init(server.getServerName().toString());
|
||||
// populate some znodes in the peer znode
|
||||
// add log to an unknown peer
|
||||
String group = "testgroup";
|
||||
rq.addLog("2", group + ".log1");
|
||||
rq.addLog("2", group + ".log2");
|
||||
|
||||
NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName().getServerName());
|
||||
w1.run();
|
||||
|
||||
// The log of the unknown peer should be removed from zk
|
||||
for (String peer : manager.getAllQueues()) {
|
||||
assertTrue(peer.startsWith("1"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception {
|
||||
// 1. Create wal key
|
||||
|
|
Loading…
Reference in New Issue