HBASE-7568 [replication] Create an interface for replication queues

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1464279 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2013-04-04 03:51:32 +00:00
parent 05defbdce3
commit e7bebd4706
14 changed files with 1093 additions and 457 deletions

View File

@ -0,0 +1,109 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.util.List;
import java.util.SortedMap;
import java.util.SortedSet;
import org.apache.zookeeper.KeeperException;
/**
* This provides an interface for maintaining a region server's replication queues. These queues
* keep track of the HLogs that still need to be replicated to remote clusters.
*/
public interface ReplicationQueues {
/**
* Initialize the region server replication queue interface.
* @param serverName The server name of the region server that owns the replication queues this
* interface manages.
*/
public void init(String serverName);
/**
* Remove a replication queue.
* @param queueId a String that identifies the queue.
*/
public void removeQueue(String queueId);
/**
* Add a new HLog file to the given queue. If the queue does not exist it is created.
* @param queueId a String that identifies the queue.
* @param filename name of the HLog
* @throws KeeperException
*/
public void addLog(String queueId, String filename) throws KeeperException;
/**
* Remove an HLog file from the given queue.
* @param queueId a String that identifies the queue.
* @param filename name of the HLog
*/
public void removeLog(String queueId, String filename);
/**
* Set the current position for a specific HLog in a given queue.
* @param queueId a String that identifies the queue
* @param filename name of the HLog
* @param position the current position in the file
*/
public void setLogPosition(String queueId, String filename, long position);
/**
* Get the current position for a specific HLog in a given queue.
* @param queueId a String that identifies the queue
* @param filename name of the HLog
* @return the current position in the file
*/
public long getLogPosition(String queueId, String filename) throws KeeperException;
/**
* Remove all replication queues for this region server.
*/
public void removeAllQueues();
/**
* Get a list of all HLogs in the given queue.
* @param queueId a String that identifies the queue
* @return a list of HLogs, null if this region server is dead and has no outstanding queues
*/
public List<String> getLogsInQueue(String queueId);
/**
* Get a list of all queues for this region server.
* @return a list of queueIds, null if this region server is dead and has no outstanding queues
*/
public List<String> getAllQueues();
/**
* Take ownership for the set of queues belonging to a dead region server.
* @param regionserver the id of the dead region server
* @return A SortedMap of the queues that have been claimed, including a SortedSet of HLogs in
* each queue. Returns an empty map if no queues were failed-over.
*/
public SortedMap<String, SortedSet<String>> claimQueues(String regionserver);
/**
* Get a list of all region servers that have outstanding replication queues. These servers could
* be alive, dead or from a previous run of the cluster.
* @return a list of server names
*/
public List<String> getListOfReplicators();
}

View File

@ -0,0 +1,50 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.util.List;
/**
* This provides an interface for clients of replication to view replication queues. These queues
* keep track of the HLogs that still need to be replicated to remote clusters.
*/
public interface ReplicationQueuesClient {
/**
* Get a list of all region servers that have outstanding replication queues. These servers could
* be alive, dead or from a previous run of the cluster.
* @return a list of server names
*/
public List<String> getListOfReplicators();
/**
* Get a list of all HLogs in the given queue on the given region server.
* @param serverName the server name of the region server that owns the queue
* @param queueId a String that identifies the queue
* @return a list of HLogs, null if this region server is dead and has no outstanding queues
*/
public List<String> getLogsInQueue(String serverName, String queueId);
/**
* Get a list of all queues for the specified region server.
* @param serverName the server name of the region server that owns the set of queues
* @return a list of queueIds, null if this region server is not a replicator.
*/
public List<String> getAllQueues(String serverName);
}

View File

@ -0,0 +1,64 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implements
ReplicationQueuesClient {
public ReplicationQueuesClientZKImpl(final ZooKeeperWatcher zk, Configuration conf,
Abortable abortable) throws KeeperException {
super(zk, conf, abortable);
ZKUtil.createWithParents(this.zookeeper, this.queuesZNode);
}
@Override
public List<String> getLogsInQueue(String serverName, String queueId) {
String znode = ZKUtil.joinZNode(this.queuesZNode, serverName);
znode = ZKUtil.joinZNode(znode, queueId);
List<String> result = null;
try {
result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
} catch (KeeperException e) {
this.abortable.abort("Failed to get list of hlogs for queueId=" + queueId
+ " and serverName=" + serverName, e);
}
return result;
}
@Override
public List<String> getAllQueues(String serverName) {
String znode = ZKUtil.joinZNode(this.queuesZNode, serverName);
List<String> result = null;
try {
result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
} catch (KeeperException e) {
this.abortable.abort("Failed to get list of queues for serverName=" + serverName, e);
}
return result;
}
}

View File

@ -0,0 +1,403 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
import org.apache.zookeeper.KeeperException;
import com.google.protobuf.InvalidProtocolBufferException;
public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements ReplicationQueues {
/** Znode containing all replication queues for this region server. */
private String myQueuesZnode;
/** Name of znode we use to lock during failover */
private final static String RS_LOCK_ZNODE = "lock";
private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class);
public ReplicationQueuesZKImpl(final ZooKeeperWatcher zk, Configuration conf, Abortable abortable)
throws KeeperException {
super(zk, conf, abortable);
}
@Override
public void init(String serverName) {
this.myQueuesZnode = ZKUtil.joinZNode(this.queuesZNode, serverName);
}
@Override
public void removeQueue(String queueId) {
try {
ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.myQueuesZnode, queueId));
} catch (KeeperException e) {
this.abortable.abort("Failed to delete queue (queueId=" + queueId + ")", e);
}
}
@Override
public void addLog(String queueId, String filename) throws KeeperException {
String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
znode = ZKUtil.joinZNode(znode, filename);
ZKUtil.createWithParents(this.zookeeper, znode);
}
@Override
public void removeLog(String queueId, String filename) {
try {
String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
znode = ZKUtil.joinZNode(znode, filename);
ZKUtil.deleteNode(this.zookeeper, znode);
} catch (KeeperException e) {
this.abortable.abort("Failed to remove hlog from queue (queueId=" + queueId + ", filename="
+ filename + ")", e);
}
}
@Override
public void setLogPosition(String queueId, String filename, long position) {
try {
String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
znode = ZKUtil.joinZNode(znode, filename);
// Why serialize String of Long and not Long as bytes?
ZKUtil.setData(this.zookeeper, znode, toByteArray(position));
} catch (KeeperException e) {
this.abortable.abort("Failed to write replication hlog position (filename=" + filename
+ ", position=" + position + ")", e);
}
}
@Override
public long getLogPosition(String queueId, String filename) throws KeeperException {
String clusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
String znode = ZKUtil.joinZNode(clusterZnode, filename);
byte[] bytes = ZKUtil.getData(this.zookeeper, znode);
try {
return parseHLogPositionFrom(bytes);
} catch (DeserializationException de) {
LOG.warn("Failed to parse HLogPosition for queueId=" + queueId + " and hlog=" + filename
+ "znode content, continuing.");
}
// if we can not parse the position, start at the beginning of the hlog file
// again
return 0;
}
@Override
public SortedMap<String, SortedSet<String>> claimQueues(String regionserverZnode) {
SortedMap<String, SortedSet<String>> newQueues = new TreeMap<String, SortedSet<String>>();
// check whether there is multi support. If yes, use it.
if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
LOG.info("Atomically moving " + regionserverZnode + "'s hlogs to my queue");
newQueues = copyQueuesFromRSUsingMulti(regionserverZnode);
} else {
LOG.info("Moving " + regionserverZnode + "'s hlogs to my queue");
if (!lockOtherRS(regionserverZnode)) {
return newQueues;
}
newQueues = copyQueuesFromRS(regionserverZnode);
deleteAnotherRSQueues(regionserverZnode);
}
return newQueues;
}
@Override
public void removeAllQueues() {
try {
ZKUtil.deleteNodeRecursively(this.zookeeper, this.myQueuesZnode);
} catch (KeeperException e) {
// if the znode is already expired, don't bother going further
if (e instanceof KeeperException.SessionExpiredException) {
return;
}
this.abortable.abort("Failed to delete replication queues for region server: "
+ this.myQueuesZnode, e);
}
}
@Override
public List<String> getLogsInQueue(String queueId) {
String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
List<String> result = null;
try {
result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
} catch (KeeperException e) {
this.abortable.abort("Failed to get list of hlogs for queueId=" + queueId, e);
}
return result;
}
@Override
public List<String> getAllQueues() {
List<String> listOfQueues = null;
try {
listOfQueues = ZKUtil.listChildrenNoWatch(this.zookeeper, this.myQueuesZnode);
} catch (KeeperException e) {
this.abortable.abort("Failed to get a list of queues for region server: "
+ this.myQueuesZnode, e);
}
return listOfQueues;
}
/**
* Try to set a lock in another region server's znode.
* @param znode the server names of the other server
* @return true if the lock was acquired, false in every other cases
*/
private boolean lockOtherRS(String znode) {
try {
String parent = ZKUtil.joinZNode(this.queuesZNode, znode);
if (parent.equals(this.myQueuesZnode)) {
LOG.warn("Won't lock because this is us, we're dead!");
return false;
}
String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE);
ZKUtil.createAndWatch(this.zookeeper, p, lockToByteArray(this.myQueuesZnode));
} catch (KeeperException e) {
// This exception will pop up if the znode under which we're trying to
// create the lock is already deleted by another region server, meaning
// that the transfer already occurred.
// NoNode => transfer is done and znodes are already deleted
// NodeExists => lock znode already created by another RS
if (e instanceof KeeperException.NoNodeException
|| e instanceof KeeperException.NodeExistsException) {
LOG.info("Won't transfer the queue," + " another RS took care of it because of: "
+ e.getMessage());
} else {
LOG.info("Failed lock other rs", e);
}
return false;
}
return true;
}
/**
* Delete all the replication queues for a given region server.
* @param regionserverZnode The znode of the region server to delete.
*/
private void deleteAnotherRSQueues(String regionserverZnode) {
String fullpath = ZKUtil.joinZNode(this.queuesZNode, regionserverZnode);
try {
List<String> clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, fullpath);
for (String cluster : clusters) {
// No need to delete, it will be deleted later.
if (cluster.equals(RS_LOCK_ZNODE)) {
continue;
}
String fullClusterPath = ZKUtil.joinZNode(fullpath, cluster);
ZKUtil.deleteNodeRecursively(this.zookeeper, fullClusterPath);
}
// Finish cleaning up
ZKUtil.deleteNodeRecursively(this.zookeeper, fullpath);
} catch (KeeperException e) {
if (e instanceof KeeperException.NoNodeException
|| e instanceof KeeperException.NotEmptyException) {
// Testing a special case where another region server was able to
// create a lock just after we deleted it, but then was also able to
// delete the RS znode before us or its lock znode is still there.
if (e.getPath().equals(fullpath)) {
return;
}
}
this.abortable.abort("Failed to delete replication queues for region server: "
+ regionserverZnode, e);
}
}
/**
* It "atomically" copies all the hlogs queues from another region server and returns them all
* sorted per peer cluster (appended with the dead server's znode).
* @param znode pertaining to the region server to copy the queues from
* @return HLog queues sorted per peer cluster
*/
private SortedMap<String, SortedSet<String>> copyQueuesFromRSUsingMulti(String znode) {
SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>();
// hbase/replication/rs/deadrs
String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
List<String> peerIdsToProcess = null;
List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
try {
peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath);
if (peerIdsToProcess == null) return queues; // node already processed
for (String peerId : peerIdsToProcess) {
String newPeerId = peerId + "-" + znode;
String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId);
// check the logs queue for the old peer cluster
String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
List<String> hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
if (hlogs == null || hlogs.size() == 0) {
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
continue; // empty log queue.
}
// create the new cluster znode
SortedSet<String> logQueue = new TreeSet<String>();
queues.put(newPeerId, logQueue);
ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
listOfOps.add(op);
// get the offset of the logs and set it to new znodes
for (String hlog : hlogs) {
String oldHlogZnode = ZKUtil.joinZNode(oldClusterZnode, hlog);
byte[] logOffset = ZKUtil.getData(this.zookeeper, oldHlogZnode);
LOG.debug("Creating " + hlog + " with data " + Bytes.toString(logOffset));
String newLogZnode = ZKUtil.joinZNode(newPeerZnode, hlog);
listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset));
// add ops for deleting
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldHlogZnode));
logQueue.add(hlog);
}
// add delete op for peer
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
}
// add delete op for dead rs
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath));
LOG.debug(" The multi list size is: " + listOfOps.size());
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
LOG.info("Atomically moved the dead regionserver logs. ");
} catch (KeeperException e) {
// Multi call failed; it looks like some other regionserver took away the logs.
LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
queues.clear();
}
return queues;
}
/**
* This methods copies all the hlogs queues from another region server and returns them all sorted
* per peer cluster (appended with the dead server's znode)
* @param znode server names to copy
* @return all hlogs for all peers of that cluster, null if an error occurred
*/
private SortedMap<String, SortedSet<String>> copyQueuesFromRS(String znode) {
// TODO this method isn't atomic enough, we could start copying and then
// TODO fail for some reason and we would end up with znodes we don't want.
SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>();
try {
String nodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
List<String> clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath);
// We have a lock znode in there, it will count as one.
if (clusters == null || clusters.size() <= 1) {
return queues;
}
// The lock isn't a peer cluster, remove it
clusters.remove(RS_LOCK_ZNODE);
for (String cluster : clusters) {
// We add the name of the recovered RS to the new znode, we can even
// do that for queues that were recovered 10 times giving a znode like
// number-startcode-number-otherstartcode-number-anotherstartcode-etc
String newCluster = cluster + "-" + znode;
String newClusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, newCluster);
String clusterPath = ZKUtil.joinZNode(nodePath, cluster);
List<String> hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath);
// That region server didn't have anything to replicate for this cluster
if (hlogs == null || hlogs.size() == 0) {
continue;
}
ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode,
HConstants.EMPTY_BYTE_ARRAY);
SortedSet<String> logQueue = new TreeSet<String>();
queues.put(newCluster, logQueue);
for (String hlog : hlogs) {
String z = ZKUtil.joinZNode(clusterPath, hlog);
byte[] positionBytes = ZKUtil.getData(this.zookeeper, z);
long position = 0;
try {
position = parseHLogPositionFrom(positionBytes);
} catch (DeserializationException e) {
LOG.warn("Failed parse of hlog position from the following znode: " + z);
}
LOG.debug("Creating " + hlog + " with data " + position);
String child = ZKUtil.joinZNode(newClusterZnode, hlog);
// Position doesn't actually change, we are just deserializing it for
// logging, so just use the already serialized version
ZKUtil.createAndWatch(this.zookeeper, child, positionBytes);
logQueue.add(hlog);
}
}
} catch (KeeperException e) {
this.abortable.abort("Copy queues from rs", e);
}
return queues;
}
/**
* @param lockOwner
* @return Serialized protobuf of <code>lockOwner</code> with pb magic prefix prepended suitable
* for use as content of an replication lock during region server fail over.
*/
static byte[] lockToByteArray(final String lockOwner) {
byte[] bytes =
ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build().toByteArray();
return ProtobufUtil.prependPBMagic(bytes);
}
/**
* @param position
* @return Serialized protobuf of <code>position</code> with pb magic prefix prepended suitable
* for use as content of an hlog position in a replication queue.
*/
static byte[] toByteArray(final long position) {
byte[] bytes =
ZooKeeperProtos.ReplicationHLogPosition.newBuilder().setPosition(position).build()
.toByteArray();
return ProtobufUtil.prependPBMagic(bytes);
}
/**
* @param bytes - Content of a HLog position znode.
* @return long - The current HLog position.
* @throws DeserializationException
*/
private long parseHLogPositionFrom(final byte[] bytes) throws DeserializationException {
if (ProtobufUtil.isPBMagicPrefix(bytes)) {
int pblen = ProtobufUtil.lengthOfPBMagic();
ZooKeeperProtos.ReplicationHLogPosition.Builder builder =
ZooKeeperProtos.ReplicationHLogPosition.newBuilder();
ZooKeeperProtos.ReplicationHLogPosition position;
try {
position = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
} catch (InvalidProtocolBufferException e) {
throw new DeserializationException(e);
}
return position.getPosition();
} else {
if (bytes.length > 0) {
return Bytes.toLong(bytes);
}
return 0;
}
}
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.replication;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -37,34 +38,37 @@ import java.util.concurrent.atomic.AtomicBoolean;
* ReplicationStateImpl is responsible for maintaining the replication state
* znode.
*/
public class ReplicationStateImpl implements ReplicationStateInterface {
public class ReplicationStateImpl extends ReplicationStateZKBase implements
ReplicationStateInterface {
private ReplicationStateTracker stateTracker;
private final String stateZnode;
private final ZooKeeperWatcher zookeeper;
private final Abortable abortable;
private final ReplicationStateTracker stateTracker;
private final AtomicBoolean replicating;
private static final Log LOG = LogFactory.getLog(ReplicationStateImpl.class);
public ReplicationStateImpl(final ZooKeeperWatcher zk, final String stateZnode,
public ReplicationStateImpl(final ZooKeeperWatcher zk, final Configuration conf,
final Abortable abortable, final AtomicBoolean replicating) {
this.zookeeper = zk;
this.stateZnode = stateZnode;
this.abortable = abortable;
super(zk, conf, abortable);
this.replicating = replicating;
// Set a tracker on replicationStateNode
this.stateTracker = new ReplicationStateTracker(this.zookeeper, this.stateZnode,
this.abortable);
this.stateTracker =
new ReplicationStateTracker(this.zookeeper, this.stateZNode, this.abortable);
stateTracker.start();
readReplicationStateZnode();
}
public ReplicationStateImpl(final ZooKeeperWatcher zk, final Configuration conf,
final Abortable abortable) {
this(zk, conf, abortable, new AtomicBoolean());
}
@Override
public boolean getState() throws KeeperException {
return getReplication();
}
@Override
public void setState(boolean newState) throws KeeperException {
setReplicating(newState);
}
@ -110,10 +114,10 @@ public class ReplicationStateImpl implements ReplicationStateInterface {
* @param newState
*/
private void setReplicating(boolean newState) throws KeeperException {
ZKUtil.createWithParents(this.zookeeper, this.stateZnode);
ZKUtil.createWithParents(this.zookeeper, this.stateZNode);
byte[] stateBytes = (newState == true) ? ReplicationZookeeper.ENABLED_ZNODE_BYTES
: ReplicationZookeeper.DISABLED_ZNODE_BYTES;
ZKUtil.setData(this.zookeeper, this.stateZnode, stateBytes);
ZKUtil.setData(this.zookeeper, this.stateZNode, stateBytes);
}
/**
@ -143,7 +147,7 @@ public class ReplicationStateImpl implements ReplicationStateInterface {
this.replicating.set(getReplication());
LOG.info("Replication is now " + (this.replicating.get() ? "started" : "stopped"));
} catch (KeeperException e) {
this.abortable.abort("Failed getting data on from " + this.stateZnode, e);
this.abortable.abort("Failed getting data on from " + this.stateZNode, e);
}
}

View File

@ -0,0 +1,81 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
/**
* This is a base class for maintaining replication state in zookeeper.
*/
public abstract class ReplicationStateZKBase {
/**
* The name of the znode that contains the replication status of a remote slave (i.e. peer)
* cluster.
*/
protected final String peerStateNodeName;
/** The name of the znode that contains the replication status of the local cluster. */
protected final String stateZNode;
/** The name of the base znode that contains all replication state. */
protected final String replicationZNode;
/** The name of the znode that contains a list of all remote slave (i.e. peer) clusters. */
protected final String peersZNode;
/** The name of the znode that contains all replication queues */
protected final String queuesZNode;
/** The cluster key of the local cluster */
protected final String ourClusterKey;
protected final ZooKeeperWatcher zookeeper;
protected final Configuration conf;
protected final Abortable abortable;
public ReplicationStateZKBase(ZooKeeperWatcher zookeeper, Configuration conf,
Abortable abortable) {
this.zookeeper = zookeeper;
this.conf = conf;
this.abortable = abortable;
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
String stateZNodeName = conf.get("zookeeper.znode.replication.state", "state");
this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf);
this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
this.stateZNode = ZKUtil.joinZNode(replicationZNode, stateZNodeName);
this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
this.queuesZNode = ZKUtil.joinZNode(replicationZNode, queuesZNodeName);
}
public List<String> getListOfReplicators() {
List<String> result = null;
try {
result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.queuesZNode);
} catch (KeeperException e) {
this.abortable.abort("Failed to get list of replicators", e);
}
return result;
}
}

View File

@ -24,7 +24,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
@ -33,7 +32,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
@ -51,7 +49,6 @@ import java.util.Map;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
@ -88,8 +85,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class ReplicationZookeeper implements Closeable {
private static final Log LOG =
LogFactory.getLog(ReplicationZookeeper.class);
// Name of znode we use to lock when failover
private final static String RS_LOCK_ZNODE = "lock";
// Our handle on zookeeper
private final ZooKeeperWatcher zookeeper;
@ -114,6 +109,7 @@ public class ReplicationZookeeper implements Closeable {
// Abortable
private Abortable abortable;
private final ReplicationStateInterface replicationState;
private final ReplicationQueues replicationQueues;
/**
* ZNode content if enabled state.
@ -139,8 +135,10 @@ public class ReplicationZookeeper implements Closeable {
this.conf = conf;
this.zookeeper = zk;
setZNodes(abortable);
this.replicationState =
new ReplicationStateImpl(this.zookeeper, getRepStateNode(), abortable, new AtomicBoolean());
this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, abortable);
// TODO This interface is no longer used by anyone using this constructor. When this class goes
// away, we will no longer have this null initialization business
this.replicationQueues = null;
}
/**
@ -149,7 +147,7 @@ public class ReplicationZookeeper implements Closeable {
* @param server
* @param replicating atomic boolean to start/stop replication
* @throws IOException
* @throws KeeperException
* @throws KeeperException
*/
public ReplicationZookeeper(final Server server, final AtomicBoolean replicating)
throws IOException, KeeperException {
@ -158,13 +156,14 @@ public class ReplicationZookeeper implements Closeable {
this.conf = server.getConfiguration();
setZNodes(server);
this.replicationState =
new ReplicationStateImpl(this.zookeeper, getRepStateNode(), server, replicating);
this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, server, replicating);
this.peerClusters = new HashMap<String, ReplicationPeer>();
ZKUtil.createWithParents(this.zookeeper,
ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName().toString());
ZKUtil.createWithParents(this.zookeeper, this.rsServerNameZnode);
this.replicationQueues = new ReplicationQueuesZKImpl(this.zookeeper, this.conf, server);
this.replicationQueues.init(server.getServerName().toString());
connectExistingPeers();
}
@ -432,32 +431,6 @@ public class ReplicationZookeeper implements Closeable {
return ProtobufUtil.prependPBMagic(bytes);
}
/**
* @param position
* @return Serialized protobuf of <code>position</code> with pb magic prefix
* prepended suitable for use as content of an hlog position in a
* replication queue.
*/
static byte[] toByteArray(
final long position) {
byte[] bytes = ZooKeeperProtos.ReplicationHLogPosition.newBuilder().setPosition(position)
.build().toByteArray();
return ProtobufUtil.prependPBMagic(bytes);
}
/**
* @param lockOwner
* @return Serialized protobuf of <code>lockOwner</code> with pb magic prefix
* prepended suitable for use as content of an replication lock during
* region server fail over.
*/
static byte[] lockToByteArray(
final String lockOwner) {
byte[] bytes = ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build()
.toByteArray();
return ProtobufUtil.prependPBMagic(bytes);
}
/**
* @param bytes Content of a peer znode.
* @return ClusterKey parsed from the passed bytes.
@ -503,58 +476,6 @@ public class ReplicationZookeeper implements Closeable {
}
}
/**
* @param bytes - Content of a HLog position znode.
* @return long - The current HLog position.
* @throws DeserializationException
*/
static long parseHLogPositionFrom(
final byte[] bytes) throws DeserializationException {
if (ProtobufUtil.isPBMagicPrefix(bytes)) {
int pblen = ProtobufUtil.lengthOfPBMagic();
ZooKeeperProtos.ReplicationHLogPosition.Builder builder = ZooKeeperProtos.ReplicationHLogPosition
.newBuilder();
ZooKeeperProtos.ReplicationHLogPosition position;
try {
position = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
} catch (InvalidProtocolBufferException e) {
throw new DeserializationException(e);
}
return position.getPosition();
} else {
if (bytes.length > 0) {
return Bytes.toLong(bytes);
}
return 0;
}
}
/**
* @param bytes - Content of a lock znode.
* @return String - The owner of the lock.
* @throws DeserializationException
*/
static String parseLockOwnerFrom(
final byte[] bytes) throws DeserializationException {
if (ProtobufUtil.isPBMagicPrefix(bytes)) {
int pblen = ProtobufUtil.lengthOfPBMagic();
ZooKeeperProtos.ReplicationLock.Builder builder = ZooKeeperProtos.ReplicationLock
.newBuilder();
ZooKeeperProtos.ReplicationLock lock;
try {
lock = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
} catch (InvalidProtocolBufferException e) {
throw new DeserializationException(e);
}
return lock.getLockOwner();
} else {
if (bytes.length > 0) {
return Bytes.toString(bytes);
}
return "";
}
}
private boolean peerExists(String id) throws KeeperException {
return ZKUtil.checkExists(this.zookeeper,
ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
@ -624,10 +545,6 @@ public class ReplicationZookeeper implements Closeable {
return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
}
private String getRepStateNode() {
return ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName);
}
/**
* Get the replication status of this cluster. If the state znode doesn't exist it will also
* create it and set it true.
@ -652,11 +569,8 @@ public class ReplicationZookeeper implements Closeable {
* @param filename name of the hlog's znode
* @param peerId name of the cluster's znode
*/
public void addLogToList(String filename, String peerId)
throws KeeperException {
String znode = ZKUtil.joinZNode(this.rsServerNameZnode, peerId);
znode = ZKUtil.joinZNode(znode, filename);
ZKUtil.createWithParents(this.zookeeper, znode);
public void addLogToList(String filename, String peerId) throws KeeperException {
this.replicationQueues.addLog(peerId, filename);
}
/**
@ -665,13 +579,7 @@ public class ReplicationZookeeper implements Closeable {
* @param clusterId name of the cluster's znode
*/
public void removeLogFromList(String filename, String clusterId) {
try {
String znode = ZKUtil.joinZNode(rsServerNameZnode, clusterId);
znode = ZKUtil.joinZNode(znode, filename);
ZKUtil.deleteNode(this.zookeeper, znode);
} catch (KeeperException e) {
this.abortable.abort("Failed remove from list", e);
}
this.replicationQueues.removeLog(clusterId, filename);
}
/**
@ -679,18 +587,9 @@ public class ReplicationZookeeper implements Closeable {
* @param filename filename name of the hlog's znode
* @param clusterId clusterId name of the cluster's znode
* @param position the position in the file
* @throws IOException
*/
public void writeReplicationStatus(String filename, String clusterId,
long position) {
try {
String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId);
znode = ZKUtil.joinZNode(znode, filename);
// Why serialize String of Long and note Long as bytes?
ZKUtil.setData(this.zookeeper, znode, toByteArray(position));
} catch (KeeperException e) {
this.abortable.abort("Writing replication status", e);
}
public void writeReplicationStatus(String filename, String clusterId, long position) {
this.replicationQueues.setLogPosition(clusterId, filename, position);
}
/**
@ -709,202 +608,15 @@ public class ReplicationZookeeper implements Closeable {
return result;
}
/**
* Get the list of the replicators that have queues, they can be alive, dead
* or simply from a previous run
* @return a list of server names
*/
public List<String> getListOfReplicators() {
List<String> result = null;
try {
result = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZNode);
} catch (KeeperException e) {
this.abortable.abort("Get list of replicators", e);
}
return result;
}
/**
* Get the list of peer clusters for the specified server names
* @param rs server names of the rs
* @return a list of peer cluster
* Take ownership for the set of queues belonging to a dead region server.
* @param regionserver the id of the dead region server
* @return A SortedMap of the queues that have been claimed, including a SortedSet of HLogs in
* each queue.
*/
public List<String> getListPeersForRS(String rs) {
String znode = ZKUtil.joinZNode(rsZNode, rs);
List<String> result = null;
try {
result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
} catch (KeeperException e) {
this.abortable.abort("Get list of peers for rs", e);
}
return result;
}
/**
* Get the list of hlogs for the specified region server and peer cluster
* @param rs server names of the rs
* @param id peer cluster
* @return a list of hlogs
*/
public List<String> getListHLogsForPeerForRS(String rs, String id) {
String znode = ZKUtil.joinZNode(rsZNode, rs);
znode = ZKUtil.joinZNode(znode, id);
List<String> result = null;
try {
result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
} catch (KeeperException e) {
this.abortable.abort("Get list of hlogs for peer", e);
}
return result;
}
/**
* Try to set a lock in another server's znode.
* @param znode the server names of the other server
* @return true if the lock was acquired, false in every other cases
*/
public boolean lockOtherRS(String znode) {
try {
String parent = ZKUtil.joinZNode(this.rsZNode, znode);
if (parent.equals(rsServerNameZnode)) {
LOG.warn("Won't lock because this is us, we're dead!");
return false;
}
String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE);
ZKUtil.createAndWatch(this.zookeeper, p, lockToByteArray(rsServerNameZnode));
} catch (KeeperException e) {
// This exception will pop up if the znode under which we're trying to
// create the lock is already deleted by another region server, meaning
// that the transfer already occurred.
// NoNode => transfer is done and znodes are already deleted
// NodeExists => lock znode already created by another RS
if (e instanceof KeeperException.NoNodeException ||
e instanceof KeeperException.NodeExistsException) {
LOG.info("Won't transfer the queue," +
" another RS took care of it because of: " + e.getMessage());
} else {
LOG.info("Failed lock other rs", e);
}
return false;
}
return true;
}
/**
* It "atomically" copies all the hlogs queues from another region server and returns them all
* sorted per peer cluster (appended with the dead server's znode).
* @param znode
* @return HLog queues sorted per peer cluster
*/
public SortedMap<String, SortedSet<String>> copyQueuesFromRSUsingMulti(String znode) {
SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>();
String deadRSZnodePath = ZKUtil.joinZNode(rsZNode, znode);// hbase/replication/rs/deadrs
List<String> peerIdsToProcess = null;
List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
try {
peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath);
if (peerIdsToProcess == null) return queues; // node already processed
for (String peerId : peerIdsToProcess) {
String newPeerId = peerId + "-" + znode;
String newPeerZnode = ZKUtil.joinZNode(this.rsServerNameZnode, newPeerId);
// check the logs queue for the old peer cluster
String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
List<String> hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
if (hlogs == null || hlogs.size() == 0) {
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
continue; // empty log queue.
}
// create the new cluster znode
SortedSet<String> logQueue = new TreeSet<String>();
queues.put(newPeerId, logQueue);
ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
listOfOps.add(op);
// get the offset of the logs and set it to new znodes
for (String hlog : hlogs) {
String oldHlogZnode = ZKUtil.joinZNode(oldClusterZnode, hlog);
byte[] logOffset = ZKUtil.getData(this.zookeeper, oldHlogZnode);
LOG.debug("Creating " + hlog + " with data " + Bytes.toString(logOffset));
String newLogZnode = ZKUtil.joinZNode(newPeerZnode, hlog);
listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset));
// add ops for deleting
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldHlogZnode));
logQueue.add(hlog);
}
// add delete op for peer
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
}
// add delete op for dead rs
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath));
LOG.debug(" The multi list size is: " + listOfOps.size());
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
LOG.info("Atomically moved the dead regionserver logs. ");
} catch (KeeperException e) {
// Multi call failed; it looks like some other regionserver took away the logs.
LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
queues.clear();
}
return queues;
}
/**
* This methods copies all the hlogs queues from another region server
* and returns them all sorted per peer cluster (appended with the dead
* server's znode)
* @param znode server names to copy
* @return all hlogs for all peers of that cluster, null if an error occurred
*/
public SortedMap<String, SortedSet<String>> copyQueuesFromRS(String znode) {
// TODO this method isn't atomic enough, we could start copying and then
// TODO fail for some reason and we would end up with znodes we don't want.
SortedMap<String,SortedSet<String>> queues =
new TreeMap<String,SortedSet<String>>();
try {
String nodePath = ZKUtil.joinZNode(rsZNode, znode);
List<String> clusters =
ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath);
// We have a lock znode in there, it will count as one.
if (clusters == null || clusters.size() <= 1) {
return queues;
}
// The lock isn't a peer cluster, remove it
clusters.remove(RS_LOCK_ZNODE);
for (String cluster : clusters) {
// We add the name of the recovered RS to the new znode, we can even
// do that for queues that were recovered 10 times giving a znode like
// number-startcode-number-otherstartcode-number-anotherstartcode-etc
String newCluster = cluster+"-"+znode;
String newClusterZnode = ZKUtil.joinZNode(rsServerNameZnode, newCluster);
String clusterPath = ZKUtil.joinZNode(nodePath, cluster);
List<String> hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath);
// That region server didn't have anything to replicate for this cluster
if (hlogs == null || hlogs.size() == 0) {
continue;
}
ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode,
HConstants.EMPTY_BYTE_ARRAY);
SortedSet<String> logQueue = new TreeSet<String>();
queues.put(newCluster, logQueue);
for (String hlog : hlogs) {
String z = ZKUtil.joinZNode(clusterPath, hlog);
byte[] positionBytes = ZKUtil.getData(this.zookeeper, z);
long position = 0;
try {
position = parseHLogPositionFrom(positionBytes);
} catch (DeserializationException e) {
LOG.warn("Failed parse of hlog position from the following znode: " + z);
}
LOG.debug("Creating " + hlog + " with data " + position);
String child = ZKUtil.joinZNode(newClusterZnode, hlog);
// Position doesn't actually change, we are just deserializing it for
// logging, so just use the already serialized version
ZKUtil.createAndWatch(this.zookeeper, child, positionBytes);
logQueue.add(hlog);
}
}
} catch (KeeperException e) {
this.abortable.abort("Copy queues from rs", e);
}
return queues;
public SortedMap<String, SortedSet<String>> claimQueues(String regionserver) {
return this.replicationQueues.claimQueues(regionserver);
}
/**
@ -912,48 +624,10 @@ public class ReplicationZookeeper implements Closeable {
* @param peerZnode znode of the peer cluster queue of hlogs to delete
*/
public void deleteSource(String peerZnode, boolean closeConnection) {
try {
ZKUtil.deleteNodeRecursively(this.zookeeper,
ZKUtil.joinZNode(rsServerNameZnode, peerZnode));
if (closeConnection) {
this.peerClusters.get(peerZnode).getZkw().close();
this.peerClusters.remove(peerZnode);
}
} catch (KeeperException e) {
this.abortable.abort("Failed delete of " + peerZnode, e);
}
}
/**
* Recursive deletion of all znodes in specified rs' znode
* @param znode
*/
public void deleteRsQueues(String znode) {
String fullpath = ZKUtil.joinZNode(rsZNode, znode);
try {
List<String> clusters =
ZKUtil.listChildrenNoWatch(this.zookeeper, fullpath);
for (String cluster : clusters) {
// We'll delete it later
if (cluster.equals(RS_LOCK_ZNODE)) {
continue;
}
String fullClusterPath = ZKUtil.joinZNode(fullpath, cluster);
ZKUtil.deleteNodeRecursively(this.zookeeper, fullClusterPath);
}
// Finish cleaning up
ZKUtil.deleteNodeRecursively(this.zookeeper, fullpath);
} catch (KeeperException e) {
if (e instanceof KeeperException.NoNodeException ||
e instanceof KeeperException.NotEmptyException) {
// Testing a special case where another region server was able to
// create a lock just after we deleted it, but then was also able to
// delete the RS znode before us or its lock znode is still there.
if (e.getPath().equals(fullpath)) {
return;
}
}
this.abortable.abort("Failed delete of " + znode, e);
this.replicationQueues.removeQueue(peerZnode);
if (closeConnection) {
this.peerClusters.get(peerZnode).getZkw().close();
this.peerClusters.remove(peerZnode);
}
}
@ -961,16 +635,7 @@ public class ReplicationZookeeper implements Closeable {
* Delete this cluster's queues
*/
public void deleteOwnRSZNode() {
try {
ZKUtil.deleteNodeRecursively(this.zookeeper,
this.rsServerNameZnode);
} catch (KeeperException e) {
// if the znode is already expired, don't bother going further
if (e instanceof KeeperException.SessionExpiredException) {
return;
}
this.abortable.abort("Failed delete of " + this.rsServerNameZnode, e);
}
this.replicationQueues.removeAllQueues();
}
/**
@ -978,22 +643,10 @@ public class ReplicationZookeeper implements Closeable {
* @param peerId znode of the peer cluster
* @param hlog name of the hlog
* @return the position in that hlog
* @throws KeeperException
* @throws KeeperException
*/
public long getHLogRepPosition(String peerId, String hlog)
throws KeeperException {
String clusterZnode = ZKUtil.joinZNode(rsServerNameZnode, peerId);
String znode = ZKUtil.joinZNode(clusterZnode, hlog);
byte[] bytes = ZKUtil.getData(this.zookeeper, znode);
try {
return parseHLogPositionFrom(bytes);
} catch (DeserializationException de) {
LOG.warn("Failed parse of HLogPosition for peerId=" + peerId + " and hlog=" + hlog
+ "znode content, continuing.");
}
// if we can not parse the position, start at the beginning of the hlog file
// again
return 0;
public long getHLogRepPosition(String peerId, String hlog) throws KeeperException {
return this.replicationQueues.getLogPosition(peerId, hlog);
}
/**
@ -1051,7 +704,7 @@ public class ReplicationZookeeper implements Closeable {
public Map<String, ReplicationPeer> getPeerClusters() {
return this.peerClusters;
}
/**
* Determine if a ZK path points to a peer node.
* @param path path to be checked

View File

@ -27,7 +27,10 @@ import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationStateImpl;
import org.apache.hadoop.hbase.replication.ReplicationStateInterface;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
@ -43,8 +46,10 @@ import java.util.Set;
@InterfaceAudience.Private
public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abortable {
private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class);
private ReplicationZookeeper zkHelper;
private Set<String> hlogs = new HashSet<String>();
private ZooKeeperWatcher zkw;
private ReplicationQueuesClient replicationQueues;
private ReplicationStateInterface replicationState;
private final Set<String> hlogs = new HashSet<String>();
private boolean stopped = false;
private boolean aborted;
@ -53,7 +58,7 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
public boolean isLogDeletable(Path filePath) {
try {
if (!zkHelper.getReplication()) {
if (!replicationState.getState()) {
return false;
}
} catch (KeeperException e) {
@ -89,20 +94,20 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
private boolean refreshHLogsAndSearch(String searchedLog) {
this.hlogs.clear();
final boolean lookForLog = searchedLog != null;
List<String> rss = zkHelper.getListOfReplicators();
List<String> rss = replicationQueues.getListOfReplicators();
if (rss == null) {
LOG.debug("Didn't find any region server that replicates, deleting: " +
searchedLog);
return false;
}
for (String rs: rss) {
List<String> listOfPeers = zkHelper.getListPeersForRS(rs);
List<String> listOfPeers = replicationQueues.getAllQueues(rs);
// if rs just died, this will be null
if (listOfPeers == null) {
continue;
}
for (String id : listOfPeers) {
List<String> peersHlogs = zkHelper.getListHLogsForPeerForRS(rs, id);
List<String> peersHlogs = replicationQueues.getLogsInQueue(rs, id);
if (peersHlogs != null) {
this.hlogs.addAll(peersHlogs);
}
@ -128,8 +133,9 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
Configuration conf = new Configuration(config);
super.setConf(conf);
try {
ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null);
this.zkHelper = new ReplicationZookeeper(this, conf, zkw);
this.zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null);
this.replicationQueues = new ReplicationQueuesClientZKImpl(zkw, conf, this);
this.replicationState = new ReplicationStateImpl(zkw, conf, this);
} catch (KeeperException e) {
LOG.error("Error while configuring " + this.getClass().getName(), e);
} catch (IOException e) {
@ -143,9 +149,17 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
public void stop(String why) {
if (this.stopped) return;
this.stopped = true;
if (this.zkHelper != null) {
LOG.info("Stopping " + this.zkHelper.getZookeeperWatcher());
this.zkHelper.getZookeeperWatcher().close();
if (this.zkw != null) {
LOG.info("Stopping " + this.zkw);
this.zkw.close();
}
if (this.replicationState != null) {
LOG.info("Stopping " + this.replicationState);
try {
this.replicationState.close();
} catch (IOException e) {
LOG.error("Error while stopping " + this.replicationState, e);
}
}
// Not sure why we're deleting a connection that we never acquired or used
HConnectionManager.deleteConnection(this.getConf());

View File

@ -43,6 +43,8 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.util.Bytes;
@ -64,6 +66,7 @@ public class Replication implements WALActionsListener,
private ReplicationSourceManager replicationManager;
private final AtomicBoolean replicating = new AtomicBoolean(true);
private ReplicationZookeeper zkHelper;
private ReplicationQueues replicationQueues;
private Configuration conf;
private ReplicationSink replicationSink;
// Hosting server
@ -104,18 +107,23 @@ public class Replication implements WALActionsListener,
if (replication) {
try {
this.zkHelper = new ReplicationZookeeper(server, this.replicating);
this.replicationQueues =
new ReplicationQueuesZKImpl(server.getZooKeeper(), this.conf, this.server);
this.replicationQueues.init(this.server.getServerName().toString());
} catch (KeeperException ke) {
throw new IOException("Failed replication handler create " +
"(replicating=" + this.replicating, ke);
}
this.replicationManager = new ReplicationSourceManager(zkHelper, conf, this.server, fs,
this.replicating, logDir, oldLogDir);
this.replicationManager =
new ReplicationSourceManager(zkHelper, replicationQueues, conf, this.server, fs,
this.replicating, logDir, oldLogDir);
this.statsThreadPeriod =
this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);
} else {
this.replicationManager = null;
this.zkHelper = null;
this.replicationQueues = null;
}
}

View File

@ -41,8 +41,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@ -73,6 +73,7 @@ public class ReplicationSourceManager {
private final AtomicBoolean replicating;
// Helper for zookeeper
private final ReplicationZookeeper zkHelper;
private final ReplicationQueues replicationQueues;
// All about stopping
private final Stoppable stopper;
// All logs we are currently tracking
@ -91,14 +92,14 @@ public class ReplicationSourceManager {
private final long sleepBeforeFailover;
// Homemade executer service for replication
private final ThreadPoolExecutor executor;
private final Random rand;
/**
* Creates a replication manager and sets the watch on all the other
* registered region servers
* Creates a replication manager and sets the watch on all the other registered region servers
* @param zkHelper the zk helper for replication
* @param replicationQueues the interface for manipulating replication queues
* @param conf the configuration to use
* @param stopper the stopper object for this region server
* @param fs the file system to use
@ -107,15 +108,13 @@ public class ReplicationSourceManager {
* @param oldLogDir the directory where old logs are archived
*/
public ReplicationSourceManager(final ReplicationZookeeper zkHelper,
final Configuration conf,
final Stoppable stopper,
final FileSystem fs,
final AtomicBoolean replicating,
final Path logDir,
final Path oldLogDir) {
final ReplicationQueues replicationQueues, final Configuration conf, final Stoppable stopper,
final FileSystem fs, final AtomicBoolean replicating, final Path logDir,
final Path oldLogDir) {
this.sources = new ArrayList<ReplicationSourceInterface>();
this.replicating = replicating;
this.zkHelper = zkHelper;
this.replicationQueues = replicationQueues;
this.stopper = stopper;
this.hlogsById = new HashMap<String, SortedSet<String>>();
this.oldsources = new ArrayList<ReplicationSourceInterface>();
@ -181,7 +180,7 @@ public class ReplicationSourceManager {
for (String id : this.zkHelper.getPeerClusters().keySet()) {
addSource(id);
}
List<String> currentReplicators = this.zkHelper.getListOfReplicators();
List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
if (currentReplicators == null || currentReplicators.size() == 0) {
return;
}
@ -350,13 +349,12 @@ public class ReplicationSourceManager {
* It creates one old source for any type of source of the old rs.
* @param rsZnode
*/
public void transferQueues(String rsZnode) {
private void transferQueues(String rsZnode) {
NodeFailoverWorker transfer = new NodeFailoverWorker(rsZnode);
try {
this.executor.execute(transfer);
} catch (RejectedExecutionException ex) {
LOG.info("Cancelling the transfer of " + rsZnode +
" because of " + ex.getMessage());
LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
}
}
@ -589,20 +587,12 @@ public class ReplicationSourceManager {
}
SortedMap<String, SortedSet<String>> newQueues = null;
// check whether there is multi support. If yes, use it.
if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
LOG.info("Atomically moving " + rsZnode + "'s hlogs to my queue");
newQueues = zkHelper.copyQueuesFromRSUsingMulti(rsZnode);
} else {
LOG.info("Moving " + rsZnode + "'s hlogs to my queue");
if (!zkHelper.lockOtherRS(rsZnode)) {
return;
}
newQueues = zkHelper.copyQueuesFromRS(rsZnode);
zkHelper.deleteRsQueues(rsZnode);
}
// process of copying over the failed queue is completed.
newQueues = zkHelper.claimQueues(rsZnode);
// Copying over the failed queue is completed.
if (newQueues.isEmpty()) {
// We either didn't get the lock or the failed region server didn't have any outstanding
// HLogs to replicate, so we are done.
return;
}

View File

@ -17,15 +17,10 @@
*/
package org.apache.hadoop.hbase.client.replication;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -49,9 +44,7 @@ public class TestReplicationAdmin {
private final String ID_SECOND = "2";
private final String KEY_SECOND = "127.0.0.1:2181:/hbase2";
private static ReplicationSourceManager manager;
private static ReplicationAdmin admin;
private static AtomicBoolean replicating = new AtomicBoolean(true);
/**
* @throws java.lang.Exception
@ -62,19 +55,6 @@ public class TestReplicationAdmin {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
admin = new ReplicationAdmin(conf);
Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(),
HConstants.HREGION_OLDLOGDIR_NAME);
Path logDir = new Path(TEST_UTIL.getDataTestDir(),
HConstants.HREGION_LOGDIR_NAME);
manager = new ReplicationSourceManager(admin.getReplicationZk(), conf,
// The following stopper never stops so that we can respond
// to zk notification
new Stoppable() {
@Override
public void stop(String why) {}
@Override
public boolean isStopped() {return false;}
}, FileSystem.get(conf), replicating, logDir, oldLogDir);
}
/**
@ -84,7 +64,6 @@ public class TestReplicationAdmin {
*/
@Test
public void testAddRemovePeer() throws Exception {
assertEquals(0, manager.getSources().size());
// Add a valid peer
admin.addPeer(ID_ONE, KEY_ONE);
// try adding the same (fails)

View File

@ -0,0 +1,155 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.*;
import java.util.List;
import java.util.SortedMap;
import java.util.SortedSet;
import org.apache.hadoop.hbase.ServerName;
import org.apache.zookeeper.KeeperException;
import org.junit.Test;
/**
* White box testing for replication state interfaces. Implementations should extend this class, and
* initialize the interfaces properly.
*/
public abstract class TestReplicationStateBasic {
protected ReplicationQueues rq1;
protected ReplicationQueues rq2;
protected ReplicationQueues rq3;
protected ReplicationQueuesClient rqc;
protected String server1 = new ServerName("hostname1.example.org", 1234, -1L).toString();
protected String server2 = new ServerName("hostname2.example.org", 1234, -1L).toString();
protected String server3 = new ServerName("hostname3.example.org", 1234, -1L).toString();
@Test
public void testReplicationQueuesClient() throws KeeperException {
// Test methods with empty state
assertEquals(0, rqc.getListOfReplicators().size());
assertNull(rqc.getLogsInQueue(server1, "qId1"));
assertNull(rqc.getAllQueues(server1));
/*
* Set up data Two replicators: -- server1: three queues with 0, 1 and 2 log files each --
* server2: zero queues
*/
rq1.init(server1);
rq2.init(server2);
rq1.addLog("qId1", "trash");
rq1.removeLog("qId1", "trash");
rq1.addLog("qId2", "filename1");
rq1.addLog("qId3", "filename2");
rq1.addLog("qId3", "filename3");
rq2.addLog("trash", "trash");
rq2.removeQueue("trash");
List<String> reps = rqc.getListOfReplicators();
assertEquals(2, reps.size());
assertTrue(server1, reps.contains(server1));
assertTrue(server2, reps.contains(server2));
assertNull(rqc.getLogsInQueue("bogus", "bogus"));
assertNull(rqc.getLogsInQueue(server1, "bogus"));
assertEquals(0, rqc.getLogsInQueue(server1, "qId1").size());
assertEquals(1, rqc.getLogsInQueue(server1, "qId2").size());
assertEquals("filename1", rqc.getLogsInQueue(server1, "qId2").get(0));
assertNull(rqc.getAllQueues("bogus"));
assertEquals(0, rqc.getAllQueues(server2).size());
List<String> list = rqc.getAllQueues(server1);
assertEquals(3, list.size());
assertTrue(list.contains("qId2"));
assertTrue(list.contains("qId3"));
}
@Test
public void testReplicationQueues() throws KeeperException {
rq1.init(server1);
rq2.init(server2);
rq3.init(server3);
// Zero queues or replicators exist
assertEquals(0, rq1.getListOfReplicators().size());
rq1.removeQueue("bogus");
rq1.removeLog("bogus", "bogus");
rq1.removeAllQueues();
assertNull(rq1.getAllQueues());
// TODO fix NPE if getting a log position on a file that does not exist
// assertEquals(0, rq1.getLogPosition("bogus", "bogus"));
assertNull(rq1.getLogsInQueue("bogus"));
assertEquals(0, rq1.claimQueues(new ServerName("bogus", 1234, -1L).toString()).size());
// TODO test setting a log position on a bogus file
// rq1.setLogPosition("bogus", "bogus", 5L);
populateQueues();
assertEquals(3, rq1.getListOfReplicators().size());
assertEquals(0, rq2.getLogsInQueue("qId1").size());
assertEquals(5, rq3.getLogsInQueue("qId5").size());
assertEquals(0, rq3.getLogPosition("qId1", "filename0"));
rq3.setLogPosition("qId5", "filename4", 354L);
assertEquals(354L, rq3.getLogPosition("qId5", "filename4"));
assertEquals(5, rq3.getLogsInQueue("qId5").size());
assertEquals(0, rq2.getLogsInQueue("qId1").size());
assertEquals(0, rq1.getAllQueues().size());
assertEquals(1, rq2.getAllQueues().size());
assertEquals(5, rq3.getAllQueues().size());
assertEquals(0, rq3.claimQueues(server1).size());
assertEquals(2, rq3.getListOfReplicators().size());
SortedMap<String, SortedSet<String>> queues = rq2.claimQueues(server3);
assertEquals(5, queues.size());
assertEquals(1, rq2.getListOfReplicators().size());
// TODO test claimQueues on yourself
// rq2.claimQueues(server2);
assertEquals(6, rq2.getAllQueues().size());
rq2.removeAllQueues();
assertEquals(0, rq2.getListOfReplicators().size());
}
/*
* three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 has 5 queues with 1, 2,
* 3, 4, 5 log files respectively
*/
protected void populateQueues() throws KeeperException {
rq1.addLog("trash", "trash");
rq1.removeQueue("trash");
rq2.addLog("qId1", "trash");
rq2.removeLog("qId1", "trash");
for (int i = 1; i < 6; i++) {
for (int j = 0; j < i; j++) {
rq3.addLog("qId" + i, "filename" + j);
}
}
}
}

View File

@ -0,0 +1,126 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
private static Configuration conf;
private static HBaseTestingUtility utility;
private static ZooKeeperWatcher zkw;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
utility = new HBaseTestingUtility();
utility.startMiniZKCluster();
conf = utility.getConfiguration();
zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
}
@Before
public void setUp() throws KeeperException {
DummyServer ds1 = new DummyServer(server1);
DummyServer ds2 = new DummyServer(server2);
DummyServer ds3 = new DummyServer(server3);
rq1 = new ReplicationQueuesZKImpl(zkw, conf, ds1);
rq2 = new ReplicationQueuesZKImpl(zkw, conf, ds2);
rq3 = new ReplicationQueuesZKImpl(zkw, conf, ds3);
rqc = new ReplicationQueuesClientZKImpl(zkw, conf, ds1);
}
@After
public void tearDown() throws KeeperException, IOException {
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
String replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName);
ZKUtil.deleteNodeRecursively(zkw, replicationZNode);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
utility.shutdownMiniZKCluster();
}
static class DummyServer implements Server {
private String serverName;
private boolean isAborted = false;
private boolean isStopped = false;
public DummyServer(String serverName) {
this.serverName = serverName;
}
@Override
public Configuration getConfiguration() {
return conf;
}
@Override
public ZooKeeperWatcher getZooKeeper() {
return zkw;
}
@Override
public CatalogTracker getCatalogTracker() {
return null;
}
@Override
public ServerName getServerName() {
return new ServerName(this.serverName);
}
@Override
public void abort(String why, Throwable e) {
this.isAborted = true;
}
@Override
public boolean isAborted() {
return this.isAborted;
}
@Override
public void stop(String why) {
this.isStopped = true;
}
@Override
public boolean isStopped() {
return this.isStopped;
}
}
}

View File

@ -225,7 +225,7 @@ public class TestReplicationSourceManager {
}
@Test
public void testNodeFailoverWorkerCopyQueuesFromRSUsingMulti() throws Exception {
public void testClaimQueues() throws Exception {
LOG.debug("testNodeFailoverWorkerCopyQueuesFromRSUsingMulti");
conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
final Server server = new DummyServer("hostname0.example.org");
@ -288,13 +288,13 @@ public class TestReplicationSourceManager {
// simulate three servers fail sequentially
ReplicationZookeeper rz1 = new ReplicationZookeeper(s1, new AtomicBoolean(true));
SortedMap<String, SortedSet<String>> testMap =
rz1.copyQueuesFromRSUsingMulti(server.getServerName().getServerName());
rz1.claimQueues(server.getServerName().getServerName());
rz1.close();
ReplicationZookeeper rz2 = new ReplicationZookeeper(s2, new AtomicBoolean(true));
testMap = rz2.copyQueuesFromRSUsingMulti(s1.getServerName().getServerName());
testMap = rz2.claimQueues(s1.getServerName().getServerName());
rz2.close();
ReplicationZookeeper rz3 = new ReplicationZookeeper(s3, new AtomicBoolean(true));
testMap = rz3.copyQueuesFromRSUsingMulti(s2.getServerName().getServerName());
testMap = rz3.claimQueues(s2.getServerName().getServerName());
rz3.close();
ReplicationSource s = new ReplicationSource();
@ -327,7 +327,7 @@ public class TestReplicationSourceManager {
@Override
public void run() {
try {
logZnodesMap = rz.copyQueuesFromRSUsingMulti(deadRsZnode);
logZnodesMap = rz.claimQueues(deadRsZnode);
rz.close();
server.abort("Done with testing", null);
} catch (Exception e) {