HBASE-12770 Don't transfer all the queued hlogs of a dead server to the same alive server
Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
parent
30d7eeaefe
commit
e5f9df1e23
|
@ -19,10 +19,10 @@
|
||||||
package org.apache.hadoop.hbase.replication;
|
package org.apache.hadoop.hbase.replication;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.SortedSet;
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This provides an interface for maintaining a region server's replication queues. These queues
|
* This provides an interface for maintaining a region server's replication queues. These queues
|
||||||
|
@ -94,12 +94,25 @@ public interface ReplicationQueues {
|
||||||
List<String> getAllQueues();
|
List<String> getAllQueues();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Take ownership for the set of queues belonging to a dead region server.
|
* Get queueIds from a dead region server, whose queues has not been claimed by other region
|
||||||
|
* servers.
|
||||||
|
* @return empty if the queue exists but no children, null if the queue does not exist.
|
||||||
|
*/
|
||||||
|
List<String> getUnClaimedQueueIds(String regionserver);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Take ownership for the queue identified by queueId and belongs to a dead region server.
|
||||||
* @param regionserver the id of the dead region server
|
* @param regionserver the id of the dead region server
|
||||||
* @return A Map of the queues that have been claimed, including a Set of WALs in
|
* @param queueId the id of the queue
|
||||||
* each queue. Returns an empty map if no queues were failed-over.
|
* @return the new PeerId and A SortedSet of WALs in its queue, and null if no unclaimed queue.
|
||||||
*/
|
*/
|
||||||
Map<String, Set<String>> claimQueues(String regionserver);
|
Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove the znode of region server if the queue is empty.
|
||||||
|
* @param regionserver
|
||||||
|
*/
|
||||||
|
void removeReplicatorIfQueueIsEmpty(String regionserver);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a list of all region servers that have outstanding replication queues. These servers could
|
* Get a list of all region servers that have outstanding replication queues. These servers could
|
||||||
|
|
|
@ -19,11 +19,9 @@
|
||||||
package org.apache.hadoop.hbase.replication;
|
package org.apache.hadoop.hbase.replication;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.SortedSet;
|
||||||
import java.util.Set;
|
import java.util.TreeSet;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -36,6 +34,7 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
|
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
|
@ -179,21 +178,66 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, Set<String>> claimQueues(String regionserverZnode) {
|
public List<String> getUnClaimedQueueIds(String regionserver) {
|
||||||
Map<String, Set<String>> newQueues = new HashMap<>();
|
if (isThisOurRegionServer(regionserver)) {
|
||||||
// check whether there is multi support. If yes, use it.
|
return null;
|
||||||
if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
|
}
|
||||||
LOG.info("Atomically moving " + regionserverZnode + "'s WALs to my queue");
|
String rsZnodePath = ZKUtil.joinZNode(this.queuesZNode, regionserver);
|
||||||
newQueues = copyQueuesFromRSUsingMulti(regionserverZnode);
|
List<String> queues = null;
|
||||||
} else {
|
try {
|
||||||
LOG.info("Moving " + regionserverZnode + "'s wals to my queue");
|
queues = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZnodePath);
|
||||||
if (!lockOtherRS(regionserverZnode)) {
|
} catch (KeeperException e) {
|
||||||
return newQueues;
|
this.abortable.abort("Failed to getUnClaimedQueueIds for RS" + regionserver, e);
|
||||||
}
|
}
|
||||||
newQueues = copyQueuesFromRS(regionserverZnode);
|
if (queues != null) {
|
||||||
deleteAnotherRSQueues(regionserverZnode);
|
queues.remove(RS_LOCK_ZNODE);
|
||||||
|
}
|
||||||
|
return queues;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId) {
|
||||||
|
if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
|
||||||
|
LOG.info("Atomically moving " + regionserver + "/" + queueId + "'s WALs to my queue");
|
||||||
|
return moveQueueUsingMulti(regionserver, queueId);
|
||||||
|
} else {
|
||||||
|
LOG.info("Moving " + regionserver + "/" + queueId + "'s wals to my queue");
|
||||||
|
if (!lockOtherRS(regionserver)) {
|
||||||
|
LOG.info("Can not take the lock now");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
Pair<String, SortedSet<String>> newQueues;
|
||||||
|
try {
|
||||||
|
newQueues = copyQueueFromLockedRS(regionserver, queueId);
|
||||||
|
removeQueueFromLockedRS(regionserver, queueId);
|
||||||
|
} finally {
|
||||||
|
unlockOtherRS(regionserver);
|
||||||
|
}
|
||||||
|
return newQueues;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void removeQueueFromLockedRS(String znode, String peerId) {
|
||||||
|
String nodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
|
||||||
|
String peerPath = ZKUtil.joinZNode(nodePath, peerId);
|
||||||
|
try {
|
||||||
|
ZKUtil.deleteNodeRecursively(this.zookeeper, peerPath);
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
LOG.warn("Remove copied queue failed", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeReplicatorIfQueueIsEmpty(String regionserver) {
|
||||||
|
String rsPath = ZKUtil.joinZNode(this.queuesZNode, regionserver);
|
||||||
|
try {
|
||||||
|
List<String> list = ZKUtil.listChildrenNoWatch(this.zookeeper, rsPath);
|
||||||
|
if (list != null && list.size() == 0){
|
||||||
|
ZKUtil.deleteNode(this.zookeeper, rsPath);
|
||||||
|
}
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
LOG.warn("Got error while removing replicator", e);
|
||||||
}
|
}
|
||||||
return newQueues;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -276,36 +320,13 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
|
||||||
return ZKUtil.checkExists(zookeeper, getLockZNode(znode)) >= 0;
|
return ZKUtil.checkExists(zookeeper, getLockZNode(znode)) >= 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
private void unlockOtherRS(String znode){
|
||||||
* Delete all the replication queues for a given region server.
|
String parent = ZKUtil.joinZNode(this.queuesZNode, znode);
|
||||||
* @param regionserverZnode The znode of the region server to delete.
|
String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE);
|
||||||
*/
|
|
||||||
private void deleteAnotherRSQueues(String regionserverZnode) {
|
|
||||||
String fullpath = ZKUtil.joinZNode(this.queuesZNode, regionserverZnode);
|
|
||||||
try {
|
try {
|
||||||
List<String> clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, fullpath);
|
ZKUtil.deleteNode(this.zookeeper, p);
|
||||||
for (String cluster : clusters) {
|
|
||||||
// No need to delete, it will be deleted later.
|
|
||||||
if (cluster.equals(RS_LOCK_ZNODE)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
String fullClusterPath = ZKUtil.joinZNode(fullpath, cluster);
|
|
||||||
ZKUtil.deleteNodeRecursively(this.zookeeper, fullClusterPath);
|
|
||||||
}
|
|
||||||
// Finish cleaning up
|
|
||||||
ZKUtil.deleteNodeRecursively(this.zookeeper, fullpath);
|
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
if (e instanceof KeeperException.NoNodeException
|
this.abortable.abort("Remove lock failed", e);
|
||||||
|| e instanceof KeeperException.NotEmptyException) {
|
|
||||||
// Testing a special case where another region server was able to
|
|
||||||
// create a lock just after we deleted it, but then was also able to
|
|
||||||
// delete the RS znode before us or its lock znode is still there.
|
|
||||||
if (e.getPath().equals(fullpath)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
this.abortable.abort("Failed to delete replication queues for region server: "
|
|
||||||
+ regionserverZnode, e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -313,38 +334,30 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
|
||||||
* It "atomically" copies all the wals queues from another region server and returns them all
|
* It "atomically" copies all the wals queues from another region server and returns them all
|
||||||
* sorted per peer cluster (appended with the dead server's znode).
|
* sorted per peer cluster (appended with the dead server's znode).
|
||||||
* @param znode pertaining to the region server to copy the queues from
|
* @param znode pertaining to the region server to copy the queues from
|
||||||
* @return WAL queues sorted per peer cluster
|
|
||||||
*/
|
*/
|
||||||
private Map<String, Set<String>> copyQueuesFromRSUsingMulti(String znode) {
|
private Pair<String, SortedSet<String>> moveQueueUsingMulti(String znode, String peerId) {
|
||||||
Map<String, Set<String>> queues = new HashMap<>();
|
|
||||||
// hbase/replication/rs/deadrs
|
|
||||||
String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
|
|
||||||
List<String> peerIdsToProcess = null;
|
|
||||||
List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
|
|
||||||
try {
|
try {
|
||||||
peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath);
|
// hbase/replication/rs/deadrs
|
||||||
if (peerIdsToProcess == null) return queues; // node already processed
|
String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
|
||||||
for (String peerId : peerIdsToProcess) {
|
List<ZKUtilOp> listOfOps = new ArrayList<>();
|
||||||
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
|
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
|
||||||
if (!peerExists(replicationQueueInfo.getPeerId())) {
|
if (!peerExists(replicationQueueInfo.getPeerId())) {
|
||||||
// the orphaned queues must be moved, otherwise the delete op of dead rs will fail,
|
// the orphaned queues must be moved, otherwise the delete op of dead rs will fail,
|
||||||
// this will cause the whole multi op fail.
|
// this will cause the whole multi op fail.
|
||||||
// NodeFailoverWorker will skip the orphaned queues.
|
// NodeFailoverWorker will skip the orphaned queues.
|
||||||
LOG.warn("Peer " + peerId
|
LOG.warn("Peer " + peerId +
|
||||||
+ " didn't exist, will move its queue to avoid the failure of multi op");
|
" didn't exist, will move its queue to avoid the failure of multi op");
|
||||||
}
|
}
|
||||||
String newPeerId = peerId + "-" + znode;
|
String newPeerId = peerId + "-" + znode;
|
||||||
String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId);
|
String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId);
|
||||||
// check the logs queue for the old peer cluster
|
// check the logs queue for the old peer cluster
|
||||||
String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
|
String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
|
||||||
List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
|
List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
|
||||||
if (wals == null || wals.size() == 0) {
|
SortedSet<String> logQueue = new TreeSet<>();
|
||||||
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
|
if (wals == null || wals.size() == 0) {
|
||||||
continue; // empty log queue.
|
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
|
||||||
}
|
} else {
|
||||||
// create the new cluster znode
|
// create the new cluster znode
|
||||||
Set<String> logQueue = new HashSet<String>();
|
|
||||||
queues.put(newPeerId, logQueue);
|
|
||||||
ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
|
ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
|
||||||
listOfOps.add(op);
|
listOfOps.add(op);
|
||||||
// get the offset of the logs and set it to new znodes
|
// get the offset of the logs and set it to new znodes
|
||||||
|
@ -354,98 +367,86 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
|
||||||
LOG.debug("Creating " + wal + " with data " + Bytes.toString(logOffset));
|
LOG.debug("Creating " + wal + " with data " + Bytes.toString(logOffset));
|
||||||
String newLogZnode = ZKUtil.joinZNode(newPeerZnode, wal);
|
String newLogZnode = ZKUtil.joinZNode(newPeerZnode, wal);
|
||||||
listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset));
|
listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset));
|
||||||
// add ops for deleting
|
|
||||||
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode));
|
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode));
|
||||||
logQueue.add(wal);
|
logQueue.add(wal);
|
||||||
}
|
}
|
||||||
// add delete op for peer
|
// add delete op for peer
|
||||||
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
|
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
|
||||||
|
|
||||||
|
if (LOG.isTraceEnabled())
|
||||||
|
LOG.trace(" The multi list size is: " + listOfOps.size());
|
||||||
}
|
}
|
||||||
// add delete op for dead rs, this will update the cversion of the parent.
|
|
||||||
// The reader will make optimistic locking with this to get a consistent
|
|
||||||
// snapshot
|
|
||||||
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath));
|
|
||||||
if (LOG.isTraceEnabled()) LOG.trace(" The multi list size is: " + listOfOps.size());
|
|
||||||
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
|
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
|
||||||
if (LOG.isTraceEnabled()) LOG.trace("Atomically moved the dead regionserver logs. ");
|
if (LOG.isTraceEnabled())
|
||||||
|
LOG.trace("Atomically moved the dead regionserver logs. ");
|
||||||
|
return new Pair<>(newPeerId, logQueue);
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
// Multi call failed; it looks like some other regionserver took away the logs.
|
// Multi call failed; it looks like some other regionserver took away the logs.
|
||||||
LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
|
LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
|
||||||
queues.clear();
|
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
|
LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
|
||||||
queues.clear();
|
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
}
|
}
|
||||||
return queues;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This methods copies all the wals queues from another region server and returns them all sorted
|
* This methods moves all the wals queues from another region server and returns them all sorted
|
||||||
* per peer cluster (appended with the dead server's znode)
|
* per peer cluster (appended with the dead server's znode)
|
||||||
* @param znode server names to copy
|
* @param znode server names to copy
|
||||||
* @return all wals for all peers of that cluster, null if an error occurred
|
* @return all wals for the peer of that cluster, null if an error occurred
|
||||||
*/
|
*/
|
||||||
private Map<String, Set<String>> copyQueuesFromRS(String znode) {
|
private Pair<String, SortedSet<String>> copyQueueFromLockedRS(String znode, String peerId) {
|
||||||
// TODO this method isn't atomic enough, we could start copying and then
|
// TODO this method isn't atomic enough, we could start copying and then
|
||||||
// TODO fail for some reason and we would end up with znodes we don't want.
|
// TODO fail for some reason and we would end up with znodes we don't want.
|
||||||
Map<String, Set<String>> queues = new HashMap<>();
|
|
||||||
try {
|
try {
|
||||||
String nodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
|
String nodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
|
||||||
List<String> clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath);
|
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
|
||||||
// We have a lock znode in there, it will count as one.
|
String clusterPath = ZKUtil.joinZNode(nodePath, peerId);
|
||||||
if (clusters == null || clusters.size() <= 1) {
|
if (!peerExists(replicationQueueInfo.getPeerId())) {
|
||||||
return queues;
|
LOG.warn("Peer " + peerId + " didn't exist, skipping the replay");
|
||||||
|
// Protection against moving orphaned queues
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
// The lock isn't a peer cluster, remove it
|
// We add the name of the recovered RS to the new znode, we can even
|
||||||
clusters.remove(RS_LOCK_ZNODE);
|
// do that for queues that were recovered 10 times giving a znode like
|
||||||
for (String cluster : clusters) {
|
// number-startcode-number-otherstartcode-number-anotherstartcode-etc
|
||||||
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(cluster);
|
String newCluster = peerId + "-" + znode;
|
||||||
if (!peerExists(replicationQueueInfo.getPeerId())) {
|
String newClusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, newCluster);
|
||||||
LOG.warn("Peer " + cluster + " didn't exist, skipping the replay");
|
|
||||||
// Protection against moving orphaned queues
|
List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath);
|
||||||
continue;
|
// That region server didn't have anything to replicate for this cluster
|
||||||
}
|
if (wals == null || wals.size() == 0) {
|
||||||
// We add the name of the recovered RS to the new znode, we can even
|
return null;
|
||||||
// do that for queues that were recovered 10 times giving a znode like
|
}
|
||||||
// number-startcode-number-otherstartcode-number-anotherstartcode-etc
|
ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode,
|
||||||
String newCluster = cluster + "-" + znode;
|
|
||||||
String newClusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, newCluster);
|
|
||||||
String clusterPath = ZKUtil.joinZNode(nodePath, cluster);
|
|
||||||
List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath);
|
|
||||||
// That region server didn't have anything to replicate for this cluster
|
|
||||||
if (wals == null || wals.size() == 0) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode,
|
|
||||||
HConstants.EMPTY_BYTE_ARRAY);
|
HConstants.EMPTY_BYTE_ARRAY);
|
||||||
Set<String> logQueue = new HashSet<String>();
|
SortedSet<String> logQueue = new TreeSet<>();
|
||||||
queues.put(newCluster, logQueue);
|
for (String wal : wals) {
|
||||||
for (String wal : wals) {
|
String z = ZKUtil.joinZNode(clusterPath, wal);
|
||||||
String z = ZKUtil.joinZNode(clusterPath, wal);
|
byte[] positionBytes = ZKUtil.getData(this.zookeeper, z);
|
||||||
byte[] positionBytes = ZKUtil.getData(this.zookeeper, z);
|
long position = 0;
|
||||||
long position = 0;
|
try {
|
||||||
try {
|
position = ZKUtil.parseWALPositionFrom(positionBytes);
|
||||||
position = ZKUtil.parseWALPositionFrom(positionBytes);
|
} catch (DeserializationException e) {
|
||||||
} catch (DeserializationException e) {
|
LOG.warn("Failed parse of wal position from the following znode: " + z
|
||||||
LOG.warn("Failed parse of wal position from the following znode: " + z
|
+ ", Exception: " + e);
|
||||||
+ ", Exception: " + e);
|
|
||||||
}
|
|
||||||
LOG.debug("Creating " + wal + " with data " + position);
|
|
||||||
String child = ZKUtil.joinZNode(newClusterZnode, wal);
|
|
||||||
// Position doesn't actually change, we are just deserializing it for
|
|
||||||
// logging, so just use the already serialized version
|
|
||||||
ZKUtil.createAndWatch(this.zookeeper, child, positionBytes);
|
|
||||||
logQueue.add(wal);
|
|
||||||
}
|
}
|
||||||
|
LOG.debug("Creating " + wal + " with data " + position);
|
||||||
|
String child = ZKUtil.joinZNode(newClusterZnode, wal);
|
||||||
|
// Position doesn't actually change, we are just deserializing it for
|
||||||
|
// logging, so just use the already serialized version
|
||||||
|
ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, child, positionBytes);
|
||||||
|
logQueue.add(wal);
|
||||||
}
|
}
|
||||||
|
return new Pair<>(newCluster, logQueue);
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
this.abortable.abort("Copy queues from rs", e);
|
LOG.warn("Got exception in copyQueueFromLockedRS: ", e);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOG.warn(e);
|
LOG.warn(e);
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
}
|
}
|
||||||
return queues;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.filter.CompareFilter;
|
import org.apache.hadoop.hbase.filter.CompareFilter;
|
||||||
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
|
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
|
||||||
|
@ -48,6 +49,8 @@ import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.SortedSet;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class provides an implementation of the ReplicationQueues interface using an HBase table
|
* This class provides an implementation of the ReplicationQueues interface using an HBase table
|
||||||
|
@ -227,31 +230,54 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
|
||||||
return getAllQueues(serverName);
|
return getAllQueues(serverName);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override public List<String> getUnClaimedQueueIds(String regionserver) {
|
||||||
public Map<String, Set<String>> claimQueues(String regionserver) {
|
|
||||||
Map<String, Set<String>> queues = new HashMap<>();
|
|
||||||
if (isThisOurRegionServer(regionserver)) {
|
if (isThisOurRegionServer(regionserver)) {
|
||||||
return queues;
|
return null;
|
||||||
}
|
}
|
||||||
ResultScanner queuesToClaim = null;
|
try (ResultScanner queuesToClaim = getQueuesBelongingToServer(regionserver)) {
|
||||||
try {
|
List<String> res = new ArrayList<>();
|
||||||
queuesToClaim = getQueuesBelongingToServer(regionserver);
|
|
||||||
for (Result queue : queuesToClaim) {
|
for (Result queue : queuesToClaim) {
|
||||||
|
String rowKey = Bytes.toString(queue.getRow());
|
||||||
|
res.add(rowKey);
|
||||||
|
}
|
||||||
|
return res.isEmpty() ? null : res;
|
||||||
|
} catch (IOException e) {
|
||||||
|
String errMsg = "Failed getUnClaimedQueueIds";
|
||||||
|
abortable.abort(errMsg, e);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void removeReplicatorIfQueueIsEmpty(String regionserver) {
|
||||||
|
// Do nothing here
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId) {
|
||||||
|
if (isThisOurRegionServer(regionserver)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
try (ResultScanner queuesToClaim = getQueuesBelongingToServer(regionserver)){
|
||||||
|
for (Result queue : queuesToClaim) {
|
||||||
|
String rowKey = Bytes.toString(queue.getRow());
|
||||||
|
if (!rowKey.equals(queueId)){
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if (attemptToClaimQueue(queue, regionserver)) {
|
if (attemptToClaimQueue(queue, regionserver)) {
|
||||||
String rowKey = Bytes.toString(queue.getRow());
|
|
||||||
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(rowKey);
|
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(rowKey);
|
||||||
if (replicationState.peerExists(replicationQueueInfo.getPeerId())) {
|
if (replicationState.peerExists(replicationQueueInfo.getPeerId())) {
|
||||||
Set<String> sortedLogs = new HashSet<String>();
|
SortedSet<String> sortedLogs = new TreeSet<>();
|
||||||
List<String> logs = getLogsInQueue(queue.getRow());
|
List<String> logs = getLogsInQueue(queue.getRow());
|
||||||
for (String log : logs) {
|
for (String log : logs) {
|
||||||
sortedLogs.add(log);
|
sortedLogs.add(log);
|
||||||
}
|
}
|
||||||
queues.put(rowKey, sortedLogs);
|
|
||||||
LOG.info(serverName + " has claimed queue " + rowKey + " from " + regionserver);
|
LOG.info(serverName + " has claimed queue " + rowKey + " from " + regionserver);
|
||||||
|
return new Pair<>(rowKey, sortedLogs);
|
||||||
} else {
|
} else {
|
||||||
// Delete orphaned queues
|
// Delete orphaned queues
|
||||||
removeQueue(Bytes.toString(queue.getRow()));
|
removeQueue(Bytes.toString(queue.getRow()));
|
||||||
LOG.info(serverName + " has deleted abandoned queue " + rowKey + " from " +
|
LOG.info(serverName + " has deleted abandoned queue " + queueId + " from " +
|
||||||
regionserver);
|
regionserver);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -259,13 +285,8 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
|
||||||
} catch (IOException | KeeperException e) {
|
} catch (IOException | KeeperException e) {
|
||||||
String errMsg = "Failed claiming queues for regionserver=" + regionserver;
|
String errMsg = "Failed claiming queues for regionserver=" + regionserver;
|
||||||
abortable.abort(errMsg, e);
|
abortable.abort(errMsg, e);
|
||||||
queues.clear();
|
|
||||||
} finally {
|
|
||||||
if (queuesToClaim != null) {
|
|
||||||
queuesToClaim.close();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return queues;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationTracker;
|
import org.apache.hadoop.hbase.replication.ReplicationTracker;
|
||||||
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -647,10 +648,27 @@ public class ReplicationSourceManager implements ReplicationListener {
|
||||||
LOG.info("Not transferring queue since we are shutting down");
|
LOG.info("Not transferring queue since we are shutting down");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
Map<String, Set<String>> newQueues = null;
|
Map<String, Set<String>> newQueues = new HashMap<>();
|
||||||
|
List<String> peers = rq.getUnClaimedQueueIds(rsZnode);
|
||||||
newQueues = this.rq.claimQueues(rsZnode);
|
while (peers != null && !peers.isEmpty()) {
|
||||||
|
Pair<String, SortedSet<String>> peer = this.rq.claimQueue(rsZnode,
|
||||||
|
peers.get(rand.nextInt(peers.size())));
|
||||||
|
long sleep = sleepBeforeFailover/2;
|
||||||
|
if (peer != null) {
|
||||||
|
newQueues.put(peer.getFirst(), peer.getSecond());
|
||||||
|
sleep = sleepBeforeFailover;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Thread.sleep(sleep);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.warn("Interrupted while waiting before transferring a queue.");
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
peers = rq.getUnClaimedQueueIds(rsZnode);
|
||||||
|
}
|
||||||
|
if (peers != null) {
|
||||||
|
rq.removeReplicatorIfQueueIsEmpty(rsZnode);
|
||||||
|
}
|
||||||
// Copying over the failed queue is completed.
|
// Copying over the failed queue is completed.
|
||||||
if (newQueues.isEmpty()) {
|
if (newQueues.isEmpty()) {
|
||||||
// We either didn't get the lock or the failed region server didn't have any outstanding
|
// We either didn't get the lock or the failed region server didn't have any outstanding
|
||||||
|
|
|
@ -22,8 +22,6 @@ import static org.junit.Assert.*;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -124,7 +122,8 @@ public abstract class TestReplicationStateBasic {
|
||||||
assertEquals(0, rq1.getAllQueues().size());
|
assertEquals(0, rq1.getAllQueues().size());
|
||||||
assertEquals(0, rq1.getLogPosition("bogus", "bogus"));
|
assertEquals(0, rq1.getLogPosition("bogus", "bogus"));
|
||||||
assertNull(rq1.getLogsInQueue("bogus"));
|
assertNull(rq1.getLogsInQueue("bogus"));
|
||||||
assertEquals(0, rq1.claimQueues(ServerName.valueOf("bogus", 1234, -1L).toString()).size());
|
assertNull(rq1.getUnClaimedQueueIds(
|
||||||
|
ServerName.valueOf("bogus", 1234, -1L).toString()));
|
||||||
|
|
||||||
rq1.setLogPosition("bogus", "bogus", 5L);
|
rq1.setLogPosition("bogus", "bogus", 5L);
|
||||||
|
|
||||||
|
@ -143,15 +142,21 @@ public abstract class TestReplicationStateBasic {
|
||||||
assertEquals(1, rq2.getAllQueues().size());
|
assertEquals(1, rq2.getAllQueues().size());
|
||||||
assertEquals(5, rq3.getAllQueues().size());
|
assertEquals(5, rq3.getAllQueues().size());
|
||||||
|
|
||||||
assertEquals(0, rq3.claimQueues(server1).size());
|
assertEquals(0, rq3.getUnClaimedQueueIds(server1).size());
|
||||||
|
rq3.removeReplicatorIfQueueIsEmpty(server1);
|
||||||
assertEquals(2, rq3.getListOfReplicators().size());
|
assertEquals(2, rq3.getListOfReplicators().size());
|
||||||
|
|
||||||
Map<String, Set<String>> queues = rq2.claimQueues(server3);
|
List<String> queues = rq2.getUnClaimedQueueIds(server3);
|
||||||
assertEquals(5, queues.size());
|
assertEquals(5, queues.size());
|
||||||
|
for(String queue: queues) {
|
||||||
|
rq2.claimQueue(server3, queue);
|
||||||
|
}
|
||||||
|
rq2.removeReplicatorIfQueueIsEmpty(server3);
|
||||||
assertEquals(1, rq2.getListOfReplicators().size());
|
assertEquals(1, rq2.getListOfReplicators().size());
|
||||||
|
|
||||||
// Try to claim our own queues
|
// Try to claim our own queues
|
||||||
assertEquals(0, rq2.claimQueues(server2).size());
|
assertNull(rq2.getUnClaimedQueueIds(server2));
|
||||||
|
rq2.removeReplicatorIfQueueIsEmpty(server2);
|
||||||
|
|
||||||
assertEquals(6, rq2.getAllQueues().size());
|
assertEquals(6, rq2.getAllQueues().size());
|
||||||
|
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -40,8 +41,6 @@ import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import static junit.framework.TestCase.assertNull;
|
import static junit.framework.TestCase.assertNull;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
@ -267,18 +266,19 @@ public class TestReplicationStateHBaseImpl {
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
// Test claiming queues
|
// Test claiming queues
|
||||||
Map<String, Set<String>> claimedQueuesFromRq2 = rq1.claimQueues(server2);
|
List<String> claimedQueuesFromRq2 = rq1.getUnClaimedQueueIds(server2);
|
||||||
// Check to make sure that list of peers with outstanding queues is decremented by one
|
// Check to make sure that list of peers with outstanding queues is decremented by one
|
||||||
// after claimQueues
|
// after claimQueues
|
||||||
|
// Check to make sure that we claimed the proper number of queues
|
||||||
|
assertEquals(2, claimedQueuesFromRq2.size());
|
||||||
|
assertTrue(claimedQueuesFromRq2.contains("Queue1-" + server2));
|
||||||
|
assertTrue(claimedQueuesFromRq2.contains("Queue2-" + server2));
|
||||||
|
assertEquals(2, rq1.claimQueue(server2, "Queue1-" + server2).getSecond().size());
|
||||||
|
assertEquals(1, rq1.claimQueue(server2, "Queue2-" + server2).getSecond().size());
|
||||||
|
rq1.removeReplicatorIfQueueIsEmpty(server2);
|
||||||
assertEquals(rq1.getListOfReplicators().size(), 2);
|
assertEquals(rq1.getListOfReplicators().size(), 2);
|
||||||
assertEquals(rq2.getListOfReplicators().size(), 2);
|
assertEquals(rq2.getListOfReplicators().size(), 2);
|
||||||
assertEquals(rq3.getListOfReplicators().size(), 2);
|
assertEquals(rq3.getListOfReplicators().size(), 2);
|
||||||
// Check to make sure that we claimed the proper number of queues
|
|
||||||
assertEquals(2, claimedQueuesFromRq2.size());
|
|
||||||
assertTrue(claimedQueuesFromRq2.containsKey("Queue1-" + server2));
|
|
||||||
assertTrue(claimedQueuesFromRq2.containsKey("Queue2-" + server2));
|
|
||||||
assertEquals(2, claimedQueuesFromRq2.get("Queue1-" + server2).size());
|
|
||||||
assertEquals(1, claimedQueuesFromRq2.get("Queue2-" + server2).size());
|
|
||||||
assertEquals(5, rq1.getAllQueues().size());
|
assertEquals(5, rq1.getAllQueues().size());
|
||||||
// Check that all the logs in the other queue were claimed
|
// Check that all the logs in the other queue were claimed
|
||||||
assertEquals(2, rq1.getLogsInQueue("Queue1-" + server2).size());
|
assertEquals(2, rq1.getLogsInQueue("Queue1-" + server2).size());
|
||||||
|
@ -294,7 +294,11 @@ public class TestReplicationStateHBaseImpl {
|
||||||
rq1.addLog("UnclaimableQueue", "WALLogFile1.1");
|
rq1.addLog("UnclaimableQueue", "WALLogFile1.1");
|
||||||
rq1.addLog("UnclaimableQueue", "WALLogFile1.2");
|
rq1.addLog("UnclaimableQueue", "WALLogFile1.2");
|
||||||
assertEquals(6, rq1.getAllQueues().size());
|
assertEquals(6, rq1.getAllQueues().size());
|
||||||
Map<String, Set<String>> claimedQueuesFromRq1 = rq3.claimQueues(server1);
|
List<String> claimedQueuesFromRq1 = rq3.getUnClaimedQueueIds(server1);
|
||||||
|
for(String queue : claimedQueuesFromRq1) {
|
||||||
|
rq3.claimQueue(server1, queue);
|
||||||
|
}
|
||||||
|
rq3.removeReplicatorIfQueueIsEmpty(server1);
|
||||||
assertEquals(rq1.getListOfReplicators().size(), 1);
|
assertEquals(rq1.getListOfReplicators().size(), 1);
|
||||||
assertEquals(rq2.getListOfReplicators().size(), 1);
|
assertEquals(rq2.getListOfReplicators().size(), 1);
|
||||||
assertEquals(rq3.getListOfReplicators().size(), 1);
|
assertEquals(rq3.getListOfReplicators().size(), 1);
|
||||||
|
@ -302,12 +306,12 @@ public class TestReplicationStateHBaseImpl {
|
||||||
// Replication Peers
|
// Replication Peers
|
||||||
assertEquals(6, rq3.getAllQueues().size());
|
assertEquals(6, rq3.getAllQueues().size());
|
||||||
// Test claiming non-existing queues
|
// Test claiming non-existing queues
|
||||||
Map<String, Set<String>> noQueues = rq3.claimQueues("NotARealServer");
|
List<String> noQueues = rq3.getUnClaimedQueueIds("NotARealServer");
|
||||||
assertEquals(0, noQueues.size());
|
assertNull(noQueues);
|
||||||
assertEquals(6, rq3.getAllQueues().size());
|
assertEquals(6, rq3.getAllQueues().size());
|
||||||
// Test claiming own queues
|
// Test claiming own queues
|
||||||
noQueues = rq3.claimQueues(server3);
|
noQueues = rq3.getUnClaimedQueueIds(server3);
|
||||||
assertEquals(0, noQueues.size());
|
Assert.assertNull(noQueues);
|
||||||
assertEquals(6, rq3.getAllQueues().size());
|
assertEquals(6, rq3.getAllQueues().size());
|
||||||
// Check that rq3 still remain on list of replicators
|
// Check that rq3 still remain on list of replicators
|
||||||
assertEquals(1, rq3.getListOfReplicators().size());
|
assertEquals(1, rq3.getListOfReplicators().size());
|
||||||
|
|
|
@ -75,6 +75,7 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||||
import org.apache.hadoop.hbase.util.ByteStringer;
|
import org.apache.hadoop.hbase.util.ByteStringer;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.wal.WAL;
|
import org.apache.hadoop.hbase.wal.WAL;
|
||||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||||
import org.apache.hadoop.hbase.wal.WALKey;
|
import org.apache.hadoop.hbase.wal.WALKey;
|
||||||
|
@ -517,7 +518,12 @@ public abstract class TestReplicationSourceManager {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
logZnodesMap = rq.claimQueues(deadRsZnode);
|
logZnodesMap = new HashMap<>();
|
||||||
|
List<String> queues = rq.getUnClaimedQueueIds(deadRsZnode);
|
||||||
|
for(String queue:queues){
|
||||||
|
Pair<String, SortedSet<String>> pair = rq.claimQueue(deadRsZnode, queue);
|
||||||
|
logZnodesMap.put(pair.getFirst(), pair.getSecond());
|
||||||
|
}
|
||||||
server.abort("Done with testing", null);
|
server.abort("Done with testing", null);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Got exception while running NodeFailoverWorker", e);
|
LOG.error("Got exception while running NodeFailoverWorker", e);
|
||||||
|
|
|
@ -36,8 +36,6 @@ import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
@ -87,22 +85,28 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan
|
||||||
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1,
|
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1,
|
||||||
s1.getZooKeeper()));
|
s1.getZooKeeper()));
|
||||||
rq1.init(s1.getServerName().toString());
|
rq1.init(s1.getServerName().toString());
|
||||||
Map<String, Set<String>> testMap =
|
String serverName = server.getServerName().getServerName();
|
||||||
rq1.claimQueues(server.getServerName().getServerName());
|
List<String> unclaimed = rq1.getUnClaimedQueueIds(serverName);
|
||||||
|
rq1.claimQueue(serverName, unclaimed.get(0)).getSecond();
|
||||||
|
rq1.removeReplicatorIfQueueIsEmpty(unclaimed.get(0));
|
||||||
ReplicationQueues rq2 =
|
ReplicationQueues rq2 =
|
||||||
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s2.getConfiguration(), s2,
|
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s2.getConfiguration(), s2,
|
||||||
s2.getZooKeeper()));
|
s2.getZooKeeper()));
|
||||||
rq2.init(s2.getServerName().toString());
|
rq2.init(s2.getServerName().toString());
|
||||||
testMap = rq2.claimQueues(s1.getServerName().getServerName());
|
serverName = s1.getServerName().getServerName();
|
||||||
|
unclaimed = rq2.getUnClaimedQueueIds(serverName);
|
||||||
|
rq2.claimQueue(serverName, unclaimed.get(0)).getSecond();
|
||||||
|
rq2.removeReplicatorIfQueueIsEmpty(unclaimed.get(0));
|
||||||
ReplicationQueues rq3 =
|
ReplicationQueues rq3 =
|
||||||
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s3.getConfiguration(), s3,
|
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s3.getConfiguration(), s3,
|
||||||
s3.getZooKeeper()));
|
s3.getZooKeeper()));
|
||||||
rq3.init(s3.getServerName().toString());
|
rq3.init(s3.getServerName().toString());
|
||||||
testMap = rq3.claimQueues(s2.getServerName().getServerName());
|
serverName = s2.getServerName().getServerName();
|
||||||
|
unclaimed = rq3.getUnClaimedQueueIds(serverName);
|
||||||
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.keySet().iterator().next());
|
String queue3 = rq3.claimQueue(serverName, unclaimed.get(0)).getFirst();
|
||||||
|
rq3.removeReplicatorIfQueueIsEmpty(unclaimed.get(0));
|
||||||
|
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queue3);
|
||||||
List<String> result = replicationQueueInfo.getDeadRegionServers();
|
List<String> result = replicationQueueInfo.getDeadRegionServers();
|
||||||
|
|
||||||
// verify
|
// verify
|
||||||
assertTrue(result.contains(server.getServerName().getServerName()));
|
assertTrue(result.contains(server.getServerName().getServerName()));
|
||||||
assertTrue(result.contains(s1.getServerName().getServerName()));
|
assertTrue(result.contains(s1.getServerName().getServerName()));
|
||||||
|
@ -137,7 +141,11 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan
|
||||||
new ReplicationQueuesClientArguments(s1.getConfiguration(), s1, s1.getZooKeeper()));
|
new ReplicationQueuesClientArguments(s1.getConfiguration(), s1, s1.getZooKeeper()));
|
||||||
|
|
||||||
int v0 = client.getQueuesZNodeCversion();
|
int v0 = client.getQueuesZNodeCversion();
|
||||||
rq1.claimQueues(s0.getServerName().getServerName());
|
List<String> queues = rq1.getUnClaimedQueueIds(s0.getServerName().getServerName());
|
||||||
|
for(String queue : queues) {
|
||||||
|
rq1.claimQueue(s0.getServerName().getServerName(), queue);
|
||||||
|
}
|
||||||
|
rq1.removeReplicatorIfQueueIsEmpty(s0.getServerName().getServerName());
|
||||||
int v1 = client.getQueuesZNodeCversion();
|
int v1 = client.getQueuesZNodeCversion();
|
||||||
// cversion should increase by 1 since a child node is deleted
|
// cversion should increase by 1 since a child node is deleted
|
||||||
assertEquals(v0 + 1, v1);
|
assertEquals(v0 + 1, v1);
|
||||||
|
|
Loading…
Reference in New Issue