HBASE-8439 [replication] Remove ReplicationZookeeper class

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1507497 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2013-07-26 22:42:35 +00:00
parent 50df1e2fd7
commit ff5fdb671f
23 changed files with 906 additions and 798 deletions

View File

@ -25,7 +25,9 @@ import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
@ -59,8 +61,9 @@ import java.util.Map;
public class ReplicationAdmin implements Closeable { public class ReplicationAdmin implements Closeable {
private static final Log LOG = LogFactory.getLog(ReplicationAdmin.class); private static final Log LOG = LogFactory.getLog(ReplicationAdmin.class);
private final ReplicationZookeeper replicationZk;
private final HConnection connection; private final HConnection connection;
private final ReplicationQueuesClient replicationQueuesClient;
private final ReplicationPeers replicationPeers;
/** /**
* Constructor that creates a connection to the local ZooKeeper ensemble. * Constructor that creates a connection to the local ZooKeeper ensemble.
@ -76,7 +79,12 @@ public class ReplicationAdmin implements Closeable {
this.connection = HConnectionManager.getConnection(conf); this.connection = HConnectionManager.getConnection(conf);
ZooKeeperWatcher zkw = createZooKeeperWatcher(); ZooKeeperWatcher zkw = createZooKeeperWatcher();
try { try {
this.replicationZk = new ReplicationZookeeper(this.connection, conf, zkw); this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.connection);
this.replicationPeers.init();
this.replicationQueuesClient =
ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection);
this.replicationQueuesClient.init();
} catch (KeeperException e) { } catch (KeeperException e) {
throw new IOException("Unable setup the ZooKeeper connection", e); throw new IOException("Unable setup the ZooKeeper connection", e);
} }
@ -109,7 +117,7 @@ public class ReplicationAdmin implements Closeable {
* multi-slave isn't supported yet. * multi-slave isn't supported yet.
*/ */
public void addPeer(String id, String clusterKey) throws IOException { public void addPeer(String id, String clusterKey) throws IOException {
this.replicationZk.addPeer(id, clusterKey); this.replicationPeers.addPeer(id, clusterKey);
} }
/** /**
@ -117,7 +125,7 @@ public class ReplicationAdmin implements Closeable {
* @param id a short that identifies the cluster * @param id a short that identifies the cluster
*/ */
public void removePeer(String id) throws IOException { public void removePeer(String id) throws IOException {
this.replicationZk.removePeer(id); this.replicationPeers.removePeer(id);
} }
/** /**
@ -125,7 +133,7 @@ public class ReplicationAdmin implements Closeable {
* @param id a short that identifies the cluster * @param id a short that identifies the cluster
*/ */
public void enablePeer(String id) throws IOException { public void enablePeer(String id) throws IOException {
this.replicationZk.enablePeer(id); this.replicationPeers.enablePeer(id);
} }
/** /**
@ -133,7 +141,7 @@ public class ReplicationAdmin implements Closeable {
* @param id a short that identifies the cluster * @param id a short that identifies the cluster
*/ */
public void disablePeer(String id) throws IOException { public void disablePeer(String id) throws IOException {
this.replicationZk.disablePeer(id); this.replicationPeers.disablePeer(id);
} }
/** /**
@ -141,7 +149,7 @@ public class ReplicationAdmin implements Closeable {
* @return number of slave clusters * @return number of slave clusters
*/ */
public int getPeersCount() { public int getPeersCount() {
return this.replicationZk.listPeersIdsAndWatch().size(); return this.replicationPeers.getAllPeerIds().size();
} }
/** /**
@ -149,15 +157,7 @@ public class ReplicationAdmin implements Closeable {
* @return A map of peer ids to peer cluster keys * @return A map of peer ids to peer cluster keys
*/ */
public Map<String, String> listPeers() { public Map<String, String> listPeers() {
return this.replicationZk.listPeers(); return this.replicationPeers.getAllPeerClusterKeys();
}
/**
* Get the ZK-support tool created and used by this object for replication.
* @return the ZK-support tool
*/
ReplicationZookeeper getReplicationZk() {
return replicationZk;
} }
@Override @Override

View File

@ -0,0 +1,51 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
/**
* A factory class for instantiating replication objects that deal with replication state.
*/
public class ReplicationFactory {
public static ReplicationQueues getReplicationQueues(final ZooKeeperWatcher zk,
Configuration conf, Abortable abortable) {
return new ReplicationQueuesZKImpl(zk, conf, abortable);
}
public static ReplicationQueuesClient getReplicationQueuesClient(final ZooKeeperWatcher zk,
Configuration conf, Abortable abortable) {
return new ReplicationQueuesClientZKImpl(zk, conf, abortable);
}
public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf,
Abortable abortable) {
return new ReplicationPeersZKImpl(zk, conf, abortable);
}
public static ReplicationTracker getReplicationTracker(ZooKeeperWatcher zookeeper,
final ReplicationPeers replicationPeers, Configuration conf, Abortable abortable,
Stoppable stopper) {
return new ReplicationTrackerZKImpl(zookeeper, replicationPeers, conf, abortable, stopper);
}
}

View File

@ -0,0 +1,51 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* The replication listener interface can be implemented if a class needs to subscribe to events
* generated by the ReplicationTracker. These events include things like addition/deletion of peer
* clusters or failure of a local region server. To receive events, the class also needs to register
* itself with a Replication Tracker.
*/
@InterfaceAudience.Private
public interface ReplicationListener {
/**
* A region server has been removed from the local cluster
* @param regionServer the removed region server
*/
public void regionServerRemoved(String regionServer);
/**
* A peer cluster has been removed (i.e. unregistered) from replication.
* @param peerId The peer id of the cluster that has been removed
*/
public void peerRemoved(String peerId);
/**
* The list of registered peer clusters has changed.
* @param peerIds A list of all currently registered peer clusters
*/
public void peerListChanged(List<String> peerIds);
}

View File

@ -20,12 +20,19 @@ package org.apache.hadoop.hbase.replication;
import java.util.List; import java.util.List;
import org.apache.zookeeper.KeeperException;
/** /**
* This provides an interface for clients of replication to view replication queues. These queues * This provides an interface for clients of replication to view replication queues. These queues
* keep track of the HLogs that still need to be replicated to remote clusters. * keep track of the HLogs that still need to be replicated to remote clusters.
*/ */
public interface ReplicationQueuesClient { public interface ReplicationQueuesClient {
/**
* Initialize the replication queue client interface.
*/
public void init() throws KeeperException;
/** /**
* Get a list of all region servers that have outstanding replication queues. These servers could * Get a list of all region servers that have outstanding replication queues. These servers could
* be alive, dead or from a previous run of the cluster. * be alive, dead or from a previous run of the cluster.

View File

@ -30,8 +30,12 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem
ReplicationQueuesClient { ReplicationQueuesClient {
public ReplicationQueuesClientZKImpl(final ZooKeeperWatcher zk, Configuration conf, public ReplicationQueuesClientZKImpl(final ZooKeeperWatcher zk, Configuration conf,
Abortable abortable) throws KeeperException { Abortable abortable) {
super(zk, conf, abortable); super(zk, conf, abortable);
}
@Override
public void init() throws KeeperException {
ZKUtil.createWithParents(this.zookeeper, this.queuesZNode); ZKUtil.createWithParents(this.zookeeper, this.queuesZNode);
} }

View File

@ -95,7 +95,16 @@ public abstract class ReplicationStateZKBase {
return ProtobufUtil.prependPBMagic(bytes); return ProtobufUtil.prependPBMagic(bytes);
} }
public boolean peerExists(String id) throws KeeperException { protected boolean peerExists(String id) throws KeeperException {
return ZKUtil.checkExists(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id)) >= 0; return ZKUtil.checkExists(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
} }
/**
* Determine if a ZK path points to a peer node.
* @param path path to be checked
* @return true if the path points to a peer node, otherwise false
*/
protected boolean isPeerPath(String path) {
return path.split("/").length == peersZNode.split("/").length + 1;
}
} }

View File

@ -0,0 +1,49 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.classification.InterfaceAudience;
import java.util.List;
/**
* This is the interface for a Replication Tracker. A replication tracker provides the facility to
* subscribe and track events that reflect a change in replication state. These events are used by
* the ReplicationSourceManager to coordinate replication tasks such as addition/deletion of queues
* and queue failover. These events are defined in the ReplicationListener interface. If a class
* would like to listen to replication events it must implement the ReplicationListener interface
* and register itself with a Replication Tracker.
*/
@InterfaceAudience.Private
public interface ReplicationTracker {
/**
* Register a replication listener to receive replication events.
* @param listener
*/
public void registerListener(ReplicationListener listener);
public void removeListener(ReplicationListener listener);
/**
* Returns a list of other live region servers in the cluster.
* @return
*/
public List<String> getListOfRegionServers();
}

View File

@ -0,0 +1,249 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
/**
* This class is a Zookeeper implementation of the ReplicationTracker interface. This class is
* responsible for handling replication events that are defined in the ReplicationListener
* interface.
*/
public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements ReplicationTracker {
private static final Log LOG = LogFactory.getLog(ReplicationTrackerZKImpl.class);
// All about stopping
private final Stoppable stopper;
// listeners to be notified
private final List<ReplicationListener> listeners =
new CopyOnWriteArrayList<ReplicationListener>();
// List of all the other region servers in this cluster
private final ArrayList<String> otherRegionServers = new ArrayList<String>();
private final ReplicationPeers replicationPeers;
public ReplicationTrackerZKImpl(ZooKeeperWatcher zookeeper,
final ReplicationPeers replicationPeers, Configuration conf, Abortable abortable,
Stoppable stopper) {
super(zookeeper, conf, abortable);
this.replicationPeers = replicationPeers;
this.stopper = stopper;
this.zookeeper.registerListener(new OtherRegionServerWatcher(this.zookeeper));
this.zookeeper.registerListener(new PeersWatcher(this.zookeeper));
}
@Override
public void registerListener(ReplicationListener listener) {
listeners.add(listener);
}
@Override
public void removeListener(ReplicationListener listener) {
listeners.remove(listener);
}
/**
* Return a snapshot of the current region servers.
*/
@Override
public List<String> getListOfRegionServers() {
refreshOtherRegionServersList();
List<String> list = null;
synchronized (otherRegionServers) {
list = new ArrayList<String>(otherRegionServers);
}
return list;
}
/**
* Watcher used to be notified of the other region server's death in the local cluster. It
* initiates the process to transfer the queues if it is able to grab the lock.
*/
public class OtherRegionServerWatcher extends ZooKeeperListener {
/**
* Construct a ZooKeeper event listener.
*/
public OtherRegionServerWatcher(ZooKeeperWatcher watcher) {
super(watcher);
}
/**
* Called when a new node has been created.
* @param path full path of the new node
*/
public void nodeCreated(String path) {
refreshListIfRightPath(path);
}
/**
* Called when a node has been deleted
* @param path full path of the deleted node
*/
public void nodeDeleted(String path) {
if (stopper.isStopped()) {
return;
}
boolean cont = refreshListIfRightPath(path);
if (!cont) {
return;
}
LOG.info(path + " znode expired, triggering replicatorRemoved event");
for (ReplicationListener rl : listeners) {
rl.regionServerRemoved(getZNodeName(path));
}
}
/**
* Called when an existing node has a child node added or removed.
* @param path full path of the node whose children have changed
*/
public void nodeChildrenChanged(String path) {
if (stopper.isStopped()) {
return;
}
refreshListIfRightPath(path);
}
private boolean refreshListIfRightPath(String path) {
if (!path.startsWith(this.watcher.rsZNode)) {
return false;
}
return refreshOtherRegionServersList();
}
}
/**
* Watcher used to follow the creation and deletion of peer clusters.
*/
public class PeersWatcher extends ZooKeeperListener {
/**
* Construct a ZooKeeper event listener.
*/
public PeersWatcher(ZooKeeperWatcher watcher) {
super(watcher);
}
/**
* Called when a node has been deleted
* @param path full path of the deleted node
*/
public void nodeDeleted(String path) {
List<String> peers = refreshPeersList(path);
if (peers == null) {
return;
}
if (isPeerPath(path)) {
String id = getZNodeName(path);
LOG.info(path + " znode expired, triggering peerRemoved event");
for (ReplicationListener rl : listeners) {
rl.peerRemoved(id);
}
}
}
/**
* Called when an existing node has a child node added or removed.
* @param path full path of the node whose children have changed
*/
public void nodeChildrenChanged(String path) {
List<String> peers = refreshPeersList(path);
if (peers == null) {
return;
}
LOG.info(path + " znode expired, triggering peerListChanged event");
for (ReplicationListener rl : listeners) {
rl.peerListChanged(peers);
}
}
}
/**
* Verify if this event is meant for us, and if so then get the latest peers' list from ZK. Also
* reset the watches.
* @param path path to check against
* @return A list of peers' identifiers if the event concerns this watcher, else null.
*/
private List<String> refreshPeersList(String path) {
if (!path.startsWith(getPeersZNode())) {
return null;
}
return this.replicationPeers.getAllPeerIds();
}
private String getPeersZNode() {
return this.peersZNode;
}
/**
* Extracts the znode name of a peer cluster from a ZK path
* @param fullPath Path to extract the id from
* @return the id or an empty string if path is invalid
*/
private String getZNodeName(String fullPath) {
String[] parts = fullPath.split("/");
return parts.length > 0 ? parts[parts.length - 1] : "";
}
/**
* Reads the list of region servers from ZK and atomically clears our local view of it and
* replaces it with the updated list.
* @return true if the local list of the other region servers was updated with the ZK data (even
* if it was empty), false if the data was missing in ZK
*/
private boolean refreshOtherRegionServersList() {
List<String> newRsList = getRegisteredRegionServers();
if (newRsList == null) {
return false;
} else {
synchronized (otherRegionServers) {
otherRegionServers.clear();
otherRegionServers.addAll(newRsList);
}
}
return true;
}
/**
* Get a list of all the other region servers in this cluster and set a watch
* @return a list of server nanes
*/
private List<String> getRegisteredRegionServers() {
List<String> result = null;
try {
result = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.zookeeper.rsZNode);
} catch (KeeperException e) {
this.abortable.abort("Get list of registered region servers", e);
}
return result;
}
}

View File

@ -1,366 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* This class serves as a helper for all things related to zookeeper in
* replication.
* <p/>
* The layout looks something like this under zookeeper.znode.parent for the
* master cluster:
* <p/>
*
* <pre>
* replication/
* state {contains true or false}
* clusterId {contains a byte}
* peers/
* 1/ {contains a full cluster address}
* peer-state {contains ENABLED or DISABLED}
* 2/
* ...
* rs/ {lists all RS that replicate}
* startcode1/ {lists all peer clusters}
* 1/ {lists hlogs to process}
* 10.10.1.76%3A53488.123456789 {contains nothing or a position}
* 10.10.1.76%3A53488.123456790
* ...
* 2/
* ...
* startcode2/
* ...
* </pre>
*/
@InterfaceAudience.Private
public class ReplicationZookeeper extends ReplicationStateZKBase {
private static final Log LOG = LogFactory.getLog(ReplicationZookeeper.class);
// Our handle on zookeeper
private final ZooKeeperWatcher zookeeper;
private String peersZNode;
private final Configuration conf;
// Abortable
private Abortable abortable;
private final ReplicationPeers replicationPeers;
private final ReplicationQueues replicationQueues;
/**
* Constructor used by clients of replication (like master and HBase clients)
* @param conf conf to use
* @param zk zk connection to use
* @throws IOException
*/
public ReplicationZookeeper(final Abortable abortable, final Configuration conf,
final ZooKeeperWatcher zk) throws KeeperException, IOException {
super(zk, conf, abortable);
this.conf = conf;
this.zookeeper = zk;
setZNodes(abortable);
// TODO This interface is no longer used by anyone using this constructor. When this class goes
// away, we will no longer have this null initialization business
this.replicationQueues = null;
this.replicationPeers = new ReplicationPeersZKImpl(this.zookeeper, this.conf, abortable);
this.replicationPeers.init();
}
/**
* Constructor used by region servers, connects to the peer cluster right away.
*
* @param server
* @throws IOException
* @throws KeeperException
*/
public ReplicationZookeeper(final Server server)
throws IOException, KeeperException {
super(server.getZooKeeper(), server.getConfiguration(), server);
this.abortable = server;
this.zookeeper = server.getZooKeeper();
this.conf = server.getConfiguration();
setZNodes(server);
this.replicationQueues = new ReplicationQueuesZKImpl(this.zookeeper, this.conf, server);
this.replicationQueues.init(server.getServerName().toString());
this.replicationPeers = new ReplicationPeersZKImpl(this.zookeeper, this.conf, server);
this.replicationPeers.init();
}
private void setZNodes(Abortable abortable) throws KeeperException {
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
String replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
}
/**
* List this cluster's peers' IDs
* @return list of all peers' identifiers
*/
public List<String> listPeersIdsAndWatch() {
return this.replicationPeers.getAllPeerIds();
}
/**
* Map of this cluster's peers for display.
* @return A map of peer ids to peer cluster keys
*/
public Map<String, String> listPeers() {
return this.replicationPeers.getAllPeerClusterKeys();
}
/**
* Returns all region servers from given peer
*
* @param peerClusterId (byte) the cluster to interrogate
* @return addresses of all region servers
*/
public List<ServerName> getSlavesAddresses(String peerClusterId) {
return this.replicationPeers.getRegionServersOfConnectedPeer(peerClusterId);
}
/**
* This method connects this cluster to another one and registers it
* in this region server's replication znode
* @param peerId id of the peer cluster
* @throws KeeperException
*/
public boolean connectToPeer(String peerId) throws IOException, KeeperException {
return this.replicationPeers.connectToPeer(peerId);
}
/**
* Remove the peer from zookeeper. which will trigger the watchers on every
* region server and close their sources
* @param id
* @throws IllegalArgumentException Thrown when the peer doesn't exist
*/
public void removePeer(String id) throws IOException {
this.replicationPeers.removePeer(id);
}
/**
* Add a new peer to this cluster
* @param id peer's identifier
* @param clusterKey ZK ensemble's addresses, client port and root znode
* @throws IllegalArgumentException Thrown when the peer doesn't exist
* @throws IllegalStateException Thrown when a peer already exists, since
* multi-slave isn't supported yet.
*/
public void addPeer(String id, String clusterKey) throws IOException {
this.replicationPeers.addPeer(id, clusterKey);
}
/**
* Enable replication to the peer
*
* @param id peer's identifier
* @throws IllegalArgumentException
* Thrown when the peer doesn't exist
*/
public void enablePeer(String id) throws IOException {
this.replicationPeers.enablePeer(id);
}
/**
* Disable replication to the peer
*
* @param id peer's identifier
* @throws IllegalArgumentException
* Thrown when the peer doesn't exist
*/
public void disablePeer(String id) throws IOException {
this.replicationPeers.disablePeer(id);
}
/**
* Check whether the peer is enabled or not. This method checks the atomic
* boolean of ReplicationPeer locally.
*
* @param id peer identifier
* @return true if the peer is enabled, otherwise false
* @throws IllegalArgumentException
* Thrown when the peer doesn't exist
*/
public boolean getPeerEnabled(String id) {
return this.replicationPeers.getStatusOfConnectedPeer(id);
}
/**
* Add a new log to the list of hlogs in zookeeper
* @param filename name of the hlog's znode
* @param peerId name of the cluster's znode
*/
public void addLogToList(String filename, String peerId) throws KeeperException {
this.replicationQueues.addLog(peerId, filename);
}
/**
* Remove a log from the list of hlogs in zookeeper
* @param filename name of the hlog's znode
* @param clusterId name of the cluster's znode
*/
public void removeLogFromList(String filename, String clusterId) {
this.replicationQueues.removeLog(clusterId, filename);
}
/**
* Set the current position of the specified cluster in the current hlog
* @param filename filename name of the hlog's znode
* @param clusterId clusterId name of the cluster's znode
* @param position the position in the file
*/
public void writeReplicationStatus(String filename, String clusterId, long position) {
this.replicationQueues.setLogPosition(clusterId, filename, position);
}
/**
* Get a list of all the other region servers in this cluster
* and set a watch
* @return a list of server nanes
*/
public List<String> getRegisteredRegionServers() {
List<String> result = null;
try {
result = ZKUtil.listChildrenAndWatchThem(
this.zookeeper, this.zookeeper.rsZNode);
} catch (KeeperException e) {
this.abortable.abort("Get list of registered region servers", e);
}
return result;
}
/**
* Take ownership for the set of queues belonging to a dead region server.
* @param regionserver the id of the dead region server
* @return A SortedMap of the queues that have been claimed, including a SortedSet of HLogs in
* each queue.
*/
public SortedMap<String, SortedSet<String>> claimQueues(String regionserver) {
return this.replicationQueues.claimQueues(regionserver);
}
/**
* Delete a complete queue of hlogs
* @param peerZnode znode of the peer cluster queue of hlogs to delete
*/
public void deleteSource(String peerZnode, boolean closeConnection) {
this.replicationQueues.removeQueue(peerZnode);
if (closeConnection) {
this.replicationPeers.disconnectFromPeer(peerZnode);
}
}
/**
* Delete this cluster's queues
*/
public void deleteOwnRSZNode() {
this.replicationQueues.removeAllQueues();
}
/**
* Get the position of the specified hlog in the specified peer znode
* @param peerId znode of the peer cluster
* @param hlog name of the hlog
* @return the position in that hlog
* @throws KeeperException
*/
public long getHLogRepPosition(String peerId, String hlog) throws KeeperException {
return this.replicationQueues.getLogPosition(peerId, hlog);
}
/**
* Returns the UUID of the provided peer id. Should a connection loss or session
* expiration happen, the ZK handler will be reopened once and if it still doesn't
* work then it will bail and return null.
* @param peerId the peer's ID that will be converted into a UUID
* @return a UUID or null if there's a ZK connection issue
*/
public UUID getPeerUUID(String peerId) {
return this.replicationPeers.getPeerUUID(peerId);
}
public void registerRegionServerListener(ZooKeeperListener listener) {
this.zookeeper.registerListener(listener);
}
/**
* Get a map of all peer clusters
* @return map of peer cluster keyed by id
*/
public Set<String> getPeerClusters() {
return this.replicationPeers.getConnectedPeers();
}
/**
* Determine if a ZK path points to a peer node.
* @param path path to be checked
* @return true if the path points to a peer node, otherwise false
*/
public boolean isPeerPath(String path) {
return path.split("/").length == peersZNode.split("/").length + 1;
}
/**
* Extracts the znode name of a peer cluster from a ZK path
* @param fullPath Path to extract the id from
* @return the id or an empty string if path is invalid
*/
public static String getZNodeName(String fullPath) {
String[] parts = fullPath.split("/");
return parts.length > 0 ? parts[parts.length-1] : "";
}
/**
* Get this cluster's zk connection
* @return zk connection
*/
public ZooKeeperWatcher getZookeeperWatcher() {
return this.zookeeper;
}
/**
* Get the full path to the peers' znode
* @return path to peers in zk
*/
public String getPeersZNode() {
return peersZNode;
}
}

View File

@ -36,8 +36,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
@ -45,8 +43,6 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException; import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.replication.ReplicationPeersZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;

View File

@ -36,10 +36,10 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationPeersZKImpl; import org.apache.hadoop.hbase.replication.ReplicationPeersZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
@ -118,7 +118,8 @@ public class VerifyReplication {
@Override public void abort(String why, Throwable e) {} @Override public void abort(String why, Throwable e) {}
@Override public boolean isAborted() {return false;} @Override public boolean isAborted() {return false;}
}); });
ReplicationPeers rp = new ReplicationPeersZKImpl(localZKW, conf, localZKW); ReplicationPeers rp =
ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
rp.init(); rp.init();
Configuration peerConf = rp.getPeerConf(peerId); Configuration peerConf = rp.getPeerConf(peerId);
if (peerConf == null) { if (peerConf == null) {

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl; import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@ -122,7 +123,8 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
super.setConf(conf); super.setConf(conf);
try { try {
this.zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null); this.zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null);
this.replicationQueues = new ReplicationQueuesClientZKImpl(zkw, conf, this); this.replicationQueues = ReplicationFactory.getReplicationQueuesClient(zkw, conf, this);
this.replicationQueues.init();
} catch (KeeperException e) { } catch (KeeperException e) {
LOG.error("Error while configuring " + this.getClass().getName(), e); LOG.error("Error while configuring " + this.getClass().getName(), e);
} catch (IOException e) { } catch (IOException e) {

View File

@ -22,10 +22,10 @@ import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -45,11 +45,13 @@ import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl; import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS; import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
@ -66,8 +68,9 @@ public class Replication implements WALActionsListener,
LogFactory.getLog(Replication.class); LogFactory.getLog(Replication.class);
private boolean replication; private boolean replication;
private ReplicationSourceManager replicationManager; private ReplicationSourceManager replicationManager;
private ReplicationZookeeper zkHelper;
private ReplicationQueues replicationQueues; private ReplicationQueues replicationQueues;
private ReplicationPeers replicationPeers;
private ReplicationTracker replicationTracker;
private Configuration conf; private Configuration conf;
private ReplicationSink replicationSink; private ReplicationSink replicationSink;
// Hosting server // Hosting server
@ -107,23 +110,35 @@ public class Replication implements WALActionsListener,
.build()); .build());
if (replication) { if (replication) {
try { try {
this.zkHelper = new ReplicationZookeeper(server);
this.replicationQueues = this.replicationQueues =
new ReplicationQueuesZKImpl(server.getZooKeeper(), this.conf, this.server); ReplicationFactory.getReplicationQueues(server.getZooKeeper(), this.conf, this.server);
this.replicationQueues.init(this.server.getServerName().toString()); this.replicationQueues.init(this.server.getServerName().toString());
this.replicationPeers =
ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server);
this.replicationPeers.init();
this.replicationTracker =
ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers,
this.conf, this.server, this.server);
} catch (KeeperException ke) { } catch (KeeperException ke) {
throw new IOException("Failed replication handler create", ke); throw new IOException("Failed replication handler create", ke);
} }
UUID clusterId = null;
try {
clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper());
} catch (KeeperException ke) {
throw new IOException("Could not read cluster id", ke);
}
this.replicationManager = this.replicationManager =
new ReplicationSourceManager(zkHelper, replicationQueues, conf, this.server, fs, logDir, new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker,
oldLogDir); conf, this.server, fs, logDir, oldLogDir, clusterId);
this.statsThreadPeriod = this.statsThreadPeriod =
this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod); LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);
} else { } else {
this.replicationManager = null; this.replicationManager = null;
this.zkHelper = null;
this.replicationQueues = null; this.replicationQueues = null;
this.replicationPeers = null;
this.replicationTracker = null;
} }
} }

View File

@ -56,11 +56,11 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
@ -86,8 +86,8 @@ public class ReplicationSource extends Thread
// container of entries to replicate // container of entries to replicate
private HLog.Entry[] entriesArray; private HLog.Entry[] entriesArray;
private HConnection conn; private HConnection conn;
// Helper class for zookeeper private ReplicationQueues replicationQueues;
private ReplicationZookeeper zkHelper; private ReplicationPeers replicationPeers;
private Configuration conf; private Configuration conf;
// ratio of region servers to chose from a slave cluster // ratio of region servers to chose from a slave cluster
private float ratio; private float ratio;
@ -151,12 +151,10 @@ public class ReplicationSource extends Thread
* @param peerClusterZnode the name of our znode * @param peerClusterZnode the name of our znode
* @throws IOException * @throws IOException
*/ */
public void init(final Configuration conf, public void init(final Configuration conf, final FileSystem fs,
final FileSystem fs, final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
final ReplicationSourceManager manager, final ReplicationPeers replicationPeers, final Stoppable stopper,
final Stoppable stopper, final String peerClusterZnode, final UUID clusterId) throws IOException {
final String peerClusterZnode)
throws IOException {
this.stopper = stopper; this.stopper = stopper;
this.conf = conf; this.conf = conf;
this.replicationQueueSizeCapacity = this.replicationQueueSizeCapacity =
@ -178,7 +176,8 @@ public class ReplicationSource extends Thread
// replication and make replication specific settings such as compression or codec to use // replication and make replication specific settings such as compression or codec to use
// passing Cells. // passing Cells.
this.conn = HConnectionManager.getConnection(conf); this.conn = HConnectionManager.getConnection(conf);
this.zkHelper = manager.getRepZkWrapper(); this.replicationQueues = replicationQueues;
this.replicationPeers = replicationPeers;
this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f); this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f);
this.currentPeers = new ArrayList<ServerName>(); this.currentPeers = new ArrayList<ServerName>();
this.random = new Random(); this.random = new Random();
@ -188,11 +187,8 @@ public class ReplicationSource extends Thread
this.fs = fs; this.fs = fs;
this.metrics = new MetricsSource(peerClusterZnode); this.metrics = new MetricsSource(peerClusterZnode);
this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf); this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);
try { this.clusterId = clusterId;
this.clusterId = ZKClusterId.getUUIDForCluster(zkHelper.getZookeeperWatcher());
} catch (KeeperException ke) {
throw new IOException("Could not read cluster id", ke);
}
this.peerClusterZnode = peerClusterZnode; this.peerClusterZnode = peerClusterZnode;
this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode); this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
// ReplicationQueueInfo parses the peerId out of the znode for us // ReplicationQueueInfo parses the peerId out of the znode for us
@ -204,7 +200,7 @@ public class ReplicationSource extends Thread
*/ */
private void chooseSinks() { private void chooseSinks() {
this.currentPeers.clear(); this.currentPeers.clear();
List<ServerName> addresses = this.zkHelper.getSlavesAddresses(this.peerId); List<ServerName> addresses = this.replicationPeers.getRegionServersOfConnectedPeer(this.peerId);
Set<ServerName> setOfAddr = new HashSet<ServerName>(); Set<ServerName> setOfAddr = new HashSet<ServerName>();
int nbPeers = (int) (Math.ceil(addresses.size() * ratio)); int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
LOG.debug("Getting " + nbPeers + LOG.debug("Getting " + nbPeers +
@ -238,7 +234,7 @@ public class ReplicationSource extends Thread
int sleepMultiplier = 1; int sleepMultiplier = 1;
// delay this until we are in an asynchronous thread // delay this until we are in an asynchronous thread
while (this.peerClusterId == null) { while (this.peerClusterId == null) {
this.peerClusterId = zkHelper.getPeerUUID(this.peerId); this.peerClusterId = replicationPeers.getPeerUUID(this.peerId);
if (this.peerClusterId == null) { if (this.peerClusterId == null) {
if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) { if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
sleepMultiplier++; sleepMultiplier++;
@ -254,8 +250,8 @@ public class ReplicationSource extends Thread
// normally has a position (unless the RS failed between 2 logs) // normally has a position (unless the RS failed between 2 logs)
if (this.replicationQueueInfo.isQueueRecovered()) { if (this.replicationQueueInfo.isQueueRecovered()) {
try { try {
this.repLogReader.setPosition(this.zkHelper.getHLogRepPosition( this.repLogReader.setPosition(this.replicationQueues.getLogPosition(this.peerClusterZnode,
this.peerClusterZnode, this.queue.peek().getName())); this.queue.peek().getName()));
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Recovered queue started with log " + this.queue.peek() + LOG.trace("Recovered queue started with log " + this.queue.peek() +
" at position " + this.repLogReader.getPosition()); " at position " + this.repLogReader.getPosition());
@ -736,7 +732,7 @@ public class ReplicationSource extends Thread
* @return true if the peer is enabled, otherwise false * @return true if the peer is enabled, otherwise false
*/ */
protected boolean isPeerEnabled() { protected boolean isPeerEnabled() {
return this.zkHelper.getPeerEnabled(this.peerId); return this.replicationPeers.getStatusOfConnectedPeer(this.peerId);
} }
/** /**

View File

@ -19,12 +19,15 @@
package org.apache.hadoop.hbase.replication.regionserver; package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException; import java.io.IOException;
import java.util.UUID;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
/** /**
* Interface that defines a replication source * Interface that defines a replication source
@ -41,13 +44,10 @@ public interface ReplicationSourceInterface {
* @param peerClusterId the id of the peer cluster * @param peerClusterId the id of the peer cluster
* @throws IOException * @throws IOException
*/ */
void init( public void init(final Configuration conf, final FileSystem fs,
final Configuration conf, final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
final FileSystem fs, final ReplicationPeers replicationPeers, final Stoppable stopper,
final ReplicationSourceManager manager, final String peerClusterZnode, final UUID clusterId) throws IOException;
final Stoppable stopper,
final String peerClusterId
) throws IOException;
/** /**
* Add a log to the list of logs to replicate * Add a log to the list of logs to replicate

View File

@ -29,11 +29,11 @@ import java.util.Random;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -42,10 +42,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.replication.ReplicationListener;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -60,18 +60,23 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
* When a region server dies, this class uses a watcher to get notified and it * When a region server dies, this class uses a watcher to get notified and it
* tries to grab a lock in order to transfer all the queues in a local * tries to grab a lock in order to transfer all the queues in a local
* old source. * old source.
*
* This class implements the ReplicationListener interface so that it can track changes in
* replication state.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class ReplicationSourceManager { public class ReplicationSourceManager implements ReplicationListener {
private static final Log LOG = private static final Log LOG =
LogFactory.getLog(ReplicationSourceManager.class); LogFactory.getLog(ReplicationSourceManager.class);
// List of all the sources that read this RS's logs // List of all the sources that read this RS's logs
private final List<ReplicationSourceInterface> sources; private final List<ReplicationSourceInterface> sources;
// List of all the sources we got from died RSs // List of all the sources we got from died RSs
private final List<ReplicationSourceInterface> oldsources; private final List<ReplicationSourceInterface> oldsources;
// Helper for zookeeper
private final ReplicationZookeeper zkHelper;
private final ReplicationQueues replicationQueues; private final ReplicationQueues replicationQueues;
private final ReplicationTracker replicationTracker;
private final ReplicationPeers replicationPeers;
// UUID for this cluster
private final UUID clusterId;
// All about stopping // All about stopping
private final Stoppable stopper; private final Stoppable stopper;
// All logs we are currently tracking // All logs we are currently tracking
@ -80,8 +85,6 @@ public class ReplicationSourceManager {
private final FileSystem fs; private final FileSystem fs;
// The path to the latest log we saw, for new coming sources // The path to the latest log we saw, for new coming sources
private Path latestPath; private Path latestPath;
// List of all the other region servers in this cluster
private final List<String> otherRegionServers = new ArrayList<String>();
// Path to the hlogs directories // Path to the hlogs directories
private final Path logDir; private final Path logDir;
// Path to the hlog archive // Path to the hlog archive
@ -104,12 +107,14 @@ public class ReplicationSourceManager {
* @param logDir the directory that contains all hlog directories of live RSs * @param logDir the directory that contains all hlog directories of live RSs
* @param oldLogDir the directory where old logs are archived * @param oldLogDir the directory where old logs are archived
*/ */
public ReplicationSourceManager(final ReplicationZookeeper zkHelper, public ReplicationSourceManager(final ReplicationQueues replicationQueues,
final ReplicationQueues replicationQueues, final Configuration conf, final Stoppable stopper, final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
final FileSystem fs, final Path logDir, final Path oldLogDir) { final Configuration conf, final Stoppable stopper, final FileSystem fs, final Path logDir,
final Path oldLogDir, final UUID clusterId) {
this.sources = new ArrayList<ReplicationSourceInterface>(); this.sources = new ArrayList<ReplicationSourceInterface>();
this.zkHelper = zkHelper;
this.replicationQueues = replicationQueues; this.replicationQueues = replicationQueues;
this.replicationPeers = replicationPeers;
this.replicationTracker = replicationTracker;
this.stopper = stopper; this.stopper = stopper;
this.hlogsById = new HashMap<String, SortedSet<String>>(); this.hlogsById = new HashMap<String, SortedSet<String>>();
this.oldsources = new ArrayList<ReplicationSourceInterface>(); this.oldsources = new ArrayList<ReplicationSourceInterface>();
@ -118,11 +123,9 @@ public class ReplicationSourceManager {
this.logDir = logDir; this.logDir = logDir;
this.oldLogDir = oldLogDir; this.oldLogDir = oldLogDir;
this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 2000); this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 2000);
this.zkHelper.registerRegionServerListener( this.clusterId = clusterId;
new OtherRegionServerWatcher(this.zkHelper.getZookeeperWatcher())); this.replicationTracker.registerListener(this);
this.zkHelper.registerRegionServerListener( this.replicationPeers.getAllPeerIds();
new PeersWatcher(this.zkHelper.getZookeeperWatcher()));
this.zkHelper.listPeersIdsAndWatch();
// It's preferable to failover 1 RS at a time, but with good zk servers // It's preferable to failover 1 RS at a time, but with good zk servers
// more could be processed at the same time. // more could be processed at the same time.
int nbWorkers = conf.getInt("replication.executor.workers", 1); int nbWorkers = conf.getInt("replication.executor.workers", 1);
@ -150,12 +153,12 @@ public class ReplicationSourceManager {
*/ */
public void logPositionAndCleanOldLogs(Path log, String id, long position, public void logPositionAndCleanOldLogs(Path log, String id, long position,
boolean queueRecovered, boolean holdLogInZK) { boolean queueRecovered, boolean holdLogInZK) {
String key = log.getName(); String fileName = log.getName();
this.zkHelper.writeReplicationStatus(key, id, position); this.replicationQueues.setLogPosition(id, fileName, position);
if (holdLogInZK) { if (holdLogInZK) {
return; return;
} }
cleanOldLogs(key, id, queueRecovered); cleanOldLogs(fileName, id, queueRecovered);
} }
/** /**
@ -175,7 +178,7 @@ public class ReplicationSourceManager {
} }
SortedSet<String> hlogSet = hlogs.headSet(key); SortedSet<String> hlogSet = hlogs.headSet(key);
for (String hlog : hlogSet) { for (String hlog : hlogSet) {
this.zkHelper.removeLogFromList(hlog, id); this.replicationQueues.removeLog(id, hlog);
} }
hlogSet.clear(); hlogSet.clear();
} }
@ -186,24 +189,21 @@ public class ReplicationSourceManager {
* old region server hlog queues * old region server hlog queues
*/ */
public void init() throws IOException { public void init() throws IOException {
for (String id : this.zkHelper.getPeerClusters()) { for (String id : this.replicationPeers.getConnectedPeers()) {
addSource(id); addSource(id);
} }
List<String> currentReplicators = this.replicationQueues.getListOfReplicators(); List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
if (currentReplicators == null || currentReplicators.size() == 0) { if (currentReplicators == null || currentReplicators.size() == 0) {
return; return;
} }
synchronized (otherRegionServers) { List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
refreshOtherRegionServersList(); LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
LOG.info("Current list of replicators: " + currentReplicators + otherRegionServers);
+ " other RSs: " + otherRegionServers);
}
// Look if there's anything to process after a restart // Look if there's anything to process after a restart
for (String rs : currentReplicators) { for (String rs : currentReplicators) {
synchronized (otherRegionServers) { if (!otherRegionServers.contains(rs)) {
if (!this.otherRegionServers.contains(rs)) { transferQueues(rs);
transferQueues(rs);
}
} }
} }
} }
@ -215,7 +215,9 @@ public class ReplicationSourceManager {
* @throws IOException * @throws IOException
*/ */
public ReplicationSourceInterface addSource(String id) throws IOException { public ReplicationSourceInterface addSource(String id) throws IOException {
ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this, stopper, id); ReplicationSourceInterface src =
getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
this.replicationPeers, stopper, id, this.clusterId);
synchronized (this.hlogsById) { synchronized (this.hlogsById) {
this.sources.add(src); this.sources.add(src);
this.hlogsById.put(id, new TreeSet<String>()); this.hlogsById.put(id, new TreeSet<String>());
@ -224,7 +226,7 @@ public class ReplicationSourceManager {
String name = this.latestPath.getName(); String name = this.latestPath.getName();
this.hlogsById.get(id).add(name); this.hlogsById.get(id).add(name);
try { try {
this.zkHelper.addLogToList(name, src.getPeerClusterZnode()); this.replicationQueues.addLog(src.getPeerClusterZnode(), name);
} catch (KeeperException ke) { } catch (KeeperException ke) {
String message = "Cannot add log to zk for" + String message = "Cannot add log to zk for" +
" replication when creating a new source"; " replication when creating a new source";
@ -238,13 +240,24 @@ public class ReplicationSourceManager {
return src; return src;
} }
/**
* Delete a complete queue of hlogs associated with a peer cluster
* @param peerId Id of the peer cluster queue of hlogs to delete
*/
public void deleteSource(String peerId, boolean closeConnection) {
this.replicationQueues.removeQueue(peerId);
if (closeConnection) {
this.replicationPeers.disconnectFromPeer(peerId);
}
}
/** /**
* Terminate the replication on this region server * Terminate the replication on this region server
*/ */
public void join() { public void join() {
this.executor.shutdown(); this.executor.shutdown();
if (this.sources.size() == 0) { if (this.sources.size() == 0) {
this.zkHelper.deleteOwnRSZNode(); this.replicationQueues.removeAllQueues();
} }
for (ReplicationSourceInterface source : this.sources) { for (ReplicationSourceInterface source : this.sources) {
source.terminate("Region server is closing"); source.terminate("Region server is closing");
@ -273,7 +286,7 @@ public class ReplicationSourceManager {
String name = newLog.getName(); String name = newLog.getName();
for (ReplicationSourceInterface source : this.sources) { for (ReplicationSourceInterface source : this.sources) {
try { try {
this.zkHelper.addLogToList(name, source.getPeerClusterZnode()); this.replicationQueues.addLog(source.getPeerClusterZnode(), name);
} catch (KeeperException ke) { } catch (KeeperException ke) {
throw new IOException("Cannot add log to zk for replication", ke); throw new IOException("Cannot add log to zk for replication", ke);
} }
@ -298,14 +311,6 @@ public class ReplicationSourceManager {
} }
} }
/**
* Get the ZK help of this manager
* @return the helper
*/
public ReplicationZookeeper getRepZkWrapper() {
return zkHelper;
}
/** /**
* Factory method to create a replication source * Factory method to create a replication source
* @param conf the configuration to use * @param conf the configuration to use
@ -316,12 +321,10 @@ public class ReplicationSourceManager {
* @return the created source * @return the created source
* @throws IOException * @throws IOException
*/ */
public ReplicationSourceInterface getReplicationSource( public ReplicationSourceInterface getReplicationSource(final Configuration conf,
final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager,
final FileSystem fs, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
final ReplicationSourceManager manager, final Stoppable stopper, final String peerId, final UUID clusterId) throws IOException {
final Stoppable stopper,
final String peerId) throws IOException {
ReplicationSourceInterface src; ReplicationSourceInterface src;
try { try {
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
@ -334,7 +337,7 @@ public class ReplicationSourceManager {
src = new ReplicationSource(); src = new ReplicationSource();
} }
src.init(conf, fs, manager, stopper, peerId); src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId, clusterId);
return src; return src;
} }
@ -347,7 +350,9 @@ public class ReplicationSourceManager {
* @param rsZnode * @param rsZnode
*/ */
private void transferQueues(String rsZnode) { private void transferQueues(String rsZnode) {
NodeFailoverWorker transfer = new NodeFailoverWorker(rsZnode); NodeFailoverWorker transfer =
new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
this.clusterId);
try { try {
this.executor.execute(transfer); this.executor.execute(transfer);
} catch (RejectedExecutionException ex) { } catch (RejectedExecutionException ex) {
@ -362,7 +367,7 @@ public class ReplicationSourceManager {
public void closeRecoveredQueue(ReplicationSourceInterface src) { public void closeRecoveredQueue(ReplicationSourceInterface src) {
LOG.info("Done with the recovered queue " + src.getPeerClusterZnode()); LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
this.oldsources.remove(src); this.oldsources.remove(src);
this.zkHelper.deleteSource(src.getPeerClusterZnode(), false); deleteSource(src.getPeerClusterZnode(), false);
} }
/** /**
@ -403,151 +408,34 @@ public class ReplicationSourceManager {
} }
srcToRemove.terminate(terminateMessage); srcToRemove.terminate(terminateMessage);
this.sources.remove(srcToRemove); this.sources.remove(srcToRemove);
this.zkHelper.deleteSource(id, true); deleteSource(id, true);
} }
/** @Override
* Reads the list of region servers from ZK and atomically clears our public void regionServerRemoved(String regionserver) {
* local view of it and replaces it with the updated list. transferQueues(regionserver);
*
* @return true if the local list of the other region servers was updated
* with the ZK data (even if it was empty),
* false if the data was missing in ZK
*/
private boolean refreshOtherRegionServersList() {
List<String> newRsList = zkHelper.getRegisteredRegionServers();
if (newRsList == null) {
return false;
} else {
synchronized (otherRegionServers) {
otherRegionServers.clear();
otherRegionServers.addAll(newRsList);
}
}
return true;
} }
/** @Override
* Watcher used to be notified of the other region server's death public void peerRemoved(String peerId) {
* in the local cluster. It initiates the process to transfer the queues removePeer(peerId);
* if it is able to grab the lock.
*/
public class OtherRegionServerWatcher extends ZooKeeperListener {
/**
* Construct a ZooKeeper event listener.
*/
public OtherRegionServerWatcher(ZooKeeperWatcher watcher) {
super(watcher);
}
/**
* Called when a new node has been created.
* @param path full path of the new node
*/
public void nodeCreated(String path) {
refreshListIfRightPath(path);
}
/**
* Called when a node has been deleted
* @param path full path of the deleted node
*/
public void nodeDeleted(String path) {
if (stopper.isStopped()) {
return;
}
boolean cont = refreshListIfRightPath(path);
if (!cont) {
return;
}
LOG.info(path + " znode expired, trying to lock it");
transferQueues(ReplicationZookeeper.getZNodeName(path));
}
/**
* Called when an existing node has a child node added or removed.
* @param path full path of the node whose children have changed
*/
public void nodeChildrenChanged(String path) {
if (stopper.isStopped()) {
return;
}
refreshListIfRightPath(path);
}
private boolean refreshListIfRightPath(String path) {
if (!path.startsWith(zkHelper.getZookeeperWatcher().rsZNode)) {
return false;
}
return refreshOtherRegionServersList();
}
} }
/** @Override
* Watcher used to follow the creation and deletion of peer clusters. public void peerListChanged(List<String> peerIds) {
*/ for (String id : peerIds) {
public class PeersWatcher extends ZooKeeperListener { try {
boolean added = this.replicationPeers.connectToPeer(id);
/** if (added) {
* Construct a ZooKeeper event listener. addSource(id);
*/
public PeersWatcher(ZooKeeperWatcher watcher) {
super(watcher);
}
/**
* Called when a node has been deleted
* @param path full path of the deleted node
*/
public void nodeDeleted(String path) {
List<String> peers = refreshPeersList(path);
if (peers == null) {
return;
}
if (zkHelper.isPeerPath(path)) {
String id = ReplicationZookeeper.getZNodeName(path);
removePeer(id);
}
}
/**
* Called when an existing node has a child node added or removed.
* @param path full path of the node whose children have changed
*/
public void nodeChildrenChanged(String path) {
List<String> peers = refreshPeersList(path);
if (peers == null) {
return;
}
for (String id : peers) {
try {
boolean added = zkHelper.connectToPeer(id);
if (added) {
addSource(id);
}
} catch (IOException e) {
// TODO manage better than that ?
LOG.error("Error while adding a new peer", e);
} catch (KeeperException e) {
LOG.error("Error while adding a new peer", e);
} }
} catch (IOException e) {
// TODO manage better than that ?
LOG.error("Error while adding a new peer", e);
} catch (KeeperException e) {
LOG.error("Error while adding a new peer", e);
} }
} }
/**
* Verify if this event is meant for us, and if so then get the latest
* peers' list from ZK. Also reset the watches.
* @param path path to check against
* @return A list of peers' identifiers if the event concerns this watcher,
* else null.
*/
private List<String> refreshPeersList(String path) {
if (!path.startsWith(zkHelper.getPeersZNode())) {
return null;
}
return zkHelper.listPeersIdsAndWatch();
}
} }
/** /**
@ -557,14 +445,21 @@ public class ReplicationSourceManager {
class NodeFailoverWorker extends Thread { class NodeFailoverWorker extends Thread {
private String rsZnode; private String rsZnode;
private final ReplicationQueues rq;
private final ReplicationPeers rp;
private final UUID clusterId;
/** /**
* *
* @param rsZnode * @param rsZnode
*/ */
public NodeFailoverWorker(String rsZnode) { public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
final ReplicationPeers replicationPeers, final UUID clusterId) {
super("Failover-for-"+rsZnode); super("Failover-for-"+rsZnode);
this.rsZnode = rsZnode; this.rsZnode = rsZnode;
this.rq = replicationQueues;
this.rp = replicationPeers;
this.clusterId = clusterId;
} }
@Override @Override
@ -584,7 +479,7 @@ public class ReplicationSourceManager {
} }
SortedMap<String, SortedSet<String>> newQueues = null; SortedMap<String, SortedSet<String>> newQueues = null;
newQueues = zkHelper.claimQueues(rsZnode); newQueues = this.rq.claimQueues(rsZnode);
// Copying over the failed queue is completed. // Copying over the failed queue is completed.
if (newQueues.isEmpty()) { if (newQueues.isEmpty()) {
@ -597,8 +492,9 @@ public class ReplicationSourceManager {
String peerId = entry.getKey(); String peerId = entry.getKey();
try { try {
ReplicationSourceInterface src = ReplicationSourceInterface src =
getReplicationSource(conf, fs, ReplicationSourceManager.this, stopper, peerId); getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
if (!zkHelper.getPeerClusters().contains((src.getPeerClusterId()))) { stopper, peerId, this.clusterId);
if (!this.rp.getConnectedPeers().contains((src.getPeerClusterId()))) {
src.terminate("Recovered queue doesn't belong to any current peer"); src.terminate("Recovered queue doesn't belong to any current peer");
break; break;
} }

View File

@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
import java.io.IOException; import java.io.IOException;
import java.net.URLEncoder; import java.net.URLEncoder;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@ -30,7 +29,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner; import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.AfterClass; import org.junit.AfterClass;
@ -68,8 +68,9 @@ public class TestLogsCleaner {
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
Replication.decorateMasterConfiguration(conf); Replication.decorateMasterConfiguration(conf);
Server server = new DummyServer(); Server server = new DummyServer();
ReplicationZookeeper zkHelper = new ReplicationZookeeper(server); ReplicationQueues repQueues =
ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server);
repQueues.init(server.getServerName().toString());
Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(), Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(),
HConstants.HREGION_OLDLOGDIR_NAME); HConstants.HREGION_OLDLOGDIR_NAME);
String fakeMachineName = String fakeMachineName =
@ -98,7 +99,7 @@ public class TestLogsCleaner {
// (TimeToLiveLogCleaner) but would be rejected by the second // (TimeToLiveLogCleaner) but would be rejected by the second
// (ReplicationLogCleaner) // (ReplicationLogCleaner)
if (i % (30/3) == 1) { if (i % (30/3) == 1) {
zkHelper.addLogToList(fileName.getName(), fakeMachineName); repQueues.addLog(fakeMachineName, fileName.getName());
System.out.println("Replication log file: " + fileName); System.out.println("Replication log file: " + fileName);
} }
} }

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.replication; package org.apache.hadoop.hbase.replication;
import java.io.IOException; import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -39,7 +40,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
@Override @Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
Stoppable stopper, String peerClusterId) throws IOException { ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId,
UUID clusterId) throws IOException {
this.manager = manager; this.manager = manager;
this.peerClusterId = peerClusterId; this.peerClusterId = peerClusterId;
} }

View File

@ -69,6 +69,7 @@ public abstract class TestReplicationStateBasic {
@Test @Test
public void testReplicationQueuesClient() throws KeeperException { public void testReplicationQueuesClient() throws KeeperException {
rqc.init();
// Test methods with empty state // Test methods with empty state
assertEquals(0, rqc.getListOfReplicators().size()); assertEquals(0, rqc.getListOfReplicators().size());
assertNull(rqc.getLogsInQueue(server1, "qId1")); assertNull(rqc.getLogsInQueue(server1, "qId1"));

View File

@ -36,7 +36,10 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Test;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -50,6 +53,7 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
private static HBaseTestingUtility utility; private static HBaseTestingUtility utility;
private static ZooKeeperWatcher zkw; private static ZooKeeperWatcher zkw;
private static String replicationZNode; private static String replicationZNode;
private ReplicationQueuesZKImpl rqZK;
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
@ -65,7 +69,7 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
private static String initPeerClusterState(String baseZKNode) private static String initPeerClusterState(String baseZKNode)
throws IOException, KeeperException { throws IOException, KeeperException {
// Set up state nodes of peer clusters // Add a dummy region server and set up the cluster id
Configuration testConf = new Configuration(conf); Configuration testConf = new Configuration(conf);
testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode); testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode);
ZooKeeperWatcher zkw1 = new ZooKeeperWatcher(testConf, "test1", null); ZooKeeperWatcher zkw1 = new ZooKeeperWatcher(testConf, "test1", null);
@ -82,16 +86,13 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
DummyServer ds1 = new DummyServer(server1); DummyServer ds1 = new DummyServer(server1);
DummyServer ds2 = new DummyServer(server2); DummyServer ds2 = new DummyServer(server2);
DummyServer ds3 = new DummyServer(server3); DummyServer ds3 = new DummyServer(server3);
try { rq1 = ReplicationFactory.getReplicationQueues(zkw, conf, ds1);
rq1 = new ReplicationQueuesZKImpl(zkw, conf, ds1); rq2 = ReplicationFactory.getReplicationQueues(zkw, conf, ds2);
rq2 = new ReplicationQueuesZKImpl(zkw, conf, ds2); rq3 = ReplicationFactory.getReplicationQueues(zkw, conf, ds3);
rq3 = new ReplicationQueuesZKImpl(zkw, conf, ds3); rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, ds1);
rqc = new ReplicationQueuesClientZKImpl(zkw, conf, ds1); rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
rp = new ReplicationPeersZKImpl(zkw, conf, zkw); OUR_KEY = ZKUtil.getZooKeeperClusterKey(conf);
OUR_KEY = ZKUtil.getZooKeeperClusterKey(conf); rqZK = new ReplicationQueuesZKImpl(zkw, conf, ds1);
} catch (KeeperException e) {
fail("Exception thrown: " + e);
}
} }
@After @After
@ -104,6 +105,23 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
utility.shutdownMiniZKCluster(); utility.shutdownMiniZKCluster();
} }
@Test
public void testIsPeerPath_PathToParentOfPeerNode() {
assertFalse(rqZK.isPeerPath(rqZK.peersZNode));
}
@Test
public void testIsPeerPath_PathToChildOfPeerNode() {
String peerChild = ZKUtil.joinZNode(ZKUtil.joinZNode(rqZK.peersZNode, "1"), "child");
assertFalse(rqZK.isPeerPath(peerChild));
}
@Test
public void testIsPeerPath_ActualPeerPath() {
String peerPath = ZKUtil.joinZNode(rqZK.peersZNode, "1");
assertTrue(rqZK.isPeerPath(peerPath));
}
static class DummyServer implements Server { static class DummyServer implements Server {
private String serverName; private String serverName;
private boolean isAborted = false; private boolean isAborted = false;

View File

@ -0,0 +1,251 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.AfterClass;
import org.junit.Test;
import static org.junit.Assert.*;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
/**
* This class tests the ReplicationTrackerZKImpl class and ReplicationListener interface. One
* MiniZKCluster is used throughout the entire class. The cluster is initialized with the creation
* of the rsZNode. All other znode creation/initialization is handled by the replication state
* interfaces (i.e. ReplicationPeers, etc.). Each test case in this class should ensure that the
* MiniZKCluster is cleaned and returned to it's initial state (i.e. nothing but the rsZNode).
*/
@Category(MediumTests.class)
public class TestReplicationTrackerZKImpl {
private static final Log LOG = LogFactory.getLog(TestReplicationTrackerZKImpl.class);
private static Configuration conf;
private static HBaseTestingUtility utility;
// Each one of the below variables are reinitialized before every test case
private ZooKeeperWatcher zkw;
private ReplicationPeers rp;
private ReplicationTracker rt;
private AtomicInteger rsRemovedCount;
private String rsRemovedData;
private AtomicInteger plChangedCount;
private List<String> plChangedData;
private AtomicInteger peerRemovedCount;
private String peerRemovedData;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
utility = new HBaseTestingUtility();
utility.startMiniZKCluster();
conf = utility.getConfiguration();
ZooKeeperWatcher zk = HBaseTestingUtility.getZooKeeperWatcher(utility);
ZKUtil.createWithParents(zk, zk.rsZNode);
}
@Before
public void setUp() throws Exception {
zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
String fakeRs1 = ZKUtil.joinZNode(zkw.rsZNode, "hostname1.example.org:1234");
try {
ZKClusterId.setClusterId(zkw, new ClusterId());
rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
rp.init();
rt = ReplicationFactory.getReplicationTracker(zkw, rp, conf, zkw, new DummyServer(fakeRs1));
} catch (Exception e) {
fail("Exception during test setup: " + e);
}
rsRemovedCount = new AtomicInteger(0);
rsRemovedData = "";
plChangedCount = new AtomicInteger(0);
plChangedData = new ArrayList<String>();
peerRemovedCount = new AtomicInteger(0);
peerRemovedData = "";
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
utility.shutdownMiniZKCluster();
}
@Test
public void testGetListOfRegionServers() throws Exception {
// 0 region servers
assertEquals(0, rt.getListOfRegionServers().size());
// 1 region server
ZKUtil.createWithParents(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname1.example.org:1234"));
assertEquals(1, rt.getListOfRegionServers().size());
// 2 region servers
ZKUtil.createWithParents(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname2.example.org:1234"));
assertEquals(2, rt.getListOfRegionServers().size());
// 1 region server
ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname2.example.org:1234"));
assertEquals(1, rt.getListOfRegionServers().size());
// 0 region server
ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname1.example.org:1234"));
assertEquals(0, rt.getListOfRegionServers().size());
}
@Test(timeout = 30000)
public void testRegionServerRemovedEvent() throws Exception {
ZKUtil.createAndWatch(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname2.example.org:1234"),
HConstants.EMPTY_BYTE_ARRAY);
rt.registerListener(new DummyReplicationListener());
// delete one
ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname2.example.org:1234"));
// wait for event
while (rsRemovedCount.get() < 1) {
Thread.sleep(5);
}
assertEquals("hostname2.example.org:1234", rsRemovedData);
}
@Test(timeout = 30000)
public void testPeerRemovedEvent() throws Exception {
rp.addPeer("5", utility.getClusterKey());
rt.registerListener(new DummyReplicationListener());
rp.removePeer("5");
// wait for event
while (peerRemovedCount.get() < 1) {
Thread.sleep(5);
}
assertEquals("5", peerRemovedData);
}
@Test(timeout = 30000)
public void testPeerListChangedEvent() throws Exception {
// add a peer
rp.addPeer("5", utility.getClusterKey());
zkw.getRecoverableZooKeeper().getZooKeeper().getChildren("/hbase/replication/peers/5", true);
rt.registerListener(new DummyReplicationListener());
rp.disablePeer("5");
ZKUtil.deleteNode(zkw, "/hbase/replication/peers/5/peer-state");
// wait for event
int tmp = plChangedCount.get();
while (plChangedCount.get() <= tmp) {
Thread.sleep(5);
}
assertEquals(1, plChangedData.size());
assertTrue(plChangedData.contains("5"));
// clean up
ZKUtil.deleteNode(zkw, "/hbase/replication/peers/5");
}
private class DummyReplicationListener implements ReplicationListener {
@Override
public void regionServerRemoved(String regionServer) {
rsRemovedData = regionServer;
rsRemovedCount.getAndIncrement();
LOG.debug("Received regionServerRemoved event: " + regionServer);
}
@Override
public void peerRemoved(String peerId) {
peerRemovedData = peerId;
peerRemovedCount.getAndIncrement();
LOG.debug("Received peerRemoved event: " + peerId);
}
@Override
public void peerListChanged(List<String> peerIds) {
plChangedData.clear();
plChangedData.addAll(peerIds);
plChangedCount.getAndIncrement();
LOG.debug("Received peerListChanged event");
}
}
private class DummyServer implements Server {
private String serverName;
private boolean isAborted = false;
private boolean isStopped = false;
public DummyServer(String serverName) {
this.serverName = serverName;
}
@Override
public Configuration getConfiguration() {
return conf;
}
@Override
public ZooKeeperWatcher getZooKeeper() {
return zkw;
}
@Override
public CatalogTracker getCatalogTracker() {
return null;
}
@Override
public ServerName getServerName() {
return new ServerName(this.serverName);
}
@Override
public void abort(String why, Throwable e) {
LOG.info("Aborting " + serverName);
this.isAborted = true;
}
@Override
public boolean isAborted() {
return this.isAborted;
}
@Override
public void stop(String why) {
this.isStopped = true;
}
@Override
public boolean isStopped() {
return this.isStopped;
}
}
}

View File

@ -1,145 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
public class TestReplicationZookeeper {
private static Configuration conf;
private static HBaseTestingUtility utility;
private static ZooKeeperWatcher zkw;
private static ReplicationZookeeper repZk;
private static String slaveClusterKey;
private static String peersZNode;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
utility = new HBaseTestingUtility();
utility.startMiniZKCluster();
conf = utility.getConfiguration();
zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
DummyServer server = new DummyServer();
repZk = new ReplicationZookeeper(server);
slaveClusterKey = conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" +
conf.get("hbase.zookeeper.property.clientPort") + ":/1";
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
String replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName);
peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
utility.shutdownMiniZKCluster();
}
@Test
public void testGetAddressesMissingSlave()
throws IOException, KeeperException {
repZk.addPeer("1", slaveClusterKey);
// HBASE-5586 used to get an NPE
assertEquals(0, repZk.getSlavesAddresses("1").size());
}
@Test
public void testIsPeerPath_PathToParentOfPeerNode() {
String peerParentNode = peersZNode;
assertFalse(repZk.isPeerPath(peerParentNode));
}
@Test
public void testIsPeerPath_PathToChildOfPeerNode() {
String peerChild = ZKUtil.joinZNode(ZKUtil.joinZNode(peersZNode, "1"), "child");
assertFalse(repZk.isPeerPath(peerChild));
}
@Test
public void testIsPeerPath_ActualPeerPath() {
String peerPath = ZKUtil.joinZNode(peersZNode, "1");
assertTrue(repZk.isPeerPath(peerPath));
}
static class DummyServer implements Server {
@Override
public Configuration getConfiguration() {
return conf;
}
@Override
public ZooKeeperWatcher getZooKeeper() {
return zkw;
}
@Override
public CatalogTracker getCatalogTracker() {
return null;
}
@Override
public ServerName getServerName() {
return new ServerName("hostname.example.org", 1234, -1L);
}
@Override
public void abort(String why, Throwable e) {
}
@Override
public boolean isAborted() {
return false;
}
@Override
public void stop(String why) {
}
@Override
public boolean isStopped() {
return false;
}
}
}

View File

@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
@ -50,11 +51,13 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.After; import org.junit.After;
@ -128,6 +131,8 @@ public class TestReplicationSourceManager {
ZKUtil.createWithParents(zkw, "/hbase/replication/state"); ZKUtil.createWithParents(zkw, "/hbase/replication/state");
ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES); ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
ZKClusterId.setClusterId(zkw, new ClusterId());
replication = new Replication(new DummyServer(), fs, logDir, oldLogDir); replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
manager = replication.getReplicationManager(); manager = replication.getReplicationManager();
fs = FileSystem.get(conf); fs = FileSystem.get(conf);
@ -226,12 +231,15 @@ public class TestReplicationSourceManager {
LOG.debug("testNodeFailoverWorkerCopyQueuesFromRSUsingMulti"); LOG.debug("testNodeFailoverWorkerCopyQueuesFromRSUsingMulti");
conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
final Server server = new DummyServer("hostname0.example.org"); final Server server = new DummyServer("hostname0.example.org");
ReplicationZookeeper rz = new ReplicationZookeeper(server); ReplicationQueues rq =
ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
server);
rq.init(server.getServerName().toString());
// populate some znodes in the peer znode // populate some znodes in the peer znode
files.add("log1"); files.add("log1");
files.add("log2"); files.add("log2");
for (String file : files) { for (String file : files) {
rz.addLogToList(file, "1"); rq.addLog("1", file);
} }
// create 3 DummyServers // create 3 DummyServers
Server s1 = new DummyServer("dummyserver1.example.org"); Server s1 = new DummyServer("dummyserver1.example.org");
@ -266,12 +274,14 @@ public class TestReplicationSourceManager {
LOG.debug("testNodeFailoverDeadServerParsing"); LOG.debug("testNodeFailoverDeadServerParsing");
conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com"); final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
ReplicationZookeeper rz = new ReplicationZookeeper(server); ReplicationQueues repQueues =
ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server);
repQueues.init(server.getServerName().toString());
// populate some znodes in the peer znode // populate some znodes in the peer znode
files.add("log1"); files.add("log1");
files.add("log2"); files.add("log2");
for (String file : files) { for (String file : files) {
rz.addLogToList(file, "1"); repQueues.addLog("1", file);
} }
// create 3 DummyServers // create 3 DummyServers
@ -280,13 +290,19 @@ public class TestReplicationSourceManager {
Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com"); Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com");
// simulate three servers fail sequentially // simulate three servers fail sequentially
ReplicationZookeeper rz1 = new ReplicationZookeeper(s1); ReplicationQueues rq1 =
ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
rq1.init(s1.getServerName().toString());
SortedMap<String, SortedSet<String>> testMap = SortedMap<String, SortedSet<String>> testMap =
rz1.claimQueues(server.getServerName().getServerName()); rq1.claimQueues(server.getServerName().getServerName());
ReplicationZookeeper rz2 = new ReplicationZookeeper(s2); ReplicationQueues rq2 =
testMap = rz2.claimQueues(s1.getServerName().getServerName()); ReplicationFactory.getReplicationQueues(s2.getZooKeeper(), s2.getConfiguration(), s2);
ReplicationZookeeper rz3 = new ReplicationZookeeper(s3); rq2.init(s2.getServerName().toString());
testMap = rz3.claimQueues(s2.getServerName().getServerName()); testMap = rq2.claimQueues(s1.getServerName().getServerName());
ReplicationQueues rq3 =
ReplicationFactory.getReplicationQueues(s3.getZooKeeper(), s3.getConfiguration(), s3);
rq3.init(s3.getServerName().toString());
testMap = rq3.claimQueues(s2.getServerName().getServerName());
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.firstKey()); ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.firstKey());
List<String> result = replicationQueueInfo.getDeadRegionServers(); List<String> result = replicationQueueInfo.getDeadRegionServers();
@ -304,18 +320,21 @@ public class TestReplicationSourceManager {
private SortedMap<String, SortedSet<String>> logZnodesMap; private SortedMap<String, SortedSet<String>> logZnodesMap;
Server server; Server server;
private String deadRsZnode; private String deadRsZnode;
ReplicationZookeeper rz; ReplicationQueues rq;
public DummyNodeFailoverWorker(String znode, Server s) throws Exception { public DummyNodeFailoverWorker(String znode, Server s) throws Exception {
this.deadRsZnode = znode; this.deadRsZnode = znode;
this.server = s; this.server = s;
rz = new ReplicationZookeeper(server); this.rq =
ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
server);
this.rq.init(this.server.getServerName().toString());
} }
@Override @Override
public void run() { public void run() {
try { try {
logZnodesMap = rz.claimQueues(deadRsZnode); logZnodesMap = rq.claimQueues(deadRsZnode);
server.abort("Done with testing", null); server.abort("Done with testing", null);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Got exception while running NodeFailoverWorker", e); LOG.error("Got exception while running NodeFailoverWorker", e);