HBASE-19543 Abstract a replication storage interface to extract the zk specific code

This commit is contained in:
zhangduo 2017-12-22 14:37:28 +08:00
parent 8f5e54a456
commit 5e6c303528
26 changed files with 1750 additions and 312 deletions

View File

@ -107,6 +107,9 @@ public class CollectionUtils {
return list.get(list.size() - 1);
}
public static <T> List<T> nullToEmpty(List<T> list) {
return list != null ? list : Collections.emptyList();
}
/**
* In HBASE-16648 we found that ConcurrentHashMap.get is much faster than computeIfAbsent if the
* value already exists. Notice that the implementation does not guarantee that the supplier will

View File

@ -137,6 +137,18 @@
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-zookeeper</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-zookeeper</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- General dependencies -->
<dependency>
<groupId>org.apache.commons</groupId>

View File

@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.util.List;
import java.util.Optional;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Perform read/write to the replication peer storage.
*/
@InterfaceAudience.Private
public interface ReplicationPeerStorage {
/**
* Add a replication peer.
* @throws ReplicationException if there are errors accessing the storage service.
*/
void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
throws ReplicationException;
/**
* Remove a replication peer.
* @throws ReplicationException if there are errors accessing the storage service.
*/
void removePeer(String peerId) throws ReplicationException;
/**
* Set the state of peer, {@code true} to {@code ENABLED}, otherwise to {@code DISABLED}.
* @throws ReplicationException if there are errors accessing the storage service.
*/
void setPeerState(String peerId, boolean enabled) throws ReplicationException;
/**
* Update the config a replication peer.
* @throws ReplicationException if there are errors accessing the storage service.
*/
void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
throws ReplicationException;
/**
* Return the peer ids of all replication peers.
* @throws ReplicationException if there are errors accessing the storage service.
*/
List<String> listPeerIds() throws ReplicationException;
/**
* Test whether a replication peer is enabled.
* @throws ReplicationException if there are errors accessing the storage service.
*/
boolean isPeerEnabled(String peerId) throws ReplicationException;
/**
* Get the peer config of a replication peer.
* @throws ReplicationException if there are errors accessing the storage service.
*/
Optional<ReplicationPeerConfig> getPeerConfig(String peerId) throws ReplicationException;
}

View File

@ -0,0 +1,164 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Perform read/write to the replication queue storage.
*/
@InterfaceAudience.Private
public interface ReplicationQueueStorage {
/**
* Remove a replication queue for a given regionserver.
* @param serverName the name of the regionserver
* @param queueId a String that identifies the queue.
*/
void removeQueue(ServerName serverName, String queueId) throws ReplicationException;
/**
* Add a new WAL file to the given queue for a given regionserver. If the queue does not exist it
* is created.
* @param serverName the name of the regionserver
* @param queueId a String that identifies the queue.
* @param fileName name of the WAL
*/
void addWAL(ServerName serverName, String queueId, String fileName) throws ReplicationException;
/**
* Remove an WAL file from the given queue for a given regionserver.
* @param serverName the name of the regionserver
* @param queueId a String that identifies the queue.
* @param fileName name of the WAL
*/
void removeWAL(ServerName serverName, String queueId, String fileName)
throws ReplicationException;
/**
* Set the current position for a specific WAL in a given queue for a given regionserver.
* @param serverName the name of the regionserver
* @param queueId a String that identifies the queue
* @param fileName name of the WAL
* @param position the current position in the file
*/
void setWALPosition(ServerName serverName, String queueId, String fileName, long position)
throws ReplicationException;
/**
* Get the current position for a specific WAL in a given queue for a given regionserver.
* @param serverName the name of the regionserver
* @param queueId a String that identifies the queue
* @param fileName name of the WAL
* @return the current position in the file
*/
long getWALPosition(ServerName serverName, String queueId, String fileName)
throws ReplicationException;
/**
* Get a list of all queues for the specified region server.
* @param serverName the server name of the region server that owns the set of queues
* @return a list of queueIds
*/
List<String> getAllQueues(ServerName serverName) throws ReplicationException;
/**
* Change ownership for the queue identified by queueId and belongs to a dead region server.
* @param sourceServerName the name of the dead region server
* @param destServerName the name of the target region server
* @param queueId the id of the queue
* @return the new PeerId and A SortedSet of WALs in its queue
*/
Pair<String, SortedSet<String>> claimQueue(ServerName sourceServerName, String queueId,
ServerName destServerName) throws ReplicationException;
/**
* Remove the record of region server if the queue is empty.
*/
void removeReplicatorIfQueueIsEmpty(ServerName serverName) throws ReplicationException;
/**
* Get a list of all region servers that have outstanding replication queues. These servers could
* be alive, dead or from a previous run of the cluster.
* @return a list of server names
*/
List<ServerName> getListOfReplicators() throws ReplicationException;
/**
* Load all wals in all replication queues. This method guarantees to return a snapshot which
* contains all WALs in the zookeeper at the start of this call even there is concurrent queue
* failover. However, some newly created WALs during the call may not be included.
*/
Set<String> getAllWALs() throws ReplicationException;
/**
* Add a peer to hfile reference queue if peer does not exist.
* @param peerId peer cluster id to be added
* @throws ReplicationException if fails to add a peer id to hfile reference queue
*/
void addPeerToHFileRefs(String peerId) throws ReplicationException;
/**
* Remove a peer from hfile reference queue.
* @param peerId peer cluster id to be removed
*/
void removePeerFromHFileRefs(String peerId) throws ReplicationException;
/**
* Add new hfile references to the queue.
* @param peerId peer cluster id to which the hfiles need to be replicated
* @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
* will be added in the queue }
* @throws ReplicationException if fails to add a hfile reference
*/
void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) throws ReplicationException;
/**
* Remove hfile references from the queue.
* @param peerId peer cluster id from which this hfile references needs to be removed
* @param files list of hfile references to be removed
*/
void removeHFileRefs(String peerId, List<String> files) throws ReplicationException;
/**
* Get the change version number of replication hfile references node. This can be used as
* optimistic locking to get a consistent snapshot of the replication queues of hfile references.
* @return change version number of hfile references node
*/
int getHFileRefsNodeChangeVersion() throws ReplicationException;
/**
* Get list of all peers from hfile reference queue.
* @return a list of peer ids
*/
List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException;
/**
* Get a list of all hfile references in the given peer.
* @param peerId a String that identifies the peer
* @return a list of hfile references
*/
List<String> getReplicableHFiles(String peerId) throws ReplicationException;
}

View File

@ -65,7 +65,6 @@ public abstract class ReplicationStateZKBase {
protected final Configuration conf;
protected final Abortable abortable;
// Public for testing
public static final byte[] ENABLED_ZNODE_BYTES =
toByteArray(ReplicationProtos.ReplicationState.State.ENABLED);
public static final byte[] DISABLED_ZNODE_BYTES =

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Used to create replication storage(peer, queue) classes.
* <p>
* For now we only have zk based implementation.
*/
@InterfaceAudience.Private
public class ReplicationStorageFactory {
private ReplicationStorageFactory() {
}
/**
* Create a new {@link ReplicationPeerStorage}.
*/
public static ReplicationPeerStorage getReplicationPeerStorage(ZKWatcher zk, Configuration conf) {
return new ZKReplicationPeerStorage(zk, conf);
}
/**
* Create a new {@link ReplicationQueueStorage}.
*/
public static ReplicationQueueStorage getReplicationQueueStorage(ZKWatcher zk,
Configuration conf) {
return new ZKReplicationQueueStorage(zk, conf);
}
}

View File

@ -0,0 +1,164 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.util.CollectionUtils;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
/**
* ZK based replication peer storage.
*/
@InterfaceAudience.Private
class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements ReplicationPeerStorage {
private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationPeerStorage.class);
public static final byte[] ENABLED_ZNODE_BYTES =
toByteArray(ReplicationProtos.ReplicationState.State.ENABLED);
public static final byte[] DISABLED_ZNODE_BYTES =
toByteArray(ReplicationProtos.ReplicationState.State.DISABLED);
/**
* The name of the znode that contains the replication status of a remote slave (i.e. peer)
* cluster.
*/
private final String peerStateNodeName;
/**
* The name of the znode that contains a list of all remote slave (i.e. peer) clusters.
*/
private final String peersZNode;
public ZKReplicationPeerStorage(ZKWatcher zookeeper, Configuration conf) {
super(zookeeper, conf);
this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
this.peersZNode = ZNodePaths.joinZNode(replicationZNode, peersZNodeName);
}
private String getPeerStateNode(String peerId) {
return ZNodePaths.joinZNode(getPeerNode(peerId), peerStateNodeName);
}
private String getPeerNode(String peerId) {
return ZNodePaths.joinZNode(peersZNode, peerId);
}
@Override
public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
throws ReplicationException {
try {
ZKUtil.createWithParents(zookeeper, peersZNode);
ZKUtil.multiOrSequential(zookeeper,
Arrays.asList(
ZKUtilOp.createAndFailSilent(getPeerNode(peerId),
ReplicationPeerConfigUtil.toByteArray(peerConfig)),
ZKUtilOp.createAndFailSilent(getPeerStateNode(peerId),
enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES)),
false);
} catch (KeeperException e) {
throw new ReplicationException("Could not add peer with id=" + peerId + ", peerConfif=>" +
peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e);
}
}
@Override
public void removePeer(String peerId) throws ReplicationException {
try {
ZKUtil.deleteNodeRecursively(zookeeper, getPeerNode(peerId));
} catch (KeeperException e) {
throw new ReplicationException("Could not remove peer with id=" + peerId, e);
}
}
@Override
public void setPeerState(String peerId, boolean enabled) throws ReplicationException {
byte[] stateBytes = enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES;
try {
ZKUtil.setData(zookeeper, getPeerStateNode(peerId), stateBytes);
} catch (KeeperException e) {
throw new ReplicationException("Unable to change state of the peer with id=" + peerId, e);
}
}
@Override
public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
throws ReplicationException {
try {
ZKUtil.setData(this.zookeeper, getPeerNode(peerId),
ReplicationPeerConfigUtil.toByteArray(peerConfig));
} catch (KeeperException e) {
throw new ReplicationException(
"There was a problem trying to save changes to the " + "replication peer " + peerId, e);
}
}
@Override
public List<String> listPeerIds() throws ReplicationException {
try {
return CollectionUtils.nullToEmpty(ZKUtil.listChildrenAndWatchThem(zookeeper, peersZNode));
} catch (KeeperException e) {
throw new ReplicationException("Cannot get the list of peers", e);
}
}
@Override
public boolean isPeerEnabled(String peerId) throws ReplicationException {
try {
return Arrays.equals(ENABLED_ZNODE_BYTES,
ZKUtil.getData(zookeeper, getPeerStateNode(peerId)));
} catch (KeeperException | InterruptedException e) {
throw new ReplicationException("Unable to get status of the peer with id=" + peerId, e);
}
}
@Override
public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) throws ReplicationException {
byte[] data;
try {
data = ZKUtil.getData(zookeeper, getPeerNode(peerId));
} catch (KeeperException | InterruptedException e) {
throw new ReplicationException("Error getting configuration for peer with id=" + peerId, e);
}
if (data == null || data.length == 0) {
return Optional.empty();
}
try {
return Optional.of(ReplicationPeerConfigUtil.parsePeerFrom(data));
} catch (DeserializationException e) {
LOG.warn("Failed to parse replication peer config for peer with id=" + peerId, e);
return Optional.empty();
}
}
}

View File

@ -0,0 +1,425 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hbase.util.CollectionUtils.nullToEmpty;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CollectionUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.BadVersionException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.KeeperException.NotEmptyException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
/**
* ZK based replication queue storage.
*/
@InterfaceAudience.Private
class ZKReplicationQueueStorage extends ZKReplicationStorageBase
implements ReplicationQueueStorage {
private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationQueueStorage.class);
public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY =
"zookeeper.znode.replication.hfile.refs";
public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs";
/**
* The name of the znode that contains all replication queues
*/
private final String queuesZNode;
/**
* The name of the znode that contains queues of hfile references to be replicated
*/
private final String hfileRefsZNode;
public ZKReplicationQueueStorage(ZKWatcher zookeeper, Configuration conf) {
super(zookeeper, conf);
String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName);
this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hfileRefsZNodeName);
}
private String getRsNode(ServerName serverName) {
return ZNodePaths.joinZNode(queuesZNode, serverName.getServerName());
}
private String getQueueNode(ServerName serverName, String queueId) {
return ZNodePaths.joinZNode(getRsNode(serverName), queueId);
}
private String getFileNode(String queueNode, String fileName) {
return ZNodePaths.joinZNode(queueNode, fileName);
}
private String getFileNode(ServerName serverName, String queueId, String fileName) {
return getFileNode(getQueueNode(serverName, queueId), fileName);
}
@Override
public void removeQueue(ServerName serverName, String queueId) throws ReplicationException {
try {
ZKUtil.deleteNodeRecursively(zookeeper, getQueueNode(serverName, queueId));
} catch (KeeperException e) {
throw new ReplicationException(
"Failed to delete queue (serverName=" + serverName + ", queueId=" + queueId + ")", e);
}
}
@Override
public void addWAL(ServerName serverName, String queueId, String fileName)
throws ReplicationException {
try {
ZKUtil.createWithParents(zookeeper, getFileNode(serverName, queueId, fileName));
} catch (KeeperException e) {
throw new ReplicationException("Failed to add wal to queue (serverName=" + serverName +
", queueId=" + queueId + ", fileName=" + fileName + ")", e);
}
}
@Override
public void removeWAL(ServerName serverName, String queueId, String fileName)
throws ReplicationException {
String fileNode = getFileNode(serverName, queueId, fileName);
try {
ZKUtil.deleteNode(zookeeper, fileNode);
} catch (NoNodeException e) {
LOG.warn(fileNode + " has already been deleted when removing log");
} catch (KeeperException e) {
throw new ReplicationException("Failed to remove wal from queue (serverName=" + serverName +
", queueId=" + queueId + ", fileName=" + fileName + ")", e);
}
}
@Override
public void setWALPosition(ServerName serverName, String queueId, String fileName, long position)
throws ReplicationException {
try {
ZKUtil.setData(zookeeper, getFileNode(serverName, queueId, fileName),
ZKUtil.positionToByteArray(position));
} catch (KeeperException e) {
throw new ReplicationException("Failed to set log position (serverName=" + serverName +
", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e);
}
}
@Override
public long getWALPosition(ServerName serverName, String queueId, String fileName)
throws ReplicationException {
byte[] bytes;
try {
bytes = ZKUtil.getData(zookeeper, getFileNode(serverName, queueId, fileName));
} catch (KeeperException | InterruptedException e) {
throw new ReplicationException("Failed to get log position (serverName=" + serverName +
", queueId=" + queueId + ", fileName=" + fileName + ")", e);
}
try {
return ZKUtil.parseWALPositionFrom(bytes);
} catch (DeserializationException de) {
LOG.warn("Failed to parse log position (serverName=" + serverName + ", queueId=" + queueId +
", fileName=" + fileName + ")");
}
// if we can not parse the position, start at the beginning of the wal file again
return 0;
}
@Override
public Pair<String, SortedSet<String>> claimQueue(ServerName sourceServerName, String queueId,
ServerName destServerName) throws ReplicationException {
LOG.info(
"Atomically moving " + sourceServerName + "/" + queueId + "'s WALs to " + destServerName);
try {
ZKUtil.createWithParents(zookeeper, getRsNode(destServerName));
} catch (KeeperException e) {
throw new ReplicationException(
"Claim queue queueId=" + queueId + " from " + sourceServerName + " to " + destServerName +
" failed when creating the node for " + destServerName,
e);
}
try {
String oldQueueNode = getQueueNode(sourceServerName, queueId);
List<String> wals = ZKUtil.listChildrenNoWatch(zookeeper, oldQueueNode);
String newQueueId = queueId + "-" + sourceServerName;
if (CollectionUtils.isEmpty(wals)) {
ZKUtil.deleteNodeFailSilent(zookeeper, oldQueueNode);
LOG.info("Removed " + sourceServerName + "/" + queueId + " since it's empty");
return new Pair<>(newQueueId, Collections.emptySortedSet());
}
String newQueueNode = getQueueNode(destServerName, newQueueId);
List<ZKUtilOp> listOfOps = new ArrayList<>();
SortedSet<String> logQueue = new TreeSet<>();
// create the new cluster znode
listOfOps.add(ZKUtilOp.createAndFailSilent(newQueueNode, HConstants.EMPTY_BYTE_ARRAY));
// get the offset of the logs and set it to new znodes
for (String wal : wals) {
String oldWalNode = getFileNode(oldQueueNode, wal);
byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalNode);
if (LOG.isDebugEnabled()) {
LOG.debug("Creating " + wal + " with data " + Bytes.toStringBinary(logOffset));
}
String newWalNode = getFileNode(newQueueNode, wal);
listOfOps.add(ZKUtilOp.createAndFailSilent(newWalNode, logOffset));
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalNode));
logQueue.add(wal);
}
// add delete op for peer
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldQueueNode));
if (LOG.isTraceEnabled()) {
LOG.trace("The multi list size is: " + listOfOps.size());
}
ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
LOG.info(
"Atomically moved " + sourceServerName + "/" + queueId + "'s WALs to " + destServerName);
return new Pair<>(newQueueId, logQueue);
} catch (NoNodeException | NodeExistsException | NotEmptyException | BadVersionException e) {
// Multi call failed; it looks like some other regionserver took away the logs.
// These exceptions mean that zk tells us the request can not be execute so it is safe to just
// return a null. For other types of exception should be thrown out to notify the upper layer.
LOG.info(
"Claim queue queueId=" + queueId + " from " + sourceServerName + " to " + destServerName +
" failed with " + e.toString() + ", maybe someone else has already took away the logs");
return null;
} catch (KeeperException | InterruptedException e) {
throw new ReplicationException("Claim queue queueId=" + queueId + " from " +
sourceServerName + " to " + destServerName + " failed", e);
}
}
@Override
public void removeReplicatorIfQueueIsEmpty(ServerName serverName) throws ReplicationException {
try {
ZKUtil.deleteNodeFailSilent(zookeeper, getRsNode(serverName));
} catch (NotEmptyException e) {
// keep silence to avoid logging too much.
} catch (KeeperException e) {
throw new ReplicationException("Failed to remove replicator for " + serverName, e);
}
}
private List<ServerName> getListOfReplicators0() throws KeeperException {
return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, queuesZNode)).stream()
.map(ServerName::parseServerName).collect(toList());
}
@Override
public List<ServerName> getListOfReplicators() throws ReplicationException {
try {
return getListOfReplicators0();
} catch (KeeperException e) {
throw new ReplicationException("Failed to get list of replicators", e);
}
}
private List<String> getLogsInQueue0(ServerName serverName, String queueId)
throws KeeperException {
return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, getQueueNode(serverName, queueId)));
}
private List<String> getAllQueues0(ServerName serverName) throws KeeperException {
return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, getRsNode(serverName)));
}
@Override
public List<String> getAllQueues(ServerName serverName) throws ReplicationException {
try {
return getAllQueues0(serverName);
} catch (KeeperException e) {
throw new ReplicationException("Failed to get all queues (serverName=" + serverName + ")", e);
}
}
private int getQueuesZNodeCversion() throws KeeperException {
Stat stat = new Stat();
ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat);
return stat.getCversion();
}
@Override
public Set<String> getAllWALs() throws ReplicationException {
try {
for (int retry = 0;; retry++) {
int v0 = getQueuesZNodeCversion();
List<ServerName> rss = getListOfReplicators0();
if (rss.isEmpty()) {
LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
return Collections.emptySet();
}
Set<String> wals = Sets.newHashSet();
for (ServerName rs : rss) {
for (String queueId : getAllQueues0(rs)) {
wals.addAll(getLogsInQueue0(rs, queueId));
}
}
int v1 = getQueuesZNodeCversion();
if (v0 == v1) {
return wals;
}
LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d",
v0, v1, retry));
}
} catch (KeeperException e) {
throw new ReplicationException("Failed to get all wals", e);
}
}
private String getHFileRefsPeerNode(String peerId) {
return ZNodePaths.joinZNode(hfileRefsZNode, peerId);
}
private String getHFileNode(String peerNode, String fileName) {
return ZNodePaths.joinZNode(peerNode, fileName);
}
@Override
public void addPeerToHFileRefs(String peerId) throws ReplicationException {
String peerNode = getHFileRefsPeerNode(peerId);
try {
if (ZKUtil.checkExists(zookeeper, peerNode) == -1) {
LOG.info("Adding peer " + peerId + " to hfile reference queue.");
ZKUtil.createWithParents(zookeeper, peerNode);
}
} catch (KeeperException e) {
throw new ReplicationException("Failed to add peer " + peerId + " to hfile reference queue.",
e);
}
}
@Override
public void removePeerFromHFileRefs(String peerId) throws ReplicationException {
String peerNode = getHFileRefsPeerNode(peerId);
try {
if (ZKUtil.checkExists(zookeeper, peerNode) == -1) {
if (LOG.isDebugEnabled()) {
LOG.debug("Peer " + peerNode + " not found in hfile reference queue.");
}
} else {
LOG.info("Removing peer " + peerNode + " from hfile reference queue.");
ZKUtil.deleteNodeRecursively(zookeeper, peerNode);
}
} catch (KeeperException e) {
throw new ReplicationException(
"Failed to remove peer " + peerId + " from hfile reference queue.", e);
}
}
@Override
public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
throws ReplicationException {
String peerNode = getHFileRefsPeerNode(peerId);
boolean debugEnabled = LOG.isDebugEnabled();
if (debugEnabled) {
LOG.debug("Adding hfile references " + pairs + " in queue " + peerNode);
}
List<ZKUtilOp> listOfOps =
pairs.stream().map(p -> p.getSecond().getName()).map(n -> getHFileNode(peerNode, n))
.map(f -> ZKUtilOp.createAndFailSilent(f, HConstants.EMPTY_BYTE_ARRAY)).collect(toList());
if (debugEnabled) {
LOG.debug("The multi list size for adding hfile references in zk for node " + peerNode +
" is " + listOfOps.size());
}
try {
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
} catch (KeeperException e) {
throw new ReplicationException("Failed to add hfile reference to peer " + peerId, e);
}
}
@Override
public void removeHFileRefs(String peerId, List<String> files) throws ReplicationException {
String peerNode = getHFileRefsPeerNode(peerId);
boolean debugEnabled = LOG.isDebugEnabled();
if (debugEnabled) {
LOG.debug("Removing hfile references " + files + " from queue " + peerNode);
}
List<ZKUtilOp> listOfOps = files.stream().map(n -> getHFileNode(peerNode, n))
.map(ZKUtilOp::deleteNodeFailSilent).collect(toList());
if (debugEnabled) {
LOG.debug("The multi list size for removing hfile references in zk for node " + peerNode +
" is " + listOfOps.size());
}
try {
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
} catch (KeeperException e) {
throw new ReplicationException("Failed to remove hfile reference from peer " + peerId, e);
}
}
@Override
public int getHFileRefsNodeChangeVersion() throws ReplicationException {
Stat stat = new Stat();
try {
ZKUtil.getDataNoWatch(zookeeper, hfileRefsZNode, stat);
} catch (KeeperException e) {
throw new ReplicationException("Failed to get stat of replication hfile references node.", e);
}
return stat.getCversion();
}
@Override
public List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException {
try {
return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, hfileRefsZNode));
} catch (KeeperException e) {
throw new ReplicationException("Failed to get list of all peers in hfile references node.",
e);
}
}
@Override
public List<String> getReplicableHFiles(String peerId) throws ReplicationException {
try {
return nullToEmpty(ZKUtil.listChildrenNoWatch(this.zookeeper, getHFileRefsPeerNode(peerId)));
} catch (KeeperException e) {
throw new ReplicationException("Failed to get list of hfile references for peer " + peerId,
e);
}
}
}

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
/**
* This is a base class for maintaining replication related data,for example, peer, queue, etc, in
* zookeeper.
*/
@InterfaceAudience.Private
class ZKReplicationStorageBase {
/** The name of the base znode that contains all replication state. */
protected final String replicationZNode;
protected final ZKWatcher zookeeper;
protected final Configuration conf;
protected ZKReplicationStorageBase(ZKWatcher zookeeper, Configuration conf) {
this.zookeeper = zookeeper;
this.conf = conf;
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
this.replicationZNode =
ZNodePaths.joinZNode(this.zookeeper.znodePaths.baseZNode, replicationZNodeName);
}
/**
* Serialized protobuf of <code>state</code> with pb magic prefix prepended suitable for use as
* content of a peer-state znode under a peer cluster id as in
* /hbase/replication/peers/PEER_ID/peer-state.
*/
protected static byte[] toByteArray(final ReplicationProtos.ReplicationState.State state) {
ReplicationProtos.ReplicationState msg =
ReplicationProtos.ReplicationState.newBuilder().setState(state).build();
// There is no toByteArray on this pb Message?
// 32 bytes is default which seems fair enough here.
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
CodedOutputStream cos = CodedOutputStream.newInstance(baos, 16);
msg.writeTo(cos);
cos.flush();
baos.flush();
return ProtobufUtil.prependPBMagic(baos.toByteArray());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,171 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ ReplicationTests.class, MediumTests.class })
public class TestZKReplicationPeerStorage {
private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility();
private static ZKReplicationPeerStorage STORAGE;
@BeforeClass
public static void setUp() throws Exception {
UTIL.startMiniZKCluster();
STORAGE = new ZKReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
}
@AfterClass
public static void tearDown() throws IOException {
UTIL.shutdownMiniZKCluster();
}
private Set<String> randNamespaces(Random rand) {
return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5))
.collect(toSet());
}
private Map<TableName, List<String>> randTableCFs(Random rand) {
int size = rand.nextInt(5);
Map<TableName, List<String>> map = new HashMap<>();
for (int i = 0; i < size; i++) {
TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong()));
List<String> cfs = Stream.generate(() -> Long.toHexString(rand.nextLong()))
.limit(rand.nextInt(5)).collect(toList());
map.put(tn, cfs);
}
return map;
}
private ReplicationPeerConfig getConfig(int seed) {
Random rand = new Random(seed);
ReplicationPeerConfig config = new ReplicationPeerConfig();
config.setClusterKey(Long.toHexString(rand.nextLong()));
config.setReplicationEndpointImpl(Long.toHexString(rand.nextLong()));
config.setNamespaces(randNamespaces(rand));
config.setExcludeNamespaces(randNamespaces(rand));
config.setTableCFsMap(randTableCFs(rand));
config.setReplicateAllUserTables(rand.nextBoolean());
config.setBandwidth(rand.nextInt(1000));
return config;
}
private void assertSetEquals(Set<String> expected, Set<String> actual) {
if (expected == null || expected.size() == 0) {
assertTrue(actual == null || actual.size() == 0);
return;
}
assertEquals(expected.size(), actual.size());
expected.forEach(s -> assertTrue(actual.contains(s)));
}
private void assertMapEquals(Map<TableName, List<String>> expected,
Map<TableName, List<String>> actual) {
if (expected == null || expected.size() == 0) {
assertTrue(actual == null || actual.size() == 0);
return;
}
assertEquals(expected.size(), actual.size());
expected.forEach((expectedTn, expectedCFs) -> {
List<String> actualCFs = actual.get(expectedTn);
if (expectedCFs == null || expectedCFs.size() == 0) {
assertTrue(actual.containsKey(expectedTn));
assertTrue(actualCFs == null || actualCFs.size() == 0);
} else {
assertNotNull(actualCFs);
assertEquals(expectedCFs.size(), actualCFs.size());
for (Iterator<String> expectedIt = expectedCFs.iterator(), actualIt = actualCFs.iterator();
expectedIt.hasNext();) {
assertEquals(expectedIt.next(), actualIt.next());
}
}
});
}
private void assertConfigEquals(ReplicationPeerConfig expected, ReplicationPeerConfig actual) {
assertEquals(expected.getClusterKey(), actual.getClusterKey());
assertEquals(expected.getReplicationEndpointImpl(), actual.getReplicationEndpointImpl());
assertSetEquals(expected.getNamespaces(), actual.getNamespaces());
assertSetEquals(expected.getExcludeNamespaces(), actual.getExcludeNamespaces());
assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap());
assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap());
assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables());
assertEquals(expected.getBandwidth(), actual.getBandwidth());
}
@Test
public void test() throws ReplicationException {
int peerCount = 10;
for (int i = 0; i < peerCount; i++) {
STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0);
}
List<String> peerIds = STORAGE.listPeerIds();
assertEquals(peerCount, peerIds.size());
for (String peerId : peerIds) {
int seed = Integer.parseInt(peerId);
assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId).get());
}
for (int i = 0; i < peerCount; i++) {
STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1));
}
for (String peerId : peerIds) {
int seed = Integer.parseInt(peerId);
assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId).get());
}
for (int i = 0; i < peerCount; i++) {
assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i)));
}
for (int i = 0; i < peerCount; i++) {
STORAGE.setPeerState(Integer.toString(i), i % 2 != 0);
}
for (int i = 0; i < peerCount; i++) {
assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i)));
}
String toRemove = Integer.toString(peerCount / 2);
STORAGE.removePeer(toRemove);
peerIds = STORAGE.listPeerIds();
assertEquals(peerCount - 1, peerIds.size());
assertFalse(peerIds.contains(toRemove));
assertFalse(STORAGE.getPeerConfig(toRemove).isPresent());
}
}

View File

@ -0,0 +1,171 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ ReplicationTests.class, MediumTests.class })
public class TestZKReplicationQueueStorage {
private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility();
private static ZKReplicationQueueStorage STORAGE;
@BeforeClass
public static void setUp() throws Exception {
UTIL.startMiniZKCluster();
STORAGE = new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
}
@AfterClass
public static void tearDown() throws IOException {
UTIL.shutdownMiniZKCluster();
}
@After
public void tearDownAfterTest() throws ReplicationException {
for (ServerName serverName : STORAGE.getListOfReplicators()) {
for (String queue : STORAGE.getAllQueues(serverName)) {
STORAGE.removeQueue(serverName, queue);
}
STORAGE.removeReplicatorIfQueueIsEmpty(serverName);
}
for (String peerId : STORAGE.getAllPeersFromHFileRefsQueue()) {
STORAGE.removePeerFromHFileRefs(peerId);
}
}
private ServerName getServerName(int i) {
return ServerName.valueOf("127.0.0.1", 8000 + i, 10000 + i);
}
@Test
public void testReplicator() throws ReplicationException {
assertTrue(STORAGE.getListOfReplicators().isEmpty());
String queueId = "1";
for (int i = 0; i < 10; i++) {
STORAGE.addWAL(getServerName(i), queueId, "file" + i);
}
List<ServerName> replicators = STORAGE.getListOfReplicators();
assertEquals(10, replicators.size());
for (int i = 0; i < 10; i++) {
assertThat(replicators, hasItems(getServerName(i)));
}
for (int i = 0; i < 5; i++) {
STORAGE.removeQueue(getServerName(i), queueId);
}
for (int i = 0; i < 10; i++) {
STORAGE.removeReplicatorIfQueueIsEmpty(getServerName(i));
}
replicators = STORAGE.getListOfReplicators();
assertEquals(5, replicators.size());
for (int i = 5; i < 10; i++) {
assertThat(replicators, hasItems(getServerName(i)));
}
}
private String getFileName(String base, int i) {
return String.format(base + "-%04d", i);
}
@Test
public void testAddRemoveLog() throws ReplicationException {
ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
assertTrue(STORAGE.getAllQueues(serverName1).isEmpty());
String queue1 = "1";
String queue2 = "2";
for (int i = 0; i < 10; i++) {
STORAGE.addWAL(serverName1, queue1, getFileName("file1", i));
STORAGE.addWAL(serverName1, queue2, getFileName("file2", i));
}
List<String> queueIds = STORAGE.getAllQueues(serverName1);
assertEquals(2, queueIds.size());
assertThat(queueIds, hasItems("1", "2"));
for (int i = 0; i < 10; i++) {
assertEquals(0, STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i)));
assertEquals(0, STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i)));
STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100);
STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i + 1) * 100 + 10);
}
for (int i = 0; i < 10; i++) {
assertEquals((i + 1) * 100,
STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i)));
assertEquals((i + 1) * 100 + 10,
STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i)));
}
for (int i = 0; i < 10; i++) {
if (i % 2 == 0) {
STORAGE.removeWAL(serverName1, queue1, getFileName("file1", i));
} else {
STORAGE.removeWAL(serverName1, queue2, getFileName("file2", i));
}
}
queueIds = STORAGE.getAllQueues(serverName1);
assertEquals(2, queueIds.size());
assertThat(queueIds, hasItems("1", "2"));
ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001);
Pair<String, SortedSet<String>> peer1 = STORAGE.claimQueue(serverName1, "1", serverName2);
assertEquals("1-" + serverName1.getServerName(), peer1.getFirst());
assertEquals(5, peer1.getSecond().size());
int i = 1;
for (String wal : peer1.getSecond()) {
assertEquals(getFileName("file1", i), wal);
assertEquals((i + 1) * 100,
STORAGE.getWALPosition(serverName2, peer1.getFirst(), getFileName("file1", i)));
i += 2;
}
queueIds = STORAGE.getAllQueues(serverName1);
assertEquals(1, queueIds.size());
assertThat(queueIds, hasItems("2"));
queueIds = STORAGE.getAllQueues(serverName2);
assertEquals(1, queueIds.size());
assertThat(queueIds, hasItems(peer1.getFirst()));
Set<String> allWals = STORAGE.getAllWALs();
assertEquals(10, allWals.size());
for (i = 0; i < 10; i++) {
assertThat(allWals, hasItems(i % 2 == 0 ? getFileName("file2", i) : getFileName("file1", i)));
}
}
}

View File

@ -39,6 +39,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@ -135,7 +136,7 @@ import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure;
import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure;
import org.apache.hadoop.hbase.master.replication.ModifyPeerProcedure;
import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
import org.apache.hadoop.hbase.master.replication.ReplicationManager;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.mob.MobConstants;
@ -327,7 +328,7 @@ public class HMaster extends HRegionServer implements MasterServices {
private AssignmentManager assignmentManager;
// manager of replication
private ReplicationManager replicationManager;
private ReplicationPeerManager replicationPeerManager;
// buffer for "fatal error" notices from region servers
// in the cluster. This is only used for assisting
@ -698,8 +699,8 @@ public class HMaster extends HRegionServer implements MasterServices {
/**
* Initialize all ZK based system trackers.
*/
void initializeZKBasedSystemTrackers() throws IOException,
InterruptedException, KeeperException, CoordinatedStateException {
void initializeZKBasedSystemTrackers() throws IOException, InterruptedException, KeeperException,
CoordinatedStateException, ReplicationException {
this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
this.normalizer.setMasterServices(this);
@ -717,7 +718,7 @@ public class HMaster extends HRegionServer implements MasterServices {
this.assignmentManager = new AssignmentManager(this);
this.assignmentManager.start();
this.replicationManager = new ReplicationManager(conf, zooKeeper, this);
this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf);
this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.serverManager);
this.regionServerTracker.start();
@ -762,9 +763,8 @@ public class HMaster extends HRegionServer implements MasterServices {
* <li>Handle either fresh cluster start or master failover</li>
* </ol>
*/
private void finishActiveMasterInitialization(MonitoredTask status)
throws IOException, InterruptedException, KeeperException, CoordinatedStateException {
private void finishActiveMasterInitialization(MonitoredTask status) throws IOException,
InterruptedException, KeeperException, CoordinatedStateException, ReplicationException {
activeMaster = true;
Thread zombieDetector = new Thread(new InitializationMonitor(this),
"ActiveMasterInitializationMonitor-" + System.currentTimeMillis());
@ -3351,18 +3351,19 @@ public class HMaster extends HRegionServer implements MasterServices {
}
@Override
public ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws ReplicationException,
IOException {
public ReplicationPeerConfig getReplicationPeerConfig(String peerId)
throws ReplicationException, IOException {
if (cpHost != null) {
cpHost.preGetReplicationPeerConfig(peerId);
}
final ReplicationPeerConfig peerConfig = this.replicationManager.getPeerConfig(peerId);
LOG.info(getClientIdAuditPrefix() + " get replication peer config, id=" + peerId + ", config="
+ peerConfig);
LOG.info(getClientIdAuditPrefix() + " get replication peer config, id=" + peerId);
Optional<ReplicationPeerConfig> peerConfig =
this.replicationPeerManager.getPeerConfig(peerId);
if (cpHost != null) {
cpHost.postGetReplicationPeerConfig(peerId);
}
return peerConfig;
return peerConfig.orElse(null);
}
@Override
@ -3381,7 +3382,8 @@ public class HMaster extends HRegionServer implements MasterServices {
}
LOG.info(getClientIdAuditPrefix() + " list replication peers, regex=" + regex);
Pattern pattern = regex == null ? null : Pattern.compile(regex);
List<ReplicationPeerDescription> peers = this.replicationManager.listReplicationPeers(pattern);
List<ReplicationPeerDescription> peers =
this.replicationPeerManager.listPeers(pattern);
if (cpHost != null) {
cpHost.postListReplicationPeers(regex);
}
@ -3531,7 +3533,7 @@ public class HMaster extends HRegionServer implements MasterServices {
}
@Override
public ReplicationManager getReplicationManager() {
return replicationManager;
public ReplicationPeerManager getReplicationPeerManager() {
return replicationPeerManager;
}
}

View File

@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.locking.LockManager;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.replication.ReplicationManager;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
import org.apache.hadoop.hbase.procedure2.LockedResource;
@ -459,9 +459,9 @@ public interface MasterServices extends Server {
IOException;
/**
* Returns the {@link ReplicationManager}.
* Returns the {@link ReplicationPeerManager}.
*/
ReplicationManager getReplicationManager();
ReplicationPeerManager getReplicationPeerManager();
/**
* Update the peerConfig for the specified peer

View File

@ -24,24 +24,24 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.replication.ReplicationManager;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
@InterfaceStability.Evolving
@ -138,8 +138,8 @@ public class MasterProcedureEnv implements ConfigurationObserver {
return remoteDispatcher;
}
public ReplicationManager getReplicationManager() {
return master.getReplicationManager();
public ReplicationPeerManager getReplicationPeerManager() {
return master.getReplicationPeerManager();
}
public boolean isRunning() {
@ -151,22 +151,22 @@ public class MasterProcedureEnv implements ConfigurationObserver {
return master.isInitialized();
}
public boolean waitInitialized(Procedure proc) {
public boolean waitInitialized(Procedure<?> proc) {
return master.getInitializedEvent().suspendIfNotReady(proc);
}
public boolean waitServerCrashProcessingEnabled(Procedure proc) {
public boolean waitServerCrashProcessingEnabled(Procedure<?> proc) {
if (master instanceof HMaster) {
return ((HMaster)master).getServerCrashProcessingEnabledEvent().suspendIfNotReady(proc);
}
return false;
}
public boolean waitFailoverCleanup(Procedure proc) {
public boolean waitFailoverCleanup(Procedure<?> proc) {
return master.getAssignmentManager().getFailoverCleanupEvent().suspendIfNotReady(proc);
}
public void setEventReady(ProcedureEvent event, boolean isReady) {
public void setEventReady(ProcedureEvent<?> event, boolean isReady) {
if (isReady) {
event.wake(procSched);
} else {

View File

@ -58,16 +58,18 @@ public class AddPeerProcedure extends ModifyPeerProcedure {
}
@Override
protected void prePeerModification(MasterProcedureEnv env) throws IOException {
protected void prePeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException {
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
cpHost.preAddReplicationPeer(peerId, peerConfig);
}
env.getReplicationPeerManager().preAddPeer(peerId, peerConfig);
}
@Override
protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
env.getReplicationManager().addReplicationPeer(peerId, peerConfig, enabled);
env.getReplicationPeerManager().addPeer(peerId, peerConfig, enabled);
}
@Override

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -51,12 +52,12 @@ public class DisablePeerProcedure extends ModifyPeerProcedure {
if (cpHost != null) {
cpHost.preDisableReplicationPeer(peerId);
}
env.getReplicationPeerManager().preDisablePeer(peerId);
}
@Override
protected void updatePeerStorage(MasterProcedureEnv env)
throws IllegalArgumentException, Exception {
env.getReplicationManager().disableReplicationPeer(peerId);
protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
env.getReplicationPeerManager().disablePeer(peerId);
}
@Override

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -51,11 +52,12 @@ public class EnablePeerProcedure extends ModifyPeerProcedure {
if (cpHost != null) {
cpHost.preEnableReplicationPeer(peerId);
}
env.getReplicationPeerManager().preEnablePeer(peerId);
}
@Override
protected void updatePeerStorage(MasterProcedureEnv env) throws Exception {
env.getReplicationManager().enableReplicationPeer(peerId);
protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
env.getReplicationPeerManager().enablePeer(peerId);
}
@Override

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
@ -27,6 +26,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -67,17 +67,16 @@ public abstract class ModifyPeerProcedure
}
/**
* Called before we start the actual processing. If an exception is thrown then we will give up
* and mark the procedure as failed directly.
* Called before we start the actual processing. The implementation should call the pre CP hook,
* and also the pre-check for the peer modification.
* <p>
* If an IOException is thrown then we will give up and mark the procedure as failed directly. If
* all checks passes then the procedure can not be rolled back any more.
*/
protected abstract void prePeerModification(MasterProcedureEnv env) throws IOException;
protected abstract void prePeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException;
/**
* We will give up and mark the procedure as failure if {@link IllegalArgumentException} is
* thrown, for other type of Exception we will retry.
*/
protected abstract void updatePeerStorage(MasterProcedureEnv env)
throws IllegalArgumentException, Exception;
protected abstract void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException;
/**
* Called before we finish the procedure. The implementation can do some logging work, and also
@ -100,23 +99,24 @@ public abstract class ModifyPeerProcedure
try {
prePeerModification(env);
} catch (IOException e) {
LOG.warn(getClass().getName() + " failed to call prePeerModification for peer " + peerId +
", mark the procedure as failure and give up", e);
setFailure("prePeerModification", e);
LOG.warn(
getClass().getName() + " failed to call CP hook or the pre check is failed for peer " +
peerId + ", mark the procedure as failure and give up",
e);
setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e);
releaseLatch();
return Flow.NO_MORE_STATE;
} catch (ReplicationException e) {
LOG.warn(getClass().getName() + " failed to call prePeerModification for peer " + peerId +
", retry", e);
throw new ProcedureYieldException();
}
setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
return Flow.HAS_MORE_STATE;
case UPDATE_PEER_STORAGE:
try {
updatePeerStorage(env);
} catch (IllegalArgumentException e) {
setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer",
new DoNotRetryIOException(e));
releaseLatch();
return Flow.NO_MORE_STATE;
} catch (Exception e) {
} catch (ReplicationException e) {
LOG.warn(
getClass().getName() + " update peer storage for peer " + peerId + " failed, retry", e);
throw new ProcedureYieldException();
@ -158,8 +158,7 @@ public abstract class ModifyPeerProcedure
@Override
protected void rollbackState(MasterProcedureEnv env, PeerModificationState state)
throws IOException, InterruptedException {
if (state == PeerModificationState.PRE_PEER_MODIFICATION ||
state == PeerModificationState.UPDATE_PEER_STORAGE) {
if (state == PeerModificationState.PRE_PEER_MODIFICATION) {
// actually the peer related operations has no rollback, but if we haven't done any
// modifications on the peer storage, we can just return.
return;

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -51,11 +52,12 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {
if (cpHost != null) {
cpHost.preRemoveReplicationPeer(peerId);
}
env.getReplicationPeerManager().preRemovePeer(peerId);
}
@Override
protected void updatePeerStorage(MasterProcedureEnv env) throws Exception {
env.getReplicationManager().removeReplicationPeer(peerId);
protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
env.getReplicationPeerManager().removePeer(peerId);
}
@Override

View File

@ -1,199 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Manages and performs all replication admin operations.
* <p>
* Used to add/remove a replication peer.
*/
@InterfaceAudience.Private
public class ReplicationManager {
private final ReplicationQueuesClient replicationQueuesClient;
private final ReplicationPeers replicationPeers;
public ReplicationManager(Configuration conf, ZKWatcher zkw, Abortable abortable)
throws IOException {
try {
this.replicationQueuesClient = ReplicationFactory
.getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf, abortable, zkw));
this.replicationQueuesClient.init();
this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf,
this.replicationQueuesClient, abortable);
this.replicationPeers.init();
} catch (Exception e) {
throw new IOException("Failed to construct ReplicationManager", e);
}
}
public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
throws ReplicationException {
checkPeerConfig(peerConfig);
replicationPeers.registerPeer(peerId, peerConfig, enabled);
replicationPeers.peerConnected(peerId);
}
public void removeReplicationPeer(String peerId) throws ReplicationException {
replicationPeers.peerDisconnected(peerId);
replicationPeers.unregisterPeer(peerId);
}
public void enableReplicationPeer(String peerId) throws ReplicationException {
this.replicationPeers.enablePeer(peerId);
}
public void disableReplicationPeer(String peerId) throws ReplicationException {
this.replicationPeers.disablePeer(peerId);
}
public ReplicationPeerConfig getPeerConfig(String peerId)
throws ReplicationException, ReplicationPeerNotFoundException {
ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(peerId);
if (peerConfig == null) {
throw new ReplicationPeerNotFoundException(peerId);
}
return peerConfig;
}
public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
throws ReplicationException, IOException {
checkPeerConfig(peerConfig);
this.replicationPeers.updatePeerConfig(peerId, peerConfig);
}
public List<ReplicationPeerDescription> listReplicationPeers(Pattern pattern)
throws ReplicationException {
List<ReplicationPeerDescription> peers = new ArrayList<>();
List<String> peerIds = replicationPeers.getAllPeerIds();
for (String peerId : peerIds) {
if (pattern == null || (pattern != null && pattern.matcher(peerId).matches())) {
peers.add(new ReplicationPeerDescription(peerId,
replicationPeers.getStatusOfPeerFromBackingStore(peerId),
replicationPeers.getReplicationPeerConfig(peerId)));
}
}
return peers;
}
/**
* If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
* Then allow config exclude namespaces or exclude table-cfs which can't be replicated to
* peer cluster.
*
* If replicate_all flag is false, it means all user tables can't be replicated to peer cluster.
* Then allow to config namespaces or table-cfs which will be replicated to peer cluster.
*/
private void checkPeerConfig(ReplicationPeerConfig peerConfig) {
if (peerConfig.replicateAllUserTables()) {
if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) ||
(peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
throw new IllegalArgumentException("Need clean namespaces or table-cfs config firstly " +
"when you want replicate all cluster");
}
checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(),
peerConfig.getExcludeTableCFsMap());
} else {
if ((peerConfig.getExcludeNamespaces() != null
&& !peerConfig.getExcludeNamespaces().isEmpty())
|| (peerConfig.getExcludeTableCFsMap() != null
&& !peerConfig.getExcludeTableCFsMap().isEmpty())) {
throw new IllegalArgumentException(
"Need clean exclude-namespaces or exclude-table-cfs config firstly"
+ " when replicate_all flag is false");
}
checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
peerConfig.getTableCFsMap());
}
checkConfiguredWALEntryFilters(peerConfig);
}
/**
* Set a namespace in the peer config means that all tables in this namespace will be replicated
* to the peer cluster.
* <ol>
* <li>If peer config already has a namespace, then not allow set any table of this namespace to
* the peer config.</li>
* <li>If peer config already has a table, then not allow set this table's namespace to the peer
* config.</li>
* </ol>
* <p>
* Set a exclude namespace in the peer config means that all tables in this namespace can't be
* replicated to the peer cluster.
* <ol>
* <li>If peer config already has a exclude namespace, then not allow set any exclude table of
* this namespace to the peer config.</li>
* <li>If peer config already has a exclude table, then not allow set this table's namespace as a
* exclude namespace.</li>
* </ol>
*/
private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces,
Map<TableName, ? extends Collection<String>> tableCfs) {
if (namespaces == null || namespaces.isEmpty()) {
return;
}
if (tableCfs == null || tableCfs.isEmpty()) {
return;
}
for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
TableName table = entry.getKey();
if (namespaces.contains(table.getNamespaceAsString())) {
throw new IllegalArgumentException("Table-cfs " + table + " is conflict with namespaces "
+ table.getNamespaceAsString() + " in peer config");
}
}
}
private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) {
String filterCSV = peerConfig.getConfiguration()
.get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
if (filterCSV != null && !filterCSV.isEmpty()) {
String[] filters = filterCSV.split(",");
for (String filter : filters) {
try {
Class.forName(filter).newInstance();
} catch (Exception e) {
throw new IllegalArgumentException("Configured WALEntryFilter " + filter +
" could not be created. Failing add/update " + "peer operation.", e);
}
}
}
}
}

View File

@ -0,0 +1,331 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Manages and performs all replication admin operations.
* <p>
* Used to add/remove a replication peer.
*/
@InterfaceAudience.Private
public final class ReplicationPeerManager {
private final ReplicationPeerStorage peerStorage;
private final ReplicationQueueStorage queueStorage;
private final ConcurrentMap<String, ReplicationPeerDescription> peers;
private ReplicationPeerManager(ReplicationPeerStorage peerStorage,
ReplicationQueueStorage queueStorage,
ConcurrentMap<String, ReplicationPeerDescription> peers) {
this.peerStorage = peerStorage;
this.queueStorage = queueStorage;
this.peers = peers;
}
private void checkQueuesDeleted(String peerId)
throws ReplicationException, DoNotRetryIOException {
for (ServerName replicator : queueStorage.getListOfReplicators()) {
List<String> queueIds = queueStorage.getAllQueues(replicator);
for (String queueId : queueIds) {
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
if (queueInfo.getPeerId().equals(peerId)) {
throw new DoNotRetryIOException("undeleted queue for peerId: " + peerId +
", replicator: " + replicator + ", queueId: " + queueId);
}
}
}
if (queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) {
throw new DoNotRetryIOException("Undeleted queue for peer " + peerId + " in hfile-refs");
}
}
public void preAddPeer(String peerId, ReplicationPeerConfig peerConfig)
throws DoNotRetryIOException, ReplicationException {
if (peerId.contains("-")) {
throw new DoNotRetryIOException("Found invalid peer name: " + peerId);
}
checkPeerConfig(peerConfig);
if (peers.containsKey(peerId)) {
throw new DoNotRetryIOException("Replication peer " + peerId + " already exists");
}
// make sure that there is no queues with the same peer id. This may happen when we create a
// peer with the same id with a old deleted peer. If the replication queues for the old peer
// have not been cleaned up yet then we should not create the new peer, otherwise the old wal
// file may also be replicated.
checkQueuesDeleted(peerId);
}
private ReplicationPeerDescription checkPeerExists(String peerId) throws DoNotRetryIOException {
ReplicationPeerDescription desc = peers.get(peerId);
if (desc == null) {
throw new DoNotRetryIOException("Replication peer " + peerId + " does not exist");
}
return desc;
}
public void preRemovePeer(String peerId) throws DoNotRetryIOException {
checkPeerExists(peerId);
}
public void preEnablePeer(String peerId) throws DoNotRetryIOException {
ReplicationPeerDescription desc = checkPeerExists(peerId);
if (desc.isEnabled()) {
throw new DoNotRetryIOException("Replication peer " + peerId + " has already been enabled");
}
}
public void preDisablePeer(String peerId) throws DoNotRetryIOException {
ReplicationPeerDescription desc = checkPeerExists(peerId);
if (!desc.isEnabled()) {
throw new DoNotRetryIOException("Replication peer " + peerId + " has already been disabled");
}
}
public void preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
throws DoNotRetryIOException {
checkPeerConfig(peerConfig);
ReplicationPeerDescription desc = checkPeerExists(peerId);
ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
if (!StringUtils.isBlank(peerConfig.getClusterKey()) &&
!peerConfig.getClusterKey().equals(oldPeerConfig.getClusterKey())) {
throw new DoNotRetryIOException(
"Changing the cluster key on an existing peer is not allowed. Existing key '" +
oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" +
peerConfig.getClusterKey() + "'");
}
if (!StringUtils.isBlank(peerConfig.getReplicationEndpointImpl()) &&
!peerConfig.getReplicationEndpointImpl().equals(oldPeerConfig.getReplicationEndpointImpl())) {
throw new DoNotRetryIOException("Changing the replication endpoint implementation class " +
"on an existing peer is not allowed. Existing class '" +
oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId +
" does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'");
}
}
private ReplicationPeerConfig copy(ReplicationPeerConfig peerConfig) {
ReplicationPeerConfig copiedPeerConfig = new ReplicationPeerConfig();
copiedPeerConfig.getConfiguration().putAll(peerConfig.getConfiguration());
copiedPeerConfig.getPeerData().putAll(peerConfig.getPeerData());
copiedPeerConfig.setTableCFsMap(peerConfig.getTableCFsMap());
copiedPeerConfig.setNamespaces(peerConfig.getNamespaces());
copiedPeerConfig.setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap());
copiedPeerConfig.setExcludeNamespaces(peerConfig.getExcludeNamespaces());
copiedPeerConfig.setBandwidth(peerConfig.getBandwidth());
copiedPeerConfig.setReplicateAllUserTables(peerConfig.replicateAllUserTables());
copiedPeerConfig.setClusterKey(peerConfig.getClusterKey());
copiedPeerConfig.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
return copiedPeerConfig;
}
public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
throws ReplicationException {
if (peers.containsKey(peerId)) {
// this should be a retry, just return
return;
}
ReplicationPeerConfig copiedPeerConfig = copy(peerConfig);
peerStorage.addPeer(peerId, copiedPeerConfig, enabled);
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig));
}
public void removePeer(String peerId) throws ReplicationException {
if (!peers.containsKey(peerId)) {
// this should be a retry, just return
return;
}
peerStorage.removePeer(peerId);
peers.remove(peerId);
}
private void setPeerState(String peerId, boolean enabled) throws ReplicationException {
ReplicationPeerDescription desc = peers.get(peerId);
if (desc.isEnabled() == enabled) {
// this should be a retry, just return
return;
}
peerStorage.setPeerState(peerId, enabled);
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig()));
}
public void enablePeer(String peerId) throws ReplicationException {
setPeerState(peerId, true);
}
public void disablePeer(String peerId) throws ReplicationException {
setPeerState(peerId, false);
}
public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
throws ReplicationException {
// the checking rules are too complicated here so we give up checking whether this is a retry.
ReplicationPeerDescription desc = peers.get(peerId);
ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
ReplicationPeerConfig newPeerConfig = copy(peerConfig);
// we need to use the new conf to overwrite the old one.
newPeerConfig.getConfiguration().putAll(oldPeerConfig.getConfiguration());
newPeerConfig.getConfiguration().putAll(peerConfig.getConfiguration());
newPeerConfig.getPeerData().putAll(oldPeerConfig.getPeerData());
newPeerConfig.getPeerData().putAll(peerConfig.getPeerData());
peerStorage.updatePeerConfig(peerId, newPeerConfig);
peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig));
}
public List<ReplicationPeerDescription> listPeers(Pattern pattern) {
if (pattern == null) {
return new ArrayList<>(peers.values());
}
return peers.values().stream().filter(r -> pattern.matcher(r.getPeerId()).matches())
.collect(Collectors.toList());
}
public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) {
ReplicationPeerDescription desc = peers.get(peerId);
return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty();
}
/**
* If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
* Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer
* cluster.
* <p>
* If replicate_all flag is false, it means all user tables can't be replicated to peer cluster.
* Then allow to config namespaces or table-cfs which will be replicated to peer cluster.
*/
private static void checkPeerConfig(ReplicationPeerConfig peerConfig)
throws DoNotRetryIOException {
if (peerConfig.replicateAllUserTables()) {
if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) ||
(peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly " +
"when you want replicate all cluster");
}
checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(),
peerConfig.getExcludeTableCFsMap());
} else {
if ((peerConfig.getExcludeNamespaces() != null &&
!peerConfig.getExcludeNamespaces().isEmpty()) ||
(peerConfig.getExcludeTableCFsMap() != null &&
!peerConfig.getExcludeTableCFsMap().isEmpty())) {
throw new DoNotRetryIOException(
"Need clean exclude-namespaces or exclude-table-cfs config firstly" +
" when replicate_all flag is false");
}
checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
peerConfig.getTableCFsMap());
}
checkConfiguredWALEntryFilters(peerConfig);
}
/**
* Set a namespace in the peer config means that all tables in this namespace will be replicated
* to the peer cluster.
* <ol>
* <li>If peer config already has a namespace, then not allow set any table of this namespace to
* the peer config.</li>
* <li>If peer config already has a table, then not allow set this table's namespace to the peer
* config.</li>
* </ol>
* <p>
* Set a exclude namespace in the peer config means that all tables in this namespace can't be
* replicated to the peer cluster.
* <ol>
* <li>If peer config already has a exclude namespace, then not allow set any exclude table of
* this namespace to the peer config.</li>
* <li>If peer config already has a exclude table, then not allow set this table's namespace as a
* exclude namespace.</li>
* </ol>
*/
private static void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces,
Map<TableName, ? extends Collection<String>> tableCfs) throws DoNotRetryIOException {
if (namespaces == null || namespaces.isEmpty()) {
return;
}
if (tableCfs == null || tableCfs.isEmpty()) {
return;
}
for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
TableName table = entry.getKey();
if (namespaces.contains(table.getNamespaceAsString())) {
throw new DoNotRetryIOException("Table-cfs " + table + " is conflict with namespaces " +
table.getNamespaceAsString() + " in peer config");
}
}
}
private static void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)
throws DoNotRetryIOException {
String filterCSV = peerConfig.getConfiguration()
.get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
if (filterCSV != null && !filterCSV.isEmpty()) {
String[] filters = filterCSV.split(",");
for (String filter : filters) {
try {
Class.forName(filter).newInstance();
} catch (Exception e) {
throw new DoNotRetryIOException("Configured WALEntryFilter " + filter +
" could not be created. Failing add/update " + "peer operation.", e);
}
}
}
}
public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf)
throws ReplicationException {
ReplicationPeerStorage peerStorage =
ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>();
for (String peerId : peerStorage.listPeerIds()) {
Optional<ReplicationPeerConfig> peerConfig = peerStorage.getPeerConfig(peerId);
boolean enabled = peerStorage.isPeerEnabled(peerId);
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig.get()));
}
return new ReplicationPeerManager(peerStorage,
ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers);
}
}

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@ -59,12 +60,12 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure {
if (cpHost != null) {
cpHost.preUpdateReplicationPeerConfig(peerId, peerConfig);
}
env.getReplicationPeerManager().preUpdatePeerConfig(peerId, peerConfig);
}
@Override
protected void updatePeerStorage(MasterProcedureEnv env)
throws IllegalArgumentException, Exception {
env.getReplicationManager().updatePeerConfig(peerId, peerConfig);
protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
env.getReplicationPeerManager().updatePeerConfig(peerId, peerConfig);
}
@Override

View File

@ -17,6 +17,12 @@
*/
package org.apache.hadoop.hbase.client.replication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
@ -31,18 +37,17 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -50,15 +55,6 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Unit testing of ReplicationAdmin
@ -66,8 +62,6 @@ import static org.junit.Assert.fail;
@Category({MediumTests.class, ClientTests.class})
public class TestReplicationAdmin {
private static final Logger LOG =
LoggerFactory.getLogger(TestReplicationAdmin.class);
private final static HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility();
@ -102,16 +96,17 @@ public class TestReplicationAdmin {
}
@After
public void cleanupPeer() {
try {
hbaseAdmin.removeReplicationPeer(ID_ONE);
} catch (Exception e) {
LOG.debug("Replication peer " + ID_ONE + " may already be removed");
public void tearDown() throws Exception {
for (ReplicationPeerDescription desc : hbaseAdmin.listReplicationPeers()) {
hbaseAdmin.removeReplicationPeer(desc.getPeerId());
}
try {
hbaseAdmin.removeReplicationPeer(ID_SECOND);
} catch (Exception e) {
LOG.debug("Replication peer " + ID_SECOND + " may already be removed");
ReplicationQueueStorage queueStorage = ReplicationStorageFactory
.getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration());
for (ServerName serverName : queueStorage.getListOfReplicators()) {
for (String queue : queueStorage.getAllQueues(serverName)) {
queueStorage.removeQueue(serverName, queue);
}
queueStorage.removeReplicatorIfQueueIsEmpty(serverName);
}
}
@ -201,32 +196,29 @@ public class TestReplicationAdmin {
ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
rpc2.setClusterKey(KEY_SECOND);
Configuration conf = TEST_UTIL.getConfiguration();
ZKWatcher zkw = new ZKWatcher(conf, "Test HBaseAdmin", null);
ReplicationQueues repQueues =
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, null, zkw));
repQueues.init("server1");
ReplicationQueueStorage queueStorage =
ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), conf);
ServerName serverName = ServerName.valueOf("server1", 8000, 1234);
// add queue for ID_ONE
repQueues.addLog(ID_ONE, "file1");
queueStorage.addWAL(serverName, ID_ONE, "file1");
try {
admin.addPeer(ID_ONE, rpc1, null);
fail();
} catch (Exception e) {
// OK!
}
repQueues.removeQueue(ID_ONE);
assertEquals(0, repQueues.getAllQueues().size());
queueStorage.removeQueue(serverName, ID_ONE);
assertEquals(0, queueStorage.getAllQueues(serverName).size());
// add recovered queue for ID_ONE
repQueues.addLog(ID_ONE + "-server2", "file1");
queueStorage.addWAL(serverName, ID_ONE + "-server2", "file1");
try {
admin.addPeer(ID_ONE, rpc2, null);
fail();
} catch (Exception e) {
// OK!
}
repQueues.removeAllQueues();
zkw.close();
}
/**
@ -422,7 +414,7 @@ public class TestReplicationAdmin {
tableCFs.clear();
tableCFs.put(tableName2, null);
admin.removePeerTableCFs(ID_ONE, tableCFs);
assertTrue(false);
fail();
} catch (ReplicationException e) {
}
tableCFs.clear();

View File

@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.master;
import static org.mockito.Mockito.mock;
import com.google.protobuf.Service;
import java.io.IOException;
import java.util.List;
@ -42,7 +44,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.locking.LockManager;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.replication.ReplicationManager;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
import org.apache.hadoop.hbase.procedure2.LockedResource;
@ -56,8 +58,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import com.google.protobuf.Service;
public class MockNoopMasterServices implements MasterServices, Server {
private final Configuration conf;
private final MetricsMaster metricsMaster;
@ -462,7 +462,7 @@ public class MockNoopMasterServices implements MasterServices, Server {
}
@Override
public ProcedureEvent getInitializedEvent() {
public ProcedureEvent<?> getInitializedEvent() {
return null;
}
@ -477,7 +477,7 @@ public class MockNoopMasterServices implements MasterServices, Server {
}
@Override
public ReplicationManager getReplicationManager() {
public ReplicationPeerManager getReplicationPeerManager() {
return null;
}
}
}

View File

@ -44,6 +44,8 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@ -270,7 +272,7 @@ public class TestMasterNoCluster {
@Override
void initializeZKBasedSystemTrackers() throws IOException, InterruptedException,
KeeperException, CoordinatedStateException {
KeeperException, CoordinatedStateException, ReplicationException {
super.initializeZKBasedSystemTrackers();
// Record a newer server in server manager at first
getServerManager().recordNewServerWithLock(newServer,

View File

@ -1,5 +1,4 @@
/*
*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -47,9 +46,6 @@ public class TestReplicationDisableInactivePeer extends TestReplicationBase {
*/
@Test(timeout = 600000)
public void testDisableInactivePeer() throws Exception {
// enabling and shutdown the peer
admin.enablePeer("2");
utility2.shutdownMiniHBaseCluster();
byte[] rowkey = Bytes.toBytes("disable inactive peer");