From c4fa568b4771cc5909cb8273a0ea82688162425a Mon Sep 17 00:00:00 2001 From: zhangduo Date: Mon, 25 Dec 2017 18:49:56 +0800 Subject: [PATCH] HBASE-19599 Remove ReplicationQueuesClient, use ReplicationQueueStorage directly --- .../hbase/replication/ReplicationFactory.java | 19 +- .../replication/ReplicationPeersZKImpl.java | 20 +- .../replication/ReplicationQueueStorage.java | 26 ++- .../replication/ReplicationQueuesClient.java | 93 --------- .../ReplicationQueuesClientArguments.java | 40 ---- .../ReplicationQueuesClientZKImpl.java | 177 ------------------ .../ZKReplicationQueueStorage.java | 100 +++++++--- .../TestReplicationStateBasic.java | 142 +++++++------- .../TestReplicationStateZKImpl.java | 121 +++--------- .../TestZKReplicationQueueStorage.java | 74 ++++++++ .../cleaner/ReplicationZKNodeCleaner.java | 71 ++++--- .../ReplicationZKNodeCleanerChore.java | 5 +- .../replication/ReplicationPeerManager.java | 31 +-- .../master/ReplicationHFileCleaner.java | 109 ++++------- .../master/ReplicationLogCleaner.java | 35 +--- .../regionserver/DumpReplicationQueues.java | 78 ++++---- .../hbase/util/hbck/ReplicationChecker.java | 14 +- .../client/TestAsyncReplicationAdminApi.java | 31 ++- .../replication/TestReplicationAdmin.java | 2 + .../hbase/master/cleaner/TestLogsCleaner.java | 30 +-- .../cleaner/TestReplicationHFileCleaner.java | 59 ++---- .../cleaner/TestReplicationZKNodeCleaner.java | 12 +- .../TestReplicationSourceManagerZkImpl.java | 84 +++------ 23 files changed, 474 insertions(+), 899 deletions(-) delete mode 100644 hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java delete mode 100644 hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java delete mode 100644 hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java rename {hbase-server => hbase-replication}/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java (70%) rename {hbase-server => hbase-replication}/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java (61%) diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java index 6c83d6a27cc..358c5e7313e 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -39,20 +38,14 @@ public final class ReplicationFactory { args); } - public static ReplicationQueuesClient - getReplicationQueuesClient(ReplicationQueuesClientArguments args) throws Exception { - return (ReplicationQueuesClient) ConstructorUtils - .invokeConstructor(ReplicationQueuesClientZKImpl.class, args); - } - - public static ReplicationPeers getReplicationPeers(final ZKWatcher zk, Configuration conf, - Abortable abortable) { + public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf, + Abortable abortable) { return getReplicationPeers(zk, conf, null, abortable); } - public static ReplicationPeers getReplicationPeers(final ZKWatcher zk, Configuration conf, - final ReplicationQueuesClient queuesClient, Abortable abortable) { - return new ReplicationPeersZKImpl(zk, conf, queuesClient, abortable); + public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf, + ReplicationQueueStorage queueStorage, Abortable abortable) { + return new ReplicationPeersZKImpl(zk, conf, queueStorage, abortable); } public static ReplicationTracker getReplicationTracker(ZKWatcher zookeeper, diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index bf8b61994ae..2fee610c71f 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.exceptions.DeserializationException; @@ -80,17 +81,17 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re // Map of peer clusters keyed by their id private Map peerClusters; - private final ReplicationQueuesClient queuesClient; + private final ReplicationQueueStorage queueStorage; private Abortable abortable; private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeersZKImpl.class); - public ReplicationPeersZKImpl(final ZKWatcher zk, final Configuration conf, - final ReplicationQueuesClient queuesClient, Abortable abortable) { + public ReplicationPeersZKImpl(ZKWatcher zk, Configuration conf, + ReplicationQueueStorage queueStorage, Abortable abortable) { super(zk, conf, abortable); this.abortable = abortable; this.peerClusters = new ConcurrentHashMap<>(); - this.queuesClient = queuesClient; + this.queueStorage = queueStorage; } @Override @@ -512,17 +513,16 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } private void checkQueuesDeleted(String peerId) throws ReplicationException { - if (queuesClient == null) { + if (queueStorage == null) { return; } - try { - List replicators = queuesClient.getListOfReplicators(); + List replicators = queueStorage.getListOfReplicators(); if (replicators == null || replicators.isEmpty()) { return; } - for (String replicator : replicators) { - List queueIds = queuesClient.getAllQueues(replicator); + for (ServerName replicator : replicators) { + List queueIds = queueStorage.getAllQueues(replicator); for (String queueId : queueIds) { ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); if (queueInfo.getPeerId().equals(peerId)) { @@ -533,7 +533,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } // Check for hfile-refs queue if (-1 != ZKUtil.checkExists(zookeeper, hfileRefsZNode) - && queuesClient.getAllPeersFromHFileRefsQueue().contains(peerId)) { + && queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) { throw new IllegalArgumentException("Undeleted queue for peerId: " + peerId + ", found in hfile-refs node path " + hfileRefsZNode); } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java index 7210d9a9ca3..e774148c903 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java @@ -77,6 +77,14 @@ public interface ReplicationQueueStorage { long getWALPosition(ServerName serverName, String queueId, String fileName) throws ReplicationException; + /** + * Get a list of all WALs in the given queue on the given region server. + * @param serverName the server name of the region server that owns the queue + * @param queueId a String that identifies the queue + * @return a list of WALs + */ + List getWALsInQueue(ServerName serverName, String queueId) throws ReplicationException; + /** * Get a list of all queues for the specified region server. * @param serverName the server name of the region server that owns the set of queues @@ -108,8 +116,8 @@ public interface ReplicationQueueStorage { /** * Load all wals in all replication queues. This method guarantees to return a snapshot which - * contains all WALs in the zookeeper at the start of this call even there is concurrent queue - * failover. However, some newly created WALs during the call may not be included. + * contains all WALs at the start of this call even there is concurrent queue failover. However, + * some newly created WALs during the call may not be included. */ Set getAllWALs() throws ReplicationException; @@ -142,13 +150,6 @@ public interface ReplicationQueueStorage { */ void removeHFileRefs(String peerId, List files) throws ReplicationException; - /** - * Get the change version number of replication hfile references node. This can be used as - * optimistic locking to get a consistent snapshot of the replication queues of hfile references. - * @return change version number of hfile references node - */ - int getHFileRefsNodeChangeVersion() throws ReplicationException; - /** * Get list of all peers from hfile reference queue. * @return a list of peer ids @@ -161,4 +162,11 @@ public interface ReplicationQueueStorage { * @return a list of hfile references */ List getReplicableHFiles(String peerId) throws ReplicationException; + + /** + * Load all hfile references in all replication queues. This method guarantees to return a + * snapshot which contains all hfile references at the start of this call. However, some newly + * created hfile references during the call may not be included. + */ + Set getAllHFileRefs() throws ReplicationException; } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java deleted file mode 100644 index 7ef4004cac4..00000000000 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import java.util.List; -import java.util.Set; - -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.zookeeper.KeeperException; - -/** - * This provides an interface for clients of replication to view replication queues. These queues - * keep track of the sources(WALs/HFile references) that still need to be replicated to remote - * clusters. - */ -@InterfaceAudience.Private -public interface ReplicationQueuesClient { - - /** - * Initialize the replication queue client interface. - */ - public void init() throws ReplicationException; - - /** - * Get a list of all region servers that have outstanding replication queues. These servers could - * be alive, dead or from a previous run of the cluster. - * @return a list of server names - * @throws KeeperException zookeeper exception - */ - List getListOfReplicators() throws KeeperException; - - /** - * Get a list of all WALs in the given queue on the given region server. - * @param serverName the server name of the region server that owns the queue - * @param queueId a String that identifies the queue - * @return a list of WALs, null if this region server is dead and has no outstanding queues - * @throws KeeperException zookeeper exception - */ - List getLogsInQueue(String serverName, String queueId) throws KeeperException; - - /** - * Get a list of all queues for the specified region server. - * @param serverName the server name of the region server that owns the set of queues - * @return a list of queueIds, null if this region server is not a replicator. - */ - List getAllQueues(String serverName) throws KeeperException; - - /** - * Load all wals in all replication queues from ZK. This method guarantees to return a - * snapshot which contains all WALs in the zookeeper at the start of this call even there - * is concurrent queue failover. However, some newly created WALs during the call may - * not be included. - */ - Set getAllWALs() throws KeeperException; - - /** - * Get the change version number of replication hfile references node. This can be used as - * optimistic locking to get a consistent snapshot of the replication queues of hfile references. - * @return change version number of hfile references node - */ - int getHFileRefsNodeChangeVersion() throws KeeperException; - - /** - * Get list of all peers from hfile reference queue. - * @return a list of peer ids - * @throws KeeperException zookeeper exception - */ - List getAllPeersFromHFileRefsQueue() throws KeeperException; - - /** - * Get a list of all hfile references in the given peer. - * @param peerId a String that identifies the peer - * @return a list of hfile references, null if not found any - * @throws KeeperException zookeeper exception - */ - List getReplicableHFiles(String peerId) throws KeeperException; -} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java deleted file mode 100644 index 9b79294865d..00000000000 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.yetus.audience.InterfaceAudience; - -/** - * Wrapper around common arguments used to construct ReplicationQueuesClient. Used to construct - * various ReplicationQueuesClient Implementations with different constructor arguments by - * reflection. - */ -@InterfaceAudience.Private -public class ReplicationQueuesClientArguments extends ReplicationQueuesArguments { - public ReplicationQueuesClientArguments(Configuration conf, Abortable abort, - ZKWatcher zk) { - super(conf, abort, zk); - } - public ReplicationQueuesClientArguments(Configuration conf, Abortable abort) { - super(conf, abort); - } -} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java deleted file mode 100644 index 4dccf7ffdad..00000000000 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import java.util.List; -import java.util.Set; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.hadoop.hbase.zookeeper.ZNodePaths; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; -import org.apache.hbase.thirdparty.com.google.common.collect.Sets; - -@InterfaceAudience.Private -public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implements - ReplicationQueuesClient { - - Logger LOG = LoggerFactory.getLogger(ReplicationQueuesClientZKImpl.class); - - public ReplicationQueuesClientZKImpl(ReplicationQueuesClientArguments args) { - this(args.getZk(), args.getConf(), args.getAbortable()); - } - - public ReplicationQueuesClientZKImpl(final ZKWatcher zk, Configuration conf, - Abortable abortable) { - super(zk, conf, abortable); - } - - @Override - public void init() throws ReplicationException { - try { - if (ZKUtil.checkExists(this.zookeeper, this.queuesZNode) < 0) { - ZKUtil.createWithParents(this.zookeeper, this.queuesZNode); - } - } catch (KeeperException e) { - throw new ReplicationException("Internal error while initializing a queues client", e); - } - } - - @Override - public List getLogsInQueue(String serverName, String queueId) throws KeeperException { - String znode = ZNodePaths.joinZNode(this.queuesZNode, serverName); - znode = ZNodePaths.joinZNode(znode, queueId); - List result = null; - try { - result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); - } catch (KeeperException e) { - this.abortable.abort("Failed to get list of wals for queueId=" + queueId - + " and serverName=" + serverName, e); - throw e; - } - return result; - } - - @Override - public List getAllQueues(String serverName) throws KeeperException { - String znode = ZNodePaths.joinZNode(this.queuesZNode, serverName); - List result = null; - try { - result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); - } catch (KeeperException e) { - this.abortable.abort("Failed to get list of queues for serverName=" + serverName, e); - throw e; - } - return result; - } - - @Override - public Set getAllWALs() throws KeeperException { - /** - * Load all wals in all replication queues from ZK. This method guarantees to return a - * snapshot which contains all WALs in the zookeeper at the start of this call even there - * is concurrent queue failover. However, some newly created WALs during the call may - * not be included. - */ - for (int retry = 0; ; retry++) { - int v0 = getQueuesZNodeCversion(); - List rss = getListOfReplicators(); - if (rss == null || rss.isEmpty()) { - LOG.debug("Didn't find any region server that replicates, won't prevent any deletions."); - return ImmutableSet.of(); - } - Set wals = Sets.newHashSet(); - for (String rs : rss) { - List listOfPeers = getAllQueues(rs); - // if rs just died, this will be null - if (listOfPeers == null) { - continue; - } - for (String id : listOfPeers) { - List peersWals = getLogsInQueue(rs, id); - if (peersWals != null) { - wals.addAll(peersWals); - } - } - } - int v1 = getQueuesZNodeCversion(); - if (v0 == v1) { - return wals; - } - LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d", - v0, v1, retry)); - } - } - - public int getQueuesZNodeCversion() throws KeeperException { - try { - Stat stat = new Stat(); - ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat); - return stat.getCversion(); - } catch (KeeperException e) { - this.abortable.abort("Failed to get stat of replication rs node", e); - throw e; - } - } - - @Override - public int getHFileRefsNodeChangeVersion() throws KeeperException { - Stat stat = new Stat(); - try { - ZKUtil.getDataNoWatch(this.zookeeper, this.hfileRefsZNode, stat); - } catch (KeeperException e) { - this.abortable.abort("Failed to get stat of replication hfile references node.", e); - throw e; - } - return stat.getCversion(); - } - - @Override - public List getAllPeersFromHFileRefsQueue() throws KeeperException { - List result = null; - try { - result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.hfileRefsZNode); - } catch (KeeperException e) { - this.abortable.abort("Failed to get list of all peers in hfile references node.", e); - throw e; - } - return result; - } - - @Override - public List getReplicableHFiles(String peerId) throws KeeperException { - String znode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId); - List result = null; - try { - result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); - } catch (KeeperException e) { - this.abortable.abort("Failed to get list of hfile references for peerId=" + peerId, e); - throw e; - } - return result; - } -} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java index 7015d7f9902..0275d52f69a 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.util.CollectionUtils.nullToEmpty; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.SortedSet; @@ -49,7 +50,7 @@ import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets; +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; /** * ZK based replication queue storage. @@ -61,7 +62,7 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationQueueStorage.class); public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY = - "zookeeper.znode.replication.hfile.refs"; + "zookeeper.znode.replication.hfile.refs"; public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs"; /** @@ -256,11 +257,23 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase } } - private List getLogsInQueue0(ServerName serverName, String queueId) + private List getWALsInQueue0(ServerName serverName, String queueId) throws KeeperException { return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, getQueueNode(serverName, queueId))); } + @Override + public List getWALsInQueue(ServerName serverName, String queueId) + throws ReplicationException { + try { + return getWALsInQueue0(serverName, queueId); + } catch (KeeperException e) { + throw new ReplicationException( + "Failed to get wals in queue (serverName=" + serverName + ", queueId=" + queueId + ")", + e); + } + } + private List getAllQueues0(ServerName serverName) throws KeeperException { return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, getRsNode(serverName))); } @@ -274,7 +287,9 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase } } - private int getQueuesZNodeCversion() throws KeeperException { + // will be overridden in UTs + @VisibleForTesting + protected int getQueuesZNodeCversion() throws KeeperException { Stat stat = new Stat(); ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat); return stat.getCversion(); @@ -290,10 +305,10 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase LOG.debug("Didn't find any region server that replicates, won't prevent any deletions."); return Collections.emptySet(); } - Set wals = Sets.newHashSet(); + Set wals = new HashSet<>(); for (ServerName rs : rss) { for (String queueId : getAllQueues0(rs)) { - wals.addAll(getLogsInQueue0(rs, queueId)); + wals.addAll(getWALsInQueue0(rs, queueId)); } } int v1 = getQueuesZNodeCversion(); @@ -356,9 +371,9 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase if (debugEnabled) { LOG.debug("Adding hfile references " + pairs + " in queue " + peerNode); } - List listOfOps = - pairs.stream().map(p -> p.getSecond().getName()).map(n -> getHFileNode(peerNode, n)) - .map(f -> ZKUtilOp.createAndFailSilent(f, HConstants.EMPTY_BYTE_ARRAY)).collect(toList()); + List listOfOps = pairs.stream().map(p -> p.getSecond().getName()) + .map(n -> getHFileNode(peerNode, n)) + .map(f -> ZKUtilOp.createAndFailSilent(f, HConstants.EMPTY_BYTE_ARRAY)).collect(toList()); if (debugEnabled) { LOG.debug("The multi list size for adding hfile references in zk for node " + peerNode + " is " + listOfOps.size()); @@ -391,8 +406,37 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase } } + private List getAllPeersFromHFileRefsQueue0() throws KeeperException { + return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, hfileRefsZNode)); + } + @Override - public int getHFileRefsNodeChangeVersion() throws ReplicationException { + public List getAllPeersFromHFileRefsQueue() throws ReplicationException { + try { + return getAllPeersFromHFileRefsQueue0(); + } catch (KeeperException e) { + throw new ReplicationException("Failed to get list of all peers in hfile references node.", + e); + } + } + + private List getReplicableHFiles0(String peerId) throws KeeperException { + return nullToEmpty(ZKUtil.listChildrenNoWatch(this.zookeeper, getHFileRefsPeerNode(peerId))); + } + + @Override + public List getReplicableHFiles(String peerId) throws ReplicationException { + try { + return getReplicableHFiles0(peerId); + } catch (KeeperException e) { + throw new ReplicationException("Failed to get list of hfile references for peer " + peerId, + e); + } + } + + // will be overridden in UTs + @VisibleForTesting + protected int getHFileRefsZNodeCversion() throws ReplicationException { Stat stat = new Stat(); try { ZKUtil.getDataNoWatch(zookeeper, hfileRefsZNode, stat); @@ -403,23 +447,29 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase } @Override - public List getAllPeersFromHFileRefsQueue() throws ReplicationException { + public Set getAllHFileRefs() throws ReplicationException { try { - return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, hfileRefsZNode)); + for (int retry = 0;; retry++) { + int v0 = getHFileRefsZNodeCversion(); + List peers = getAllPeersFromHFileRefsQueue(); + if (peers.isEmpty()) { + LOG.debug("Didn't find any peers with hfile references, won't prevent any deletions."); + return Collections.emptySet(); + } + Set hfileRefs = new HashSet<>(); + for (String peer : peers) { + hfileRefs.addAll(getReplicableHFiles0(peer)); + } + int v1 = getHFileRefsZNodeCversion(); + if (v0 == v1) { + return hfileRefs; + } + LOG.debug(String.format( + "Replication hfile references node cversion changed from " + "%d to %d, retry = %d", v0, + v1, retry)); + } } catch (KeeperException e) { - throw new ReplicationException("Failed to get list of all peers in hfile references node.", - e); + throw new ReplicationException("Failed to get all hfile refs", e); } } - - @Override - public List getReplicableHFiles(String peerId) throws ReplicationException { - try { - return nullToEmpty(ZKUtil.listChildrenNoWatch(this.zookeeper, getHFileRefsPeerNode(peerId))); - } catch (KeeperException e) { - throw new ReplicationException("Failed to get list of hfile references for peer " + peerId, - e); - } - } - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java similarity index 70% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java rename to hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index 29c093000e9..6fe869c9417 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -15,20 +15,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.replication; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.zookeeper.KeeperException; -import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,10 +45,10 @@ public abstract class TestReplicationStateBasic { protected ReplicationQueues rq1; protected ReplicationQueues rq2; protected ReplicationQueues rq3; - protected ReplicationQueuesClient rqc; - protected String server1 = ServerName.valueOf("hostname1.example.org", 1234, -1L).toString(); - protected String server2 = ServerName.valueOf("hostname2.example.org", 1234, -1L).toString(); - protected String server3 = ServerName.valueOf("hostname3.example.org", 1234, -1L).toString(); + protected ReplicationQueueStorage rqs; + protected ServerName server1 = ServerName.valueOf("hostname1.example.org", 1234, 12345); + protected ServerName server2 = ServerName.valueOf("hostname2.example.org", 1234, 12345); + protected ServerName server3 = ServerName.valueOf("hostname3.example.org", 1234, 12345); protected ReplicationPeers rp; protected static final String ID_ONE = "1"; protected static final String ID_TWO = "2"; @@ -62,25 +65,19 @@ public abstract class TestReplicationStateBasic { private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class); - @Before - public void setUp() { - zkTimeoutCount = 0; - } - @Test - public void testReplicationQueuesClient() throws ReplicationException, KeeperException { - rqc.init(); + public void testReplicationQueueStorage() throws ReplicationException { // Test methods with empty state - assertEquals(0, rqc.getListOfReplicators().size()); - assertNull(rqc.getLogsInQueue(server1, "qId1")); - assertNull(rqc.getAllQueues(server1)); + assertEquals(0, rqs.getListOfReplicators().size()); + assertTrue(rqs.getWALsInQueue(server1, "qId1").isEmpty()); + assertTrue(rqs.getAllQueues(server1).isEmpty()); /* * Set up data Two replicators: -- server1: three queues with 0, 1 and 2 log files each -- * server2: zero queues */ - rq1.init(server1); - rq2.init(server2); + rq1.init(server1.getServerName()); + rq2.init(server2.getServerName()); rq1.addLog("qId1", "trash"); rq1.removeLog("qId1", "trash"); rq1.addLog("qId2", "filename1"); @@ -89,20 +86,20 @@ public abstract class TestReplicationStateBasic { rq2.addLog("trash", "trash"); rq2.removeQueue("trash"); - List reps = rqc.getListOfReplicators(); + List reps = rqs.getListOfReplicators(); assertEquals(2, reps.size()); - assertTrue(server1, reps.contains(server1)); - assertTrue(server2, reps.contains(server2)); + assertTrue(server1.getServerName(), reps.contains(server1)); + assertTrue(server2.getServerName(), reps.contains(server2)); - assertNull(rqc.getLogsInQueue("bogus", "bogus")); - assertNull(rqc.getLogsInQueue(server1, "bogus")); - assertEquals(0, rqc.getLogsInQueue(server1, "qId1").size()); - assertEquals(1, rqc.getLogsInQueue(server1, "qId2").size()); - assertEquals("filename1", rqc.getLogsInQueue(server1, "qId2").get(0)); + assertTrue(rqs.getWALsInQueue(ServerName.valueOf("bogus", 12345, 12345), "bogus").isEmpty()); + assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty()); + assertEquals(0, rqs.getWALsInQueue(server1, "qId1").size()); + assertEquals(1, rqs.getWALsInQueue(server1, "qId2").size()); + assertEquals("filename1", rqs.getWALsInQueue(server1, "qId2").get(0)); - assertNull(rqc.getAllQueues("bogus")); - assertEquals(0, rqc.getAllQueues(server2).size()); - List list = rqc.getAllQueues(server1); + assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 12345, -1L)).isEmpty()); + assertEquals(0, rqs.getAllQueues(server2).size()); + List list = rqs.getAllQueues(server1); assertEquals(3, list.size()); assertTrue(list.contains("qId2")); assertTrue(list.contains("qId3")); @@ -110,10 +107,10 @@ public abstract class TestReplicationStateBasic { @Test public void testReplicationQueues() throws ReplicationException { - rq1.init(server1); - rq2.init(server2); - rq3.init(server3); - //Initialize ReplicationPeer so we can add peers (we don't transfer lone queues) + rq1.init(server1.getServerName()); + rq2.init(server2.getServerName()); + rq3.init(server3.getServerName()); + // Initialize ReplicationPeer so we can add peers (we don't transfer lone queues) rp.init(); // 3 replicators should exist @@ -124,8 +121,7 @@ public abstract class TestReplicationStateBasic { assertEquals(0, rq1.getAllQueues().size()); assertEquals(0, rq1.getLogPosition("bogus", "bogus")); assertNull(rq1.getLogsInQueue("bogus")); - assertNull(rq1.getUnClaimedQueueIds( - ServerName.valueOf("bogus", 1234, -1L).toString())); + assertNull(rq1.getUnClaimedQueueIds(ServerName.valueOf("bogus", 1234, -1L).toString())); rq1.setLogPosition("bogus", "bogus", 5L); @@ -144,21 +140,21 @@ public abstract class TestReplicationStateBasic { assertEquals(1, rq2.getAllQueues().size()); assertEquals(5, rq3.getAllQueues().size()); - assertEquals(0, rq3.getUnClaimedQueueIds(server1).size()); - rq3.removeReplicatorIfQueueIsEmpty(server1); + assertEquals(0, rq3.getUnClaimedQueueIds(server1.getServerName()).size()); + rq3.removeReplicatorIfQueueIsEmpty(server1.getServerName()); assertEquals(2, rq3.getListOfReplicators().size()); - List queues = rq2.getUnClaimedQueueIds(server3); + List queues = rq2.getUnClaimedQueueIds(server3.getServerName()); assertEquals(5, queues.size()); - for(String queue: queues) { - rq2.claimQueue(server3, queue); + for (String queue : queues) { + rq2.claimQueue(server3.getServerName(), queue); } - rq2.removeReplicatorIfQueueIsEmpty(server3); + rq2.removeReplicatorIfQueueIsEmpty(server3.getServerName()); assertEquals(1, rq2.getListOfReplicators().size()); // Try to claim our own queues - assertNull(rq2.getUnClaimedQueueIds(server2)); - rq2.removeReplicatorIfQueueIsEmpty(server2); + assertNull(rq2.getUnClaimedQueueIds(server2.getServerName())); + rq2.removeReplicatorIfQueueIsEmpty(server2.getServerName()); assertEquals(6, rq2.getAllQueues().size()); @@ -174,8 +170,8 @@ public abstract class TestReplicationStateBasic { try { rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:hbase")); - fail("Should throw an IllegalArgumentException because " - + "zookeeper.znode.parent is missing leading '/'."); + fail("Should throw an IllegalArgumentException because " + + "zookeeper.znode.parent is missing leading '/'."); } catch (IllegalArgumentException e) { // Expected. } @@ -191,8 +187,8 @@ public abstract class TestReplicationStateBasic { try { rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey("hostname1.example.org::/hbase")); - fail("Should throw an IllegalArgumentException because " - + "hbase.zookeeper.property.clientPort is missing."); + fail("Should throw an IllegalArgumentException because " + + "hbase.zookeeper.property.clientPort is missing."); } catch (IllegalArgumentException e) { // Expected. } @@ -201,38 +197,36 @@ public abstract class TestReplicationStateBasic { @Test public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException { rp.init(); - rq1.init(server1); - rqc.init(); + rq1.init(server1.getServerName()); List> files1 = new ArrayList<>(3); files1.add(new Pair<>(null, new Path("file_1"))); files1.add(new Pair<>(null, new Path("file_2"))); files1.add(new Pair<>(null, new Path("file_3"))); - assertNull(rqc.getReplicableHFiles(ID_ONE)); - assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size()); + assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty()); + assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size()); rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); rq1.addPeerToHFileRefs(ID_ONE); rq1.addHFileRefs(ID_ONE, files1); - assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size()); - assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size()); + assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size()); + assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size()); List hfiles2 = new ArrayList<>(files1.size()); for (Pair p : files1) { hfiles2.add(p.getSecond().getName()); } String removedString = hfiles2.remove(0); rq1.removeHFileRefs(ID_ONE, hfiles2); - assertEquals(1, rqc.getReplicableHFiles(ID_ONE).size()); + assertEquals(1, rqs.getReplicableHFiles(ID_ONE).size()); hfiles2 = new ArrayList<>(1); hfiles2.add(removedString); rq1.removeHFileRefs(ID_ONE, hfiles2); - assertEquals(0, rqc.getReplicableHFiles(ID_ONE).size()); + assertEquals(0, rqs.getReplicableHFiles(ID_ONE).size()); rp.unregisterPeer(ID_ONE); } @Test public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException { - rq1.init(server1); - rqc.init(); + rq1.init(server1.getServerName()); rp.init(); rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); @@ -246,20 +240,20 @@ public abstract class TestReplicationStateBasic { files1.add(new Pair<>(null, new Path("file_3"))); rq1.addHFileRefs(ID_ONE, files1); rq1.addHFileRefs(ID_TWO, files1); - assertEquals(2, rqc.getAllPeersFromHFileRefsQueue().size()); - assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size()); - assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size()); + assertEquals(2, rqs.getAllPeersFromHFileRefsQueue().size()); + assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size()); + assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size()); rp.unregisterPeer(ID_ONE); rq1.removePeerFromHFileRefs(ID_ONE); - assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size()); - assertNull(rqc.getReplicableHFiles(ID_ONE)); - assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size()); + assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size()); + assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty()); + assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size()); rp.unregisterPeer(ID_TWO); rq1.removePeerFromHFileRefs(ID_TWO); - assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size()); - assertNull(rqc.getReplicableHFiles(ID_TWO)); + assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size()); + assertTrue(rqs.getReplicableHFiles(ID_TWO).isEmpty()); } @Test @@ -316,8 +310,14 @@ public abstract class TestReplicationStateBasic { assertNumberOfPeers(2); assertTrue(rp.getStatusOfPeer(ID_ONE)); rp.disablePeer(ID_ONE); + // now we do not rely on zk watcher to trigger the state change so we need to trigger it + // manually... + assertEquals(PeerState.DISABLED, rp.getConnectedPeer(ID_ONE).getPeerState(true)); assertConnectedPeerStatus(false, ID_ONE); rp.enablePeer(ID_ONE); + // now we do not rely on zk watcher to trigger the state change so we need to trigger it + // manually... + assertEquals(PeerState.ENABLED, rp.getConnectedPeer(ID_ONE).getPeerState(true)); assertConnectedPeerStatus(true, ID_ONE); // Disconnect peer @@ -340,8 +340,8 @@ public abstract class TestReplicationStateBasic { return; } if (zkTimeoutCount < ZK_MAX_COUNT) { - LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status - + ", sleeping and trying again."); + LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status + + ", sleeping and trying again."); Thread.sleep(ZK_SLEEP_INTERVAL); } else { fail("Timed out waiting for ConnectedPeerStatus to be " + status); @@ -370,9 +370,9 @@ public abstract class TestReplicationStateBasic { for (int j = 0; j < i; j++) { rq3.addLog("qId" + i, "filename" + j); } - //Add peers for the corresponding queues so they are not orphans - rp.registerPeer("qId" + i, new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus" + i)); + // Add peers for the corresponding queues so they are not orphans + rp.registerPeer("qId" + i, + new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus" + i)); } } } - diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java similarity index 61% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java rename to hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java index 231d655e2c2..5fe7c55692d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java @@ -24,19 +24,12 @@ import static org.junit.Assert.fail; import java.io.IOException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hbase.ChoreService; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.ClusterId; -import org.apache.hadoop.hbase.CoordinatedStateManager; -import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HBaseZKTestingUtility; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.ClusterConnection; -import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; -import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -52,24 +45,24 @@ import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@Category({ReplicationTests.class, MediumTests.class}) +@Category({ ReplicationTests.class, MediumTests.class }) public class TestReplicationStateZKImpl extends TestReplicationStateBasic { private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateZKImpl.class); private static Configuration conf; - private static HBaseTestingUtility utility; + private static HBaseZKTestingUtility utility; private static ZKWatcher zkw; private static String replicationZNode; private ReplicationQueuesZKImpl rqZK; @BeforeClass public static void setUpBeforeClass() throws Exception { - utility = new HBaseTestingUtility(); + utility = new HBaseZKTestingUtility(); utility.startMiniZKCluster(); conf = utility.getConfiguration(); conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); - zkw = HBaseTestingUtility.getZooKeeperWatcher(utility); + zkw = utility.getZooKeeperWatcher(); String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); replicationZNode = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, replicationZNodeName); KEY_ONE = initPeerClusterState("/hbase1"); @@ -89,18 +82,17 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic { } @Before - @Override public void setUp() { - super.setUp(); - DummyServer ds1 = new DummyServer(server1); - DummyServer ds2 = new DummyServer(server2); - DummyServer ds3 = new DummyServer(server3); + zkTimeoutCount = 0; + WarnOnlyAbortable abortable = new WarnOnlyAbortable(); try { - rq1 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds1, zkw)); - rq2 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds2, zkw)); - rq3 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds3, zkw)); - rqc = ReplicationFactory.getReplicationQueuesClient( - new ReplicationQueuesClientArguments(conf, ds1, zkw)); + rq1 = ReplicationFactory + .getReplicationQueues(new ReplicationQueuesArguments(conf, abortable, zkw)); + rq2 = ReplicationFactory + .getReplicationQueues(new ReplicationQueuesArguments(conf, abortable, zkw)); + rq3 = ReplicationFactory + .getReplicationQueues(new ReplicationQueuesArguments(conf, abortable, zkw)); + rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf); } catch (Exception e) { // This should not occur, because getReplicationQueues() only throws for // TableBasedReplicationQueuesImpl @@ -108,7 +100,7 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic { } rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw); OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf); - rqZK = new ReplicationQueuesZKImpl(zkw, conf, ds1); + rqZK = new ReplicationQueuesZKImpl(zkw, conf, abortable); } @After @@ -138,90 +130,19 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic { assertTrue(rqZK.isPeerPath(peerPath)); } - static class DummyServer implements Server { - private String serverName; - private boolean isAborted = false; - private boolean isStopped = false; - - public DummyServer(String serverName) { - this.serverName = serverName; - } - - @Override - public Configuration getConfiguration() { - return conf; - } - - @Override - public ZKWatcher getZooKeeper() { - return zkw; - } - - @Override - public CoordinatedStateManager getCoordinatedStateManager() { - return null; - } - - @Override - public ClusterConnection getConnection() { - return null; - } - - @Override - public MetaTableLocator getMetaTableLocator() { - return null; - } - - @Override - public ServerName getServerName() { - return ServerName.valueOf(this.serverName); - } + private static class WarnOnlyAbortable implements Abortable { @Override public void abort(String why, Throwable e) { - LOG.info("Aborting " + serverName); - this.isAborted = true; + LOG.warn("TestReplicationStateZKImpl received abort, ignoring. Reason: " + why); + if (LOG.isDebugEnabled()) { + LOG.debug(e.toString(), e); + } } @Override public boolean isAborted() { - return this.isAborted; - } - - @Override - public void stop(String why) { - this.isStopped = true; - } - - @Override - public boolean isStopped() { - return this.isStopped; - } - - @Override - public ChoreService getChoreService() { - return null; - } - - @Override - public ClusterConnection getClusterConnection() { - // TODO Auto-generated method stub - return null; - } - - @Override - public FileSystem getFileSystem() { - return null; - } - - @Override - public boolean isStopping() { return false; } - - @Override - public Connection createConnection(Configuration conf) throws IOException { - return null; - } } } diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java index d5bba0d3d3f..786730f8287 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java @@ -23,15 +23,18 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.Set; import java.util.SortedSet; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseZKTestingUtility; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Pair; +import org.apache.zookeeper.KeeperException; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -115,6 +118,15 @@ public class TestZKReplicationQueueStorage { assertEquals(2, queueIds.size()); assertThat(queueIds, hasItems("1", "2")); + List wals1 = STORAGE.getWALsInQueue(serverName1, queue1); + List wals2 = STORAGE.getWALsInQueue(serverName1, queue2); + assertEquals(10, wals1.size()); + assertEquals(10, wals1.size()); + for (int i = 0; i < 10; i++) { + assertThat(wals1, hasItems(getFileName("file1", i))); + assertThat(wals2, hasItems(getFileName("file2", i))); + } + for (int i = 0; i < 10; i++) { assertEquals(0, STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i))); assertEquals(0, STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i))); @@ -157,10 +169,20 @@ public class TestZKReplicationQueueStorage { queueIds = STORAGE.getAllQueues(serverName1); assertEquals(1, queueIds.size()); assertThat(queueIds, hasItems("2")); + wals2 = STORAGE.getWALsInQueue(serverName1, queue2); + assertEquals(5, wals2.size()); + for (i = 0; i < 10; i += 2) { + assertThat(wals2, hasItems(getFileName("file2", i))); + } queueIds = STORAGE.getAllQueues(serverName2); assertEquals(1, queueIds.size()); assertThat(queueIds, hasItems(peer1.getFirst())); + wals1 = STORAGE.getWALsInQueue(serverName2, peer1.getFirst()); + assertEquals(5, wals1.size()); + for (i = 1; i < 10; i += 2) { + assertThat(wals1, hasItems(getFileName("file1", i))); + } Set allWals = STORAGE.getAllWALs(); assertEquals(10, allWals.size()); @@ -168,4 +190,56 @@ public class TestZKReplicationQueueStorage { assertThat(allWals, hasItems(i % 2 == 0 ? getFileName("file2", i) : getFileName("file1", i))); } } + + // For HBASE-12865 + @Test + public void testClaimQueueChangeCversion() throws ReplicationException, KeeperException { + ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000); + STORAGE.addWAL(serverName1, "1", "file"); + + int v0 = STORAGE.getQueuesZNodeCversion(); + ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001); + STORAGE.claimQueue(serverName1, "1", serverName2); + int v1 = STORAGE.getQueuesZNodeCversion(); + // cversion should increase by 1 since a child node is deleted + assertEquals(1, v1 - v0); + } + + private ZKReplicationQueueStorage createWithUnstableCversion() throws IOException { + return new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()) { + + private int called = 0; + + @Override + protected int getQueuesZNodeCversion() throws KeeperException { + if (called < 4) { + called++; + } + return called; + } + }; + } + + @Test + public void testGetAllWALsCversionChange() throws IOException, ReplicationException { + ZKReplicationQueueStorage storage = createWithUnstableCversion(); + storage.addWAL(getServerName(0), "1", "file"); + // This should return eventually when cversion stabilizes + Set allWals = storage.getAllWALs(); + assertEquals(1, allWals.size()); + assertThat(allWals, hasItems("file")); + } + + // For HBASE-14621 + @Test + public void testGetAllHFileRefsCversionChange() throws IOException, ReplicationException { + ZKReplicationQueueStorage storage = createWithUnstableCversion(); + storage.addPeerToHFileRefs("1"); + Path p = new Path("/test"); + storage.addHFileRefs("1", Arrays.asList(Pair.newPair(p, p))); + // This should return eventually when cversion stabilizes + Set allHFileRefs = storage.getAllHFileRefs(); + assertEquals(1, allHFileRefs.size()); + assertThat(allHFileRefs, hasItems("test")); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java index 97deab51ed0..af413991cbe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java @@ -23,21 +23,23 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.Map.Entry; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,23 +50,19 @@ import org.slf4j.LoggerFactory; @InterfaceAudience.Private public class ReplicationZKNodeCleaner { private static final Logger LOG = LoggerFactory.getLogger(ReplicationZKNodeCleaner.class); - private final ZKWatcher zkw; - private final ReplicationQueuesClient queuesClient; + private final ReplicationQueueStorage queueStorage; private final ReplicationPeers replicationPeers; private final ReplicationQueueDeletor queueDeletor; public ReplicationZKNodeCleaner(Configuration conf, ZKWatcher zkw, Abortable abortable) throws IOException { try { - this.zkw = zkw; - this.queuesClient = ReplicationFactory - .getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf, abortable, zkw)); - this.queuesClient.init(); - this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.queuesClient, - abortable); + this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf); + this.replicationPeers = + ReplicationFactory.getReplicationPeers(zkw, conf, this.queueStorage, abortable); this.replicationPeers.init(); this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, abortable); - } catch (Exception e) { + } catch (ReplicationException e) { throw new IOException("failed to construct ReplicationZKNodeCleaner", e); } } @@ -73,16 +71,16 @@ public class ReplicationZKNodeCleaner { * @return undeletedQueues replicator with its queueIds for removed peers * @throws IOException */ - public Map> getUnDeletedQueues() throws IOException { - Map> undeletedQueues = new HashMap<>(); + public Map> getUnDeletedQueues() throws IOException { + Map> undeletedQueues = new HashMap<>(); Set peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds()); try { - List replicators = this.queuesClient.getListOfReplicators(); + List replicators = this.queueStorage.getListOfReplicators(); if (replicators == null || replicators.isEmpty()) { return undeletedQueues; } - for (String replicator : replicators) { - List queueIds = this.queuesClient.getAllQueues(replicator); + for (ServerName replicator : replicators) { + List queueIds = this.queueStorage.getAllQueues(replicator); for (String queueId : queueIds) { ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); if (!peerIds.contains(queueInfo.getPeerId())) { @@ -96,7 +94,7 @@ public class ReplicationZKNodeCleaner { } } } - } catch (KeeperException ke) { + } catch (ReplicationException ke) { throw new IOException("Failed to get the replication queues of all replicators", ke); } return undeletedQueues; @@ -105,25 +103,21 @@ public class ReplicationZKNodeCleaner { /** * @return undeletedHFileRefsQueue replicator with its undeleted queueIds for removed peers in * hfile-refs queue - * @throws IOException */ public Set getUnDeletedHFileRefsQueues() throws IOException { Set undeletedHFileRefsQueue = new HashSet<>(); Set peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds()); String hfileRefsZNode = queueDeletor.getHfileRefsZNode(); try { - if (-1 == ZKUtil.checkExists(zkw, hfileRefsZNode)) { - return null; - } - List listOfPeers = this.queuesClient.getAllPeersFromHFileRefsQueue(); + List listOfPeers = this.queueStorage.getAllPeersFromHFileRefsQueue(); Set peers = new HashSet<>(listOfPeers); peers.removeAll(peerIds); if (!peers.isEmpty()) { undeletedHFileRefsQueue.addAll(peers); } - } catch (KeeperException e) { - throw new IOException("Failed to get list of all peers from hfile-refs znode " - + hfileRefsZNode, e); + } catch (ReplicationException e) { + throw new IOException( + "Failed to get list of all peers from hfile-refs znode " + hfileRefsZNode, e); } return undeletedHFileRefsQueue; } @@ -137,21 +131,20 @@ public class ReplicationZKNodeCleaner { /** * @param replicator The regionserver which has undeleted queue * @param queueId The undeleted queue id - * @throws IOException */ - public void removeQueue(final String replicator, final String queueId) throws IOException { - String queueZnodePath = - ZNodePaths.joinZNode(ZNodePaths.joinZNode(this.queuesZNode, replicator), queueId); + public void removeQueue(final ServerName replicator, final String queueId) throws IOException { + String queueZnodePath = ZNodePaths + .joinZNode(ZNodePaths.joinZNode(this.queuesZNode, replicator.getServerName()), queueId); try { ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); if (!replicationPeers.getAllPeerIds().contains(queueInfo.getPeerId())) { ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath); - LOG.info("Successfully removed replication queue, replicator: " + replicator - + ", queueId: " + queueId); + LOG.info("Successfully removed replication queue, replicator: " + replicator + + ", queueId: " + queueId); } } catch (KeeperException e) { - throw new IOException("Failed to delete queue, replicator: " + replicator + ", queueId: " - + queueId); + throw new IOException( + "Failed to delete queue, replicator: " + replicator + ", queueId: " + queueId); } } @@ -183,9 +176,9 @@ public class ReplicationZKNodeCleaner { * @param undeletedQueues replicator with its queueIds for removed peers * @throws IOException */ - public void removeQueues(final Map> undeletedQueues) throws IOException { - for (Entry> replicatorAndQueueIds : undeletedQueues.entrySet()) { - String replicator = replicatorAndQueueIds.getKey(); + public void removeQueues(final Map> undeletedQueues) throws IOException { + for (Entry> replicatorAndQueueIds : undeletedQueues.entrySet()) { + ServerName replicator = replicatorAndQueueIds.getKey(); for (String queueId : replicatorAndQueueIds.getValue()) { queueDeletor.removeQueue(replicator, queueId); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java index 8d5df9bfd2d..19ca8041e12 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.master.cleaner; import java.io.IOException; @@ -23,6 +22,7 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Stoppable; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -45,11 +45,10 @@ public class ReplicationZKNodeCleanerChore extends ScheduledChore { @Override protected void chore() { try { - Map> undeletedQueues = cleaner.getUnDeletedQueues(); + Map> undeletedQueues = cleaner.getUnDeletedQueues(); cleaner.removeQueues(undeletedQueues); } catch (IOException e) { LOG.warn("Failed to clean replication zk node", e); } } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 5abd874cba7..84abfeb643b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; @@ -147,28 +148,13 @@ public final class ReplicationPeerManager { } } - private ReplicationPeerConfig copy(ReplicationPeerConfig peerConfig) { - ReplicationPeerConfig copiedPeerConfig = new ReplicationPeerConfig(); - copiedPeerConfig.getConfiguration().putAll(peerConfig.getConfiguration()); - copiedPeerConfig.getPeerData().putAll(peerConfig.getPeerData()); - copiedPeerConfig.setTableCFsMap(peerConfig.getTableCFsMap()); - copiedPeerConfig.setNamespaces(peerConfig.getNamespaces()); - copiedPeerConfig.setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap()); - copiedPeerConfig.setExcludeNamespaces(peerConfig.getExcludeNamespaces()); - copiedPeerConfig.setBandwidth(peerConfig.getBandwidth()); - copiedPeerConfig.setReplicateAllUserTables(peerConfig.replicateAllUserTables()); - copiedPeerConfig.setClusterKey(peerConfig.getClusterKey()); - copiedPeerConfig.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl()); - return copiedPeerConfig; - } - public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) throws ReplicationException { if (peers.containsKey(peerId)) { // this should be a retry, just return return; } - ReplicationPeerConfig copiedPeerConfig = copy(peerConfig); + ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build(); peerStorage.addPeer(peerId, copiedPeerConfig, enabled); peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig)); } @@ -205,13 +191,14 @@ public final class ReplicationPeerManager { // the checking rules are too complicated here so we give up checking whether this is a retry. ReplicationPeerDescription desc = peers.get(peerId); ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig(); - ReplicationPeerConfig newPeerConfig = copy(peerConfig); + ReplicationPeerConfigBuilder newPeerConfigBuilder = + ReplicationPeerConfig.newBuilder(peerConfig); // we need to use the new conf to overwrite the old one. - newPeerConfig.getConfiguration().putAll(oldPeerConfig.getConfiguration()); - newPeerConfig.getConfiguration().putAll(peerConfig.getConfiguration()); - newPeerConfig.getPeerData().putAll(oldPeerConfig.getPeerData()); - newPeerConfig.getPeerData().putAll(peerConfig.getPeerData()); - + newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration()); + newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration()); + newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration()); + newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration()); + ReplicationPeerConfig newPeerConfig = newPeerConfigBuilder.build(); peerStorage.updatePeerConfig(peerId, newPeerConfig); peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig)); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java index 5f1df44c042..7b6216944d2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java @@ -1,42 +1,43 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable - * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" - * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License - * for the specific language governing permissions and limitations under the License. +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.hadoop.hbase.replication.master; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hbase.thirdparty.com.google.common.base.Predicate; -import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; -import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; -import org.apache.hbase.thirdparty.com.google.common.collect.Sets; - import java.io.IOException; import java.util.Collections; -import java.util.List; import java.util.Set; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; -import org.apache.hadoop.hbase.replication.ReplicationFactory; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.zookeeper.KeeperException; +import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.base.Predicate; +import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; + /** * Implementation of a file cleaner that checks if a hfile is still scheduled for replication before * deleting it from hfile archive directory. @@ -45,7 +46,7 @@ import org.slf4j.LoggerFactory; public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate { private static final Logger LOG = LoggerFactory.getLogger(ReplicationHFileCleaner.class); private ZKWatcher zkw; - private ReplicationQueuesClient rqc; + private ReplicationQueueStorage rqs; private boolean stopped = false; @Override @@ -60,8 +61,8 @@ public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate { try { // The concurrently created new hfile entries in ZK may not be included in the return list, // but they won't be deleted because they're not in the checking set. - hfileRefs = loadHFileRefsFromPeers(); - } catch (KeeperException e) { + hfileRefs = rqs.getAllHFileRefs(); + } catch (ReplicationException e) { LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable files"); return Collections.emptyList(); } @@ -82,37 +83,6 @@ public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate { }); } - /** - * Load all hfile references in all replication queues from ZK. This method guarantees to return a - * snapshot which contains all hfile references in the zookeeper at the start of this call. - * However, some newly created hfile references during the call may not be included. - */ - private Set loadHFileRefsFromPeers() throws KeeperException { - Set hfileRefs = Sets.newHashSet(); - List listOfPeers; - for (int retry = 0;; retry++) { - int v0 = rqc.getHFileRefsNodeChangeVersion(); - hfileRefs.clear(); - listOfPeers = rqc.getAllPeersFromHFileRefsQueue(); - if (listOfPeers == null) { - LOG.debug("Didn't find any peers with hfile references, won't prevent any deletions."); - return ImmutableSet.of(); - } - for (String id : listOfPeers) { - List peerHFileRefs = rqc.getReplicableHFiles(id); - if (peerHFileRefs != null) { - hfileRefs.addAll(peerHFileRefs); - } - } - int v1 = rqc.getHFileRefsNodeChangeVersion(); - if (v0 == v1) { - return hfileRefs; - } - LOG.debug(String.format("Replication hfile references node cversion changed from " - + "%d to %d, retry = %d", v0, v1, retry)); - } - } - @Override public void setConf(Configuration config) { // If either replication or replication of bulk load hfiles is disabled, keep all members null @@ -139,17 +109,15 @@ public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate { public void setConf(Configuration conf, ZKWatcher zk) { super.setConf(conf); try { - initReplicationQueuesClient(conf, zk); + initReplicationQueueStorage(conf, zk); } catch (Exception e) { LOG.error("Error while configuring " + this.getClass().getName(), e); } } - private void initReplicationQueuesClient(Configuration conf, ZKWatcher zk) - throws Exception { + private void initReplicationQueueStorage(Configuration conf, ZKWatcher zk) { this.zkw = zk; - this.rqc = ReplicationFactory.getReplicationQueuesClient(new ReplicationQueuesClientArguments( - conf, new WarnOnlyAbortable(), zkw)); + this.rqs = ReplicationStorageFactory.getReplicationQueueStorage(zk, conf); } @Override @@ -179,25 +147,12 @@ public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate { } try { - hfileRefsFromQueue = loadHFileRefsFromPeers(); - } catch (KeeperException e) { + hfileRefsFromQueue = rqs.getAllHFileRefs(); + } catch (ReplicationException e) { LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable " + "file for " + fStat.getPath()); return false; } return !hfileRefsFromQueue.contains(fStat.getPath().getName()); } - - private static class WarnOnlyAbortable implements Abortable { - @Override - public void abort(String why, Throwable e) { - LOG.warn("ReplicationHFileCleaner received abort, ignoring. Reason: " + why); - LOG.debug(e.toString(), e); - } - - @Override - public boolean isAborted() { - return false; - } - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java index 86f98da2731..15aa21acfb0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -24,16 +23,14 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; -import org.apache.hadoop.hbase.replication.ReplicationFactory; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +46,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; public class ReplicationLogCleaner extends BaseLogCleanerDelegate { private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogCleaner.class); private ZKWatcher zkw; - private ReplicationQueuesClient replicationQueues; + private ReplicationQueueStorage queueStorage; private boolean stopped = false; private Set wals; private long readZKTimestamp = 0; @@ -60,8 +57,8 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { try { // The concurrently created new WALs may not be included in the return list, // but they won't be deleted because they're not in the checking set. - wals = replicationQueues.getAllWALs(); - } catch (KeeperException e) { + wals = queueStorage.getAllWALs(); + } catch (ReplicationException e) { LOG.warn("Failed to read zookeeper, skipping checking deletable files"); wals = null; } @@ -110,9 +107,7 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { super.setConf(conf); try { this.zkw = zk; - this.replicationQueues = ReplicationFactory.getReplicationQueuesClient( - new ReplicationQueuesClientArguments(conf, new WarnOnlyAbortable(), zkw)); - this.replicationQueues.init(); + this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zk, conf); } catch (Exception e) { LOG.error("Error while configuring " + this.getClass().getName(), e); } @@ -132,18 +127,4 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { public boolean isStopped() { return this.stopped; } - - private static class WarnOnlyAbortable implements Abortable { - - @Override - public void abort(String why, Throwable e) { - LOG.warn("ReplicationLogCleaner received abort, ignoring. Reason: " + why); - LOG.debug(e.toString(), e); - } - - @Override - public boolean isAborted() { - return false; - } - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java index 6e27a21226b..d8f9625e26d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java @@ -21,13 +21,13 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.stream.Collectors; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileStatus; @@ -48,17 +48,18 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationQueues; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; +import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AtomicLongMap; /** @@ -303,57 +304,53 @@ public class DumpReplicationQueues extends Configured implements Tool { } public String dumpQueues(ClusterConnection connection, ZKWatcher zkw, Set peerIds, - boolean hdfs) throws Exception { - ReplicationQueuesClient queuesClient; + boolean hdfs) throws Exception { + ReplicationQueueStorage queueStorage; ReplicationPeers replicationPeers; ReplicationQueues replicationQueues; ReplicationTracker replicationTracker; - ReplicationQueuesClientArguments replicationArgs = - new ReplicationQueuesClientArguments(getConf(), new WarnOnlyAbortable(), zkw); + ReplicationQueuesArguments replicationArgs = + new ReplicationQueuesArguments(getConf(), new WarnOnlyAbortable(), zkw); StringBuilder sb = new StringBuilder(); - queuesClient = ReplicationFactory.getReplicationQueuesClient(replicationArgs); - queuesClient.init(); + queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf()); replicationQueues = ReplicationFactory.getReplicationQueues(replicationArgs); - replicationPeers = ReplicationFactory.getReplicationPeers(zkw, getConf(), queuesClient, connection); + replicationPeers = + ReplicationFactory.getReplicationPeers(zkw, getConf(), queueStorage, connection); replicationTracker = ReplicationFactory.getReplicationTracker(zkw, replicationPeers, getConf(), new WarnOnlyAbortable(), new WarnOnlyStoppable()); - List liveRegionServers = replicationTracker.getListOfRegionServers(); + Set liveRegionServers = new HashSet<>(replicationTracker.getListOfRegionServers()); // Loops each peer on each RS and dumps the queues - try { - List regionservers = queuesClient.getListOfReplicators(); - if (regionservers == null || regionservers.isEmpty()) { - return sb.toString(); + List regionservers = queueStorage.getListOfReplicators(); + if (regionservers == null || regionservers.isEmpty()) { + return sb.toString(); + } + for (ServerName regionserver : regionservers) { + List queueIds = queueStorage.getAllQueues(regionserver); + replicationQueues.init(regionserver.getServerName()); + if (!liveRegionServers.contains(regionserver.getServerName())) { + deadRegionServers.add(regionserver.getServerName()); } - for (String regionserver : regionservers) { - List queueIds = queuesClient.getAllQueues(regionserver); - replicationQueues.init(regionserver); - if (!liveRegionServers.contains(regionserver)) { - deadRegionServers.add(regionserver); - } - for (String queueId : queueIds) { - ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); - List wals = queuesClient.getLogsInQueue(regionserver, queueId); - if (!peerIds.contains(queueInfo.getPeerId())) { - deletedQueues.add(regionserver + "/" + queueId); - sb.append(formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, true, - hdfs)); - } else { - sb.append(formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, false, - hdfs)); - } + for (String queueId : queueIds) { + ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); + List wals = queueStorage.getWALsInQueue(regionserver, queueId); + if (!peerIds.contains(queueInfo.getPeerId())) { + deletedQueues.add(regionserver + "/" + queueId); + sb.append( + formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, true, hdfs)); + } else { + sb.append( + formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, false, hdfs)); } } - } catch (KeeperException ke) { - throw new IOException(ke); } return sb.toString(); } - private String formatQueue(String regionserver, ReplicationQueues replicationQueues, ReplicationQueueInfo queueInfo, - String queueId, List wals, boolean isDeleted, boolean hdfs) throws Exception { - + private String formatQueue(ServerName regionserver, ReplicationQueues replicationQueues, + ReplicationQueueInfo queueInfo, String queueId, List wals, boolean isDeleted, + boolean hdfs) throws Exception { StringBuilder sb = new StringBuilder(); List deadServers; @@ -389,13 +386,14 @@ public class DumpReplicationQueues extends Configured implements Tool { /** * return total size in bytes from a list of WALs */ - private long getTotalWALSize(FileSystem fs, List wals, String server) throws IOException { + private long getTotalWALSize(FileSystem fs, List wals, ServerName server) + throws IOException { long size = 0; FileStatus fileStatus; for (String wal : wals) { try { - fileStatus = (new WALLink(getConf(), server, wal)).getFileStatus(fs); + fileStatus = (new WALLink(getConf(), server.getServerName(), wal)).getFileStatus(fs); } catch (IOException e) { if (e instanceof FileNotFoundException) { numWalsNotFound++; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java index 839b5ade533..85fa7297739 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.util.hbck; import java.io.IOException; @@ -27,22 +26,23 @@ import java.util.Map.Entry; import java.util.Set; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.util.HBaseFsck; import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.yetus.audience.InterfaceAudience; -/* +/** * Check and fix undeleted replication queues for removed peerId. */ @InterfaceAudience.Private public class ReplicationChecker { private final ErrorReporter errorReporter; // replicator with its queueIds for removed peers - private Map> undeletedQueueIds = new HashMap<>(); + private Map> undeletedQueueIds = new HashMap<>(); // replicator with its undeleted queueIds for removed peers in hfile-refs queue private Set undeletedHFileRefsQueueIds = new HashSet<>(); private final ReplicationZKNodeCleaner cleaner; @@ -60,8 +60,8 @@ public class ReplicationChecker { public void checkUnDeletedQueues() throws IOException { undeletedQueueIds = cleaner.getUnDeletedQueues(); - for (Entry> replicatorAndQueueIds : undeletedQueueIds.entrySet()) { - String replicator = replicatorAndQueueIds.getKey(); + for (Entry> replicatorAndQueueIds : undeletedQueueIds.entrySet()) { + ServerName replicator = replicatorAndQueueIds.getKey(); for (String queueId : replicatorAndQueueIds.getValue()) { ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); String msg = "Undeleted replication queue for removed peer found: " diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java index 28a7562d054..b28eaaf6483 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -34,12 +35,16 @@ import java.util.Set; import java.util.concurrent.CompletionException; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient; import org.junit.After; import org.junit.BeforeClass; import org.junit.Test; @@ -56,8 +61,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { private final String ID_ONE = "1"; private final String KEY_ONE = "127.0.0.1:2181:/hbase"; - private final String ID_SECOND = "2"; - private final String KEY_SECOND = "127.0.0.1:2181:/hbase2"; + private final String ID_TWO = "2"; + private final String KEY_TWO = "127.0.0.1:2181:/hbase2"; @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -65,21 +70,27 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); + TEST_UTIL.getConfiguration().setInt(ReadOnlyZKClient.RECOVERY_RETRY, 1); TEST_UTIL.startMiniCluster(); ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); } @After - public void cleanupPeer() { + public void clearPeerAndQueues() throws IOException, ReplicationException { try { admin.removeReplicationPeer(ID_ONE).join(); } catch (Exception e) { - LOG.debug("Replication peer " + ID_ONE + " may already be removed"); } try { - admin.removeReplicationPeer(ID_SECOND).join(); + admin.removeReplicationPeer(ID_TWO).join(); } catch (Exception e) { - LOG.debug("Replication peer " + ID_SECOND + " may already be removed"); + } + ReplicationQueueStorage queueStorage = ReplicationStorageFactory + .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration()); + for (ServerName serverName : queueStorage.getListOfReplicators()) { + for (String queue : queueStorage.getAllQueues(serverName)) { + queueStorage.removeQueue(serverName, queue); + } } } @@ -88,7 +99,7 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); rpc1.setClusterKey(KEY_ONE); ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); - rpc2.setClusterKey(KEY_SECOND); + rpc2.setClusterKey(KEY_TWO); // Add a valid peer admin.addReplicationPeer(ID_ONE, rpc1).join(); // try adding the same (fails) @@ -101,19 +112,19 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { assertEquals(1, admin.listReplicationPeers().get().size()); // Try to remove an inexisting peer try { - admin.removeReplicationPeer(ID_SECOND).join(); + admin.removeReplicationPeer(ID_TWO).join(); fail("Test case should fail as removing a inexisting peer."); } catch (CompletionException e) { // OK! } assertEquals(1, admin.listReplicationPeers().get().size()); // Add a second since multi-slave is supported - admin.addReplicationPeer(ID_SECOND, rpc2).join(); + admin.addReplicationPeer(ID_TWO, rpc2).join(); assertEquals(2, admin.listReplicationPeers().get().size()); // Remove the first peer we added admin.removeReplicationPeer(ID_ONE).join(); assertEquals(1, admin.listReplicationPeers().get().size()); - admin.removeReplicationPeer(ID_SECOND).join(); + admin.removeReplicationPeer(ID_TWO).join(); assertEquals(0, admin.listReplicationPeers().get().size()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index c2fcd8c9c88..8bb32305891 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -82,6 +83,7 @@ public class TestReplicationAdmin { @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + TEST_UTIL.getConfiguration().setInt(ReadOnlyZKClient.RECOVERY_RETRY, 1); TEST_UTIL.startMiniCluster(); admin = new ReplicationAdmin(TEST_UTIL.getConfiguration()); hbaseAdmin = TEST_UTIL.getAdmin(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index 08b27ec9c45..1e75959bec2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -24,16 +24,12 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; import java.io.IOException; -import java.lang.reflect.Field; import java.net.URLEncoder; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Random; - -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -51,7 +47,6 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.testclassification.MasterTests; @@ -65,10 +60,11 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; + @Category({MasterTests.class, MediumTests.class}) public class TestLogsCleaner { @@ -195,24 +191,6 @@ public class TestLogsCleaner { } } - @Test(timeout=5000) - public void testZnodeCversionChange() throws Exception { - Configuration conf = TEST_UTIL.getConfiguration(); - ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); - cleaner.setConf(conf); - - ReplicationQueuesClientZKImpl rqcMock = Mockito.mock(ReplicationQueuesClientZKImpl.class); - Mockito.when(rqcMock.getQueuesZNodeCversion()).thenReturn(1, 2, 3, 4); - - Field rqc = ReplicationLogCleaner.class.getDeclaredField("replicationQueues"); - rqc.setAccessible(true); - - rqc.set(cleaner, rqcMock); - - // This should return eventually when cversion stabilizes - cleaner.getDeletableFiles(new LinkedList<>()); - } - /** * ReplicationLogCleaner should be able to ride over ZooKeeper errors without aborting. */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java index 29487014b9a..f83695feabf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java @@ -1,12 +1,19 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable - * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" - * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License - * for the specific language governing permissions and limitations under the License. +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.hadoop.hbase.master.cleaner; @@ -17,14 +24,10 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; - import java.io.IOException; -import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Iterator; import java.util.List; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -45,7 +48,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl; import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; import org.apache.hadoop.hbase.replication.regionserver.Replication; @@ -63,10 +65,11 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; + @Category({ MasterTests.class, SmallTests.class }) public class TestReplicationHFileCleaner { private static final Logger LOG = LoggerFactory.getLogger(ReplicationQueuesZKImpl.class); @@ -188,32 +191,6 @@ public class TestReplicationHFileCleaner { assertTrue(deletableFilesIterator.next().getPath().equals(deletablefile)); } - /* - * Test for HBASE-14621. This test will not assert directly anything. Without the fix the test - * will end up in a infinite loop, so it will timeout. - */ - @Test(timeout = 15000) - public void testForDifferntHFileRefsZnodeVersion() throws Exception { - // 1. Create a file - Path file = new Path(root, "testForDifferntHFileRefsZnodeVersion"); - fs.createNewFile(file); - // 2. Assert file is successfully created - assertTrue("Test file not created!", fs.exists(file)); - ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner(); - cleaner.setConf(conf); - - ReplicationQueuesClient replicationQueuesClient = Mockito.mock(ReplicationQueuesClient.class); - //Return different znode version for each call - Mockito.when(replicationQueuesClient.getHFileRefsNodeChangeVersion()).thenReturn(1, 2); - - Class cleanerClass = cleaner.getClass(); - Field rqc = cleanerClass.getDeclaredField("rqc"); - rqc.setAccessible(true); - rqc.set(cleaner, replicationQueuesClient); - - cleaner.isFileDeletable(fs.getFileStatus(file)); - } - /** * ReplicationHFileCleaner should be able to ride over ZooKeeper errors without aborting. */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java index 6aa59cb33cf..817826632fb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.master.cleaner; import static org.junit.Assert.assertEquals; @@ -26,6 +25,7 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; @@ -43,9 +43,9 @@ public class TestReplicationZKNodeCleaner { private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private final String ID_ONE = "1"; - private final String SERVER_ONE = "server1"; + private final ServerName SERVER_ONE = ServerName.valueOf("server1", 8000, 1234); private final String ID_TWO = "2"; - private final String SERVER_TWO = "server2"; + private final ServerName SERVER_TWO = ServerName.valueOf("server2", 8000, 1234); private final Configuration conf; private final ZKWatcher zkw; @@ -72,12 +72,12 @@ public class TestReplicationZKNodeCleaner { @Test public void testReplicationZKNodeCleaner() throws Exception { - repQueues.init(SERVER_ONE); + repQueues.init(SERVER_ONE.getServerName()); // add queue for ID_ONE which isn't exist repQueues.addLog(ID_ONE, "file1"); ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, null); - Map> undeletedQueues = cleaner.getUnDeletedQueues(); + Map> undeletedQueues = cleaner.getUnDeletedQueues(); assertEquals(1, undeletedQueues.size()); assertTrue(undeletedQueues.containsKey(SERVER_ONE)); assertEquals(1, undeletedQueues.get(SERVER_ONE).size()); @@ -100,7 +100,7 @@ public class TestReplicationZKNodeCleaner { @Test public void testReplicationZKNodeCleanerChore() throws Exception { - repQueues.init(SERVER_ONE); + repQueues.init(SERVER_ONE.getServerName()); // add queue for ID_ONE which isn't exist repQueues.addLog(ID_ONE, "file1"); // add a recovery queue for ID_TWO which isn't exist diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java index b47a8d3f628..aeab8b0e9a2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java @@ -1,34 +1,34 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.hbase.replication.regionserver; +import static org.junit.Assert.assertTrue; + +import java.util.List; + import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; @@ -36,11 +36,6 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - /** * Tests the ReplicationSourceManager with ReplicationQueueZkImpl's and * ReplicationQueuesClientZkImpl. Also includes extra tests outside of those in @@ -114,41 +109,4 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan server.stop(""); } - - @Test - public void testFailoverDeadServerCversionChange() throws Exception { - final Server s0 = new DummyServer("cversion-change0.example.org"); - ReplicationQueues repQueues = - ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, s0, - s0.getZooKeeper())); - repQueues.init(s0.getServerName().toString()); - // populate some znodes in the peer znode - files.add("log1"); - files.add("log2"); - for (String file : files) { - repQueues.addLog("1", file); - } - // simulate queue transfer - Server s1 = new DummyServer("cversion-change1.example.org"); - ReplicationQueues rq1 = - ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1, - s1.getZooKeeper())); - rq1.init(s1.getServerName().toString()); - - ReplicationQueuesClientZKImpl client = - (ReplicationQueuesClientZKImpl)ReplicationFactory.getReplicationQueuesClient( - new ReplicationQueuesClientArguments(s1.getConfiguration(), s1, s1.getZooKeeper())); - - int v0 = client.getQueuesZNodeCversion(); - List queues = rq1.getUnClaimedQueueIds(s0.getServerName().getServerName()); - for(String queue : queues) { - rq1.claimQueue(s0.getServerName().getServerName(), queue); - } - rq1.removeReplicatorIfQueueIsEmpty(s0.getServerName().getServerName()); - int v1 = client.getQueuesZNodeCversion(); - // cversion should increase by 1 since a child node is deleted - assertEquals(v0 + 1, v1); - - s0.stop(""); - } }