HBASE-11442 ReplicationSourceManager doesn't cleanup the queues for recovered sources (Virag Kothari)
This commit is contained in:
parent
463d52d8cf
commit
7db2563c6a
|
@ -30,11 +30,12 @@ import java.util.SortedMap;
|
||||||
import java.util.SortedSet;
|
import java.util.SortedSet;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -86,6 +87,8 @@ public class ReplicationSourceManager implements ReplicationListener {
|
||||||
private final Stoppable stopper;
|
private final Stoppable stopper;
|
||||||
// All logs we are currently tracking
|
// All logs we are currently tracking
|
||||||
private final Map<String, SortedSet<String>> hlogsById;
|
private final Map<String, SortedSet<String>> hlogsById;
|
||||||
|
// Logs for recovered sources we are currently tracking
|
||||||
|
private final Map<String, SortedSet<String>> hlogsByIdRecoveredQueues;
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
private final FileSystem fs;
|
private final FileSystem fs;
|
||||||
// The path to the latest log we saw, for new coming sources
|
// The path to the latest log we saw, for new coming sources
|
||||||
|
@ -126,6 +129,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
||||||
this.replicationTracker = replicationTracker;
|
this.replicationTracker = replicationTracker;
|
||||||
this.stopper = stopper;
|
this.stopper = stopper;
|
||||||
this.hlogsById = new HashMap<String, SortedSet<String>>();
|
this.hlogsById = new HashMap<String, SortedSet<String>>();
|
||||||
|
this.hlogsByIdRecoveredQueues = new ConcurrentHashMap<String, SortedSet<String>>();
|
||||||
this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
|
this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.fs = fs;
|
this.fs = fs;
|
||||||
|
@ -177,20 +181,29 @@ public class ReplicationSourceManager implements ReplicationListener {
|
||||||
* @param id id of the peer cluster
|
* @param id id of the peer cluster
|
||||||
* @param queueRecovered Whether this is a recovered queue
|
* @param queueRecovered Whether this is a recovered queue
|
||||||
*/
|
*/
|
||||||
public void cleanOldLogs(String key,
|
public void cleanOldLogs(String key, String id, boolean queueRecovered) {
|
||||||
String id,
|
if (queueRecovered) {
|
||||||
boolean queueRecovered) {
|
SortedSet<String> hlogs = hlogsByIdRecoveredQueues.get(id);
|
||||||
synchronized (this.hlogsById) {
|
if (hlogs != null && !hlogs.first().equals(key)) {
|
||||||
SortedSet<String> hlogs = this.hlogsById.get(id);
|
cleanOldLogs(hlogs, key, id);
|
||||||
if (queueRecovered || hlogs.first().equals(key)) {
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
SortedSet<String> hlogSet = hlogs.headSet(key);
|
} else {
|
||||||
for (String hlog : hlogSet) {
|
synchronized (this.hlogsById) {
|
||||||
this.replicationQueues.removeLog(id, hlog);
|
SortedSet<String> hlogs = hlogsById.get(id);
|
||||||
|
if (!hlogs.first().equals(key)) {
|
||||||
|
cleanOldLogs(hlogs, key, id);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
hlogSet.clear();
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void cleanOldLogs(SortedSet<String> hlogs, String key, String id) {
|
||||||
|
SortedSet<String> hlogSet = hlogs.headSet(key);
|
||||||
|
LOG.debug("Removing " + hlogSet.size() + " logs in the list: " + hlogSet);
|
||||||
|
for (String hlog : hlogSet) {
|
||||||
|
this.replicationQueues.removeLog(id, hlog);
|
||||||
|
}
|
||||||
|
hlogSet.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -285,6 +298,14 @@ public class ReplicationSourceManager implements ReplicationListener {
|
||||||
protected Map<String, SortedSet<String>> getHLogs() {
|
protected Map<String, SortedSet<String>> getHLogs() {
|
||||||
return Collections.unmodifiableMap(hlogsById);
|
return Collections.unmodifiableMap(hlogsById);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a copy of the hlogs of the recovered sources on this rs
|
||||||
|
* @return a sorted set of hlog names
|
||||||
|
*/
|
||||||
|
protected Map<String, SortedSet<String>> getHlogsByIdRecoveredQueues() {
|
||||||
|
return Collections.unmodifiableMap(hlogsByIdRecoveredQueues);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a list of all the normal sources of this rs
|
* Get a list of all the normal sources of this rs
|
||||||
|
@ -303,7 +324,6 @@ public class ReplicationSourceManager implements ReplicationListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
void preLogRoll(Path newLog) throws IOException {
|
void preLogRoll(Path newLog) throws IOException {
|
||||||
|
|
||||||
synchronized (this.hlogsById) {
|
synchronized (this.hlogsById) {
|
||||||
String name = newLog.getName();
|
String name = newLog.getName();
|
||||||
for (ReplicationSourceInterface source : this.sources) {
|
for (ReplicationSourceInterface source : this.sources) {
|
||||||
|
@ -416,6 +436,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
||||||
LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
|
LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
|
||||||
this.oldsources.remove(src);
|
this.oldsources.remove(src);
|
||||||
deleteSource(src.getPeerClusterZnode(), false);
|
deleteSource(src.getPeerClusterZnode(), false);
|
||||||
|
this.hlogsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -563,10 +584,12 @@ public class ReplicationSourceManager implements ReplicationListener {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
oldsources.add(src);
|
oldsources.add(src);
|
||||||
for (String hlog : entry.getValue()) {
|
SortedSet<String> hlogsSet = entry.getValue();
|
||||||
|
for (String hlog : hlogsSet) {
|
||||||
src.enqueueLog(new Path(oldLogDir, hlog));
|
src.enqueueLog(new Path(oldLogDir, hlog));
|
||||||
}
|
}
|
||||||
src.startup();
|
src.startup();
|
||||||
|
hlogsByIdRecoveredQueues.put(peerId, hlogsSet);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// TODO manage it
|
// TODO manage it
|
||||||
LOG.error("Failed creating a source", e);
|
LOG.error("Failed creating a source", e);
|
||||||
|
|
|
@ -80,7 +80,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getPeerClusterId() {
|
public String getPeerClusterId() {
|
||||||
return peerClusterId;
|
String[] parts = peerClusterId.split("-", 2);
|
||||||
|
return parts.length != 1 ?
|
||||||
|
parts[0] : peerClusterId;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -27,6 +27,8 @@ import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.SortedMap;
|
import java.util.SortedMap;
|
||||||
import java.util.SortedSet;
|
import java.util.SortedSet;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
import java.util.UUID;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
@ -55,10 +57,12 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
|
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
|
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
|
||||||
|
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
|
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
|
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
|
||||||
|
@ -71,6 +75,8 @@ import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import com.google.common.collect.Sets;
|
||||||
|
|
||||||
@Category(MediumTests.class)
|
@Category(MediumTests.class)
|
||||||
public class TestReplicationSourceManager {
|
public class TestReplicationSourceManager {
|
||||||
|
|
||||||
|
@ -138,14 +144,14 @@ public class TestReplicationSourceManager {
|
||||||
ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
|
ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
|
||||||
|
|
||||||
ZKClusterId.setClusterId(zkw, new ClusterId());
|
ZKClusterId.setClusterId(zkw, new ClusterId());
|
||||||
|
|
||||||
replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
|
|
||||||
manager = replication.getReplicationManager();
|
|
||||||
fs = FileSystem.get(conf);
|
fs = FileSystem.get(conf);
|
||||||
oldLogDir = new Path(utility.getDataTestDir(),
|
oldLogDir = new Path(utility.getDataTestDir(),
|
||||||
HConstants.HREGION_OLDLOGDIR_NAME);
|
HConstants.HREGION_OLDLOGDIR_NAME);
|
||||||
logDir = new Path(utility.getDataTestDir(),
|
logDir = new Path(utility.getDataTestDir(),
|
||||||
HConstants.HREGION_LOGDIR_NAME);
|
HConstants.HREGION_LOGDIR_NAME);
|
||||||
|
replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
|
||||||
|
manager = replication.getReplicationManager();
|
||||||
|
|
||||||
logName = HConstants.HREGION_LOGDIR_NAME;
|
logName = HConstants.HREGION_LOGDIR_NAME;
|
||||||
|
|
||||||
manager.addSource(slaveId);
|
manager.addSource(slaveId);
|
||||||
|
@ -274,6 +280,40 @@ public class TestReplicationSourceManager {
|
||||||
assertEquals(1, populatedMap);
|
assertEquals(1, populatedMap);
|
||||||
server.abort("", null);
|
server.abort("", null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCleanupFailoverQueues() throws Exception {
|
||||||
|
final Server server = new DummyServer("hostname1.example.org");
|
||||||
|
ReplicationQueues rq =
|
||||||
|
ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
|
||||||
|
server);
|
||||||
|
rq.init(server.getServerName().toString());
|
||||||
|
// populate some znodes in the peer znode
|
||||||
|
SortedSet<String> files = new TreeSet<String>();
|
||||||
|
files.add("log1");
|
||||||
|
files.add("log2");
|
||||||
|
for (String file : files) {
|
||||||
|
rq.addLog("1", file);
|
||||||
|
}
|
||||||
|
Server s1 = new DummyServer("dummyserver1.example.org");
|
||||||
|
ReplicationQueues rq1 =
|
||||||
|
ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
|
||||||
|
rq1.init(s1.getServerName().toString());
|
||||||
|
ReplicationPeers rp1 =
|
||||||
|
ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1);
|
||||||
|
rp1.init();
|
||||||
|
NodeFailoverWorker w1 =
|
||||||
|
manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID(
|
||||||
|
new Long(1), new Long(2)));
|
||||||
|
w1.start();
|
||||||
|
w1.join(5000);
|
||||||
|
assertEquals(1, manager.getHlogsByIdRecoveredQueues().size());
|
||||||
|
String id = "1-" + server.getServerName().getServerName();
|
||||||
|
assertEquals(files, manager.getHlogsByIdRecoveredQueues().get(id));
|
||||||
|
manager.cleanOldLogs("log2", id, true);
|
||||||
|
// log1 should be deleted
|
||||||
|
assertEquals(Sets.newHashSet("log2"), manager.getHlogsByIdRecoveredQueues().get(id));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNodeFailoverDeadServerParsing() throws Exception {
|
public void testNodeFailoverDeadServerParsing() throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue