HBASE-8099 ReplicationZookeeper.copyQueuesFromRSUsingMulti should not return any queues if it failed to execute. (Himanshu and LarsH)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1456520 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5a099d8f56
commit
68463933e4
|
@ -803,7 +803,7 @@ public class ReplicationZookeeper implements Closeable {
|
|||
List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
|
||||
try {
|
||||
peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath);
|
||||
if (peerIdsToProcess == null) return null; // node already processed
|
||||
if (peerIdsToProcess == null) return queues; // node already processed
|
||||
for (String peerId : peerIdsToProcess) {
|
||||
String newPeerId = peerId + "-" + znode;
|
||||
String newPeerZnode = ZKUtil.joinZNode(this.rsServerNameZnode, newPeerId);
|
||||
|
@ -838,6 +838,7 @@ public class ReplicationZookeeper implements Closeable {
|
|||
} catch (KeeperException e) {
|
||||
// Multi call failed; it looks like some other regionserver took away the logs.
|
||||
LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
|
||||
queues.clear();
|
||||
}
|
||||
return queues;
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.Collections;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.SortedMap;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
|
@ -90,6 +91,9 @@ public class ReplicationSourceManager {
|
|||
private final long sleepBeforeFailover;
|
||||
// Homemade executer service for replication
|
||||
private final ThreadPoolExecutor executor;
|
||||
|
||||
private final Random rand;
|
||||
|
||||
|
||||
/**
|
||||
* Creates a replication manager and sets the watch on all the other
|
||||
|
@ -136,6 +140,7 @@ public class ReplicationSourceManager {
|
|||
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
|
||||
tfb.setNameFormat("ReplicationExecutor-%d");
|
||||
this.executor.setThreadFactory(tfb.build());
|
||||
this.rand = new Random();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -572,7 +577,7 @@ public class ReplicationSourceManager {
|
|||
// Wait a bit before transferring the queues, we may be shutting down.
|
||||
// This sleep may not be enough in some cases.
|
||||
try {
|
||||
Thread.sleep(sleepBeforeFailover);
|
||||
Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Interrupted while waiting before transferring a queue.");
|
||||
Thread.currentThread().interrupt();
|
||||
|
@ -597,7 +602,7 @@ public class ReplicationSourceManager {
|
|||
zkHelper.deleteRsQueues(rsZnode);
|
||||
}
|
||||
// process of copying over the failed queue is completed.
|
||||
if (newQueues.size() == 0) {
|
||||
if (newQueues.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue