HBASE-3640 [replication] Transferring queues shouldn't be done inline with RS startup

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1084785 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2011-03-23 22:23:54 +00:00
parent 5c78c1a4a7
commit 4b8b1c2ed8
2 changed files with 92 additions and 43 deletions

View File

@ -182,6 +182,7 @@ Release 0.90.2 - Unreleased
HBASE-3496 HFile CLI Improvements
HBASE-3596 [replication] Wait a few seconds before transferring queues
HBASE-3600 Update our jruby to 1.6.0
HBASE-3640 [replication] Transferring queues shouldn't be done inline with RS startup
Release 0.90.1 - February 9th, 2011

View File

@ -27,8 +27,13 @@ import java.util.Map;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -78,6 +83,8 @@ public class ReplicationSourceManager {
private final Path oldLogDir;
// The number of ms that we wait before moving znodes, HBASE-3596
private final long sleepBeforeFailover;
// Homemade executer service for replication
private final ThreadPoolExecutor executor;
/**
* Creates a replication manager and sets the watch on all the other
@ -116,6 +123,17 @@ public class ReplicationSourceManager {
new PeersWatcher(this.zkHelper.getZookeeperWatcher()));
this.zkHelper.listPeersIdsAndWatch();
this.otherRegionServers = otherRSs == null ? new ArrayList<String>() : otherRSs;
// It's preferable to failover 1 RS at a time, but with good zk servers
// more could be processed at the same time.
int nbWorkers = conf.getInt("replication.executor.workers", 1);
// use a short 100ms sleep since this could be done inline with a RS startup
// even if we fail, other region servers can take care of it
this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
100, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
tfb.setNameFormat("ReplicationExecutor-%d");
this.executor.setThreadFactory(tfb.build());
}
/**
@ -199,6 +217,7 @@ public class ReplicationSourceManager {
* Terminate the replication on this region server
*/
public void join() {
this.executor.shutdown();
if (this.sources.size() == 0) {
this.zkHelper.deleteOwnRSZNode();
}
@ -298,50 +317,12 @@ public class ReplicationSourceManager {
* @param rsZnode
*/
public void transferQueues(String rsZnode) {
// Wait a bit before transferring the queues, we may be shutting down.
// This sleep may not be enough in some cases.
NodeFailoverWorker transfer = new NodeFailoverWorker(rsZnode);
try {
Thread.sleep(this.sleepBeforeFailover);
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting before transferring a queue.");
Thread.currentThread().interrupt();
}
// We try to lock that rs' queue directory
if (this.stopper.isStopped()) {
LOG.info("Not transferring queue since we are shutting down");
return;
}
if (!this.zkHelper.lockOtherRS(rsZnode)) {
return;
}
LOG.info("Moving " + rsZnode + "'s hlogs to my queue");
SortedMap<String, SortedSet<String>> newQueues =
this.zkHelper.copyQueuesFromRS(rsZnode);
this.zkHelper.deleteRsQueues(rsZnode);
if (newQueues == null || newQueues.size() == 0) {
return;
}
for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
String peerId = entry.getKey();
try {
ReplicationSourceInterface src = getReplicationSource(this.conf,
this.fs, this, this.stopper, this.replicating, peerId);
if (!zkHelper.getPeerClusters().containsKey(src.getPeerClusterId())) {
src.terminate("Recovered queue doesn't belong to any current peer");
break;
}
this.oldsources.add(src);
for (String hlog : entry.getValue()) {
src.enqueueLog(new Path(this.oldLogDir, hlog));
}
// TODO set it to what's in ZK
src.setSourceEnabled(true);
src.startup();
} catch (IOException e) {
// TODO manage it
LOG.error("Failed creating a source", e);
}
this.executor.execute(transfer);
} catch (RejectedExecutionException ex) {
LOG.info("Cancelling the transfer of " + rsZnode +
" because of " + ex.getMessage());
}
}
@ -524,6 +505,73 @@ public class ReplicationSourceManager {
}
}
/**
* Class responsible to setup new ReplicationSources to take care of the
* queues from dead region servers.
*/
class NodeFailoverWorker extends Thread {
private String rsZnode;
/**
*
* @param rsZnode
*/
public NodeFailoverWorker(String rsZnode) {
super("Failover-for-"+rsZnode);
this.rsZnode = rsZnode;
}
@Override
public void run() {
// Wait a bit before transferring the queues, we may be shutting down.
// This sleep may not be enough in some cases.
try {
Thread.sleep(sleepBeforeFailover);
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting before transferring a queue.");
Thread.currentThread().interrupt();
}
// We try to lock that rs' queue directory
if (stopper.isStopped()) {
LOG.info("Not transferring queue since we are shutting down");
return;
}
if (!zkHelper.lockOtherRS(rsZnode)) {
return;
}
LOG.info("Moving " + rsZnode + "'s hlogs to my queue");
SortedMap<String, SortedSet<String>> newQueues =
zkHelper.copyQueuesFromRS(rsZnode);
zkHelper.deleteRsQueues(rsZnode);
if (newQueues == null || newQueues.size() == 0) {
return;
}
for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
String peerId = entry.getKey();
try {
ReplicationSourceInterface src = getReplicationSource(conf,
fs, ReplicationSourceManager.this, stopper, replicating, peerId);
if (!zkHelper.getPeerClusters().containsKey(src.getPeerClusterId())) {
src.terminate("Recovered queue doesn't belong to any current peer");
break;
}
oldsources.add(src);
for (String hlog : entry.getValue()) {
src.enqueueLog(new Path(oldLogDir, hlog));
}
// TODO set it to what's in ZK
src.setSourceEnabled(true);
src.startup();
} catch (IOException e) {
// TODO manage it
LOG.error("Failed creating a source", e);
}
}
}
}
/**
* Get the directory where hlogs are archived
* @return the directory where hlogs are archived