From e7bebd470650488cafd94a37462e6c1fd7833edf Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Thu, 4 Apr 2013 03:51:32 +0000 Subject: [PATCH] HBASE-7568 [replication] Create an interface for replication queues git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1464279 13f79535-47bb-0310-9956-ffa450edef68 --- .../hbase/replication/ReplicationQueues.java | 109 +++++ .../replication/ReplicationQueuesClient.java | 50 +++ .../ReplicationQueuesClientZKImpl.java | 64 +++ .../replication/ReplicationQueuesZKImpl.java | 403 +++++++++++++++++ .../replication/ReplicationStateImpl.java | 32 +- .../replication/ReplicationStateZKBase.java | 81 ++++ .../replication/ReplicationZookeeper.java | 405 ++---------------- .../master/ReplicationLogCleaner.java | 38 +- .../replication/regionserver/Replication.java | 12 +- .../ReplicationSourceManager.java | 44 +- .../replication/TestReplicationAdmin.java | 21 - .../TestReplicationStateBasic.java | 155 +++++++ .../TestReplicationStateZKImpl.java | 126 ++++++ .../TestReplicationSourceManager.java | 10 +- 14 files changed, 1093 insertions(+), 457 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java new file mode 100644 index 00000000000..068d91631e2 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java @@ -0,0 +1,109 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.List; +import java.util.SortedMap; +import java.util.SortedSet; + +import org.apache.zookeeper.KeeperException; + +/** + * This provides an interface for maintaining a region server's replication queues. These queues + * keep track of the HLogs that still need to be replicated to remote clusters. + */ +public interface ReplicationQueues { + + /** + * Initialize the region server replication queue interface. + * @param serverName The server name of the region server that owns the replication queues this + * interface manages. + */ + public void init(String serverName); + + /** + * Remove a replication queue. + * @param queueId a String that identifies the queue. + */ + public void removeQueue(String queueId); + + /** + * Add a new HLog file to the given queue. If the queue does not exist it is created. + * @param queueId a String that identifies the queue. + * @param filename name of the HLog + * @throws KeeperException + */ + public void addLog(String queueId, String filename) throws KeeperException; + + /** + * Remove an HLog file from the given queue. + * @param queueId a String that identifies the queue. + * @param filename name of the HLog + */ + public void removeLog(String queueId, String filename); + + /** + * Set the current position for a specific HLog in a given queue. + * @param queueId a String that identifies the queue + * @param filename name of the HLog + * @param position the current position in the file + */ + public void setLogPosition(String queueId, String filename, long position); + + /** + * Get the current position for a specific HLog in a given queue. + * @param queueId a String that identifies the queue + * @param filename name of the HLog + * @return the current position in the file + */ + public long getLogPosition(String queueId, String filename) throws KeeperException; + + /** + * Remove all replication queues for this region server. + */ + public void removeAllQueues(); + + /** + * Get a list of all HLogs in the given queue. + * @param queueId a String that identifies the queue + * @return a list of HLogs, null if this region server is dead and has no outstanding queues + */ + public List getLogsInQueue(String queueId); + + /** + * Get a list of all queues for this region server. + * @return a list of queueIds, null if this region server is dead and has no outstanding queues + */ + public List getAllQueues(); + + /** + * Take ownership for the set of queues belonging to a dead region server. + * @param regionserver the id of the dead region server + * @return A SortedMap of the queues that have been claimed, including a SortedSet of HLogs in + * each queue. Returns an empty map if no queues were failed-over. + */ + public SortedMap> claimQueues(String regionserver); + + /** + * Get a list of all region servers that have outstanding replication queues. These servers could + * be alive, dead or from a previous run of the cluster. + * @return a list of server names + */ + public List getListOfReplicators(); +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java new file mode 100644 index 00000000000..cd8d878b330 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.List; + +/** + * This provides an interface for clients of replication to view replication queues. These queues + * keep track of the HLogs that still need to be replicated to remote clusters. + */ +public interface ReplicationQueuesClient { + + /** + * Get a list of all region servers that have outstanding replication queues. These servers could + * be alive, dead or from a previous run of the cluster. + * @return a list of server names + */ + public List getListOfReplicators(); + + /** + * Get a list of all HLogs in the given queue on the given region server. + * @param serverName the server name of the region server that owns the queue + * @param queueId a String that identifies the queue + * @return a list of HLogs, null if this region server is dead and has no outstanding queues + */ + public List getLogsInQueue(String serverName, String queueId); + + /** + * Get a list of all queues for the specified region server. + * @param serverName the server name of the region server that owns the set of queues + * @return a list of queueIds, null if this region server is not a replicator. + */ + public List getAllQueues(String serverName); +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java new file mode 100644 index 00000000000..b5189518dbf --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implements + ReplicationQueuesClient { + + public ReplicationQueuesClientZKImpl(final ZooKeeperWatcher zk, Configuration conf, + Abortable abortable) throws KeeperException { + super(zk, conf, abortable); + ZKUtil.createWithParents(this.zookeeper, this.queuesZNode); + } + + @Override + public List getLogsInQueue(String serverName, String queueId) { + String znode = ZKUtil.joinZNode(this.queuesZNode, serverName); + znode = ZKUtil.joinZNode(znode, queueId); + List result = null; + try { + result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); + } catch (KeeperException e) { + this.abortable.abort("Failed to get list of hlogs for queueId=" + queueId + + " and serverName=" + serverName, e); + } + return result; + } + + @Override + public List getAllQueues(String serverName) { + String znode = ZKUtil.joinZNode(this.queuesZNode, serverName); + List result = null; + try { + result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); + } catch (KeeperException e) { + this.abortable.abort("Failed to get list of queues for serverName=" + serverName, e); + } + return result; + } + +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java new file mode 100644 index 00000000000..e4e16fdd4af --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java @@ -0,0 +1,403 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.ArrayList; +import java.util.List; +import java.util.SortedMap; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; +import org.apache.zookeeper.KeeperException; + +import com.google.protobuf.InvalidProtocolBufferException; + +public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements ReplicationQueues { + + /** Znode containing all replication queues for this region server. */ + private String myQueuesZnode; + /** Name of znode we use to lock during failover */ + private final static String RS_LOCK_ZNODE = "lock"; + + private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class); + + public ReplicationQueuesZKImpl(final ZooKeeperWatcher zk, Configuration conf, Abortable abortable) + throws KeeperException { + super(zk, conf, abortable); + } + + @Override + public void init(String serverName) { + this.myQueuesZnode = ZKUtil.joinZNode(this.queuesZNode, serverName); + } + + @Override + public void removeQueue(String queueId) { + try { + ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.myQueuesZnode, queueId)); + } catch (KeeperException e) { + this.abortable.abort("Failed to delete queue (queueId=" + queueId + ")", e); + } + } + + @Override + public void addLog(String queueId, String filename) throws KeeperException { + String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId); + znode = ZKUtil.joinZNode(znode, filename); + ZKUtil.createWithParents(this.zookeeper, znode); + } + + @Override + public void removeLog(String queueId, String filename) { + try { + String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId); + znode = ZKUtil.joinZNode(znode, filename); + ZKUtil.deleteNode(this.zookeeper, znode); + } catch (KeeperException e) { + this.abortable.abort("Failed to remove hlog from queue (queueId=" + queueId + ", filename=" + + filename + ")", e); + } + } + + @Override + public void setLogPosition(String queueId, String filename, long position) { + try { + String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId); + znode = ZKUtil.joinZNode(znode, filename); + // Why serialize String of Long and not Long as bytes? + ZKUtil.setData(this.zookeeper, znode, toByteArray(position)); + } catch (KeeperException e) { + this.abortable.abort("Failed to write replication hlog position (filename=" + filename + + ", position=" + position + ")", e); + } + } + + @Override + public long getLogPosition(String queueId, String filename) throws KeeperException { + String clusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, queueId); + String znode = ZKUtil.joinZNode(clusterZnode, filename); + byte[] bytes = ZKUtil.getData(this.zookeeper, znode); + try { + return parseHLogPositionFrom(bytes); + } catch (DeserializationException de) { + LOG.warn("Failed to parse HLogPosition for queueId=" + queueId + " and hlog=" + filename + + "znode content, continuing."); + } + // if we can not parse the position, start at the beginning of the hlog file + // again + return 0; + } + + @Override + public SortedMap> claimQueues(String regionserverZnode) { + SortedMap> newQueues = new TreeMap>(); + // check whether there is multi support. If yes, use it. + if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) { + LOG.info("Atomically moving " + regionserverZnode + "'s hlogs to my queue"); + newQueues = copyQueuesFromRSUsingMulti(regionserverZnode); + } else { + LOG.info("Moving " + regionserverZnode + "'s hlogs to my queue"); + if (!lockOtherRS(regionserverZnode)) { + return newQueues; + } + newQueues = copyQueuesFromRS(regionserverZnode); + deleteAnotherRSQueues(regionserverZnode); + } + return newQueues; + } + + @Override + public void removeAllQueues() { + try { + ZKUtil.deleteNodeRecursively(this.zookeeper, this.myQueuesZnode); + } catch (KeeperException e) { + // if the znode is already expired, don't bother going further + if (e instanceof KeeperException.SessionExpiredException) { + return; + } + this.abortable.abort("Failed to delete replication queues for region server: " + + this.myQueuesZnode, e); + } + } + + @Override + public List getLogsInQueue(String queueId) { + String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId); + List result = null; + try { + result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); + } catch (KeeperException e) { + this.abortable.abort("Failed to get list of hlogs for queueId=" + queueId, e); + } + return result; + } + + @Override + public List getAllQueues() { + List listOfQueues = null; + try { + listOfQueues = ZKUtil.listChildrenNoWatch(this.zookeeper, this.myQueuesZnode); + } catch (KeeperException e) { + this.abortable.abort("Failed to get a list of queues for region server: " + + this.myQueuesZnode, e); + } + return listOfQueues; + } + + /** + * Try to set a lock in another region server's znode. + * @param znode the server names of the other server + * @return true if the lock was acquired, false in every other cases + */ + private boolean lockOtherRS(String znode) { + try { + String parent = ZKUtil.joinZNode(this.queuesZNode, znode); + if (parent.equals(this.myQueuesZnode)) { + LOG.warn("Won't lock because this is us, we're dead!"); + return false; + } + String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE); + ZKUtil.createAndWatch(this.zookeeper, p, lockToByteArray(this.myQueuesZnode)); + } catch (KeeperException e) { + // This exception will pop up if the znode under which we're trying to + // create the lock is already deleted by another region server, meaning + // that the transfer already occurred. + // NoNode => transfer is done and znodes are already deleted + // NodeExists => lock znode already created by another RS + if (e instanceof KeeperException.NoNodeException + || e instanceof KeeperException.NodeExistsException) { + LOG.info("Won't transfer the queue," + " another RS took care of it because of: " + + e.getMessage()); + } else { + LOG.info("Failed lock other rs", e); + } + return false; + } + return true; + } + + /** + * Delete all the replication queues for a given region server. + * @param regionserverZnode The znode of the region server to delete. + */ + private void deleteAnotherRSQueues(String regionserverZnode) { + String fullpath = ZKUtil.joinZNode(this.queuesZNode, regionserverZnode); + try { + List clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, fullpath); + for (String cluster : clusters) { + // No need to delete, it will be deleted later. + if (cluster.equals(RS_LOCK_ZNODE)) { + continue; + } + String fullClusterPath = ZKUtil.joinZNode(fullpath, cluster); + ZKUtil.deleteNodeRecursively(this.zookeeper, fullClusterPath); + } + // Finish cleaning up + ZKUtil.deleteNodeRecursively(this.zookeeper, fullpath); + } catch (KeeperException e) { + if (e instanceof KeeperException.NoNodeException + || e instanceof KeeperException.NotEmptyException) { + // Testing a special case where another region server was able to + // create a lock just after we deleted it, but then was also able to + // delete the RS znode before us or its lock znode is still there. + if (e.getPath().equals(fullpath)) { + return; + } + } + this.abortable.abort("Failed to delete replication queues for region server: " + + regionserverZnode, e); + } + } + + /** + * It "atomically" copies all the hlogs queues from another region server and returns them all + * sorted per peer cluster (appended with the dead server's znode). + * @param znode pertaining to the region server to copy the queues from + * @return HLog queues sorted per peer cluster + */ + private SortedMap> copyQueuesFromRSUsingMulti(String znode) { + SortedMap> queues = new TreeMap>(); + // hbase/replication/rs/deadrs + String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode); + List peerIdsToProcess = null; + List listOfOps = new ArrayList(); + try { + peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath); + if (peerIdsToProcess == null) return queues; // node already processed + for (String peerId : peerIdsToProcess) { + String newPeerId = peerId + "-" + znode; + String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId); + // check the logs queue for the old peer cluster + String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId); + List hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode); + if (hlogs == null || hlogs.size() == 0) { + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); + continue; // empty log queue. + } + // create the new cluster znode + SortedSet logQueue = new TreeSet(); + queues.put(newPeerId, logQueue); + ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY); + listOfOps.add(op); + // get the offset of the logs and set it to new znodes + for (String hlog : hlogs) { + String oldHlogZnode = ZKUtil.joinZNode(oldClusterZnode, hlog); + byte[] logOffset = ZKUtil.getData(this.zookeeper, oldHlogZnode); + LOG.debug("Creating " + hlog + " with data " + Bytes.toString(logOffset)); + String newLogZnode = ZKUtil.joinZNode(newPeerZnode, hlog); + listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset)); + // add ops for deleting + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldHlogZnode)); + logQueue.add(hlog); + } + // add delete op for peer + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); + } + // add delete op for dead rs + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath)); + LOG.debug(" The multi list size is: " + listOfOps.size()); + ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false); + LOG.info("Atomically moved the dead regionserver logs. "); + } catch (KeeperException e) { + // Multi call failed; it looks like some other regionserver took away the logs. + LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e); + queues.clear(); + } + return queues; + } + + /** + * This methods copies all the hlogs queues from another region server and returns them all sorted + * per peer cluster (appended with the dead server's znode) + * @param znode server names to copy + * @return all hlogs for all peers of that cluster, null if an error occurred + */ + private SortedMap> copyQueuesFromRS(String znode) { + // TODO this method isn't atomic enough, we could start copying and then + // TODO fail for some reason and we would end up with znodes we don't want. + SortedMap> queues = new TreeMap>(); + try { + String nodePath = ZKUtil.joinZNode(this.queuesZNode, znode); + List clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath); + // We have a lock znode in there, it will count as one. + if (clusters == null || clusters.size() <= 1) { + return queues; + } + // The lock isn't a peer cluster, remove it + clusters.remove(RS_LOCK_ZNODE); + for (String cluster : clusters) { + // We add the name of the recovered RS to the new znode, we can even + // do that for queues that were recovered 10 times giving a znode like + // number-startcode-number-otherstartcode-number-anotherstartcode-etc + String newCluster = cluster + "-" + znode; + String newClusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, newCluster); + String clusterPath = ZKUtil.joinZNode(nodePath, cluster); + List hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath); + // That region server didn't have anything to replicate for this cluster + if (hlogs == null || hlogs.size() == 0) { + continue; + } + ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode, + HConstants.EMPTY_BYTE_ARRAY); + SortedSet logQueue = new TreeSet(); + queues.put(newCluster, logQueue); + for (String hlog : hlogs) { + String z = ZKUtil.joinZNode(clusterPath, hlog); + byte[] positionBytes = ZKUtil.getData(this.zookeeper, z); + long position = 0; + try { + position = parseHLogPositionFrom(positionBytes); + } catch (DeserializationException e) { + LOG.warn("Failed parse of hlog position from the following znode: " + z); + } + LOG.debug("Creating " + hlog + " with data " + position); + String child = ZKUtil.joinZNode(newClusterZnode, hlog); + // Position doesn't actually change, we are just deserializing it for + // logging, so just use the already serialized version + ZKUtil.createAndWatch(this.zookeeper, child, positionBytes); + logQueue.add(hlog); + } + } + } catch (KeeperException e) { + this.abortable.abort("Copy queues from rs", e); + } + return queues; + } + + /** + * @param lockOwner + * @return Serialized protobuf of lockOwner with pb magic prefix prepended suitable + * for use as content of an replication lock during region server fail over. + */ + static byte[] lockToByteArray(final String lockOwner) { + byte[] bytes = + ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build().toByteArray(); + return ProtobufUtil.prependPBMagic(bytes); + } + + /** + * @param position + * @return Serialized protobuf of position with pb magic prefix prepended suitable + * for use as content of an hlog position in a replication queue. + */ + static byte[] toByteArray(final long position) { + byte[] bytes = + ZooKeeperProtos.ReplicationHLogPosition.newBuilder().setPosition(position).build() + .toByteArray(); + return ProtobufUtil.prependPBMagic(bytes); + } + + /** + * @param bytes - Content of a HLog position znode. + * @return long - The current HLog position. + * @throws DeserializationException + */ + private long parseHLogPositionFrom(final byte[] bytes) throws DeserializationException { + if (ProtobufUtil.isPBMagicPrefix(bytes)) { + int pblen = ProtobufUtil.lengthOfPBMagic(); + ZooKeeperProtos.ReplicationHLogPosition.Builder builder = + ZooKeeperProtos.ReplicationHLogPosition.newBuilder(); + ZooKeeperProtos.ReplicationHLogPosition position; + try { + position = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build(); + } catch (InvalidProtocolBufferException e) { + throw new DeserializationException(e); + } + return position.getPosition(); + } else { + if (bytes.length > 0) { + return Bytes.toLong(bytes); + } + return 0; + } + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java index 60358c26c4a..ff4d07c602a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.replication; import com.google.protobuf.InvalidProtocolBufferException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -37,34 +38,37 @@ import java.util.concurrent.atomic.AtomicBoolean; * ReplicationStateImpl is responsible for maintaining the replication state * znode. */ -public class ReplicationStateImpl implements ReplicationStateInterface { +public class ReplicationStateImpl extends ReplicationStateZKBase implements + ReplicationStateInterface { - private ReplicationStateTracker stateTracker; - private final String stateZnode; - private final ZooKeeperWatcher zookeeper; - private final Abortable abortable; + private final ReplicationStateTracker stateTracker; private final AtomicBoolean replicating; private static final Log LOG = LogFactory.getLog(ReplicationStateImpl.class); - public ReplicationStateImpl(final ZooKeeperWatcher zk, final String stateZnode, + public ReplicationStateImpl(final ZooKeeperWatcher zk, final Configuration conf, final Abortable abortable, final AtomicBoolean replicating) { - this.zookeeper = zk; - this.stateZnode = stateZnode; - this.abortable = abortable; + super(zk, conf, abortable); this.replicating = replicating; // Set a tracker on replicationStateNode - this.stateTracker = new ReplicationStateTracker(this.zookeeper, this.stateZnode, - this.abortable); + this.stateTracker = + new ReplicationStateTracker(this.zookeeper, this.stateZNode, this.abortable); stateTracker.start(); readReplicationStateZnode(); } + public ReplicationStateImpl(final ZooKeeperWatcher zk, final Configuration conf, + final Abortable abortable) { + this(zk, conf, abortable, new AtomicBoolean()); + } + + @Override public boolean getState() throws KeeperException { return getReplication(); } + @Override public void setState(boolean newState) throws KeeperException { setReplicating(newState); } @@ -110,10 +114,10 @@ public class ReplicationStateImpl implements ReplicationStateInterface { * @param newState */ private void setReplicating(boolean newState) throws KeeperException { - ZKUtil.createWithParents(this.zookeeper, this.stateZnode); + ZKUtil.createWithParents(this.zookeeper, this.stateZNode); byte[] stateBytes = (newState == true) ? ReplicationZookeeper.ENABLED_ZNODE_BYTES : ReplicationZookeeper.DISABLED_ZNODE_BYTES; - ZKUtil.setData(this.zookeeper, this.stateZnode, stateBytes); + ZKUtil.setData(this.zookeeper, this.stateZNode, stateBytes); } /** @@ -143,7 +147,7 @@ public class ReplicationStateImpl implements ReplicationStateInterface { this.replicating.set(getReplication()); LOG.info("Replication is now " + (this.replicating.get() ? "started" : "stopped")); } catch (KeeperException e) { - this.abortable.abort("Failed getting data on from " + this.stateZnode, e); + this.abortable.abort("Failed getting data on from " + this.stateZNode, e); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java new file mode 100644 index 00000000000..9e6b46321e7 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java @@ -0,0 +1,81 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + + +/** + * This is a base class for maintaining replication state in zookeeper. + */ +public abstract class ReplicationStateZKBase { + + /** + * The name of the znode that contains the replication status of a remote slave (i.e. peer) + * cluster. + */ + protected final String peerStateNodeName; + /** The name of the znode that contains the replication status of the local cluster. */ + protected final String stateZNode; + /** The name of the base znode that contains all replication state. */ + protected final String replicationZNode; + /** The name of the znode that contains a list of all remote slave (i.e. peer) clusters. */ + protected final String peersZNode; + /** The name of the znode that contains all replication queues */ + protected final String queuesZNode; + /** The cluster key of the local cluster */ + protected final String ourClusterKey; + protected final ZooKeeperWatcher zookeeper; + protected final Configuration conf; + protected final Abortable abortable; + + public ReplicationStateZKBase(ZooKeeperWatcher zookeeper, Configuration conf, + Abortable abortable) { + this.zookeeper = zookeeper; + this.conf = conf; + this.abortable = abortable; + + String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); + String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers"); + String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs"); + String stateZNodeName = conf.get("zookeeper.znode.replication.state", "state"); + this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state"); + this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf); + this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName); + this.stateZNode = ZKUtil.joinZNode(replicationZNode, stateZNodeName); + this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName); + this.queuesZNode = ZKUtil.joinZNode(replicationZNode, queuesZNodeName); + } + + public List getListOfReplicators() { + List result = null; + try { + result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.queuesZNode); + } catch (KeeperException e) { + this.abortable.abort("Failed to get list of replicators", e); + } + return result; + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java index f8ed492db63..ba74a6d360a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java @@ -24,7 +24,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.exceptions.DeserializationException; @@ -33,7 +32,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -51,7 +49,6 @@ import java.util.Map; import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; -import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; @@ -88,8 +85,6 @@ import java.util.concurrent.atomic.AtomicBoolean; public class ReplicationZookeeper implements Closeable { private static final Log LOG = LogFactory.getLog(ReplicationZookeeper.class); - // Name of znode we use to lock when failover - private final static String RS_LOCK_ZNODE = "lock"; // Our handle on zookeeper private final ZooKeeperWatcher zookeeper; @@ -114,6 +109,7 @@ public class ReplicationZookeeper implements Closeable { // Abortable private Abortable abortable; private final ReplicationStateInterface replicationState; + private final ReplicationQueues replicationQueues; /** * ZNode content if enabled state. @@ -139,8 +135,10 @@ public class ReplicationZookeeper implements Closeable { this.conf = conf; this.zookeeper = zk; setZNodes(abortable); - this.replicationState = - new ReplicationStateImpl(this.zookeeper, getRepStateNode(), abortable, new AtomicBoolean()); + this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, abortable); + // TODO This interface is no longer used by anyone using this constructor. When this class goes + // away, we will no longer have this null initialization business + this.replicationQueues = null; } /** @@ -149,7 +147,7 @@ public class ReplicationZookeeper implements Closeable { * @param server * @param replicating atomic boolean to start/stop replication * @throws IOException - * @throws KeeperException + * @throws KeeperException */ public ReplicationZookeeper(final Server server, final AtomicBoolean replicating) throws IOException, KeeperException { @@ -158,13 +156,14 @@ public class ReplicationZookeeper implements Closeable { this.conf = server.getConfiguration(); setZNodes(server); - this.replicationState = - new ReplicationStateImpl(this.zookeeper, getRepStateNode(), server, replicating); + this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, server, replicating); this.peerClusters = new HashMap(); ZKUtil.createWithParents(this.zookeeper, ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName)); this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName().toString()); ZKUtil.createWithParents(this.zookeeper, this.rsServerNameZnode); + this.replicationQueues = new ReplicationQueuesZKImpl(this.zookeeper, this.conf, server); + this.replicationQueues.init(server.getServerName().toString()); connectExistingPeers(); } @@ -432,32 +431,6 @@ public class ReplicationZookeeper implements Closeable { return ProtobufUtil.prependPBMagic(bytes); } - /** - * @param position - * @return Serialized protobuf of position with pb magic prefix - * prepended suitable for use as content of an hlog position in a - * replication queue. - */ - static byte[] toByteArray( - final long position) { - byte[] bytes = ZooKeeperProtos.ReplicationHLogPosition.newBuilder().setPosition(position) - .build().toByteArray(); - return ProtobufUtil.prependPBMagic(bytes); - } - - /** - * @param lockOwner - * @return Serialized protobuf of lockOwner with pb magic prefix - * prepended suitable for use as content of an replication lock during - * region server fail over. - */ - static byte[] lockToByteArray( - final String lockOwner) { - byte[] bytes = ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build() - .toByteArray(); - return ProtobufUtil.prependPBMagic(bytes); - } - /** * @param bytes Content of a peer znode. * @return ClusterKey parsed from the passed bytes. @@ -503,58 +476,6 @@ public class ReplicationZookeeper implements Closeable { } } - /** - * @param bytes - Content of a HLog position znode. - * @return long - The current HLog position. - * @throws DeserializationException - */ - static long parseHLogPositionFrom( - final byte[] bytes) throws DeserializationException { - if (ProtobufUtil.isPBMagicPrefix(bytes)) { - int pblen = ProtobufUtil.lengthOfPBMagic(); - ZooKeeperProtos.ReplicationHLogPosition.Builder builder = ZooKeeperProtos.ReplicationHLogPosition - .newBuilder(); - ZooKeeperProtos.ReplicationHLogPosition position; - try { - position = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build(); - } catch (InvalidProtocolBufferException e) { - throw new DeserializationException(e); - } - return position.getPosition(); - } else { - if (bytes.length > 0) { - return Bytes.toLong(bytes); - } - return 0; - } - } - - /** - * @param bytes - Content of a lock znode. - * @return String - The owner of the lock. - * @throws DeserializationException - */ - static String parseLockOwnerFrom( - final byte[] bytes) throws DeserializationException { - if (ProtobufUtil.isPBMagicPrefix(bytes)) { - int pblen = ProtobufUtil.lengthOfPBMagic(); - ZooKeeperProtos.ReplicationLock.Builder builder = ZooKeeperProtos.ReplicationLock - .newBuilder(); - ZooKeeperProtos.ReplicationLock lock; - try { - lock = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build(); - } catch (InvalidProtocolBufferException e) { - throw new DeserializationException(e); - } - return lock.getLockOwner(); - } else { - if (bytes.length > 0) { - return Bytes.toString(bytes); - } - return ""; - } - } - private boolean peerExists(String id) throws KeeperException { return ZKUtil.checkExists(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id)) >= 0; @@ -624,10 +545,6 @@ public class ReplicationZookeeper implements Closeable { return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName)); } - private String getRepStateNode() { - return ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName); - } - /** * Get the replication status of this cluster. If the state znode doesn't exist it will also * create it and set it true. @@ -652,11 +569,8 @@ public class ReplicationZookeeper implements Closeable { * @param filename name of the hlog's znode * @param peerId name of the cluster's znode */ - public void addLogToList(String filename, String peerId) - throws KeeperException { - String znode = ZKUtil.joinZNode(this.rsServerNameZnode, peerId); - znode = ZKUtil.joinZNode(znode, filename); - ZKUtil.createWithParents(this.zookeeper, znode); + public void addLogToList(String filename, String peerId) throws KeeperException { + this.replicationQueues.addLog(peerId, filename); } /** @@ -665,13 +579,7 @@ public class ReplicationZookeeper implements Closeable { * @param clusterId name of the cluster's znode */ public void removeLogFromList(String filename, String clusterId) { - try { - String znode = ZKUtil.joinZNode(rsServerNameZnode, clusterId); - znode = ZKUtil.joinZNode(znode, filename); - ZKUtil.deleteNode(this.zookeeper, znode); - } catch (KeeperException e) { - this.abortable.abort("Failed remove from list", e); - } + this.replicationQueues.removeLog(clusterId, filename); } /** @@ -679,18 +587,9 @@ public class ReplicationZookeeper implements Closeable { * @param filename filename name of the hlog's znode * @param clusterId clusterId name of the cluster's znode * @param position the position in the file - * @throws IOException */ - public void writeReplicationStatus(String filename, String clusterId, - long position) { - try { - String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId); - znode = ZKUtil.joinZNode(znode, filename); - // Why serialize String of Long and note Long as bytes? - ZKUtil.setData(this.zookeeper, znode, toByteArray(position)); - } catch (KeeperException e) { - this.abortable.abort("Writing replication status", e); - } + public void writeReplicationStatus(String filename, String clusterId, long position) { + this.replicationQueues.setLogPosition(clusterId, filename, position); } /** @@ -709,202 +608,15 @@ public class ReplicationZookeeper implements Closeable { return result; } - /** - * Get the list of the replicators that have queues, they can be alive, dead - * or simply from a previous run - * @return a list of server names - */ - public List getListOfReplicators() { - List result = null; - try { - result = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZNode); - } catch (KeeperException e) { - this.abortable.abort("Get list of replicators", e); - } - return result; - } /** - * Get the list of peer clusters for the specified server names - * @param rs server names of the rs - * @return a list of peer cluster + * Take ownership for the set of queues belonging to a dead region server. + * @param regionserver the id of the dead region server + * @return A SortedMap of the queues that have been claimed, including a SortedSet of HLogs in + * each queue. */ - public List getListPeersForRS(String rs) { - String znode = ZKUtil.joinZNode(rsZNode, rs); - List result = null; - try { - result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); - } catch (KeeperException e) { - this.abortable.abort("Get list of peers for rs", e); - } - return result; - } - - /** - * Get the list of hlogs for the specified region server and peer cluster - * @param rs server names of the rs - * @param id peer cluster - * @return a list of hlogs - */ - public List getListHLogsForPeerForRS(String rs, String id) { - String znode = ZKUtil.joinZNode(rsZNode, rs); - znode = ZKUtil.joinZNode(znode, id); - List result = null; - try { - result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); - } catch (KeeperException e) { - this.abortable.abort("Get list of hlogs for peer", e); - } - return result; - } - - /** - * Try to set a lock in another server's znode. - * @param znode the server names of the other server - * @return true if the lock was acquired, false in every other cases - */ - public boolean lockOtherRS(String znode) { - try { - String parent = ZKUtil.joinZNode(this.rsZNode, znode); - if (parent.equals(rsServerNameZnode)) { - LOG.warn("Won't lock because this is us, we're dead!"); - return false; - } - String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE); - ZKUtil.createAndWatch(this.zookeeper, p, lockToByteArray(rsServerNameZnode)); - } catch (KeeperException e) { - // This exception will pop up if the znode under which we're trying to - // create the lock is already deleted by another region server, meaning - // that the transfer already occurred. - // NoNode => transfer is done and znodes are already deleted - // NodeExists => lock znode already created by another RS - if (e instanceof KeeperException.NoNodeException || - e instanceof KeeperException.NodeExistsException) { - LOG.info("Won't transfer the queue," + - " another RS took care of it because of: " + e.getMessage()); - } else { - LOG.info("Failed lock other rs", e); - } - return false; - } - return true; - } - - /** - * It "atomically" copies all the hlogs queues from another region server and returns them all - * sorted per peer cluster (appended with the dead server's znode). - * @param znode - * @return HLog queues sorted per peer cluster - */ - public SortedMap> copyQueuesFromRSUsingMulti(String znode) { - SortedMap> queues = new TreeMap>(); - String deadRSZnodePath = ZKUtil.joinZNode(rsZNode, znode);// hbase/replication/rs/deadrs - List peerIdsToProcess = null; - List listOfOps = new ArrayList(); - try { - peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath); - if (peerIdsToProcess == null) return queues; // node already processed - for (String peerId : peerIdsToProcess) { - String newPeerId = peerId + "-" + znode; - String newPeerZnode = ZKUtil.joinZNode(this.rsServerNameZnode, newPeerId); - // check the logs queue for the old peer cluster - String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId); - List hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode); - if (hlogs == null || hlogs.size() == 0) { - listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); - continue; // empty log queue. - } - // create the new cluster znode - SortedSet logQueue = new TreeSet(); - queues.put(newPeerId, logQueue); - ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY); - listOfOps.add(op); - // get the offset of the logs and set it to new znodes - for (String hlog : hlogs) { - String oldHlogZnode = ZKUtil.joinZNode(oldClusterZnode, hlog); - byte[] logOffset = ZKUtil.getData(this.zookeeper, oldHlogZnode); - LOG.debug("Creating " + hlog + " with data " + Bytes.toString(logOffset)); - String newLogZnode = ZKUtil.joinZNode(newPeerZnode, hlog); - listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset)); - // add ops for deleting - listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldHlogZnode)); - logQueue.add(hlog); - } - // add delete op for peer - listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); - } - // add delete op for dead rs - listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath)); - LOG.debug(" The multi list size is: " + listOfOps.size()); - ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false); - LOG.info("Atomically moved the dead regionserver logs. "); - } catch (KeeperException e) { - // Multi call failed; it looks like some other regionserver took away the logs. - LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e); - queues.clear(); - } - return queues; - } - - /** - * This methods copies all the hlogs queues from another region server - * and returns them all sorted per peer cluster (appended with the dead - * server's znode) - * @param znode server names to copy - * @return all hlogs for all peers of that cluster, null if an error occurred - */ - public SortedMap> copyQueuesFromRS(String znode) { - // TODO this method isn't atomic enough, we could start copying and then - // TODO fail for some reason and we would end up with znodes we don't want. - SortedMap> queues = - new TreeMap>(); - try { - String nodePath = ZKUtil.joinZNode(rsZNode, znode); - List clusters = - ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath); - // We have a lock znode in there, it will count as one. - if (clusters == null || clusters.size() <= 1) { - return queues; - } - // The lock isn't a peer cluster, remove it - clusters.remove(RS_LOCK_ZNODE); - for (String cluster : clusters) { - // We add the name of the recovered RS to the new znode, we can even - // do that for queues that were recovered 10 times giving a znode like - // number-startcode-number-otherstartcode-number-anotherstartcode-etc - String newCluster = cluster+"-"+znode; - String newClusterZnode = ZKUtil.joinZNode(rsServerNameZnode, newCluster); - String clusterPath = ZKUtil.joinZNode(nodePath, cluster); - List hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath); - // That region server didn't have anything to replicate for this cluster - if (hlogs == null || hlogs.size() == 0) { - continue; - } - ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode, - HConstants.EMPTY_BYTE_ARRAY); - SortedSet logQueue = new TreeSet(); - queues.put(newCluster, logQueue); - for (String hlog : hlogs) { - String z = ZKUtil.joinZNode(clusterPath, hlog); - byte[] positionBytes = ZKUtil.getData(this.zookeeper, z); - long position = 0; - try { - position = parseHLogPositionFrom(positionBytes); - } catch (DeserializationException e) { - LOG.warn("Failed parse of hlog position from the following znode: " + z); - } - LOG.debug("Creating " + hlog + " with data " + position); - String child = ZKUtil.joinZNode(newClusterZnode, hlog); - // Position doesn't actually change, we are just deserializing it for - // logging, so just use the already serialized version - ZKUtil.createAndWatch(this.zookeeper, child, positionBytes); - logQueue.add(hlog); - } - } - } catch (KeeperException e) { - this.abortable.abort("Copy queues from rs", e); - } - return queues; + public SortedMap> claimQueues(String regionserver) { + return this.replicationQueues.claimQueues(regionserver); } /** @@ -912,48 +624,10 @@ public class ReplicationZookeeper implements Closeable { * @param peerZnode znode of the peer cluster queue of hlogs to delete */ public void deleteSource(String peerZnode, boolean closeConnection) { - try { - ZKUtil.deleteNodeRecursively(this.zookeeper, - ZKUtil.joinZNode(rsServerNameZnode, peerZnode)); - if (closeConnection) { - this.peerClusters.get(peerZnode).getZkw().close(); - this.peerClusters.remove(peerZnode); - } - } catch (KeeperException e) { - this.abortable.abort("Failed delete of " + peerZnode, e); - } - } - - /** - * Recursive deletion of all znodes in specified rs' znode - * @param znode - */ - public void deleteRsQueues(String znode) { - String fullpath = ZKUtil.joinZNode(rsZNode, znode); - try { - List clusters = - ZKUtil.listChildrenNoWatch(this.zookeeper, fullpath); - for (String cluster : clusters) { - // We'll delete it later - if (cluster.equals(RS_LOCK_ZNODE)) { - continue; - } - String fullClusterPath = ZKUtil.joinZNode(fullpath, cluster); - ZKUtil.deleteNodeRecursively(this.zookeeper, fullClusterPath); - } - // Finish cleaning up - ZKUtil.deleteNodeRecursively(this.zookeeper, fullpath); - } catch (KeeperException e) { - if (e instanceof KeeperException.NoNodeException || - e instanceof KeeperException.NotEmptyException) { - // Testing a special case where another region server was able to - // create a lock just after we deleted it, but then was also able to - // delete the RS znode before us or its lock znode is still there. - if (e.getPath().equals(fullpath)) { - return; - } - } - this.abortable.abort("Failed delete of " + znode, e); + this.replicationQueues.removeQueue(peerZnode); + if (closeConnection) { + this.peerClusters.get(peerZnode).getZkw().close(); + this.peerClusters.remove(peerZnode); } } @@ -961,16 +635,7 @@ public class ReplicationZookeeper implements Closeable { * Delete this cluster's queues */ public void deleteOwnRSZNode() { - try { - ZKUtil.deleteNodeRecursively(this.zookeeper, - this.rsServerNameZnode); - } catch (KeeperException e) { - // if the znode is already expired, don't bother going further - if (e instanceof KeeperException.SessionExpiredException) { - return; - } - this.abortable.abort("Failed delete of " + this.rsServerNameZnode, e); - } + this.replicationQueues.removeAllQueues(); } /** @@ -978,22 +643,10 @@ public class ReplicationZookeeper implements Closeable { * @param peerId znode of the peer cluster * @param hlog name of the hlog * @return the position in that hlog - * @throws KeeperException + * @throws KeeperException */ - public long getHLogRepPosition(String peerId, String hlog) - throws KeeperException { - String clusterZnode = ZKUtil.joinZNode(rsServerNameZnode, peerId); - String znode = ZKUtil.joinZNode(clusterZnode, hlog); - byte[] bytes = ZKUtil.getData(this.zookeeper, znode); - try { - return parseHLogPositionFrom(bytes); - } catch (DeserializationException de) { - LOG.warn("Failed parse of HLogPosition for peerId=" + peerId + " and hlog=" + hlog - + "znode content, continuing."); - } - // if we can not parse the position, start at the beginning of the hlog file - // again - return 0; + public long getHLogRepPosition(String peerId, String hlog) throws KeeperException { + return this.replicationQueues.getLogPosition(peerId, hlog); } /** @@ -1051,7 +704,7 @@ public class ReplicationZookeeper implements Closeable { public Map getPeerClusters() { return this.peerClusters; } - + /** * Determine if a ZK path points to a peer node. * @param path path to be checked diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java index f3b2e0b0e75..0a43a500ed1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java @@ -27,7 +27,10 @@ import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; -import org.apache.hadoop.hbase.replication.ReplicationZookeeper; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl; +import org.apache.hadoop.hbase.replication.ReplicationStateImpl; +import org.apache.hadoop.hbase.replication.ReplicationStateInterface; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -43,8 +46,10 @@ import java.util.Set; @InterfaceAudience.Private public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abortable { private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class); - private ReplicationZookeeper zkHelper; - private Set hlogs = new HashSet(); + private ZooKeeperWatcher zkw; + private ReplicationQueuesClient replicationQueues; + private ReplicationStateInterface replicationState; + private final Set hlogs = new HashSet(); private boolean stopped = false; private boolean aborted; @@ -53,7 +58,7 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo public boolean isLogDeletable(Path filePath) { try { - if (!zkHelper.getReplication()) { + if (!replicationState.getState()) { return false; } } catch (KeeperException e) { @@ -89,20 +94,20 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo private boolean refreshHLogsAndSearch(String searchedLog) { this.hlogs.clear(); final boolean lookForLog = searchedLog != null; - List rss = zkHelper.getListOfReplicators(); + List rss = replicationQueues.getListOfReplicators(); if (rss == null) { LOG.debug("Didn't find any region server that replicates, deleting: " + searchedLog); return false; } for (String rs: rss) { - List listOfPeers = zkHelper.getListPeersForRS(rs); + List listOfPeers = replicationQueues.getAllQueues(rs); // if rs just died, this will be null if (listOfPeers == null) { continue; } for (String id : listOfPeers) { - List peersHlogs = zkHelper.getListHLogsForPeerForRS(rs, id); + List peersHlogs = replicationQueues.getLogsInQueue(rs, id); if (peersHlogs != null) { this.hlogs.addAll(peersHlogs); } @@ -128,8 +133,9 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo Configuration conf = new Configuration(config); super.setConf(conf); try { - ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null); - this.zkHelper = new ReplicationZookeeper(this, conf, zkw); + this.zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null); + this.replicationQueues = new ReplicationQueuesClientZKImpl(zkw, conf, this); + this.replicationState = new ReplicationStateImpl(zkw, conf, this); } catch (KeeperException e) { LOG.error("Error while configuring " + this.getClass().getName(), e); } catch (IOException e) { @@ -143,9 +149,17 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo public void stop(String why) { if (this.stopped) return; this.stopped = true; - if (this.zkHelper != null) { - LOG.info("Stopping " + this.zkHelper.getZookeeperWatcher()); - this.zkHelper.getZookeeperWatcher().close(); + if (this.zkw != null) { + LOG.info("Stopping " + this.zkw); + this.zkw.close(); + } + if (this.replicationState != null) { + LOG.info("Stopping " + this.replicationState); + try { + this.replicationState.close(); + } catch (IOException e) { + LOG.error("Error while stopping " + this.replicationState, e); + } } // Not sure why we're deleting a connection that we never acquired or used HConnectionManager.deleteConnection(this.getConf()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 3ff7a1888a6..e31228c995c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -43,6 +43,8 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; import org.apache.hadoop.hbase.util.Bytes; @@ -64,6 +66,7 @@ public class Replication implements WALActionsListener, private ReplicationSourceManager replicationManager; private final AtomicBoolean replicating = new AtomicBoolean(true); private ReplicationZookeeper zkHelper; + private ReplicationQueues replicationQueues; private Configuration conf; private ReplicationSink replicationSink; // Hosting server @@ -104,18 +107,23 @@ public class Replication implements WALActionsListener, if (replication) { try { this.zkHelper = new ReplicationZookeeper(server, this.replicating); + this.replicationQueues = + new ReplicationQueuesZKImpl(server.getZooKeeper(), this.conf, this.server); + this.replicationQueues.init(this.server.getServerName().toString()); } catch (KeeperException ke) { throw new IOException("Failed replication handler create " + "(replicating=" + this.replicating, ke); } - this.replicationManager = new ReplicationSourceManager(zkHelper, conf, this.server, fs, - this.replicating, logDir, oldLogDir); + this.replicationManager = + new ReplicationSourceManager(zkHelper, replicationQueues, conf, this.server, fs, + this.replicating, logDir, oldLogDir); this.statsThreadPeriod = this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod); } else { this.replicationManager = null; this.zkHelper = null; + this.replicationQueues = null; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index a615430dd1a..0f141b76908 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -41,8 +41,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -73,6 +73,7 @@ public class ReplicationSourceManager { private final AtomicBoolean replicating; // Helper for zookeeper private final ReplicationZookeeper zkHelper; + private final ReplicationQueues replicationQueues; // All about stopping private final Stoppable stopper; // All logs we are currently tracking @@ -91,14 +92,14 @@ public class ReplicationSourceManager { private final long sleepBeforeFailover; // Homemade executer service for replication private final ThreadPoolExecutor executor; - + private final Random rand; /** - * Creates a replication manager and sets the watch on all the other - * registered region servers + * Creates a replication manager and sets the watch on all the other registered region servers * @param zkHelper the zk helper for replication + * @param replicationQueues the interface for manipulating replication queues * @param conf the configuration to use * @param stopper the stopper object for this region server * @param fs the file system to use @@ -107,15 +108,13 @@ public class ReplicationSourceManager { * @param oldLogDir the directory where old logs are archived */ public ReplicationSourceManager(final ReplicationZookeeper zkHelper, - final Configuration conf, - final Stoppable stopper, - final FileSystem fs, - final AtomicBoolean replicating, - final Path logDir, - final Path oldLogDir) { + final ReplicationQueues replicationQueues, final Configuration conf, final Stoppable stopper, + final FileSystem fs, final AtomicBoolean replicating, final Path logDir, + final Path oldLogDir) { this.sources = new ArrayList(); this.replicating = replicating; this.zkHelper = zkHelper; + this.replicationQueues = replicationQueues; this.stopper = stopper; this.hlogsById = new HashMap>(); this.oldsources = new ArrayList(); @@ -181,7 +180,7 @@ public class ReplicationSourceManager { for (String id : this.zkHelper.getPeerClusters().keySet()) { addSource(id); } - List currentReplicators = this.zkHelper.getListOfReplicators(); + List currentReplicators = this.replicationQueues.getListOfReplicators(); if (currentReplicators == null || currentReplicators.size() == 0) { return; } @@ -350,13 +349,12 @@ public class ReplicationSourceManager { * It creates one old source for any type of source of the old rs. * @param rsZnode */ - public void transferQueues(String rsZnode) { + private void transferQueues(String rsZnode) { NodeFailoverWorker transfer = new NodeFailoverWorker(rsZnode); try { this.executor.execute(transfer); } catch (RejectedExecutionException ex) { - LOG.info("Cancelling the transfer of " + rsZnode + - " because of " + ex.getMessage()); + LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage()); } } @@ -589,20 +587,12 @@ public class ReplicationSourceManager { } SortedMap> newQueues = null; - // check whether there is multi support. If yes, use it. - if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) { - LOG.info("Atomically moving " + rsZnode + "'s hlogs to my queue"); - newQueues = zkHelper.copyQueuesFromRSUsingMulti(rsZnode); - } else { - LOG.info("Moving " + rsZnode + "'s hlogs to my queue"); - if (!zkHelper.lockOtherRS(rsZnode)) { - return; - } - newQueues = zkHelper.copyQueuesFromRS(rsZnode); - zkHelper.deleteRsQueues(rsZnode); - } - // process of copying over the failed queue is completed. + newQueues = zkHelper.claimQueues(rsZnode); + + // Copying over the failed queue is completed. if (newQueues.isEmpty()) { + // We either didn't get the lock or the failed region server didn't have any outstanding + // HLogs to replicate, so we are done. return; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index 69f54f5d596..df0af63f39c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -17,15 +17,10 @@ */ package org.apache.hadoop.hbase.client.replication; -import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -49,9 +44,7 @@ public class TestReplicationAdmin { private final String ID_SECOND = "2"; private final String KEY_SECOND = "127.0.0.1:2181:/hbase2"; - private static ReplicationSourceManager manager; private static ReplicationAdmin admin; - private static AtomicBoolean replicating = new AtomicBoolean(true); /** * @throws java.lang.Exception @@ -62,19 +55,6 @@ public class TestReplicationAdmin { Configuration conf = TEST_UTIL.getConfiguration(); conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); admin = new ReplicationAdmin(conf); - Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(), - HConstants.HREGION_OLDLOGDIR_NAME); - Path logDir = new Path(TEST_UTIL.getDataTestDir(), - HConstants.HREGION_LOGDIR_NAME); - manager = new ReplicationSourceManager(admin.getReplicationZk(), conf, - // The following stopper never stops so that we can respond - // to zk notification - new Stoppable() { - @Override - public void stop(String why) {} - @Override - public boolean isStopped() {return false;} - }, FileSystem.get(conf), replicating, logDir, oldLogDir); } /** @@ -84,7 +64,6 @@ public class TestReplicationAdmin { */ @Test public void testAddRemovePeer() throws Exception { - assertEquals(0, manager.getSources().size()); // Add a valid peer admin.addPeer(ID_ONE, KEY_ONE); // try adding the same (fails) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java new file mode 100644 index 00000000000..6bb64fddd4b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.*; + +import java.util.List; +import java.util.SortedMap; +import java.util.SortedSet; + +import org.apache.hadoop.hbase.ServerName; +import org.apache.zookeeper.KeeperException; +import org.junit.Test; + +/** + * White box testing for replication state interfaces. Implementations should extend this class, and + * initialize the interfaces properly. + */ +public abstract class TestReplicationStateBasic { + + protected ReplicationQueues rq1; + protected ReplicationQueues rq2; + protected ReplicationQueues rq3; + protected ReplicationQueuesClient rqc; + protected String server1 = new ServerName("hostname1.example.org", 1234, -1L).toString(); + protected String server2 = new ServerName("hostname2.example.org", 1234, -1L).toString(); + protected String server3 = new ServerName("hostname3.example.org", 1234, -1L).toString(); + + @Test + public void testReplicationQueuesClient() throws KeeperException { + // Test methods with empty state + assertEquals(0, rqc.getListOfReplicators().size()); + assertNull(rqc.getLogsInQueue(server1, "qId1")); + assertNull(rqc.getAllQueues(server1)); + + /* + * Set up data Two replicators: -- server1: three queues with 0, 1 and 2 log files each -- + * server2: zero queues + */ + rq1.init(server1); + rq2.init(server2); + rq1.addLog("qId1", "trash"); + rq1.removeLog("qId1", "trash"); + rq1.addLog("qId2", "filename1"); + rq1.addLog("qId3", "filename2"); + rq1.addLog("qId3", "filename3"); + rq2.addLog("trash", "trash"); + rq2.removeQueue("trash"); + + List reps = rqc.getListOfReplicators(); + assertEquals(2, reps.size()); + assertTrue(server1, reps.contains(server1)); + assertTrue(server2, reps.contains(server2)); + + assertNull(rqc.getLogsInQueue("bogus", "bogus")); + assertNull(rqc.getLogsInQueue(server1, "bogus")); + assertEquals(0, rqc.getLogsInQueue(server1, "qId1").size()); + assertEquals(1, rqc.getLogsInQueue(server1, "qId2").size()); + assertEquals("filename1", rqc.getLogsInQueue(server1, "qId2").get(0)); + + assertNull(rqc.getAllQueues("bogus")); + assertEquals(0, rqc.getAllQueues(server2).size()); + List list = rqc.getAllQueues(server1); + assertEquals(3, list.size()); + assertTrue(list.contains("qId2")); + assertTrue(list.contains("qId3")); + } + + @Test + public void testReplicationQueues() throws KeeperException { + rq1.init(server1); + rq2.init(server2); + rq3.init(server3); + + // Zero queues or replicators exist + assertEquals(0, rq1.getListOfReplicators().size()); + rq1.removeQueue("bogus"); + rq1.removeLog("bogus", "bogus"); + rq1.removeAllQueues(); + assertNull(rq1.getAllQueues()); + // TODO fix NPE if getting a log position on a file that does not exist + // assertEquals(0, rq1.getLogPosition("bogus", "bogus")); + assertNull(rq1.getLogsInQueue("bogus")); + assertEquals(0, rq1.claimQueues(new ServerName("bogus", 1234, -1L).toString()).size()); + + // TODO test setting a log position on a bogus file + // rq1.setLogPosition("bogus", "bogus", 5L); + + populateQueues(); + + assertEquals(3, rq1.getListOfReplicators().size()); + assertEquals(0, rq2.getLogsInQueue("qId1").size()); + assertEquals(5, rq3.getLogsInQueue("qId5").size()); + assertEquals(0, rq3.getLogPosition("qId1", "filename0")); + rq3.setLogPosition("qId5", "filename4", 354L); + assertEquals(354L, rq3.getLogPosition("qId5", "filename4")); + + assertEquals(5, rq3.getLogsInQueue("qId5").size()); + assertEquals(0, rq2.getLogsInQueue("qId1").size()); + assertEquals(0, rq1.getAllQueues().size()); + assertEquals(1, rq2.getAllQueues().size()); + assertEquals(5, rq3.getAllQueues().size()); + + assertEquals(0, rq3.claimQueues(server1).size()); + assertEquals(2, rq3.getListOfReplicators().size()); + + SortedMap> queues = rq2.claimQueues(server3); + assertEquals(5, queues.size()); + assertEquals(1, rq2.getListOfReplicators().size()); + + // TODO test claimQueues on yourself + // rq2.claimQueues(server2); + + assertEquals(6, rq2.getAllQueues().size()); + + rq2.removeAllQueues(); + + assertEquals(0, rq2.getListOfReplicators().size()); + } + + /* + * three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 has 5 queues with 1, 2, + * 3, 4, 5 log files respectively + */ + protected void populateQueues() throws KeeperException { + rq1.addLog("trash", "trash"); + rq1.removeQueue("trash"); + + rq2.addLog("qId1", "trash"); + rq2.removeLog("qId1", "trash"); + + for (int i = 1; i < 6; i++) { + for (int j = 0; j < i; j++) { + rq3.addLog("qId" + i, "filename" + j); + } + } + } +} + diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java new file mode 100644 index 00000000000..c14bd193f69 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.catalog.CatalogTracker; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestReplicationStateZKImpl extends TestReplicationStateBasic { + + private static Configuration conf; + private static HBaseTestingUtility utility; + private static ZooKeeperWatcher zkw; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + utility = new HBaseTestingUtility(); + utility.startMiniZKCluster(); + conf = utility.getConfiguration(); + zkw = HBaseTestingUtility.getZooKeeperWatcher(utility); + } + + @Before + public void setUp() throws KeeperException { + DummyServer ds1 = new DummyServer(server1); + DummyServer ds2 = new DummyServer(server2); + DummyServer ds3 = new DummyServer(server3); + rq1 = new ReplicationQueuesZKImpl(zkw, conf, ds1); + rq2 = new ReplicationQueuesZKImpl(zkw, conf, ds2); + rq3 = new ReplicationQueuesZKImpl(zkw, conf, ds3); + rqc = new ReplicationQueuesClientZKImpl(zkw, conf, ds1); + } + + @After + public void tearDown() throws KeeperException, IOException { + String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); + String replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName); + ZKUtil.deleteNodeRecursively(zkw, replicationZNode); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + utility.shutdownMiniZKCluster(); + } + + static class DummyServer implements Server { + private String serverName; + private boolean isAborted = false; + private boolean isStopped = false; + + public DummyServer(String serverName) { + this.serverName = serverName; + } + + @Override + public Configuration getConfiguration() { + return conf; + } + + @Override + public ZooKeeperWatcher getZooKeeper() { + return zkw; + } + + @Override + public CatalogTracker getCatalogTracker() { + return null; + } + + @Override + public ServerName getServerName() { + return new ServerName(this.serverName); + } + + @Override + public void abort(String why, Throwable e) { + this.isAborted = true; + } + + @Override + public boolean isAborted() { + return this.isAborted; + } + + @Override + public void stop(String why) { + this.isStopped = true; + } + + @Override + public boolean isStopped() { + return this.isStopped; + } + } +} + diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 87fdbdc82a8..eef6d9252e4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -225,7 +225,7 @@ public class TestReplicationSourceManager { } @Test - public void testNodeFailoverWorkerCopyQueuesFromRSUsingMulti() throws Exception { + public void testClaimQueues() throws Exception { LOG.debug("testNodeFailoverWorkerCopyQueuesFromRSUsingMulti"); conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); final Server server = new DummyServer("hostname0.example.org"); @@ -288,13 +288,13 @@ public class TestReplicationSourceManager { // simulate three servers fail sequentially ReplicationZookeeper rz1 = new ReplicationZookeeper(s1, new AtomicBoolean(true)); SortedMap> testMap = - rz1.copyQueuesFromRSUsingMulti(server.getServerName().getServerName()); + rz1.claimQueues(server.getServerName().getServerName()); rz1.close(); ReplicationZookeeper rz2 = new ReplicationZookeeper(s2, new AtomicBoolean(true)); - testMap = rz2.copyQueuesFromRSUsingMulti(s1.getServerName().getServerName()); + testMap = rz2.claimQueues(s1.getServerName().getServerName()); rz2.close(); ReplicationZookeeper rz3 = new ReplicationZookeeper(s3, new AtomicBoolean(true)); - testMap = rz3.copyQueuesFromRSUsingMulti(s2.getServerName().getServerName()); + testMap = rz3.claimQueues(s2.getServerName().getServerName()); rz3.close(); ReplicationSource s = new ReplicationSource(); @@ -327,7 +327,7 @@ public class TestReplicationSourceManager { @Override public void run() { try { - logZnodesMap = rz.copyQueuesFromRSUsingMulti(deadRsZnode); + logZnodesMap = rz.claimQueues(deadRsZnode); rz.close(); server.abort("Done with testing", null); } catch (Exception e) {