diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java index 875b1244e22..8bbb6f1e386 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java @@ -107,6 +107,9 @@ public class CollectionUtils { return list.get(list.size() - 1); } + public static List nullToEmpty(List list) { + return list != null ? list : Collections.emptyList(); + } /** * In HBASE-16648 we found that ConcurrentHashMap.get is much faster than computeIfAbsent if the * value already exists. Notice that the implementation does not guarantee that the supplier will diff --git a/hbase-replication/pom.xml b/hbase-replication/pom.xml index af1145b6d29..4952009bf15 100644 --- a/hbase-replication/pom.xml +++ b/hbase-replication/pom.xml @@ -97,6 +97,18 @@ org.apache.hbase hbase-zookeeper + + org.apache.hbase + hbase-common + test-jar + test + + + org.apache.hbase + hbase-zookeeper + test-jar + test + org.apache.commons diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java new file mode 100644 index 00000000000..e00cd0df2f0 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.List; +import java.util.Optional; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Perform read/write to the replication peer storage. + */ +@InterfaceAudience.Private +public interface ReplicationPeerStorage { + + /** + * Add a replication peer. + * @throws ReplicationException if there are errors accessing the storage service. + */ + void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) + throws ReplicationException; + + /** + * Remove a replication peer. + * @throws ReplicationException if there are errors accessing the storage service. + */ + void removePeer(String peerId) throws ReplicationException; + + /** + * Set the state of peer, {@code true} to {@code ENABLED}, otherwise to {@code DISABLED}. + * @throws ReplicationException if there are errors accessing the storage service. + */ + void setPeerState(String peerId, boolean enabled) throws ReplicationException; + + /** + * Update the config a replication peer. + * @throws ReplicationException if there are errors accessing the storage service. + */ + void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) + throws ReplicationException; + + /** + * Return the peer ids of all replication peers. + * @throws ReplicationException if there are errors accessing the storage service. + */ + List listPeerIds() throws ReplicationException; + + /** + * Test whether a replication peer is enabled. + * @throws ReplicationException if there are errors accessing the storage service. + */ + boolean isPeerEnabled(String peerId) throws ReplicationException; + + /** + * Get the peer config of a replication peer. + * @throws ReplicationException if there are errors accessing the storage service. + */ + Optional getPeerConfig(String peerId) throws ReplicationException; +} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java new file mode 100644 index 00000000000..7210d9a9ca3 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.List; +import java.util.Set; +import java.util.SortedSet; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Perform read/write to the replication queue storage. + */ +@InterfaceAudience.Private +public interface ReplicationQueueStorage { + + /** + * Remove a replication queue for a given regionserver. + * @param serverName the name of the regionserver + * @param queueId a String that identifies the queue. + */ + void removeQueue(ServerName serverName, String queueId) throws ReplicationException; + + /** + * Add a new WAL file to the given queue for a given regionserver. If the queue does not exist it + * is created. + * @param serverName the name of the regionserver + * @param queueId a String that identifies the queue. + * @param fileName name of the WAL + */ + void addWAL(ServerName serverName, String queueId, String fileName) throws ReplicationException; + + /** + * Remove an WAL file from the given queue for a given regionserver. + * @param serverName the name of the regionserver + * @param queueId a String that identifies the queue. + * @param fileName name of the WAL + */ + void removeWAL(ServerName serverName, String queueId, String fileName) + throws ReplicationException; + + /** + * Set the current position for a specific WAL in a given queue for a given regionserver. + * @param serverName the name of the regionserver + * @param queueId a String that identifies the queue + * @param fileName name of the WAL + * @param position the current position in the file + */ + void setWALPosition(ServerName serverName, String queueId, String fileName, long position) + throws ReplicationException; + + /** + * Get the current position for a specific WAL in a given queue for a given regionserver. + * @param serverName the name of the regionserver + * @param queueId a String that identifies the queue + * @param fileName name of the WAL + * @return the current position in the file + */ + long getWALPosition(ServerName serverName, String queueId, String fileName) + throws ReplicationException; + + /** + * Get a list of all queues for the specified region server. + * @param serverName the server name of the region server that owns the set of queues + * @return a list of queueIds + */ + List getAllQueues(ServerName serverName) throws ReplicationException; + + /** + * Change ownership for the queue identified by queueId and belongs to a dead region server. + * @param sourceServerName the name of the dead region server + * @param destServerName the name of the target region server + * @param queueId the id of the queue + * @return the new PeerId and A SortedSet of WALs in its queue + */ + Pair> claimQueue(ServerName sourceServerName, String queueId, + ServerName destServerName) throws ReplicationException; + + /** + * Remove the record of region server if the queue is empty. + */ + void removeReplicatorIfQueueIsEmpty(ServerName serverName) throws ReplicationException; + + /** + * Get a list of all region servers that have outstanding replication queues. These servers could + * be alive, dead or from a previous run of the cluster. + * @return a list of server names + */ + List getListOfReplicators() throws ReplicationException; + + /** + * Load all wals in all replication queues. This method guarantees to return a snapshot which + * contains all WALs in the zookeeper at the start of this call even there is concurrent queue + * failover. However, some newly created WALs during the call may not be included. + */ + Set getAllWALs() throws ReplicationException; + + /** + * Add a peer to hfile reference queue if peer does not exist. + * @param peerId peer cluster id to be added + * @throws ReplicationException if fails to add a peer id to hfile reference queue + */ + void addPeerToHFileRefs(String peerId) throws ReplicationException; + + /** + * Remove a peer from hfile reference queue. + * @param peerId peer cluster id to be removed + */ + void removePeerFromHFileRefs(String peerId) throws ReplicationException; + + /** + * Add new hfile references to the queue. + * @param peerId peer cluster id to which the hfiles need to be replicated + * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which + * will be added in the queue } + * @throws ReplicationException if fails to add a hfile reference + */ + void addHFileRefs(String peerId, List> pairs) throws ReplicationException; + + /** + * Remove hfile references from the queue. + * @param peerId peer cluster id from which this hfile references needs to be removed + * @param files list of hfile references to be removed + */ + void removeHFileRefs(String peerId, List files) throws ReplicationException; + + /** + * Get the change version number of replication hfile references node. This can be used as + * optimistic locking to get a consistent snapshot of the replication queues of hfile references. + * @return change version number of hfile references node + */ + int getHFileRefsNodeChangeVersion() throws ReplicationException; + + /** + * Get list of all peers from hfile reference queue. + * @return a list of peer ids + */ + List getAllPeersFromHFileRefsQueue() throws ReplicationException; + + /** + * Get a list of all hfile references in the given peer. + * @param peerId a String that identifies the peer + * @return a list of hfile references + */ + List getReplicableHFiles(String peerId) throws ReplicationException; +} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java index 4e9479fe32a..a48683ef621 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java @@ -63,7 +63,6 @@ public abstract class ReplicationStateZKBase { protected final Configuration conf; protected final Abortable abortable; - // Public for testing public static final byte[] ENABLED_ZNODE_BYTES = toByteArray(ReplicationProtos.ReplicationState.State.ENABLED); public static final byte[] DISABLED_ZNODE_BYTES = diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java new file mode 100644 index 00000000000..60d07498a93 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Used to create replication storage(peer, queue) classes. + *

+ * For now we only have zk based implementation. + */ +@InterfaceAudience.Private +public class ReplicationStorageFactory { + + private ReplicationStorageFactory() { + } + + /** + * Create a new {@link ReplicationPeerStorage}. + */ + public static ReplicationPeerStorage getReplicationPeerStorage(ZKWatcher zk, Configuration conf) { + return new ZKReplicationPeerStorage(zk, conf); + } + + /** + * Create a new {@link ReplicationQueueStorage}. + */ + public static ReplicationQueueStorage getReplicationQueueStorage(ZKWatcher zk, + Configuration conf) { + return new ZKReplicationQueueStorage(zk, conf); + } +} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java new file mode 100644 index 00000000000..49af4c3808c --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.util.CollectionUtils; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; + +/** + * ZK based replication peer storage. + */ +@InterfaceAudience.Private +class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements ReplicationPeerStorage { + + private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationPeerStorage.class); + + public static final byte[] ENABLED_ZNODE_BYTES = + toByteArray(ReplicationProtos.ReplicationState.State.ENABLED); + public static final byte[] DISABLED_ZNODE_BYTES = + toByteArray(ReplicationProtos.ReplicationState.State.DISABLED); + + /** + * The name of the znode that contains the replication status of a remote slave (i.e. peer) + * cluster. + */ + private final String peerStateNodeName; + + /** + * The name of the znode that contains a list of all remote slave (i.e. peer) clusters. + */ + private final String peersZNode; + + public ZKReplicationPeerStorage(ZKWatcher zookeeper, Configuration conf) { + super(zookeeper, conf); + this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state"); + String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers"); + this.peersZNode = ZNodePaths.joinZNode(replicationZNode, peersZNodeName); + } + + private String getPeerStateNode(String peerId) { + return ZNodePaths.joinZNode(getPeerNode(peerId), peerStateNodeName); + } + + private String getPeerNode(String peerId) { + return ZNodePaths.joinZNode(peersZNode, peerId); + } + + @Override + public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) + throws ReplicationException { + try { + ZKUtil.createWithParents(zookeeper, peersZNode); + ZKUtil.multiOrSequential(zookeeper, + Arrays.asList( + ZKUtilOp.createAndFailSilent(getPeerNode(peerId), + ReplicationPeerConfigUtil.toByteArray(peerConfig)), + ZKUtilOp.createAndFailSilent(getPeerStateNode(peerId), + enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES)), + false); + } catch (KeeperException e) { + throw new ReplicationException("Could not add peer with id=" + peerId + ", peerConfif=>" + + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e); + } + } + + @Override + public void removePeer(String peerId) throws ReplicationException { + try { + ZKUtil.deleteNodeRecursively(zookeeper, getPeerNode(peerId)); + } catch (KeeperException e) { + throw new ReplicationException("Could not remove peer with id=" + peerId, e); + } + } + + @Override + public void setPeerState(String peerId, boolean enabled) throws ReplicationException { + byte[] stateBytes = enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES; + try { + ZKUtil.setData(zookeeper, getPeerStateNode(peerId), stateBytes); + } catch (KeeperException e) { + throw new ReplicationException("Unable to change state of the peer with id=" + peerId, e); + } + } + + @Override + public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) + throws ReplicationException { + try { + ZKUtil.setData(this.zookeeper, getPeerNode(peerId), + ReplicationPeerConfigUtil.toByteArray(peerConfig)); + } catch (KeeperException e) { + throw new ReplicationException( + "There was a problem trying to save changes to the " + "replication peer " + peerId, e); + } + } + + @Override + public List listPeerIds() throws ReplicationException { + try { + return CollectionUtils.nullToEmpty(ZKUtil.listChildrenAndWatchThem(zookeeper, peersZNode)); + } catch (KeeperException e) { + throw new ReplicationException("Cannot get the list of peers", e); + } + } + + @Override + public boolean isPeerEnabled(String peerId) throws ReplicationException { + try { + return Arrays.equals(ENABLED_ZNODE_BYTES, + ZKUtil.getData(zookeeper, getPeerStateNode(peerId))); + } catch (KeeperException | InterruptedException e) { + throw new ReplicationException("Unable to get status of the peer with id=" + peerId, e); + } + } + + @Override + public Optional getPeerConfig(String peerId) throws ReplicationException { + byte[] data; + try { + data = ZKUtil.getData(zookeeper, getPeerNode(peerId)); + } catch (KeeperException | InterruptedException e) { + throw new ReplicationException("Error getting configuration for peer with id=" + peerId, e); + } + if (data == null || data.length == 0) { + return Optional.empty(); + } + try { + return Optional.of(ReplicationPeerConfigUtil.parsePeerFrom(data)); + } catch (DeserializationException e) { + LOG.warn("Failed to parse replication peer config for peer with id=" + peerId, e); + return Optional.empty(); + } + } +} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java new file mode 100644 index 00000000000..7015d7f9902 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java @@ -0,0 +1,425 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hbase.util.CollectionUtils.nullToEmpty; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CollectionUtils; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.BadVersionException; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.KeeperException.NodeExistsException; +import org.apache.zookeeper.KeeperException.NotEmptyException; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets; + +/** + * ZK based replication queue storage. + */ +@InterfaceAudience.Private +class ZKReplicationQueueStorage extends ZKReplicationStorageBase + implements ReplicationQueueStorage { + + private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationQueueStorage.class); + + public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY = + "zookeeper.znode.replication.hfile.refs"; + public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs"; + + /** + * The name of the znode that contains all replication queues + */ + private final String queuesZNode; + + /** + * The name of the znode that contains queues of hfile references to be replicated + */ + private final String hfileRefsZNode; + + public ZKReplicationQueueStorage(ZKWatcher zookeeper, Configuration conf) { + super(zookeeper, conf); + + String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs"); + String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY, + ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT); + this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName); + this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hfileRefsZNodeName); + } + + private String getRsNode(ServerName serverName) { + return ZNodePaths.joinZNode(queuesZNode, serverName.getServerName()); + } + + private String getQueueNode(ServerName serverName, String queueId) { + return ZNodePaths.joinZNode(getRsNode(serverName), queueId); + } + + private String getFileNode(String queueNode, String fileName) { + return ZNodePaths.joinZNode(queueNode, fileName); + } + + private String getFileNode(ServerName serverName, String queueId, String fileName) { + return getFileNode(getQueueNode(serverName, queueId), fileName); + } + + @Override + public void removeQueue(ServerName serverName, String queueId) throws ReplicationException { + try { + ZKUtil.deleteNodeRecursively(zookeeper, getQueueNode(serverName, queueId)); + } catch (KeeperException e) { + throw new ReplicationException( + "Failed to delete queue (serverName=" + serverName + ", queueId=" + queueId + ")", e); + } + } + + @Override + public void addWAL(ServerName serverName, String queueId, String fileName) + throws ReplicationException { + try { + ZKUtil.createWithParents(zookeeper, getFileNode(serverName, queueId, fileName)); + } catch (KeeperException e) { + throw new ReplicationException("Failed to add wal to queue (serverName=" + serverName + + ", queueId=" + queueId + ", fileName=" + fileName + ")", e); + } + } + + @Override + public void removeWAL(ServerName serverName, String queueId, String fileName) + throws ReplicationException { + String fileNode = getFileNode(serverName, queueId, fileName); + try { + ZKUtil.deleteNode(zookeeper, fileNode); + } catch (NoNodeException e) { + LOG.warn(fileNode + " has already been deleted when removing log"); + } catch (KeeperException e) { + throw new ReplicationException("Failed to remove wal from queue (serverName=" + serverName + + ", queueId=" + queueId + ", fileName=" + fileName + ")", e); + } + } + + @Override + public void setWALPosition(ServerName serverName, String queueId, String fileName, long position) + throws ReplicationException { + try { + ZKUtil.setData(zookeeper, getFileNode(serverName, queueId, fileName), + ZKUtil.positionToByteArray(position)); + } catch (KeeperException e) { + throw new ReplicationException("Failed to set log position (serverName=" + serverName + + ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e); + } + } + + @Override + public long getWALPosition(ServerName serverName, String queueId, String fileName) + throws ReplicationException { + byte[] bytes; + try { + bytes = ZKUtil.getData(zookeeper, getFileNode(serverName, queueId, fileName)); + } catch (KeeperException | InterruptedException e) { + throw new ReplicationException("Failed to get log position (serverName=" + serverName + + ", queueId=" + queueId + ", fileName=" + fileName + ")", e); + } + try { + return ZKUtil.parseWALPositionFrom(bytes); + } catch (DeserializationException de) { + LOG.warn("Failed to parse log position (serverName=" + serverName + ", queueId=" + queueId + + ", fileName=" + fileName + ")"); + } + // if we can not parse the position, start at the beginning of the wal file again + return 0; + } + + @Override + public Pair> claimQueue(ServerName sourceServerName, String queueId, + ServerName destServerName) throws ReplicationException { + LOG.info( + "Atomically moving " + sourceServerName + "/" + queueId + "'s WALs to " + destServerName); + try { + ZKUtil.createWithParents(zookeeper, getRsNode(destServerName)); + } catch (KeeperException e) { + throw new ReplicationException( + "Claim queue queueId=" + queueId + " from " + sourceServerName + " to " + destServerName + + " failed when creating the node for " + destServerName, + e); + } + try { + String oldQueueNode = getQueueNode(sourceServerName, queueId); + List wals = ZKUtil.listChildrenNoWatch(zookeeper, oldQueueNode); + String newQueueId = queueId + "-" + sourceServerName; + if (CollectionUtils.isEmpty(wals)) { + ZKUtil.deleteNodeFailSilent(zookeeper, oldQueueNode); + LOG.info("Removed " + sourceServerName + "/" + queueId + " since it's empty"); + return new Pair<>(newQueueId, Collections.emptySortedSet()); + } + String newQueueNode = getQueueNode(destServerName, newQueueId); + List listOfOps = new ArrayList<>(); + SortedSet logQueue = new TreeSet<>(); + // create the new cluster znode + listOfOps.add(ZKUtilOp.createAndFailSilent(newQueueNode, HConstants.EMPTY_BYTE_ARRAY)); + // get the offset of the logs and set it to new znodes + for (String wal : wals) { + String oldWalNode = getFileNode(oldQueueNode, wal); + byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalNode); + if (LOG.isDebugEnabled()) { + LOG.debug("Creating " + wal + " with data " + Bytes.toStringBinary(logOffset)); + } + String newWalNode = getFileNode(newQueueNode, wal); + listOfOps.add(ZKUtilOp.createAndFailSilent(newWalNode, logOffset)); + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalNode)); + logQueue.add(wal); + } + // add delete op for peer + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldQueueNode)); + + if (LOG.isTraceEnabled()) { + LOG.trace("The multi list size is: " + listOfOps.size()); + } + ZKUtil.multiOrSequential(zookeeper, listOfOps, false); + + LOG.info( + "Atomically moved " + sourceServerName + "/" + queueId + "'s WALs to " + destServerName); + return new Pair<>(newQueueId, logQueue); + } catch (NoNodeException | NodeExistsException | NotEmptyException | BadVersionException e) { + // Multi call failed; it looks like some other regionserver took away the logs. + // These exceptions mean that zk tells us the request can not be execute so it is safe to just + // return a null. For other types of exception should be thrown out to notify the upper layer. + LOG.info( + "Claim queue queueId=" + queueId + " from " + sourceServerName + " to " + destServerName + + " failed with " + e.toString() + ", maybe someone else has already took away the logs"); + return null; + } catch (KeeperException | InterruptedException e) { + throw new ReplicationException("Claim queue queueId=" + queueId + " from " + + sourceServerName + " to " + destServerName + " failed", e); + } + } + + @Override + public void removeReplicatorIfQueueIsEmpty(ServerName serverName) throws ReplicationException { + try { + ZKUtil.deleteNodeFailSilent(zookeeper, getRsNode(serverName)); + } catch (NotEmptyException e) { + // keep silence to avoid logging too much. + } catch (KeeperException e) { + throw new ReplicationException("Failed to remove replicator for " + serverName, e); + } + } + + private List getListOfReplicators0() throws KeeperException { + return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, queuesZNode)).stream() + .map(ServerName::parseServerName).collect(toList()); + } + + @Override + public List getListOfReplicators() throws ReplicationException { + try { + return getListOfReplicators0(); + } catch (KeeperException e) { + throw new ReplicationException("Failed to get list of replicators", e); + } + } + + private List getLogsInQueue0(ServerName serverName, String queueId) + throws KeeperException { + return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, getQueueNode(serverName, queueId))); + } + + private List getAllQueues0(ServerName serverName) throws KeeperException { + return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, getRsNode(serverName))); + } + + @Override + public List getAllQueues(ServerName serverName) throws ReplicationException { + try { + return getAllQueues0(serverName); + } catch (KeeperException e) { + throw new ReplicationException("Failed to get all queues (serverName=" + serverName + ")", e); + } + } + + private int getQueuesZNodeCversion() throws KeeperException { + Stat stat = new Stat(); + ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat); + return stat.getCversion(); + } + + @Override + public Set getAllWALs() throws ReplicationException { + try { + for (int retry = 0;; retry++) { + int v0 = getQueuesZNodeCversion(); + List rss = getListOfReplicators0(); + if (rss.isEmpty()) { + LOG.debug("Didn't find any region server that replicates, won't prevent any deletions."); + return Collections.emptySet(); + } + Set wals = Sets.newHashSet(); + for (ServerName rs : rss) { + for (String queueId : getAllQueues0(rs)) { + wals.addAll(getLogsInQueue0(rs, queueId)); + } + } + int v1 = getQueuesZNodeCversion(); + if (v0 == v1) { + return wals; + } + LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d", + v0, v1, retry)); + } + } catch (KeeperException e) { + throw new ReplicationException("Failed to get all wals", e); + } + } + + private String getHFileRefsPeerNode(String peerId) { + return ZNodePaths.joinZNode(hfileRefsZNode, peerId); + } + + private String getHFileNode(String peerNode, String fileName) { + return ZNodePaths.joinZNode(peerNode, fileName); + } + + @Override + public void addPeerToHFileRefs(String peerId) throws ReplicationException { + String peerNode = getHFileRefsPeerNode(peerId); + try { + if (ZKUtil.checkExists(zookeeper, peerNode) == -1) { + LOG.info("Adding peer " + peerId + " to hfile reference queue."); + ZKUtil.createWithParents(zookeeper, peerNode); + } + } catch (KeeperException e) { + throw new ReplicationException("Failed to add peer " + peerId + " to hfile reference queue.", + e); + } + } + + @Override + public void removePeerFromHFileRefs(String peerId) throws ReplicationException { + String peerNode = getHFileRefsPeerNode(peerId); + try { + if (ZKUtil.checkExists(zookeeper, peerNode) == -1) { + if (LOG.isDebugEnabled()) { + LOG.debug("Peer " + peerNode + " not found in hfile reference queue."); + } + } else { + LOG.info("Removing peer " + peerNode + " from hfile reference queue."); + ZKUtil.deleteNodeRecursively(zookeeper, peerNode); + } + } catch (KeeperException e) { + throw new ReplicationException( + "Failed to remove peer " + peerId + " from hfile reference queue.", e); + } + } + + @Override + public void addHFileRefs(String peerId, List> pairs) + throws ReplicationException { + String peerNode = getHFileRefsPeerNode(peerId); + boolean debugEnabled = LOG.isDebugEnabled(); + if (debugEnabled) { + LOG.debug("Adding hfile references " + pairs + " in queue " + peerNode); + } + List listOfOps = + pairs.stream().map(p -> p.getSecond().getName()).map(n -> getHFileNode(peerNode, n)) + .map(f -> ZKUtilOp.createAndFailSilent(f, HConstants.EMPTY_BYTE_ARRAY)).collect(toList()); + if (debugEnabled) { + LOG.debug("The multi list size for adding hfile references in zk for node " + peerNode + + " is " + listOfOps.size()); + } + try { + ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true); + } catch (KeeperException e) { + throw new ReplicationException("Failed to add hfile reference to peer " + peerId, e); + } + } + + @Override + public void removeHFileRefs(String peerId, List files) throws ReplicationException { + String peerNode = getHFileRefsPeerNode(peerId); + boolean debugEnabled = LOG.isDebugEnabled(); + if (debugEnabled) { + LOG.debug("Removing hfile references " + files + " from queue " + peerNode); + } + + List listOfOps = files.stream().map(n -> getHFileNode(peerNode, n)) + .map(ZKUtilOp::deleteNodeFailSilent).collect(toList()); + if (debugEnabled) { + LOG.debug("The multi list size for removing hfile references in zk for node " + peerNode + + " is " + listOfOps.size()); + } + try { + ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true); + } catch (KeeperException e) { + throw new ReplicationException("Failed to remove hfile reference from peer " + peerId, e); + } + } + + @Override + public int getHFileRefsNodeChangeVersion() throws ReplicationException { + Stat stat = new Stat(); + try { + ZKUtil.getDataNoWatch(zookeeper, hfileRefsZNode, stat); + } catch (KeeperException e) { + throw new ReplicationException("Failed to get stat of replication hfile references node.", e); + } + return stat.getCversion(); + } + + @Override + public List getAllPeersFromHFileRefsQueue() throws ReplicationException { + try { + return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, hfileRefsZNode)); + } catch (KeeperException e) { + throw new ReplicationException("Failed to get list of all peers in hfile references node.", + e); + } + } + + @Override + public List getReplicableHFiles(String peerId) throws ReplicationException { + try { + return nullToEmpty(ZKUtil.listChildrenNoWatch(this.zookeeper, getHFileRefsPeerNode(peerId))); + } catch (KeeperException e) { + throw new ReplicationException("Failed to get list of hfile references for peer " + peerId, + e); + } + } + +} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java new file mode 100644 index 00000000000..b8a20446aea --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; + +/** + * This is a base class for maintaining replication related data,for example, peer, queue, etc, in + * zookeeper. + */ +@InterfaceAudience.Private +class ZKReplicationStorageBase { + + /** The name of the base znode that contains all replication state. */ + protected final String replicationZNode; + + protected final ZKWatcher zookeeper; + protected final Configuration conf; + + protected ZKReplicationStorageBase(ZKWatcher zookeeper, Configuration conf) { + this.zookeeper = zookeeper; + this.conf = conf; + String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); + + this.replicationZNode = + ZNodePaths.joinZNode(this.zookeeper.znodePaths.baseZNode, replicationZNodeName); + + } + + /** + * Serialized protobuf of state with pb magic prefix prepended suitable for use as + * content of a peer-state znode under a peer cluster id as in + * /hbase/replication/peers/PEER_ID/peer-state. + */ + protected static byte[] toByteArray(final ReplicationProtos.ReplicationState.State state) { + ReplicationProtos.ReplicationState msg = + ReplicationProtos.ReplicationState.newBuilder().setState(state).build(); + // There is no toByteArray on this pb Message? + // 32 bytes is default which seems fair enough here. + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + CodedOutputStream cos = CodedOutputStream.newInstance(baos, 16); + msg.writeTo(cos); + cos.flush(); + baos.flush(); + return ProtobufUtil.prependPBMagic(baos.toByteArray()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java new file mode 100644 index 00000000000..a3be1e653c7 --- /dev/null +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toSet; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.stream.Stream; + +import org.apache.hadoop.hbase.HBaseZKTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestZKReplicationPeerStorage { + + private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility(); + + private static ZKReplicationPeerStorage STORAGE; + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniZKCluster(); + STORAGE = new ZKReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); + } + + @AfterClass + public static void tearDown() throws IOException { + UTIL.shutdownMiniZKCluster(); + } + + private Set randNamespaces(Random rand) { + return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5)) + .collect(toSet()); + } + + private Map> randTableCFs(Random rand) { + int size = rand.nextInt(5); + Map> map = new HashMap<>(); + for (int i = 0; i < size; i++) { + TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong())); + List cfs = Stream.generate(() -> Long.toHexString(rand.nextLong())) + .limit(rand.nextInt(5)).collect(toList()); + map.put(tn, cfs); + } + return map; + } + + private ReplicationPeerConfig getConfig(int seed) { + Random rand = new Random(seed); + ReplicationPeerConfig config = new ReplicationPeerConfig(); + config.setClusterKey(Long.toHexString(rand.nextLong())); + config.setReplicationEndpointImpl(Long.toHexString(rand.nextLong())); + config.setNamespaces(randNamespaces(rand)); + config.setExcludeNamespaces(randNamespaces(rand)); + config.setTableCFsMap(randTableCFs(rand)); + config.setReplicateAllUserTables(rand.nextBoolean()); + config.setBandwidth(rand.nextInt(1000)); + return config; + } + + private void assertSetEquals(Set expected, Set actual) { + if (expected == null || expected.size() == 0) { + assertTrue(actual == null || actual.size() == 0); + return; + } + assertEquals(expected.size(), actual.size()); + expected.forEach(s -> assertTrue(actual.contains(s))); + } + + private void assertMapEquals(Map> expected, + Map> actual) { + if (expected == null || expected.size() == 0) { + assertTrue(actual == null || actual.size() == 0); + return; + } + assertEquals(expected.size(), actual.size()); + expected.forEach((expectedTn, expectedCFs) -> { + List actualCFs = actual.get(expectedTn); + if (expectedCFs == null || expectedCFs.size() == 0) { + assertTrue(actual.containsKey(expectedTn)); + assertTrue(actualCFs == null || actualCFs.size() == 0); + } else { + assertNotNull(actualCFs); + assertEquals(expectedCFs.size(), actualCFs.size()); + for (Iterator expectedIt = expectedCFs.iterator(), actualIt = actualCFs.iterator(); + expectedIt.hasNext();) { + assertEquals(expectedIt.next(), actualIt.next()); + } + } + }); + } + + private void assertConfigEquals(ReplicationPeerConfig expected, ReplicationPeerConfig actual) { + assertEquals(expected.getClusterKey(), actual.getClusterKey()); + assertEquals(expected.getReplicationEndpointImpl(), actual.getReplicationEndpointImpl()); + assertSetEquals(expected.getNamespaces(), actual.getNamespaces()); + assertSetEquals(expected.getExcludeNamespaces(), actual.getExcludeNamespaces()); + assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap()); + assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap()); + assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables()); + assertEquals(expected.getBandwidth(), actual.getBandwidth()); + } + + @Test + public void test() throws ReplicationException { + int peerCount = 10; + for (int i = 0; i < peerCount; i++) { + STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0); + } + List peerIds = STORAGE.listPeerIds(); + assertEquals(peerCount, peerIds.size()); + for (String peerId : peerIds) { + int seed = Integer.parseInt(peerId); + assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId).get()); + } + for (int i = 0; i < peerCount; i++) { + STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1)); + } + for (String peerId : peerIds) { + int seed = Integer.parseInt(peerId); + assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId).get()); + } + for (int i = 0; i < peerCount; i++) { + assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i))); + } + for (int i = 0; i < peerCount; i++) { + STORAGE.setPeerState(Integer.toString(i), i % 2 != 0); + } + for (int i = 0; i < peerCount; i++) { + assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i))); + } + String toRemove = Integer.toString(peerCount / 2); + STORAGE.removePeer(toRemove); + peerIds = STORAGE.listPeerIds(); + assertEquals(peerCount - 1, peerIds.size()); + assertFalse(peerIds.contains(toRemove)); + assertFalse(STORAGE.getPeerConfig(toRemove).isPresent()); + } +} diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java new file mode 100644 index 00000000000..d5bba0d3d3f --- /dev/null +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.hamcrest.CoreMatchers.hasItems; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; + +import org.apache.hadoop.hbase.HBaseZKTestingUtility; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestZKReplicationQueueStorage { + private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility(); + + private static ZKReplicationQueueStorage STORAGE; + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniZKCluster(); + STORAGE = new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); + } + + @AfterClass + public static void tearDown() throws IOException { + UTIL.shutdownMiniZKCluster(); + } + + @After + public void tearDownAfterTest() throws ReplicationException { + for (ServerName serverName : STORAGE.getListOfReplicators()) { + for (String queue : STORAGE.getAllQueues(serverName)) { + STORAGE.removeQueue(serverName, queue); + } + STORAGE.removeReplicatorIfQueueIsEmpty(serverName); + } + for (String peerId : STORAGE.getAllPeersFromHFileRefsQueue()) { + STORAGE.removePeerFromHFileRefs(peerId); + } + } + + private ServerName getServerName(int i) { + return ServerName.valueOf("127.0.0.1", 8000 + i, 10000 + i); + } + + @Test + public void testReplicator() throws ReplicationException { + assertTrue(STORAGE.getListOfReplicators().isEmpty()); + String queueId = "1"; + for (int i = 0; i < 10; i++) { + STORAGE.addWAL(getServerName(i), queueId, "file" + i); + } + List replicators = STORAGE.getListOfReplicators(); + assertEquals(10, replicators.size()); + for (int i = 0; i < 10; i++) { + assertThat(replicators, hasItems(getServerName(i))); + } + for (int i = 0; i < 5; i++) { + STORAGE.removeQueue(getServerName(i), queueId); + } + for (int i = 0; i < 10; i++) { + STORAGE.removeReplicatorIfQueueIsEmpty(getServerName(i)); + } + replicators = STORAGE.getListOfReplicators(); + assertEquals(5, replicators.size()); + for (int i = 5; i < 10; i++) { + assertThat(replicators, hasItems(getServerName(i))); + } + } + + private String getFileName(String base, int i) { + return String.format(base + "-%04d", i); + } + + @Test + public void testAddRemoveLog() throws ReplicationException { + ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000); + assertTrue(STORAGE.getAllQueues(serverName1).isEmpty()); + String queue1 = "1"; + String queue2 = "2"; + for (int i = 0; i < 10; i++) { + STORAGE.addWAL(serverName1, queue1, getFileName("file1", i)); + STORAGE.addWAL(serverName1, queue2, getFileName("file2", i)); + } + List queueIds = STORAGE.getAllQueues(serverName1); + assertEquals(2, queueIds.size()); + assertThat(queueIds, hasItems("1", "2")); + + for (int i = 0; i < 10; i++) { + assertEquals(0, STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i))); + assertEquals(0, STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i))); + STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100); + STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i + 1) * 100 + 10); + } + + for (int i = 0; i < 10; i++) { + assertEquals((i + 1) * 100, + STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i))); + assertEquals((i + 1) * 100 + 10, + STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i))); + } + + for (int i = 0; i < 10; i++) { + if (i % 2 == 0) { + STORAGE.removeWAL(serverName1, queue1, getFileName("file1", i)); + } else { + STORAGE.removeWAL(serverName1, queue2, getFileName("file2", i)); + } + } + + queueIds = STORAGE.getAllQueues(serverName1); + assertEquals(2, queueIds.size()); + assertThat(queueIds, hasItems("1", "2")); + + ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001); + Pair> peer1 = STORAGE.claimQueue(serverName1, "1", serverName2); + + assertEquals("1-" + serverName1.getServerName(), peer1.getFirst()); + assertEquals(5, peer1.getSecond().size()); + int i = 1; + for (String wal : peer1.getSecond()) { + assertEquals(getFileName("file1", i), wal); + assertEquals((i + 1) * 100, + STORAGE.getWALPosition(serverName2, peer1.getFirst(), getFileName("file1", i))); + i += 2; + } + + queueIds = STORAGE.getAllQueues(serverName1); + assertEquals(1, queueIds.size()); + assertThat(queueIds, hasItems("2")); + + queueIds = STORAGE.getAllQueues(serverName2); + assertEquals(1, queueIds.size()); + assertThat(queueIds, hasItems(peer1.getFirst())); + + Set allWals = STORAGE.getAllWALs(); + assertEquals(10, allWals.size()); + for (i = 0; i < 10; i++) { + assertThat(allWals, hasItems(i % 2 == 0 ? getFileName("file2", i) : getFileName("file1", i))); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 4032a71f0eb..034b71ce510 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -40,6 +40,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -133,7 +134,7 @@ import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure; import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure; import org.apache.hadoop.hbase.master.replication.ModifyPeerProcedure; import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure; -import org.apache.hadoop.hbase.master.replication.ReplicationManager; +import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.mob.MobConstants; @@ -327,7 +328,7 @@ public class HMaster extends HRegionServer implements MasterServices { private AssignmentManager assignmentManager; // manager of replication - private ReplicationManager replicationManager; + private ReplicationPeerManager replicationPeerManager; // buffer for "fatal error" notices from region servers // in the cluster. This is only used for assisting @@ -718,8 +719,8 @@ public class HMaster extends HRegionServer implements MasterServices { /** * Initialize all ZK based system trackers. */ - void initializeZKBasedSystemTrackers() throws IOException, - InterruptedException, KeeperException { + void initializeZKBasedSystemTrackers() + throws IOException, InterruptedException, KeeperException, ReplicationException { this.balancer = LoadBalancerFactory.getLoadBalancer(conf); this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf); this.normalizer.setMasterServices(this); @@ -737,7 +738,7 @@ public class HMaster extends HRegionServer implements MasterServices { this.assignmentManager = new AssignmentManager(this); this.assignmentManager.start(); - this.replicationManager = new ReplicationManager(conf, zooKeeper, this); + this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf); this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.serverManager); this.regionServerTracker.start(); @@ -783,8 +784,7 @@ public class HMaster extends HRegionServer implements MasterServices { * */ private void finishActiveMasterInitialization(MonitoredTask status) - throws IOException, InterruptedException, KeeperException { - + throws IOException, InterruptedException, KeeperException, ReplicationException { Thread zombieDetector = new Thread(new InitializationMonitor(this), "ActiveMasterInitializationMonitor-" + System.currentTimeMillis()); zombieDetector.setDaemon(true); @@ -3414,18 +3414,19 @@ public class HMaster extends HRegionServer implements MasterServices { } @Override - public ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws ReplicationException, - IOException { + public ReplicationPeerConfig getReplicationPeerConfig(String peerId) + throws ReplicationException, IOException { if (cpHost != null) { cpHost.preGetReplicationPeerConfig(peerId); } - final ReplicationPeerConfig peerConfig = this.replicationManager.getPeerConfig(peerId); - LOG.info(getClientIdAuditPrefix() + " get replication peer config, id=" + peerId + ", config=" - + peerConfig); + LOG.info(getClientIdAuditPrefix() + " get replication peer config, id=" + peerId); + Optional peerConfig = + this.replicationPeerManager.getPeerConfig(peerId); + if (cpHost != null) { cpHost.postGetReplicationPeerConfig(peerId); } - return peerConfig; + return peerConfig.orElse(null); } @Override @@ -3444,7 +3445,8 @@ public class HMaster extends HRegionServer implements MasterServices { } LOG.info(getClientIdAuditPrefix() + " list replication peers, regex=" + regex); Pattern pattern = regex == null ? null : Pattern.compile(regex); - List peers = this.replicationManager.listReplicationPeers(pattern); + List peers = + this.replicationPeerManager.listPeers(pattern); if (cpHost != null) { cpHost.postListReplicationPeers(regex); } @@ -3593,8 +3595,8 @@ public class HMaster extends HRegionServer implements MasterServices { } @Override - public ReplicationManager getReplicationManager() { - return replicationManager; + public ReplicationPeerManager getReplicationPeerManager() { + return replicationPeerManager; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index b0bf9ca61fc..0e552d69307 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.locking.LockManager; import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; -import org.apache.hadoop.hbase.master.replication.ReplicationManager; +import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost; import org.apache.hadoop.hbase.procedure2.LockedResource; @@ -459,9 +459,9 @@ public interface MasterServices extends Server { IOException; /** - * Returns the {@link ReplicationManager}. + * Returns the {@link ReplicationPeerManager}. */ - ReplicationManager getReplicationManager(); + ReplicationPeerManager getReplicationPeerManager(); /** * Update the peerConfig for the specified peer diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java index 19e6f9b9b5b..7fb187fe059 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java @@ -24,24 +24,24 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.conf.ConfigurationObserver; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.assignment.AssignmentManager; -import org.apache.hadoop.hbase.master.replication.ReplicationManager; +import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.Superusers; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @InterfaceAudience.Private @InterfaceStability.Evolving @@ -138,8 +138,8 @@ public class MasterProcedureEnv implements ConfigurationObserver { return remoteDispatcher; } - public ReplicationManager getReplicationManager() { - return master.getReplicationManager(); + public ReplicationPeerManager getReplicationPeerManager() { + return master.getReplicationPeerManager(); } public boolean isRunning() { @@ -151,22 +151,22 @@ public class MasterProcedureEnv implements ConfigurationObserver { return master.isInitialized(); } - public boolean waitInitialized(Procedure proc) { + public boolean waitInitialized(Procedure proc) { return master.getInitializedEvent().suspendIfNotReady(proc); } - public boolean waitServerCrashProcessingEnabled(Procedure proc) { + public boolean waitServerCrashProcessingEnabled(Procedure proc) { if (master instanceof HMaster) { return ((HMaster)master).getServerCrashProcessingEnabledEvent().suspendIfNotReady(proc); } return false; } - public boolean waitFailoverCleanup(Procedure proc) { + public boolean waitFailoverCleanup(Procedure proc) { return master.getAssignmentManager().getFailoverCleanupEvent().suspendIfNotReady(proc); } - public void setEventReady(ProcedureEvent event, boolean isReady) { + public void setEventReady(ProcedureEvent event, boolean isReady) { if (isReady) { event.wake(procSched); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java index 066c3e7f212..a4f9b3226c2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java @@ -58,16 +58,18 @@ public class AddPeerProcedure extends ModifyPeerProcedure { } @Override - protected void prePeerModification(MasterProcedureEnv env) throws IOException { + protected void prePeerModification(MasterProcedureEnv env) + throws IOException, ReplicationException { MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); if (cpHost != null) { cpHost.preAddReplicationPeer(peerId, peerConfig); } + env.getReplicationPeerManager().preAddPeer(peerId, peerConfig); } @Override protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { - env.getReplicationManager().addReplicationPeer(peerId, peerConfig, enabled); + env.getReplicationPeerManager().addPeer(peerId, peerConfig, enabled); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java index 9a28de6edbc..10e35a89f70 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,12 +52,12 @@ public class DisablePeerProcedure extends ModifyPeerProcedure { if (cpHost != null) { cpHost.preDisableReplicationPeer(peerId); } + env.getReplicationPeerManager().preDisablePeer(peerId); } @Override - protected void updatePeerStorage(MasterProcedureEnv env) - throws IllegalArgumentException, Exception { - env.getReplicationManager().disableReplicationPeer(peerId); + protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { + env.getReplicationPeerManager().disablePeer(peerId); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java index 485590152b6..f2a9f011173 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,11 +52,12 @@ public class EnablePeerProcedure extends ModifyPeerProcedure { if (cpHost != null) { cpHost.preEnableReplicationPeer(peerId); } + env.getReplicationPeerManager().preEnablePeer(peerId); } @Override - protected void updatePeerStorage(MasterProcedureEnv env) throws Exception { - env.getReplicationManager().enableReplicationPeer(peerId); + protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { + env.getReplicationPeerManager().enablePeer(peerId); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java index c4552edf8a2..279fbc78753 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; -import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; @@ -27,6 +26,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,17 +67,16 @@ public abstract class ModifyPeerProcedure } /** - * Called before we start the actual processing. If an exception is thrown then we will give up - * and mark the procedure as failed directly. + * Called before we start the actual processing. The implementation should call the pre CP hook, + * and also the pre-check for the peer modification. + *

+ * If an IOException is thrown then we will give up and mark the procedure as failed directly. If + * all checks passes then the procedure can not be rolled back any more. */ - protected abstract void prePeerModification(MasterProcedureEnv env) throws IOException; + protected abstract void prePeerModification(MasterProcedureEnv env) + throws IOException, ReplicationException; - /** - * We will give up and mark the procedure as failure if {@link IllegalArgumentException} is - * thrown, for other type of Exception we will retry. - */ - protected abstract void updatePeerStorage(MasterProcedureEnv env) - throws IllegalArgumentException, Exception; + protected abstract void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException; /** * Called before we finish the procedure. The implementation can do some logging work, and also @@ -100,23 +99,24 @@ public abstract class ModifyPeerProcedure try { prePeerModification(env); } catch (IOException e) { - LOG.warn(getClass().getName() + " failed to call prePeerModification for peer " + peerId + - ", mark the procedure as failure and give up", e); - setFailure("prePeerModification", e); + LOG.warn( + getClass().getName() + " failed to call CP hook or the pre check is failed for peer " + + peerId + ", mark the procedure as failure and give up", + e); + setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e); releaseLatch(); return Flow.NO_MORE_STATE; + } catch (ReplicationException e) { + LOG.warn(getClass().getName() + " failed to call prePeerModification for peer " + peerId + + ", retry", e); + throw new ProcedureYieldException(); } setNextState(PeerModificationState.UPDATE_PEER_STORAGE); return Flow.HAS_MORE_STATE; case UPDATE_PEER_STORAGE: try { updatePeerStorage(env); - } catch (IllegalArgumentException e) { - setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", - new DoNotRetryIOException(e)); - releaseLatch(); - return Flow.NO_MORE_STATE; - } catch (Exception e) { + } catch (ReplicationException e) { LOG.warn( getClass().getName() + " update peer storage for peer " + peerId + " failed, retry", e); throw new ProcedureYieldException(); @@ -158,8 +158,7 @@ public abstract class ModifyPeerProcedure @Override protected void rollbackState(MasterProcedureEnv env, PeerModificationState state) throws IOException, InterruptedException { - if (state == PeerModificationState.PRE_PEER_MODIFICATION || - state == PeerModificationState.UPDATE_PEER_STORAGE) { + if (state == PeerModificationState.PRE_PEER_MODIFICATION) { // actually the peer related operations has no rollback, but if we haven't done any // modifications on the peer storage, we can just return. return; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java index d40df02b3af..6e9c3840e8d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,11 +52,12 @@ public class RemovePeerProcedure extends ModifyPeerProcedure { if (cpHost != null) { cpHost.preRemoveReplicationPeer(peerId); } + env.getReplicationPeerManager().preRemovePeer(peerId); } @Override - protected void updatePeerStorage(MasterProcedureEnv env) throws Exception { - env.getReplicationManager().removeReplicationPeer(peerId); + protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { + env.getReplicationPeerManager().removePeer(peerId); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java deleted file mode 100644 index b6f8784bfd9..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java +++ /dev/null @@ -1,199 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master.replication; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.regex.Pattern; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; -import org.apache.hadoop.hbase.replication.ReplicationException; -import org.apache.hadoop.hbase.replication.ReplicationFactory; -import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; -import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; -import org.apache.hadoop.hbase.replication.ReplicationPeers; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.yetus.audience.InterfaceAudience; - -/** - * Manages and performs all replication admin operations. - *

- * Used to add/remove a replication peer. - */ -@InterfaceAudience.Private -public class ReplicationManager { - private final ReplicationQueuesClient replicationQueuesClient; - private final ReplicationPeers replicationPeers; - - public ReplicationManager(Configuration conf, ZKWatcher zkw, Abortable abortable) - throws IOException { - try { - this.replicationQueuesClient = ReplicationFactory - .getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf, abortable, zkw)); - this.replicationQueuesClient.init(); - this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, - this.replicationQueuesClient, abortable); - this.replicationPeers.init(); - } catch (Exception e) { - throw new IOException("Failed to construct ReplicationManager", e); - } - } - - public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) - throws ReplicationException { - checkPeerConfig(peerConfig); - replicationPeers.registerPeer(peerId, peerConfig, enabled); - replicationPeers.peerConnected(peerId); - } - - public void removeReplicationPeer(String peerId) throws ReplicationException { - replicationPeers.peerDisconnected(peerId); - replicationPeers.unregisterPeer(peerId); - } - - public void enableReplicationPeer(String peerId) throws ReplicationException { - this.replicationPeers.enablePeer(peerId); - } - - public void disableReplicationPeer(String peerId) throws ReplicationException { - this.replicationPeers.disablePeer(peerId); - } - - public ReplicationPeerConfig getPeerConfig(String peerId) - throws ReplicationException, ReplicationPeerNotFoundException { - ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(peerId); - if (peerConfig == null) { - throw new ReplicationPeerNotFoundException(peerId); - } - return peerConfig; - } - - public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) - throws ReplicationException, IOException { - checkPeerConfig(peerConfig); - this.replicationPeers.updatePeerConfig(peerId, peerConfig); - } - - public List listReplicationPeers(Pattern pattern) - throws ReplicationException { - List peers = new ArrayList<>(); - List peerIds = replicationPeers.getAllPeerIds(); - for (String peerId : peerIds) { - if (pattern == null || (pattern != null && pattern.matcher(peerId).matches())) { - peers.add(new ReplicationPeerDescription(peerId, - replicationPeers.getStatusOfPeerFromBackingStore(peerId), - replicationPeers.getReplicationPeerConfig(peerId))); - } - } - return peers; - } - - /** - * If replicate_all flag is true, it means all user tables will be replicated to peer cluster. - * Then allow config exclude namespaces or exclude table-cfs which can't be replicated to - * peer cluster. - * - * If replicate_all flag is false, it means all user tables can't be replicated to peer cluster. - * Then allow to config namespaces or table-cfs which will be replicated to peer cluster. - */ - private void checkPeerConfig(ReplicationPeerConfig peerConfig) { - if (peerConfig.replicateAllUserTables()) { - if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) || - (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { - throw new IllegalArgumentException("Need clean namespaces or table-cfs config firstly " + - "when you want replicate all cluster"); - } - checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(), - peerConfig.getExcludeTableCFsMap()); - } else { - if ((peerConfig.getExcludeNamespaces() != null - && !peerConfig.getExcludeNamespaces().isEmpty()) - || (peerConfig.getExcludeTableCFsMap() != null - && !peerConfig.getExcludeTableCFsMap().isEmpty())) { - throw new IllegalArgumentException( - "Need clean exclude-namespaces or exclude-table-cfs config firstly" - + " when replicate_all flag is false"); - } - checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), - peerConfig.getTableCFsMap()); - } - checkConfiguredWALEntryFilters(peerConfig); - } - - /** - * Set a namespace in the peer config means that all tables in this namespace will be replicated - * to the peer cluster. - *

    - *
  1. If peer config already has a namespace, then not allow set any table of this namespace to - * the peer config.
  2. - *
  3. If peer config already has a table, then not allow set this table's namespace to the peer - * config.
  4. - *
- *

- * Set a exclude namespace in the peer config means that all tables in this namespace can't be - * replicated to the peer cluster. - *

    - *
  1. If peer config already has a exclude namespace, then not allow set any exclude table of - * this namespace to the peer config.
  2. - *
  3. If peer config already has a exclude table, then not allow set this table's namespace as a - * exclude namespace.
  4. - *
- */ - private void checkNamespacesAndTableCfsConfigConflict(Set namespaces, - Map> tableCfs) { - if (namespaces == null || namespaces.isEmpty()) { - return; - } - if (tableCfs == null || tableCfs.isEmpty()) { - return; - } - for (Map.Entry> entry : tableCfs.entrySet()) { - TableName table = entry.getKey(); - if (namespaces.contains(table.getNamespaceAsString())) { - throw new IllegalArgumentException("Table-cfs " + table + " is conflict with namespaces " - + table.getNamespaceAsString() + " in peer config"); - } - } - } - - private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) { - String filterCSV = peerConfig.getConfiguration() - .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY); - if (filterCSV != null && !filterCSV.isEmpty()) { - String[] filters = filterCSV.split(","); - for (String filter : filters) { - try { - Class.forName(filter).newInstance(); - } catch (Exception e) { - throw new IllegalArgumentException("Configured WALEntryFilter " + filter + - " could not be created. Failing add/update " + "peer operation.", e); - } - } - } - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java new file mode 100644 index 00000000000..5abd874cba7 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; +import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Manages and performs all replication admin operations. + *

+ * Used to add/remove a replication peer. + */ +@InterfaceAudience.Private +public final class ReplicationPeerManager { + + private final ReplicationPeerStorage peerStorage; + + private final ReplicationQueueStorage queueStorage; + + private final ConcurrentMap peers; + + private ReplicationPeerManager(ReplicationPeerStorage peerStorage, + ReplicationQueueStorage queueStorage, + ConcurrentMap peers) { + this.peerStorage = peerStorage; + this.queueStorage = queueStorage; + this.peers = peers; + } + + private void checkQueuesDeleted(String peerId) + throws ReplicationException, DoNotRetryIOException { + for (ServerName replicator : queueStorage.getListOfReplicators()) { + List queueIds = queueStorage.getAllQueues(replicator); + for (String queueId : queueIds) { + ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); + if (queueInfo.getPeerId().equals(peerId)) { + throw new DoNotRetryIOException("undeleted queue for peerId: " + peerId + + ", replicator: " + replicator + ", queueId: " + queueId); + } + } + } + if (queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) { + throw new DoNotRetryIOException("Undeleted queue for peer " + peerId + " in hfile-refs"); + } + } + + public void preAddPeer(String peerId, ReplicationPeerConfig peerConfig) + throws DoNotRetryIOException, ReplicationException { + if (peerId.contains("-")) { + throw new DoNotRetryIOException("Found invalid peer name: " + peerId); + } + checkPeerConfig(peerConfig); + if (peers.containsKey(peerId)) { + throw new DoNotRetryIOException("Replication peer " + peerId + " already exists"); + } + // make sure that there is no queues with the same peer id. This may happen when we create a + // peer with the same id with a old deleted peer. If the replication queues for the old peer + // have not been cleaned up yet then we should not create the new peer, otherwise the old wal + // file may also be replicated. + checkQueuesDeleted(peerId); + } + + private ReplicationPeerDescription checkPeerExists(String peerId) throws DoNotRetryIOException { + ReplicationPeerDescription desc = peers.get(peerId); + if (desc == null) { + throw new DoNotRetryIOException("Replication peer " + peerId + " does not exist"); + } + return desc; + } + + public void preRemovePeer(String peerId) throws DoNotRetryIOException { + checkPeerExists(peerId); + } + + public void preEnablePeer(String peerId) throws DoNotRetryIOException { + ReplicationPeerDescription desc = checkPeerExists(peerId); + if (desc.isEnabled()) { + throw new DoNotRetryIOException("Replication peer " + peerId + " has already been enabled"); + } + } + + public void preDisablePeer(String peerId) throws DoNotRetryIOException { + ReplicationPeerDescription desc = checkPeerExists(peerId); + if (!desc.isEnabled()) { + throw new DoNotRetryIOException("Replication peer " + peerId + " has already been disabled"); + } + } + + public void preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) + throws DoNotRetryIOException { + checkPeerConfig(peerConfig); + ReplicationPeerDescription desc = checkPeerExists(peerId); + ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig(); + if (!StringUtils.isBlank(peerConfig.getClusterKey()) && + !peerConfig.getClusterKey().equals(oldPeerConfig.getClusterKey())) { + throw new DoNotRetryIOException( + "Changing the cluster key on an existing peer is not allowed. Existing key '" + + oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" + + peerConfig.getClusterKey() + "'"); + } + + if (!StringUtils.isBlank(peerConfig.getReplicationEndpointImpl()) && + !peerConfig.getReplicationEndpointImpl().equals(oldPeerConfig.getReplicationEndpointImpl())) { + throw new DoNotRetryIOException("Changing the replication endpoint implementation class " + + "on an existing peer is not allowed. Existing class '" + + oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId + + " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'"); + } + } + + private ReplicationPeerConfig copy(ReplicationPeerConfig peerConfig) { + ReplicationPeerConfig copiedPeerConfig = new ReplicationPeerConfig(); + copiedPeerConfig.getConfiguration().putAll(peerConfig.getConfiguration()); + copiedPeerConfig.getPeerData().putAll(peerConfig.getPeerData()); + copiedPeerConfig.setTableCFsMap(peerConfig.getTableCFsMap()); + copiedPeerConfig.setNamespaces(peerConfig.getNamespaces()); + copiedPeerConfig.setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap()); + copiedPeerConfig.setExcludeNamespaces(peerConfig.getExcludeNamespaces()); + copiedPeerConfig.setBandwidth(peerConfig.getBandwidth()); + copiedPeerConfig.setReplicateAllUserTables(peerConfig.replicateAllUserTables()); + copiedPeerConfig.setClusterKey(peerConfig.getClusterKey()); + copiedPeerConfig.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl()); + return copiedPeerConfig; + } + + public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) + throws ReplicationException { + if (peers.containsKey(peerId)) { + // this should be a retry, just return + return; + } + ReplicationPeerConfig copiedPeerConfig = copy(peerConfig); + peerStorage.addPeer(peerId, copiedPeerConfig, enabled); + peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig)); + } + + public void removePeer(String peerId) throws ReplicationException { + if (!peers.containsKey(peerId)) { + // this should be a retry, just return + return; + } + peerStorage.removePeer(peerId); + peers.remove(peerId); + } + + private void setPeerState(String peerId, boolean enabled) throws ReplicationException { + ReplicationPeerDescription desc = peers.get(peerId); + if (desc.isEnabled() == enabled) { + // this should be a retry, just return + return; + } + peerStorage.setPeerState(peerId, enabled); + peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig())); + } + + public void enablePeer(String peerId) throws ReplicationException { + setPeerState(peerId, true); + } + + public void disablePeer(String peerId) throws ReplicationException { + setPeerState(peerId, false); + } + + public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) + throws ReplicationException { + // the checking rules are too complicated here so we give up checking whether this is a retry. + ReplicationPeerDescription desc = peers.get(peerId); + ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig(); + ReplicationPeerConfig newPeerConfig = copy(peerConfig); + // we need to use the new conf to overwrite the old one. + newPeerConfig.getConfiguration().putAll(oldPeerConfig.getConfiguration()); + newPeerConfig.getConfiguration().putAll(peerConfig.getConfiguration()); + newPeerConfig.getPeerData().putAll(oldPeerConfig.getPeerData()); + newPeerConfig.getPeerData().putAll(peerConfig.getPeerData()); + + peerStorage.updatePeerConfig(peerId, newPeerConfig); + peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig)); + } + + public List listPeers(Pattern pattern) { + if (pattern == null) { + return new ArrayList<>(peers.values()); + } + return peers.values().stream().filter(r -> pattern.matcher(r.getPeerId()).matches()) + .collect(Collectors.toList()); + } + + public Optional getPeerConfig(String peerId) { + ReplicationPeerDescription desc = peers.get(peerId); + return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty(); + } + + /** + * If replicate_all flag is true, it means all user tables will be replicated to peer cluster. + * Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer + * cluster. + *

+ * If replicate_all flag is false, it means all user tables can't be replicated to peer cluster. + * Then allow to config namespaces or table-cfs which will be replicated to peer cluster. + */ + private static void checkPeerConfig(ReplicationPeerConfig peerConfig) + throws DoNotRetryIOException { + if (peerConfig.replicateAllUserTables()) { + if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) || + (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { + throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly " + + "when you want replicate all cluster"); + } + checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(), + peerConfig.getExcludeTableCFsMap()); + } else { + if ((peerConfig.getExcludeNamespaces() != null && + !peerConfig.getExcludeNamespaces().isEmpty()) || + (peerConfig.getExcludeTableCFsMap() != null && + !peerConfig.getExcludeTableCFsMap().isEmpty())) { + throw new DoNotRetryIOException( + "Need clean exclude-namespaces or exclude-table-cfs config firstly" + + " when replicate_all flag is false"); + } + checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), + peerConfig.getTableCFsMap()); + } + checkConfiguredWALEntryFilters(peerConfig); + } + + /** + * Set a namespace in the peer config means that all tables in this namespace will be replicated + * to the peer cluster. + *

    + *
  1. If peer config already has a namespace, then not allow set any table of this namespace to + * the peer config.
  2. + *
  3. If peer config already has a table, then not allow set this table's namespace to the peer + * config.
  4. + *
+ *

+ * Set a exclude namespace in the peer config means that all tables in this namespace can't be + * replicated to the peer cluster. + *

    + *
  1. If peer config already has a exclude namespace, then not allow set any exclude table of + * this namespace to the peer config.
  2. + *
  3. If peer config already has a exclude table, then not allow set this table's namespace as a + * exclude namespace.
  4. + *
+ */ + private static void checkNamespacesAndTableCfsConfigConflict(Set namespaces, + Map> tableCfs) throws DoNotRetryIOException { + if (namespaces == null || namespaces.isEmpty()) { + return; + } + if (tableCfs == null || tableCfs.isEmpty()) { + return; + } + for (Map.Entry> entry : tableCfs.entrySet()) { + TableName table = entry.getKey(); + if (namespaces.contains(table.getNamespaceAsString())) { + throw new DoNotRetryIOException("Table-cfs " + table + " is conflict with namespaces " + + table.getNamespaceAsString() + " in peer config"); + } + } + } + + private static void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) + throws DoNotRetryIOException { + String filterCSV = peerConfig.getConfiguration() + .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY); + if (filterCSV != null && !filterCSV.isEmpty()) { + String[] filters = filterCSV.split(","); + for (String filter : filters) { + try { + Class.forName(filter).newInstance(); + } catch (Exception e) { + throw new DoNotRetryIOException("Configured WALEntryFilter " + filter + + " could not be created. Failing add/update " + "peer operation.", e); + } + } + } + } + + public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf) + throws ReplicationException { + ReplicationPeerStorage peerStorage = + ReplicationStorageFactory.getReplicationPeerStorage(zk, conf); + ConcurrentMap peers = new ConcurrentHashMap<>(); + for (String peerId : peerStorage.listPeerIds()) { + Optional peerConfig = peerStorage.getPeerConfig(peerId); + boolean enabled = peerStorage.isPeerEnabled(peerId); + peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig.get())); + } + return new ReplicationPeerManager(peerStorage, + ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java index d8154dc930d..a43532dd5f6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -59,12 +60,12 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure { if (cpHost != null) { cpHost.preUpdateReplicationPeerConfig(peerId, peerConfig); } + env.getReplicationPeerManager().preUpdatePeerConfig(peerId, peerConfig); } @Override - protected void updatePeerStorage(MasterProcedureEnv env) - throws IllegalArgumentException, Exception { - env.getReplicationManager().updatePeerConfig(peerId, peerConfig); + protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { + env.getReplicationPeerManager().updatePeerConfig(peerId, peerConfig); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index ba78e6dde77..29a577ba954 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -17,6 +17,13 @@ */ package org.apache.hadoop.hbase.client.replication; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -25,26 +32,24 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.regex.Pattern; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.replication.ReplicationException; -import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; -import org.apache.hadoop.hbase.replication.ReplicationQueues; -import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterReplicationEndpointForTest; import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.ReplicationEndpointForTest; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -53,15 +58,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; /** * Unit testing of ReplicationAdmin @@ -73,8 +69,6 @@ public class TestReplicationAdmin { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestReplicationAdmin.class); - private static final Logger LOG = LoggerFactory.getLogger(TestReplicationAdmin.class); - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -109,16 +103,17 @@ public class TestReplicationAdmin { } @After - public void cleanupPeer() { - try { - hbaseAdmin.removeReplicationPeer(ID_ONE); - } catch (Exception e) { - LOG.debug("Replication peer " + ID_ONE + " may already be removed"); + public void tearDown() throws Exception { + for (ReplicationPeerDescription desc : hbaseAdmin.listReplicationPeers()) { + hbaseAdmin.removeReplicationPeer(desc.getPeerId()); } - try { - hbaseAdmin.removeReplicationPeer(ID_SECOND); - } catch (Exception e) { - LOG.debug("Replication peer " + ID_SECOND + " may already be removed"); + ReplicationQueueStorage queueStorage = ReplicationStorageFactory + .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration()); + for (ServerName serverName : queueStorage.getListOfReplicators()) { + for (String queue : queueStorage.getAllQueues(serverName)) { + queueStorage.removeQueue(serverName, queue); + } + queueStorage.removeReplicatorIfQueueIsEmpty(serverName); } } @@ -208,32 +203,29 @@ public class TestReplicationAdmin { ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); rpc2.setClusterKey(KEY_SECOND); Configuration conf = TEST_UTIL.getConfiguration(); - ZKWatcher zkw = new ZKWatcher(conf, "Test HBaseAdmin", null); - ReplicationQueues repQueues = - ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, null, zkw)); - repQueues.init("server1"); + ReplicationQueueStorage queueStorage = + ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), conf); + ServerName serverName = ServerName.valueOf("server1", 8000, 1234); // add queue for ID_ONE - repQueues.addLog(ID_ONE, "file1"); + queueStorage.addWAL(serverName, ID_ONE, "file1"); try { admin.addPeer(ID_ONE, rpc1, null); fail(); } catch (Exception e) { // OK! } - repQueues.removeQueue(ID_ONE); - assertEquals(0, repQueues.getAllQueues().size()); + queueStorage.removeQueue(serverName, ID_ONE); + assertEquals(0, queueStorage.getAllQueues(serverName).size()); // add recovered queue for ID_ONE - repQueues.addLog(ID_ONE + "-server2", "file1"); + queueStorage.addWAL(serverName, ID_ONE + "-server2", "file1"); try { admin.addPeer(ID_ONE, rpc2, null); fail(); } catch (Exception e) { // OK! } - repQueues.removeAllQueues(); - zkw.close(); } /** @@ -429,7 +421,7 @@ public class TestReplicationAdmin { tableCFs.clear(); tableCFs.put(tableName2, null); admin.removePeerTableCFs(ID_ONE, tableCFs); - assertTrue(false); + fail(); } catch (ReplicationException e) { } tableCFs.clear(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index e88710efc2f..4e666763033 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -19,9 +19,9 @@ package org.apache.hadoop.hbase.master; import static org.mockito.Mockito.mock; +import com.google.protobuf.Service; import java.io.IOException; import java.util.List; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.ChoreService; @@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.locking.LockManager; import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; -import org.apache.hadoop.hbase.master.replication.ReplicationManager; +import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost; import org.apache.hadoop.hbase.procedure2.LockedResource; @@ -55,9 +55,8 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import com.google.protobuf.Service; - public class MockNoopMasterServices implements MasterServices { + private final Configuration conf; private final MetricsMaster metricsMaster; @@ -461,7 +460,7 @@ public class MockNoopMasterServices implements MasterServices { } @Override - public ProcedureEvent getInitializedEvent() { + public ProcedureEvent getInitializedEvent() { return null; } @@ -476,7 +475,7 @@ public class MockNoopMasterServices implements MasterServices { } @Override - public ReplicationManager getReplicationManager() { + public ReplicationPeerManager getReplicationPeerManager() { return null; } -} +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java index aec1a75bf3e..2b85e25abea 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.HConnectionTestingUtility; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.FSUtils; @@ -274,7 +275,7 @@ public class TestMasterNoCluster { @Override void initializeZKBasedSystemTrackers() throws IOException, InterruptedException, - KeeperException { + KeeperException, ReplicationException { super.initializeZKBasedSystemTrackers(); // Record a newer server in server manager at first getServerManager().recordNewServerWithLock(newServer, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java index 2aa8e8361e7..8b795aa4235 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -54,9 +53,6 @@ public class TestReplicationDisableInactivePeer extends TestReplicationBase { */ @Test public void testDisableInactivePeer() throws Exception { - - // enabling and shutdown the peer - admin.enablePeer("2"); utility2.shutdownMiniHBaseCluster(); byte[] rowkey = Bytes.toBytes("disable inactive peer");