From ff5fdb671f0e0d9fc76b6ead1800188569fb2348 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Fri, 26 Jul 2013 22:42:35 +0000 Subject: [PATCH] HBASE-8439 [replication] Remove ReplicationZookeeper class git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1507497 13f79535-47bb-0310-9956-ffa450edef68 --- .../client/replication/ReplicationAdmin.java | 34 +- .../hbase/replication/ReplicationFactory.java | 51 +++ .../replication/ReplicationListener.java | 51 +++ .../replication/ReplicationQueuesClient.java | 7 + .../ReplicationQueuesClientZKImpl.java | 6 +- .../replication/ReplicationStateZKBase.java | 11 +- .../hbase/replication/ReplicationTracker.java | 49 +++ .../replication/ReplicationTrackerZKImpl.java | 249 ++++++++++++ .../replication/ReplicationZookeeper.java | 366 ------------------ .../apache/hadoop/hbase/mapreduce/Import.java | 4 - .../replication/VerifyReplication.java | 5 +- .../master/ReplicationLogCleaner.java | 4 +- .../replication/regionserver/Replication.java | 33 +- .../regionserver/ReplicationSource.java | 38 +- .../ReplicationSourceInterface.java | 14 +- .../ReplicationSourceManager.java | 282 +++++--------- .../hbase/master/cleaner/TestLogsCleaner.java | 11 +- .../replication/ReplicationSourceDummy.java | 5 +- .../TestReplicationStateBasic.java | 1 + .../TestReplicationStateZKImpl.java | 40 +- .../TestReplicationTrackerZKImpl.java | 251 ++++++++++++ .../replication/TestReplicationZookeeper.java | 145 ------- .../TestReplicationSourceManager.java | 47 ++- 23 files changed, 906 insertions(+), 798 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java delete mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationZookeeper.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index 832c96cd2f4..ebdd3355dde 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -25,7 +25,9 @@ import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.replication.ReplicationZookeeper; +import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationPeers; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -59,8 +61,9 @@ import java.util.Map; public class ReplicationAdmin implements Closeable { private static final Log LOG = LogFactory.getLog(ReplicationAdmin.class); - private final ReplicationZookeeper replicationZk; private final HConnection connection; + private final ReplicationQueuesClient replicationQueuesClient; + private final ReplicationPeers replicationPeers; /** * Constructor that creates a connection to the local ZooKeeper ensemble. @@ -76,7 +79,12 @@ public class ReplicationAdmin implements Closeable { this.connection = HConnectionManager.getConnection(conf); ZooKeeperWatcher zkw = createZooKeeperWatcher(); try { - this.replicationZk = new ReplicationZookeeper(this.connection, conf, zkw); + this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.connection); + this.replicationPeers.init(); + this.replicationQueuesClient = + ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection); + this.replicationQueuesClient.init(); + } catch (KeeperException e) { throw new IOException("Unable setup the ZooKeeper connection", e); } @@ -109,7 +117,7 @@ public class ReplicationAdmin implements Closeable { * multi-slave isn't supported yet. */ public void addPeer(String id, String clusterKey) throws IOException { - this.replicationZk.addPeer(id, clusterKey); + this.replicationPeers.addPeer(id, clusterKey); } /** @@ -117,7 +125,7 @@ public class ReplicationAdmin implements Closeable { * @param id a short that identifies the cluster */ public void removePeer(String id) throws IOException { - this.replicationZk.removePeer(id); + this.replicationPeers.removePeer(id); } /** @@ -125,7 +133,7 @@ public class ReplicationAdmin implements Closeable { * @param id a short that identifies the cluster */ public void enablePeer(String id) throws IOException { - this.replicationZk.enablePeer(id); + this.replicationPeers.enablePeer(id); } /** @@ -133,7 +141,7 @@ public class ReplicationAdmin implements Closeable { * @param id a short that identifies the cluster */ public void disablePeer(String id) throws IOException { - this.replicationZk.disablePeer(id); + this.replicationPeers.disablePeer(id); } /** @@ -141,7 +149,7 @@ public class ReplicationAdmin implements Closeable { * @return number of slave clusters */ public int getPeersCount() { - return this.replicationZk.listPeersIdsAndWatch().size(); + return this.replicationPeers.getAllPeerIds().size(); } /** @@ -149,15 +157,7 @@ public class ReplicationAdmin implements Closeable { * @return A map of peer ids to peer cluster keys */ public Map listPeers() { - return this.replicationZk.listPeers(); - } - - /** - * Get the ZK-support tool created and used by this object for replication. - * @return the ZK-support tool - */ - ReplicationZookeeper getReplicationZk() { - return replicationZk; + return this.replicationPeers.getAllPeerClusterKeys(); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java new file mode 100644 index 00000000000..2af664eb55d --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; + +/** + * A factory class for instantiating replication objects that deal with replication state. + */ +public class ReplicationFactory { + + public static ReplicationQueues getReplicationQueues(final ZooKeeperWatcher zk, + Configuration conf, Abortable abortable) { + return new ReplicationQueuesZKImpl(zk, conf, abortable); + } + + public static ReplicationQueuesClient getReplicationQueuesClient(final ZooKeeperWatcher zk, + Configuration conf, Abortable abortable) { + return new ReplicationQueuesClientZKImpl(zk, conf, abortable); + } + + public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf, + Abortable abortable) { + return new ReplicationPeersZKImpl(zk, conf, abortable); + } + + public static ReplicationTracker getReplicationTracker(ZooKeeperWatcher zookeeper, + final ReplicationPeers replicationPeers, Configuration conf, Abortable abortable, + Stoppable stopper) { + return new ReplicationTrackerZKImpl(zookeeper, replicationPeers, conf, abortable, stopper); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java new file mode 100644 index 00000000000..15c2ef95c44 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * The replication listener interface can be implemented if a class needs to subscribe to events + * generated by the ReplicationTracker. These events include things like addition/deletion of peer + * clusters or failure of a local region server. To receive events, the class also needs to register + * itself with a Replication Tracker. + */ +@InterfaceAudience.Private +public interface ReplicationListener { + + /** + * A region server has been removed from the local cluster + * @param regionServer the removed region server + */ + public void regionServerRemoved(String regionServer); + + /** + * A peer cluster has been removed (i.e. unregistered) from replication. + * @param peerId The peer id of the cluster that has been removed + */ + public void peerRemoved(String peerId); + + /** + * The list of registered peer clusters has changed. + * @param peerIds A list of all currently registered peer clusters + */ + public void peerListChanged(List peerIds); +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java index f8edd2a4c6a..8fd3277387d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java @@ -20,12 +20,19 @@ package org.apache.hadoop.hbase.replication; import java.util.List; +import org.apache.zookeeper.KeeperException; + /** * This provides an interface for clients of replication to view replication queues. These queues * keep track of the HLogs that still need to be replicated to remote clusters. */ public interface ReplicationQueuesClient { + /** + * Initialize the replication queue client interface. + */ + public void init() throws KeeperException; + /** * Get a list of all region servers that have outstanding replication queues. These servers could * be alive, dead or from a previous run of the cluster. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java index b5189518dbf..6bac186c83d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java @@ -30,8 +30,12 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem ReplicationQueuesClient { public ReplicationQueuesClientZKImpl(final ZooKeeperWatcher zk, Configuration conf, - Abortable abortable) throws KeeperException { + Abortable abortable) { super(zk, conf, abortable); + } + + @Override + public void init() throws KeeperException { ZKUtil.createWithParents(this.zookeeper, this.queuesZNode); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java index f46dbb36bcc..9199efee956 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java @@ -95,7 +95,16 @@ public abstract class ReplicationStateZKBase { return ProtobufUtil.prependPBMagic(bytes); } - public boolean peerExists(String id) throws KeeperException { + protected boolean peerExists(String id) throws KeeperException { return ZKUtil.checkExists(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id)) >= 0; } + + /** + * Determine if a ZK path points to a peer node. + * @param path path to be checked + * @return true if the path points to a peer node, otherwise false + */ + protected boolean isPeerPath(String path) { + return path.split("/").length == peersZNode.split("/").length + 1; + } } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java new file mode 100644 index 00000000000..bcd8937e11d --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.classification.InterfaceAudience; + +import java.util.List; + +/** + * This is the interface for a Replication Tracker. A replication tracker provides the facility to + * subscribe and track events that reflect a change in replication state. These events are used by + * the ReplicationSourceManager to coordinate replication tasks such as addition/deletion of queues + * and queue failover. These events are defined in the ReplicationListener interface. If a class + * would like to listen to replication events it must implement the ReplicationListener interface + * and register itself with a Replication Tracker. + */ +@InterfaceAudience.Private +public interface ReplicationTracker { + + /** + * Register a replication listener to receive replication events. + * @param listener + */ + public void registerListener(ReplicationListener listener); + + public void removeListener(ReplicationListener listener); + + /** + * Returns a list of other live region servers in the cluster. + * @return + */ + public List getListOfRegionServers(); +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java new file mode 100644 index 00000000000..5624eccce79 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java @@ -0,0 +1,249 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +/** + * This class is a Zookeeper implementation of the ReplicationTracker interface. This class is + * responsible for handling replication events that are defined in the ReplicationListener + * interface. + */ +public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements ReplicationTracker { + + private static final Log LOG = LogFactory.getLog(ReplicationTrackerZKImpl.class); + // All about stopping + private final Stoppable stopper; + // listeners to be notified + private final List listeners = + new CopyOnWriteArrayList(); + // List of all the other region servers in this cluster + private final ArrayList otherRegionServers = new ArrayList(); + private final ReplicationPeers replicationPeers; + + public ReplicationTrackerZKImpl(ZooKeeperWatcher zookeeper, + final ReplicationPeers replicationPeers, Configuration conf, Abortable abortable, + Stoppable stopper) { + super(zookeeper, conf, abortable); + this.replicationPeers = replicationPeers; + this.stopper = stopper; + this.zookeeper.registerListener(new OtherRegionServerWatcher(this.zookeeper)); + this.zookeeper.registerListener(new PeersWatcher(this.zookeeper)); + } + + @Override + public void registerListener(ReplicationListener listener) { + listeners.add(listener); + } + + @Override + public void removeListener(ReplicationListener listener) { + listeners.remove(listener); + } + + /** + * Return a snapshot of the current region servers. + */ + @Override + public List getListOfRegionServers() { + refreshOtherRegionServersList(); + + List list = null; + synchronized (otherRegionServers) { + list = new ArrayList(otherRegionServers); + } + return list; + } + + /** + * Watcher used to be notified of the other region server's death in the local cluster. It + * initiates the process to transfer the queues if it is able to grab the lock. + */ + public class OtherRegionServerWatcher extends ZooKeeperListener { + + /** + * Construct a ZooKeeper event listener. + */ + public OtherRegionServerWatcher(ZooKeeperWatcher watcher) { + super(watcher); + } + + /** + * Called when a new node has been created. + * @param path full path of the new node + */ + public void nodeCreated(String path) { + refreshListIfRightPath(path); + } + + /** + * Called when a node has been deleted + * @param path full path of the deleted node + */ + public void nodeDeleted(String path) { + if (stopper.isStopped()) { + return; + } + boolean cont = refreshListIfRightPath(path); + if (!cont) { + return; + } + LOG.info(path + " znode expired, triggering replicatorRemoved event"); + for (ReplicationListener rl : listeners) { + rl.regionServerRemoved(getZNodeName(path)); + } + } + + /** + * Called when an existing node has a child node added or removed. + * @param path full path of the node whose children have changed + */ + public void nodeChildrenChanged(String path) { + if (stopper.isStopped()) { + return; + } + refreshListIfRightPath(path); + } + + private boolean refreshListIfRightPath(String path) { + if (!path.startsWith(this.watcher.rsZNode)) { + return false; + } + return refreshOtherRegionServersList(); + } + } + + /** + * Watcher used to follow the creation and deletion of peer clusters. + */ + public class PeersWatcher extends ZooKeeperListener { + + /** + * Construct a ZooKeeper event listener. + */ + public PeersWatcher(ZooKeeperWatcher watcher) { + super(watcher); + } + + /** + * Called when a node has been deleted + * @param path full path of the deleted node + */ + public void nodeDeleted(String path) { + List peers = refreshPeersList(path); + if (peers == null) { + return; + } + if (isPeerPath(path)) { + String id = getZNodeName(path); + LOG.info(path + " znode expired, triggering peerRemoved event"); + for (ReplicationListener rl : listeners) { + rl.peerRemoved(id); + } + } + } + + /** + * Called when an existing node has a child node added or removed. + * @param path full path of the node whose children have changed + */ + public void nodeChildrenChanged(String path) { + List peers = refreshPeersList(path); + if (peers == null) { + return; + } + LOG.info(path + " znode expired, triggering peerListChanged event"); + for (ReplicationListener rl : listeners) { + rl.peerListChanged(peers); + } + } + } + + /** + * Verify if this event is meant for us, and if so then get the latest peers' list from ZK. Also + * reset the watches. + * @param path path to check against + * @return A list of peers' identifiers if the event concerns this watcher, else null. + */ + private List refreshPeersList(String path) { + if (!path.startsWith(getPeersZNode())) { + return null; + } + return this.replicationPeers.getAllPeerIds(); + } + + private String getPeersZNode() { + return this.peersZNode; + } + + /** + * Extracts the znode name of a peer cluster from a ZK path + * @param fullPath Path to extract the id from + * @return the id or an empty string if path is invalid + */ + private String getZNodeName(String fullPath) { + String[] parts = fullPath.split("/"); + return parts.length > 0 ? parts[parts.length - 1] : ""; + } + + /** + * Reads the list of region servers from ZK and atomically clears our local view of it and + * replaces it with the updated list. + * @return true if the local list of the other region servers was updated with the ZK data (even + * if it was empty), false if the data was missing in ZK + */ + private boolean refreshOtherRegionServersList() { + List newRsList = getRegisteredRegionServers(); + if (newRsList == null) { + return false; + } else { + synchronized (otherRegionServers) { + otherRegionServers.clear(); + otherRegionServers.addAll(newRsList); + } + } + return true; + } + + /** + * Get a list of all the other region servers in this cluster and set a watch + * @return a list of server nanes + */ + private List getRegisteredRegionServers() { + List result = null; + try { + result = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.zookeeper.rsZNode); + } catch (KeeperException e) { + this.abortable.abort("Get list of registered region servers", e); + } + return result; + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java deleted file mode 100644 index 9b0b72ca1ea..00000000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java +++ /dev/null @@ -1,366 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.KeeperException; - -import java.io.Closeable; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedMap; -import java.util.SortedSet; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * This class serves as a helper for all things related to zookeeper in - * replication. - *

- * The layout looks something like this under zookeeper.znode.parent for the - * master cluster: - *

- * - *

- * replication/
- *  state      {contains true or false}
- *  clusterId  {contains a byte}
- *  peers/
- *    1/   {contains a full cluster address}
- *      peer-state  {contains ENABLED or DISABLED}
- *    2/
- *    ...
- *  rs/ {lists all RS that replicate}
- *    startcode1/ {lists all peer clusters}
- *      1/ {lists hlogs to process}
- *        10.10.1.76%3A53488.123456789 {contains nothing or a position}
- *        10.10.1.76%3A53488.123456790
- *        ...
- *      2/
- *      ...
- *    startcode2/
- *    ...
- * 
- */ -@InterfaceAudience.Private -public class ReplicationZookeeper extends ReplicationStateZKBase { - private static final Log LOG = LogFactory.getLog(ReplicationZookeeper.class); - - // Our handle on zookeeper - private final ZooKeeperWatcher zookeeper; - private String peersZNode; - private final Configuration conf; - // Abortable - private Abortable abortable; - private final ReplicationPeers replicationPeers; - private final ReplicationQueues replicationQueues; - - /** - * Constructor used by clients of replication (like master and HBase clients) - * @param conf conf to use - * @param zk zk connection to use - * @throws IOException - */ - public ReplicationZookeeper(final Abortable abortable, final Configuration conf, - final ZooKeeperWatcher zk) throws KeeperException, IOException { - super(zk, conf, abortable); - this.conf = conf; - this.zookeeper = zk; - setZNodes(abortable); - // TODO This interface is no longer used by anyone using this constructor. When this class goes - // away, we will no longer have this null initialization business - this.replicationQueues = null; - this.replicationPeers = new ReplicationPeersZKImpl(this.zookeeper, this.conf, abortable); - this.replicationPeers.init(); - } - - /** - * Constructor used by region servers, connects to the peer cluster right away. - * - * @param server - * @throws IOException - * @throws KeeperException - */ - public ReplicationZookeeper(final Server server) - throws IOException, KeeperException { - super(server.getZooKeeper(), server.getConfiguration(), server); - this.abortable = server; - this.zookeeper = server.getZooKeeper(); - this.conf = server.getConfiguration(); - setZNodes(server); - this.replicationQueues = new ReplicationQueuesZKImpl(this.zookeeper, this.conf, server); - this.replicationQueues.init(server.getServerName().toString()); - this.replicationPeers = new ReplicationPeersZKImpl(this.zookeeper, this.conf, server); - this.replicationPeers.init(); - } - - private void setZNodes(Abortable abortable) throws KeeperException { - String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); - String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers"); - String replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName); - this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName); - } - - /** - * List this cluster's peers' IDs - * @return list of all peers' identifiers - */ - public List listPeersIdsAndWatch() { - return this.replicationPeers.getAllPeerIds(); - } - - /** - * Map of this cluster's peers for display. - * @return A map of peer ids to peer cluster keys - */ - public Map listPeers() { - return this.replicationPeers.getAllPeerClusterKeys(); - } - - /** - * Returns all region servers from given peer - * - * @param peerClusterId (byte) the cluster to interrogate - * @return addresses of all region servers - */ - public List getSlavesAddresses(String peerClusterId) { - return this.replicationPeers.getRegionServersOfConnectedPeer(peerClusterId); - } - - /** - * This method connects this cluster to another one and registers it - * in this region server's replication znode - * @param peerId id of the peer cluster - * @throws KeeperException - */ - public boolean connectToPeer(String peerId) throws IOException, KeeperException { - return this.replicationPeers.connectToPeer(peerId); - } - - /** - * Remove the peer from zookeeper. which will trigger the watchers on every - * region server and close their sources - * @param id - * @throws IllegalArgumentException Thrown when the peer doesn't exist - */ - public void removePeer(String id) throws IOException { - this.replicationPeers.removePeer(id); - } - - /** - * Add a new peer to this cluster - * @param id peer's identifier - * @param clusterKey ZK ensemble's addresses, client port and root znode - * @throws IllegalArgumentException Thrown when the peer doesn't exist - * @throws IllegalStateException Thrown when a peer already exists, since - * multi-slave isn't supported yet. - */ - public void addPeer(String id, String clusterKey) throws IOException { - this.replicationPeers.addPeer(id, clusterKey); - } - - /** - * Enable replication to the peer - * - * @param id peer's identifier - * @throws IllegalArgumentException - * Thrown when the peer doesn't exist - */ - public void enablePeer(String id) throws IOException { - this.replicationPeers.enablePeer(id); - } - - /** - * Disable replication to the peer - * - * @param id peer's identifier - * @throws IllegalArgumentException - * Thrown when the peer doesn't exist - */ - public void disablePeer(String id) throws IOException { - this.replicationPeers.disablePeer(id); - } - - /** - * Check whether the peer is enabled or not. This method checks the atomic - * boolean of ReplicationPeer locally. - * - * @param id peer identifier - * @return true if the peer is enabled, otherwise false - * @throws IllegalArgumentException - * Thrown when the peer doesn't exist - */ - public boolean getPeerEnabled(String id) { - return this.replicationPeers.getStatusOfConnectedPeer(id); - } - - /** - * Add a new log to the list of hlogs in zookeeper - * @param filename name of the hlog's znode - * @param peerId name of the cluster's znode - */ - public void addLogToList(String filename, String peerId) throws KeeperException { - this.replicationQueues.addLog(peerId, filename); - } - - /** - * Remove a log from the list of hlogs in zookeeper - * @param filename name of the hlog's znode - * @param clusterId name of the cluster's znode - */ - public void removeLogFromList(String filename, String clusterId) { - this.replicationQueues.removeLog(clusterId, filename); - } - - /** - * Set the current position of the specified cluster in the current hlog - * @param filename filename name of the hlog's znode - * @param clusterId clusterId name of the cluster's znode - * @param position the position in the file - */ - public void writeReplicationStatus(String filename, String clusterId, long position) { - this.replicationQueues.setLogPosition(clusterId, filename, position); - } - - /** - * Get a list of all the other region servers in this cluster - * and set a watch - * @return a list of server nanes - */ - public List getRegisteredRegionServers() { - List result = null; - try { - result = ZKUtil.listChildrenAndWatchThem( - this.zookeeper, this.zookeeper.rsZNode); - } catch (KeeperException e) { - this.abortable.abort("Get list of registered region servers", e); - } - return result; - } - - - /** - * Take ownership for the set of queues belonging to a dead region server. - * @param regionserver the id of the dead region server - * @return A SortedMap of the queues that have been claimed, including a SortedSet of HLogs in - * each queue. - */ - public SortedMap> claimQueues(String regionserver) { - return this.replicationQueues.claimQueues(regionserver); - } - - /** - * Delete a complete queue of hlogs - * @param peerZnode znode of the peer cluster queue of hlogs to delete - */ - public void deleteSource(String peerZnode, boolean closeConnection) { - this.replicationQueues.removeQueue(peerZnode); - if (closeConnection) { - this.replicationPeers.disconnectFromPeer(peerZnode); - } - } - - /** - * Delete this cluster's queues - */ - public void deleteOwnRSZNode() { - this.replicationQueues.removeAllQueues(); - } - - /** - * Get the position of the specified hlog in the specified peer znode - * @param peerId znode of the peer cluster - * @param hlog name of the hlog - * @return the position in that hlog - * @throws KeeperException - */ - public long getHLogRepPosition(String peerId, String hlog) throws KeeperException { - return this.replicationQueues.getLogPosition(peerId, hlog); - } - - /** - * Returns the UUID of the provided peer id. Should a connection loss or session - * expiration happen, the ZK handler will be reopened once and if it still doesn't - * work then it will bail and return null. - * @param peerId the peer's ID that will be converted into a UUID - * @return a UUID or null if there's a ZK connection issue - */ - public UUID getPeerUUID(String peerId) { - return this.replicationPeers.getPeerUUID(peerId); - } - - public void registerRegionServerListener(ZooKeeperListener listener) { - this.zookeeper.registerListener(listener); - } - - /** - * Get a map of all peer clusters - * @return map of peer cluster keyed by id - */ - public Set getPeerClusters() { - return this.replicationPeers.getConnectedPeers(); - } - - /** - * Determine if a ZK path points to a peer node. - * @param path path to be checked - * @return true if the path points to a peer node, otherwise false - */ - public boolean isPeerPath(String path) { - return path.split("/").length == peersZNode.split("/").length + 1; - } - - /** - * Extracts the znode name of a peer cluster from a ZK path - * @param fullPath Path to extract the id from - * @return the id or an empty string if path is invalid - */ - public static String getZNodeName(String fullPath) { - String[] parts = fullPath.split("/"); - return parts.length > 0 ? parts[parts.length-1] : ""; - } - - /** - * Get this cluster's zk connection - * @return zk connection - */ - public ZooKeeperWatcher getZookeeperWatcher() { - return this.zookeeper; - } - - - /** - * Get the full path to the peers' znode - * @return path to peers in zk - */ - public String getPeersZNode() { - return peersZNode; - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java index 20dc80f58a0..3a5c3be37e4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java @@ -36,8 +36,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; @@ -45,8 +43,6 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.replication.ReplicationPeersZKImpl; -import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index 43945832991..2bf5fdace15 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -36,10 +36,10 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; +import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationPeersZKImpl; -import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.mapreduce.Job; @@ -118,7 +118,8 @@ public class VerifyReplication { @Override public void abort(String why, Throwable e) {} @Override public boolean isAborted() {return false;} }); - ReplicationPeers rp = new ReplicationPeersZKImpl(localZKW, conf, localZKW); + ReplicationPeers rp = + ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW); rp.init(); Configuration peerConf = rp.getPeerConf(peerId); if (peerConf == null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java index 9d404a00077..86669faa2b5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; +import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -122,7 +123,8 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo super.setConf(conf); try { this.zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null); - this.replicationQueues = new ReplicationQueuesClientZKImpl(zkw, conf, this); + this.replicationQueues = ReplicationFactory.getReplicationQueuesClient(zkw, conf, this); + this.replicationQueues.init(); } catch (KeeperException e) { LOG.error("Error while configuring " + this.getClass().getName(), e); } catch (IOException e) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 20ae5635247..549ab17285c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -22,10 +22,10 @@ import java.io.IOException; import java.util.List; import java.util.NavigableMap; import java.util.TreeMap; +import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.logging.Log; @@ -45,11 +45,13 @@ import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueues; -import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl; -import org.apache.hadoop.hbase.replication.ReplicationZookeeper; +import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.zookeeper.KeeperException; import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS; @@ -66,8 +68,9 @@ public class Replication implements WALActionsListener, LogFactory.getLog(Replication.class); private boolean replication; private ReplicationSourceManager replicationManager; - private ReplicationZookeeper zkHelper; private ReplicationQueues replicationQueues; + private ReplicationPeers replicationPeers; + private ReplicationTracker replicationTracker; private Configuration conf; private ReplicationSink replicationSink; // Hosting server @@ -107,23 +110,35 @@ public class Replication implements WALActionsListener, .build()); if (replication) { try { - this.zkHelper = new ReplicationZookeeper(server); this.replicationQueues = - new ReplicationQueuesZKImpl(server.getZooKeeper(), this.conf, this.server); + ReplicationFactory.getReplicationQueues(server.getZooKeeper(), this.conf, this.server); this.replicationQueues.init(this.server.getServerName().toString()); + this.replicationPeers = + ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server); + this.replicationPeers.init(); + this.replicationTracker = + ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers, + this.conf, this.server, this.server); } catch (KeeperException ke) { throw new IOException("Failed replication handler create", ke); } + UUID clusterId = null; + try { + clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper()); + } catch (KeeperException ke) { + throw new IOException("Could not read cluster id", ke); + } this.replicationManager = - new ReplicationSourceManager(zkHelper, replicationQueues, conf, this.server, fs, logDir, - oldLogDir); + new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker, + conf, this.server, fs, logDir, oldLogDir, clusterId); this.statsThreadPeriod = this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod); } else { this.replicationManager = null; - this.zkHelper = null; this.replicationQueues = null; + this.replicationPeers = null; + this.replicationTracker = null; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 94560821af1..76667909e5b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -56,11 +56,11 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; -import org.apache.hadoop.hbase.replication.ReplicationZookeeper; +import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.ipc.RemoteException; import org.apache.zookeeper.KeeperException; @@ -86,8 +86,8 @@ public class ReplicationSource extends Thread // container of entries to replicate private HLog.Entry[] entriesArray; private HConnection conn; - // Helper class for zookeeper - private ReplicationZookeeper zkHelper; + private ReplicationQueues replicationQueues; + private ReplicationPeers replicationPeers; private Configuration conf; // ratio of region servers to chose from a slave cluster private float ratio; @@ -151,12 +151,10 @@ public class ReplicationSource extends Thread * @param peerClusterZnode the name of our znode * @throws IOException */ - public void init(final Configuration conf, - final FileSystem fs, - final ReplicationSourceManager manager, - final Stoppable stopper, - final String peerClusterZnode) - throws IOException { + public void init(final Configuration conf, final FileSystem fs, + final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, + final ReplicationPeers replicationPeers, final Stoppable stopper, + final String peerClusterZnode, final UUID clusterId) throws IOException { this.stopper = stopper; this.conf = conf; this.replicationQueueSizeCapacity = @@ -178,7 +176,8 @@ public class ReplicationSource extends Thread // replication and make replication specific settings such as compression or codec to use // passing Cells. this.conn = HConnectionManager.getConnection(conf); - this.zkHelper = manager.getRepZkWrapper(); + this.replicationQueues = replicationQueues; + this.replicationPeers = replicationPeers; this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f); this.currentPeers = new ArrayList(); this.random = new Random(); @@ -188,11 +187,8 @@ public class ReplicationSource extends Thread this.fs = fs; this.metrics = new MetricsSource(peerClusterZnode); this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf); - try { - this.clusterId = ZKClusterId.getUUIDForCluster(zkHelper.getZookeeperWatcher()); - } catch (KeeperException ke) { - throw new IOException("Could not read cluster id", ke); - } + this.clusterId = clusterId; + this.peerClusterZnode = peerClusterZnode; this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode); // ReplicationQueueInfo parses the peerId out of the znode for us @@ -204,7 +200,7 @@ public class ReplicationSource extends Thread */ private void chooseSinks() { this.currentPeers.clear(); - List addresses = this.zkHelper.getSlavesAddresses(this.peerId); + List addresses = this.replicationPeers.getRegionServersOfConnectedPeer(this.peerId); Set setOfAddr = new HashSet(); int nbPeers = (int) (Math.ceil(addresses.size() * ratio)); LOG.debug("Getting " + nbPeers + @@ -238,7 +234,7 @@ public class ReplicationSource extends Thread int sleepMultiplier = 1; // delay this until we are in an asynchronous thread while (this.peerClusterId == null) { - this.peerClusterId = zkHelper.getPeerUUID(this.peerId); + this.peerClusterId = replicationPeers.getPeerUUID(this.peerId); if (this.peerClusterId == null) { if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) { sleepMultiplier++; @@ -254,8 +250,8 @@ public class ReplicationSource extends Thread // normally has a position (unless the RS failed between 2 logs) if (this.replicationQueueInfo.isQueueRecovered()) { try { - this.repLogReader.setPosition(this.zkHelper.getHLogRepPosition( - this.peerClusterZnode, this.queue.peek().getName())); + this.repLogReader.setPosition(this.replicationQueues.getLogPosition(this.peerClusterZnode, + this.queue.peek().getName())); if (LOG.isTraceEnabled()) { LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position " + this.repLogReader.getPosition()); @@ -736,7 +732,7 @@ public class ReplicationSource extends Thread * @return true if the peer is enabled, otherwise false */ protected boolean isPeerEnabled() { - return this.zkHelper.getPeerEnabled(this.peerId); + return this.replicationPeers.getStatusOfConnectedPeer(this.peerId); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 8a5c08f10fa..5ff9707fe4a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -19,12 +19,15 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; +import java.util.UUID; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.replication.ReplicationPeers; +import org.apache.hadoop.hbase.replication.ReplicationQueues; /** * Interface that defines a replication source @@ -41,13 +44,10 @@ public interface ReplicationSourceInterface { * @param peerClusterId the id of the peer cluster * @throws IOException */ - void init( - final Configuration conf, - final FileSystem fs, - final ReplicationSourceManager manager, - final Stoppable stopper, - final String peerClusterId - ) throws IOException; + public void init(final Configuration conf, final FileSystem fs, + final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, + final ReplicationPeers replicationPeers, final Stoppable stopper, + final String peerClusterZnode, final UUID clusterId) throws IOException; /** * Add a log to the list of logs to replicate diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index f33e3a8d59f..e3678119bbe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -29,11 +29,11 @@ import java.util.Random; import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeSet; +import java.util.UUID; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,10 +42,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.replication.ReplicationListener; +import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueues; -import org.apache.hadoop.hbase.replication.ReplicationZookeeper; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.zookeeper.KeeperException; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -60,18 +60,23 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; * When a region server dies, this class uses a watcher to get notified and it * tries to grab a lock in order to transfer all the queues in a local * old source. + * + * This class implements the ReplicationListener interface so that it can track changes in + * replication state. */ @InterfaceAudience.Private -public class ReplicationSourceManager { +public class ReplicationSourceManager implements ReplicationListener { private static final Log LOG = LogFactory.getLog(ReplicationSourceManager.class); // List of all the sources that read this RS's logs private final List sources; // List of all the sources we got from died RSs private final List oldsources; - // Helper for zookeeper - private final ReplicationZookeeper zkHelper; private final ReplicationQueues replicationQueues; + private final ReplicationTracker replicationTracker; + private final ReplicationPeers replicationPeers; + // UUID for this cluster + private final UUID clusterId; // All about stopping private final Stoppable stopper; // All logs we are currently tracking @@ -80,8 +85,6 @@ public class ReplicationSourceManager { private final FileSystem fs; // The path to the latest log we saw, for new coming sources private Path latestPath; - // List of all the other region servers in this cluster - private final List otherRegionServers = new ArrayList(); // Path to the hlogs directories private final Path logDir; // Path to the hlog archive @@ -104,12 +107,14 @@ public class ReplicationSourceManager { * @param logDir the directory that contains all hlog directories of live RSs * @param oldLogDir the directory where old logs are archived */ - public ReplicationSourceManager(final ReplicationZookeeper zkHelper, - final ReplicationQueues replicationQueues, final Configuration conf, final Stoppable stopper, - final FileSystem fs, final Path logDir, final Path oldLogDir) { + public ReplicationSourceManager(final ReplicationQueues replicationQueues, + final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker, + final Configuration conf, final Stoppable stopper, final FileSystem fs, final Path logDir, + final Path oldLogDir, final UUID clusterId) { this.sources = new ArrayList(); - this.zkHelper = zkHelper; this.replicationQueues = replicationQueues; + this.replicationPeers = replicationPeers; + this.replicationTracker = replicationTracker; this.stopper = stopper; this.hlogsById = new HashMap>(); this.oldsources = new ArrayList(); @@ -118,11 +123,9 @@ public class ReplicationSourceManager { this.logDir = logDir; this.oldLogDir = oldLogDir; this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 2000); - this.zkHelper.registerRegionServerListener( - new OtherRegionServerWatcher(this.zkHelper.getZookeeperWatcher())); - this.zkHelper.registerRegionServerListener( - new PeersWatcher(this.zkHelper.getZookeeperWatcher())); - this.zkHelper.listPeersIdsAndWatch(); + this.clusterId = clusterId; + this.replicationTracker.registerListener(this); + this.replicationPeers.getAllPeerIds(); // It's preferable to failover 1 RS at a time, but with good zk servers // more could be processed at the same time. int nbWorkers = conf.getInt("replication.executor.workers", 1); @@ -150,12 +153,12 @@ public class ReplicationSourceManager { */ public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered, boolean holdLogInZK) { - String key = log.getName(); - this.zkHelper.writeReplicationStatus(key, id, position); + String fileName = log.getName(); + this.replicationQueues.setLogPosition(id, fileName, position); if (holdLogInZK) { return; } - cleanOldLogs(key, id, queueRecovered); + cleanOldLogs(fileName, id, queueRecovered); } /** @@ -175,7 +178,7 @@ public class ReplicationSourceManager { } SortedSet hlogSet = hlogs.headSet(key); for (String hlog : hlogSet) { - this.zkHelper.removeLogFromList(hlog, id); + this.replicationQueues.removeLog(id, hlog); } hlogSet.clear(); } @@ -186,24 +189,21 @@ public class ReplicationSourceManager { * old region server hlog queues */ public void init() throws IOException { - for (String id : this.zkHelper.getPeerClusters()) { + for (String id : this.replicationPeers.getConnectedPeers()) { addSource(id); } List currentReplicators = this.replicationQueues.getListOfReplicators(); if (currentReplicators == null || currentReplicators.size() == 0) { return; } - synchronized (otherRegionServers) { - refreshOtherRegionServersList(); - LOG.info("Current list of replicators: " + currentReplicators - + " other RSs: " + otherRegionServers); - } + List otherRegionServers = replicationTracker.getListOfRegionServers(); + LOG.info("Current list of replicators: " + currentReplicators + " other RSs: " + + otherRegionServers); + // Look if there's anything to process after a restart for (String rs : currentReplicators) { - synchronized (otherRegionServers) { - if (!this.otherRegionServers.contains(rs)) { - transferQueues(rs); - } + if (!otherRegionServers.contains(rs)) { + transferQueues(rs); } } } @@ -215,7 +215,9 @@ public class ReplicationSourceManager { * @throws IOException */ public ReplicationSourceInterface addSource(String id) throws IOException { - ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this, stopper, id); + ReplicationSourceInterface src = + getReplicationSource(this.conf, this.fs, this, this.replicationQueues, + this.replicationPeers, stopper, id, this.clusterId); synchronized (this.hlogsById) { this.sources.add(src); this.hlogsById.put(id, new TreeSet()); @@ -224,7 +226,7 @@ public class ReplicationSourceManager { String name = this.latestPath.getName(); this.hlogsById.get(id).add(name); try { - this.zkHelper.addLogToList(name, src.getPeerClusterZnode()); + this.replicationQueues.addLog(src.getPeerClusterZnode(), name); } catch (KeeperException ke) { String message = "Cannot add log to zk for" + " replication when creating a new source"; @@ -238,13 +240,24 @@ public class ReplicationSourceManager { return src; } + /** + * Delete a complete queue of hlogs associated with a peer cluster + * @param peerId Id of the peer cluster queue of hlogs to delete + */ + public void deleteSource(String peerId, boolean closeConnection) { + this.replicationQueues.removeQueue(peerId); + if (closeConnection) { + this.replicationPeers.disconnectFromPeer(peerId); + } + } + /** * Terminate the replication on this region server */ public void join() { this.executor.shutdown(); if (this.sources.size() == 0) { - this.zkHelper.deleteOwnRSZNode(); + this.replicationQueues.removeAllQueues(); } for (ReplicationSourceInterface source : this.sources) { source.terminate("Region server is closing"); @@ -273,7 +286,7 @@ public class ReplicationSourceManager { String name = newLog.getName(); for (ReplicationSourceInterface source : this.sources) { try { - this.zkHelper.addLogToList(name, source.getPeerClusterZnode()); + this.replicationQueues.addLog(source.getPeerClusterZnode(), name); } catch (KeeperException ke) { throw new IOException("Cannot add log to zk for replication", ke); } @@ -298,14 +311,6 @@ public class ReplicationSourceManager { } } - /** - * Get the ZK help of this manager - * @return the helper - */ - public ReplicationZookeeper getRepZkWrapper() { - return zkHelper; - } - /** * Factory method to create a replication source * @param conf the configuration to use @@ -316,12 +321,10 @@ public class ReplicationSourceManager { * @return the created source * @throws IOException */ - public ReplicationSourceInterface getReplicationSource( - final Configuration conf, - final FileSystem fs, - final ReplicationSourceManager manager, - final Stoppable stopper, - final String peerId) throws IOException { + public ReplicationSourceInterface getReplicationSource(final Configuration conf, + final FileSystem fs, final ReplicationSourceManager manager, + final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, + final Stoppable stopper, final String peerId, final UUID clusterId) throws IOException { ReplicationSourceInterface src; try { @SuppressWarnings("rawtypes") @@ -334,7 +337,7 @@ public class ReplicationSourceManager { src = new ReplicationSource(); } - src.init(conf, fs, manager, stopper, peerId); + src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId, clusterId); return src; } @@ -347,7 +350,9 @@ public class ReplicationSourceManager { * @param rsZnode */ private void transferQueues(String rsZnode) { - NodeFailoverWorker transfer = new NodeFailoverWorker(rsZnode); + NodeFailoverWorker transfer = + new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers, + this.clusterId); try { this.executor.execute(transfer); } catch (RejectedExecutionException ex) { @@ -362,7 +367,7 @@ public class ReplicationSourceManager { public void closeRecoveredQueue(ReplicationSourceInterface src) { LOG.info("Done with the recovered queue " + src.getPeerClusterZnode()); this.oldsources.remove(src); - this.zkHelper.deleteSource(src.getPeerClusterZnode(), false); + deleteSource(src.getPeerClusterZnode(), false); } /** @@ -403,151 +408,34 @@ public class ReplicationSourceManager { } srcToRemove.terminate(terminateMessage); this.sources.remove(srcToRemove); - this.zkHelper.deleteSource(id, true); + deleteSource(id, true); } - /** - * Reads the list of region servers from ZK and atomically clears our - * local view of it and replaces it with the updated list. - * - * @return true if the local list of the other region servers was updated - * with the ZK data (even if it was empty), - * false if the data was missing in ZK - */ - private boolean refreshOtherRegionServersList() { - List newRsList = zkHelper.getRegisteredRegionServers(); - if (newRsList == null) { - return false; - } else { - synchronized (otherRegionServers) { - otherRegionServers.clear(); - otherRegionServers.addAll(newRsList); - } - } - return true; + @Override + public void regionServerRemoved(String regionserver) { + transferQueues(regionserver); } - /** - * Watcher used to be notified of the other region server's death - * in the local cluster. It initiates the process to transfer the queues - * if it is able to grab the lock. - */ - public class OtherRegionServerWatcher extends ZooKeeperListener { - - /** - * Construct a ZooKeeper event listener. - */ - public OtherRegionServerWatcher(ZooKeeperWatcher watcher) { - super(watcher); - } - - /** - * Called when a new node has been created. - * @param path full path of the new node - */ - public void nodeCreated(String path) { - refreshListIfRightPath(path); - } - - /** - * Called when a node has been deleted - * @param path full path of the deleted node - */ - public void nodeDeleted(String path) { - if (stopper.isStopped()) { - return; - } - boolean cont = refreshListIfRightPath(path); - if (!cont) { - return; - } - LOG.info(path + " znode expired, trying to lock it"); - transferQueues(ReplicationZookeeper.getZNodeName(path)); - } - - /** - * Called when an existing node has a child node added or removed. - * @param path full path of the node whose children have changed - */ - public void nodeChildrenChanged(String path) { - if (stopper.isStopped()) { - return; - } - refreshListIfRightPath(path); - } - - private boolean refreshListIfRightPath(String path) { - if (!path.startsWith(zkHelper.getZookeeperWatcher().rsZNode)) { - return false; - } - return refreshOtherRegionServersList(); - } + @Override + public void peerRemoved(String peerId) { + removePeer(peerId); } - /** - * Watcher used to follow the creation and deletion of peer clusters. - */ - public class PeersWatcher extends ZooKeeperListener { - - /** - * Construct a ZooKeeper event listener. - */ - public PeersWatcher(ZooKeeperWatcher watcher) { - super(watcher); - } - - /** - * Called when a node has been deleted - * @param path full path of the deleted node - */ - public void nodeDeleted(String path) { - List peers = refreshPeersList(path); - if (peers == null) { - return; - } - if (zkHelper.isPeerPath(path)) { - String id = ReplicationZookeeper.getZNodeName(path); - removePeer(id); - } - } - - /** - * Called when an existing node has a child node added or removed. - * @param path full path of the node whose children have changed - */ - public void nodeChildrenChanged(String path) { - List peers = refreshPeersList(path); - if (peers == null) { - return; - } - for (String id : peers) { - try { - boolean added = zkHelper.connectToPeer(id); - if (added) { - addSource(id); - } - } catch (IOException e) { - // TODO manage better than that ? - LOG.error("Error while adding a new peer", e); - } catch (KeeperException e) { - LOG.error("Error while adding a new peer", e); + @Override + public void peerListChanged(List peerIds) { + for (String id : peerIds) { + try { + boolean added = this.replicationPeers.connectToPeer(id); + if (added) { + addSource(id); } + } catch (IOException e) { + // TODO manage better than that ? + LOG.error("Error while adding a new peer", e); + } catch (KeeperException e) { + LOG.error("Error while adding a new peer", e); } } - - /** - * Verify if this event is meant for us, and if so then get the latest - * peers' list from ZK. Also reset the watches. - * @param path path to check against - * @return A list of peers' identifiers if the event concerns this watcher, - * else null. - */ - private List refreshPeersList(String path) { - if (!path.startsWith(zkHelper.getPeersZNode())) { - return null; - } - return zkHelper.listPeersIdsAndWatch(); - } } /** @@ -557,14 +445,21 @@ public class ReplicationSourceManager { class NodeFailoverWorker extends Thread { private String rsZnode; + private final ReplicationQueues rq; + private final ReplicationPeers rp; + private final UUID clusterId; /** * * @param rsZnode */ - public NodeFailoverWorker(String rsZnode) { + public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues, + final ReplicationPeers replicationPeers, final UUID clusterId) { super("Failover-for-"+rsZnode); this.rsZnode = rsZnode; + this.rq = replicationQueues; + this.rp = replicationPeers; + this.clusterId = clusterId; } @Override @@ -584,7 +479,7 @@ public class ReplicationSourceManager { } SortedMap> newQueues = null; - newQueues = zkHelper.claimQueues(rsZnode); + newQueues = this.rq.claimQueues(rsZnode); // Copying over the failed queue is completed. if (newQueues.isEmpty()) { @@ -597,8 +492,9 @@ public class ReplicationSourceManager { String peerId = entry.getKey(); try { ReplicationSourceInterface src = - getReplicationSource(conf, fs, ReplicationSourceManager.this, stopper, peerId); - if (!zkHelper.getPeerClusters().contains((src.getPeerClusterId()))) { + getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp, + stopper, peerId, this.clusterId); + if (!this.rp.getConnectedPeers().contains((src.getPeerClusterId()))) { src.terminate("Recovered queue doesn't belong to any current peer"); break; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index 06843df555e..fcfc908f815 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals; import java.io.IOException; import java.net.URLEncoder; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -30,7 +29,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.master.cleaner.LogCleaner; -import org.apache.hadoop.hbase.replication.ReplicationZookeeper; +import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.AfterClass; @@ -68,8 +68,9 @@ public class TestLogsCleaner { conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); Replication.decorateMasterConfiguration(conf); Server server = new DummyServer(); - ReplicationZookeeper zkHelper = new ReplicationZookeeper(server); - + ReplicationQueues repQueues = + ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server); + repQueues.init(server.getServerName().toString()); Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME); String fakeMachineName = @@ -98,7 +99,7 @@ public class TestLogsCleaner { // (TimeToLiveLogCleaner) but would be rejected by the second // (ReplicationLogCleaner) if (i % (30/3) == 1) { - zkHelper.addLogToList(fileName.getName(), fakeMachineName); + repQueues.addLog(fakeMachineName, fileName.getName()); System.out.println("Replication log file: " + fileName); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index 743a1762a34..d0868d6ce79 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; +import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; @@ -39,7 +40,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, - Stoppable stopper, String peerClusterId) throws IOException { + ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId, + UUID clusterId) throws IOException { + this.manager = manager; this.peerClusterId = peerClusterId; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index 97bcde41643..d79cc6051a1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -69,6 +69,7 @@ public abstract class TestReplicationStateBasic { @Test public void testReplicationQueuesClient() throws KeeperException { + rqc.init(); // Test methods with empty state assertEquals(0, rqc.getListOfReplicators().size()); assertNull(rqc.getLogsInQueue(server1, "qId1")); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java index 077a504de9c..8b535695c8a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java @@ -36,7 +36,10 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; import org.junit.After; import org.junit.AfterClass; +import org.junit.Test; + import static org.junit.Assert.*; + import org.junit.Before; import org.junit.BeforeClass; import org.junit.experimental.categories.Category; @@ -50,6 +53,7 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic { private static HBaseTestingUtility utility; private static ZooKeeperWatcher zkw; private static String replicationZNode; + private ReplicationQueuesZKImpl rqZK; @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -65,7 +69,7 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic { private static String initPeerClusterState(String baseZKNode) throws IOException, KeeperException { - // Set up state nodes of peer clusters + // Add a dummy region server and set up the cluster id Configuration testConf = new Configuration(conf); testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode); ZooKeeperWatcher zkw1 = new ZooKeeperWatcher(testConf, "test1", null); @@ -82,16 +86,13 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic { DummyServer ds1 = new DummyServer(server1); DummyServer ds2 = new DummyServer(server2); DummyServer ds3 = new DummyServer(server3); - try { - rq1 = new ReplicationQueuesZKImpl(zkw, conf, ds1); - rq2 = new ReplicationQueuesZKImpl(zkw, conf, ds2); - rq3 = new ReplicationQueuesZKImpl(zkw, conf, ds3); - rqc = new ReplicationQueuesClientZKImpl(zkw, conf, ds1); - rp = new ReplicationPeersZKImpl(zkw, conf, zkw); - OUR_KEY = ZKUtil.getZooKeeperClusterKey(conf); - } catch (KeeperException e) { - fail("Exception thrown: " + e); - } + rq1 = ReplicationFactory.getReplicationQueues(zkw, conf, ds1); + rq2 = ReplicationFactory.getReplicationQueues(zkw, conf, ds2); + rq3 = ReplicationFactory.getReplicationQueues(zkw, conf, ds3); + rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, ds1); + rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw); + OUR_KEY = ZKUtil.getZooKeeperClusterKey(conf); + rqZK = new ReplicationQueuesZKImpl(zkw, conf, ds1); } @After @@ -104,6 +105,23 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic { utility.shutdownMiniZKCluster(); } + @Test + public void testIsPeerPath_PathToParentOfPeerNode() { + assertFalse(rqZK.isPeerPath(rqZK.peersZNode)); + } + + @Test + public void testIsPeerPath_PathToChildOfPeerNode() { + String peerChild = ZKUtil.joinZNode(ZKUtil.joinZNode(rqZK.peersZNode, "1"), "child"); + assertFalse(rqZK.isPeerPath(peerChild)); + } + + @Test + public void testIsPeerPath_ActualPeerPath() { + String peerPath = ZKUtil.joinZNode(rqZK.peersZNode, "1"); + assertTrue(rqZK.isPeerPath(peerPath)); + } + static class DummyServer implements Server { private String serverName; private boolean isAborted = false; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java new file mode 100644 index 00000000000..e3d9a126778 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java @@ -0,0 +1,251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterId; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.catalog.CatalogTracker; +import org.apache.hadoop.hbase.zookeeper.ZKClusterId; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.AfterClass; +import org.junit.Test; + +import static org.junit.Assert.*; + +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +/** + * This class tests the ReplicationTrackerZKImpl class and ReplicationListener interface. One + * MiniZKCluster is used throughout the entire class. The cluster is initialized with the creation + * of the rsZNode. All other znode creation/initialization is handled by the replication state + * interfaces (i.e. ReplicationPeers, etc.). Each test case in this class should ensure that the + * MiniZKCluster is cleaned and returned to it's initial state (i.e. nothing but the rsZNode). + */ +@Category(MediumTests.class) +public class TestReplicationTrackerZKImpl { + + private static final Log LOG = LogFactory.getLog(TestReplicationTrackerZKImpl.class); + + private static Configuration conf; + private static HBaseTestingUtility utility; + + // Each one of the below variables are reinitialized before every test case + private ZooKeeperWatcher zkw; + private ReplicationPeers rp; + private ReplicationTracker rt; + private AtomicInteger rsRemovedCount; + private String rsRemovedData; + private AtomicInteger plChangedCount; + private List plChangedData; + private AtomicInteger peerRemovedCount; + private String peerRemovedData; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + utility = new HBaseTestingUtility(); + utility.startMiniZKCluster(); + conf = utility.getConfiguration(); + ZooKeeperWatcher zk = HBaseTestingUtility.getZooKeeperWatcher(utility); + ZKUtil.createWithParents(zk, zk.rsZNode); + } + + @Before + public void setUp() throws Exception { + zkw = HBaseTestingUtility.getZooKeeperWatcher(utility); + String fakeRs1 = ZKUtil.joinZNode(zkw.rsZNode, "hostname1.example.org:1234"); + try { + ZKClusterId.setClusterId(zkw, new ClusterId()); + rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw); + rp.init(); + rt = ReplicationFactory.getReplicationTracker(zkw, rp, conf, zkw, new DummyServer(fakeRs1)); + } catch (Exception e) { + fail("Exception during test setup: " + e); + } + rsRemovedCount = new AtomicInteger(0); + rsRemovedData = ""; + plChangedCount = new AtomicInteger(0); + plChangedData = new ArrayList(); + peerRemovedCount = new AtomicInteger(0); + peerRemovedData = ""; + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + utility.shutdownMiniZKCluster(); + } + + @Test + public void testGetListOfRegionServers() throws Exception { + // 0 region servers + assertEquals(0, rt.getListOfRegionServers().size()); + + // 1 region server + ZKUtil.createWithParents(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname1.example.org:1234")); + assertEquals(1, rt.getListOfRegionServers().size()); + + // 2 region servers + ZKUtil.createWithParents(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname2.example.org:1234")); + assertEquals(2, rt.getListOfRegionServers().size()); + + // 1 region server + ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname2.example.org:1234")); + assertEquals(1, rt.getListOfRegionServers().size()); + + // 0 region server + ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname1.example.org:1234")); + assertEquals(0, rt.getListOfRegionServers().size()); + } + + @Test(timeout = 30000) + public void testRegionServerRemovedEvent() throws Exception { + ZKUtil.createAndWatch(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname2.example.org:1234"), + HConstants.EMPTY_BYTE_ARRAY); + rt.registerListener(new DummyReplicationListener()); + // delete one + ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname2.example.org:1234")); + // wait for event + while (rsRemovedCount.get() < 1) { + Thread.sleep(5); + } + assertEquals("hostname2.example.org:1234", rsRemovedData); + } + + @Test(timeout = 30000) + public void testPeerRemovedEvent() throws Exception { + rp.addPeer("5", utility.getClusterKey()); + rt.registerListener(new DummyReplicationListener()); + rp.removePeer("5"); + // wait for event + while (peerRemovedCount.get() < 1) { + Thread.sleep(5); + } + assertEquals("5", peerRemovedData); + } + + @Test(timeout = 30000) + public void testPeerListChangedEvent() throws Exception { + // add a peer + rp.addPeer("5", utility.getClusterKey()); + zkw.getRecoverableZooKeeper().getZooKeeper().getChildren("/hbase/replication/peers/5", true); + rt.registerListener(new DummyReplicationListener()); + rp.disablePeer("5"); + ZKUtil.deleteNode(zkw, "/hbase/replication/peers/5/peer-state"); + // wait for event + int tmp = plChangedCount.get(); + while (plChangedCount.get() <= tmp) { + Thread.sleep(5); + } + assertEquals(1, plChangedData.size()); + assertTrue(plChangedData.contains("5")); + + // clean up + ZKUtil.deleteNode(zkw, "/hbase/replication/peers/5"); + } + + private class DummyReplicationListener implements ReplicationListener { + + @Override + public void regionServerRemoved(String regionServer) { + rsRemovedData = regionServer; + rsRemovedCount.getAndIncrement(); + LOG.debug("Received regionServerRemoved event: " + regionServer); + } + + @Override + public void peerRemoved(String peerId) { + peerRemovedData = peerId; + peerRemovedCount.getAndIncrement(); + LOG.debug("Received peerRemoved event: " + peerId); + } + + @Override + public void peerListChanged(List peerIds) { + plChangedData.clear(); + plChangedData.addAll(peerIds); + plChangedCount.getAndIncrement(); + LOG.debug("Received peerListChanged event"); + } + } + + private class DummyServer implements Server { + private String serverName; + private boolean isAborted = false; + private boolean isStopped = false; + + public DummyServer(String serverName) { + this.serverName = serverName; + } + + @Override + public Configuration getConfiguration() { + return conf; + } + + @Override + public ZooKeeperWatcher getZooKeeper() { + return zkw; + } + + @Override + public CatalogTracker getCatalogTracker() { + return null; + } + + @Override + public ServerName getServerName() { + return new ServerName(this.serverName); + } + + @Override + public void abort(String why, Throwable e) { + LOG.info("Aborting " + serverName); + this.isAborted = true; + } + + @Override + public boolean isAborted() { + return this.isAborted; + } + + @Override + public void stop(String why) { + this.isStopped = true; + } + + @Override + public boolean isStopped() { + return this.isStopped; + } + } +} + diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationZookeeper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationZookeeper.java deleted file mode 100644 index de2515c969e..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationZookeeper.java +++ /dev/null @@ -1,145 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.replication; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.MediumTests; -import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.catalog.CatalogTracker; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.KeeperException; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category(MediumTests.class) -public class TestReplicationZookeeper { - - private static Configuration conf; - - private static HBaseTestingUtility utility; - - private static ZooKeeperWatcher zkw; - - private static ReplicationZookeeper repZk; - - private static String slaveClusterKey; - - private static String peersZNode; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - utility = new HBaseTestingUtility(); - utility.startMiniZKCluster(); - conf = utility.getConfiguration(); - zkw = HBaseTestingUtility.getZooKeeperWatcher(utility); - DummyServer server = new DummyServer(); - repZk = new ReplicationZookeeper(server); - slaveClusterKey = conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + - conf.get("hbase.zookeeper.property.clientPort") + ":/1"; - String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); - String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers"); - String replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName); - peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - utility.shutdownMiniZKCluster(); - } - - @Test - public void testGetAddressesMissingSlave() - throws IOException, KeeperException { - repZk.addPeer("1", slaveClusterKey); - // HBASE-5586 used to get an NPE - assertEquals(0, repZk.getSlavesAddresses("1").size()); - } - - @Test - public void testIsPeerPath_PathToParentOfPeerNode() { - String peerParentNode = peersZNode; - assertFalse(repZk.isPeerPath(peerParentNode)); - } - - @Test - public void testIsPeerPath_PathToChildOfPeerNode() { - String peerChild = ZKUtil.joinZNode(ZKUtil.joinZNode(peersZNode, "1"), "child"); - assertFalse(repZk.isPeerPath(peerChild)); - } - - @Test - public void testIsPeerPath_ActualPeerPath() { - String peerPath = ZKUtil.joinZNode(peersZNode, "1"); - assertTrue(repZk.isPeerPath(peerPath)); - } - - static class DummyServer implements Server { - - @Override - public Configuration getConfiguration() { - return conf; - } - - @Override - public ZooKeeperWatcher getZooKeeper() { - return zkw; - } - - @Override - public CatalogTracker getCatalogTracker() { - return null; - } - - @Override - public ServerName getServerName() { - return new ServerName("hostname.example.org", 1234, -1L); - } - - @Override - public void abort(String why, Throwable e) { - } - - @Override - public boolean isAborted() { - return false; - } - - @Override - public void stop(String why) { - } - - @Override - public boolean isStopped() { - return false; - } - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 1189162b96b..f5c3c990129 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ClusterId; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -50,11 +51,13 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; +import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; -import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.After; @@ -128,6 +131,8 @@ public class TestReplicationSourceManager { ZKUtil.createWithParents(zkw, "/hbase/replication/state"); ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES); + ZKClusterId.setClusterId(zkw, new ClusterId()); + replication = new Replication(new DummyServer(), fs, logDir, oldLogDir); manager = replication.getReplicationManager(); fs = FileSystem.get(conf); @@ -226,12 +231,15 @@ public class TestReplicationSourceManager { LOG.debug("testNodeFailoverWorkerCopyQueuesFromRSUsingMulti"); conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); final Server server = new DummyServer("hostname0.example.org"); - ReplicationZookeeper rz = new ReplicationZookeeper(server); + ReplicationQueues rq = + ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), + server); + rq.init(server.getServerName().toString()); // populate some znodes in the peer znode files.add("log1"); files.add("log2"); for (String file : files) { - rz.addLogToList(file, "1"); + rq.addLog("1", file); } // create 3 DummyServers Server s1 = new DummyServer("dummyserver1.example.org"); @@ -266,12 +274,14 @@ public class TestReplicationSourceManager { LOG.debug("testNodeFailoverDeadServerParsing"); conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com"); - ReplicationZookeeper rz = new ReplicationZookeeper(server); + ReplicationQueues repQueues = + ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server); + repQueues.init(server.getServerName().toString()); // populate some znodes in the peer znode files.add("log1"); files.add("log2"); for (String file : files) { - rz.addLogToList(file, "1"); + repQueues.addLog("1", file); } // create 3 DummyServers @@ -280,13 +290,19 @@ public class TestReplicationSourceManager { Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com"); // simulate three servers fail sequentially - ReplicationZookeeper rz1 = new ReplicationZookeeper(s1); + ReplicationQueues rq1 = + ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1); + rq1.init(s1.getServerName().toString()); SortedMap> testMap = - rz1.claimQueues(server.getServerName().getServerName()); - ReplicationZookeeper rz2 = new ReplicationZookeeper(s2); - testMap = rz2.claimQueues(s1.getServerName().getServerName()); - ReplicationZookeeper rz3 = new ReplicationZookeeper(s3); - testMap = rz3.claimQueues(s2.getServerName().getServerName()); + rq1.claimQueues(server.getServerName().getServerName()); + ReplicationQueues rq2 = + ReplicationFactory.getReplicationQueues(s2.getZooKeeper(), s2.getConfiguration(), s2); + rq2.init(s2.getServerName().toString()); + testMap = rq2.claimQueues(s1.getServerName().getServerName()); + ReplicationQueues rq3 = + ReplicationFactory.getReplicationQueues(s3.getZooKeeper(), s3.getConfiguration(), s3); + rq3.init(s3.getServerName().toString()); + testMap = rq3.claimQueues(s2.getServerName().getServerName()); ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.firstKey()); List result = replicationQueueInfo.getDeadRegionServers(); @@ -304,18 +320,21 @@ public class TestReplicationSourceManager { private SortedMap> logZnodesMap; Server server; private String deadRsZnode; - ReplicationZookeeper rz; + ReplicationQueues rq; public DummyNodeFailoverWorker(String znode, Server s) throws Exception { this.deadRsZnode = znode; this.server = s; - rz = new ReplicationZookeeper(server); + this.rq = + ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), + server); + this.rq.init(this.server.getServerName().toString()); } @Override public void run() { try { - logZnodesMap = rz.claimQueues(deadRsZnode); + logZnodesMap = rq.claimQueues(deadRsZnode); server.abort("Done with testing", null); } catch (Exception e) { LOG.error("Got exception while running NodeFailoverWorker", e);