HBASE-19543 Abstract a replication storage interface to extract the zk specific code
This commit is contained in:
parent
3fd417600e
commit
1de08ded58
|
@ -107,6 +107,9 @@ public class CollectionUtils {
|
|||
return list.get(list.size() - 1);
|
||||
}
|
||||
|
||||
public static <T> List<T> nullToEmpty(List<T> list) {
|
||||
return list != null ? list : Collections.emptyList();
|
||||
}
|
||||
/**
|
||||
* In HBASE-16648 we found that ConcurrentHashMap.get is much faster than computeIfAbsent if the
|
||||
* value already exists. Notice that the implementation does not guarantee that the supplier will
|
||||
|
|
|
@ -97,6 +97,18 @@
|
|||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-zookeeper</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-common</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-zookeeper</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<!-- General dependencies -->
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
|
|
|
@ -0,0 +1,74 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Perform read/write to the replication peer storage.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface ReplicationPeerStorage {
|
||||
|
||||
/**
|
||||
* Add a replication peer.
|
||||
* @throws ReplicationException if there are errors accessing the storage service.
|
||||
*/
|
||||
void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
|
||||
throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Remove a replication peer.
|
||||
* @throws ReplicationException if there are errors accessing the storage service.
|
||||
*/
|
||||
void removePeer(String peerId) throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Set the state of peer, {@code true} to {@code ENABLED}, otherwise to {@code DISABLED}.
|
||||
* @throws ReplicationException if there are errors accessing the storage service.
|
||||
*/
|
||||
void setPeerState(String peerId, boolean enabled) throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Update the config a replication peer.
|
||||
* @throws ReplicationException if there are errors accessing the storage service.
|
||||
*/
|
||||
void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
|
||||
throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Return the peer ids of all replication peers.
|
||||
* @throws ReplicationException if there are errors accessing the storage service.
|
||||
*/
|
||||
List<String> listPeerIds() throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Test whether a replication peer is enabled.
|
||||
* @throws ReplicationException if there are errors accessing the storage service.
|
||||
*/
|
||||
boolean isPeerEnabled(String peerId) throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Get the peer config of a replication peer.
|
||||
* @throws ReplicationException if there are errors accessing the storage service.
|
||||
*/
|
||||
Optional<ReplicationPeerConfig> getPeerConfig(String peerId) throws ReplicationException;
|
||||
}
|
|
@ -0,0 +1,164 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.SortedSet;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Perform read/write to the replication queue storage.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface ReplicationQueueStorage {
|
||||
|
||||
/**
|
||||
* Remove a replication queue for a given regionserver.
|
||||
* @param serverName the name of the regionserver
|
||||
* @param queueId a String that identifies the queue.
|
||||
*/
|
||||
void removeQueue(ServerName serverName, String queueId) throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Add a new WAL file to the given queue for a given regionserver. If the queue does not exist it
|
||||
* is created.
|
||||
* @param serverName the name of the regionserver
|
||||
* @param queueId a String that identifies the queue.
|
||||
* @param fileName name of the WAL
|
||||
*/
|
||||
void addWAL(ServerName serverName, String queueId, String fileName) throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Remove an WAL file from the given queue for a given regionserver.
|
||||
* @param serverName the name of the regionserver
|
||||
* @param queueId a String that identifies the queue.
|
||||
* @param fileName name of the WAL
|
||||
*/
|
||||
void removeWAL(ServerName serverName, String queueId, String fileName)
|
||||
throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Set the current position for a specific WAL in a given queue for a given regionserver.
|
||||
* @param serverName the name of the regionserver
|
||||
* @param queueId a String that identifies the queue
|
||||
* @param fileName name of the WAL
|
||||
* @param position the current position in the file
|
||||
*/
|
||||
void setWALPosition(ServerName serverName, String queueId, String fileName, long position)
|
||||
throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Get the current position for a specific WAL in a given queue for a given regionserver.
|
||||
* @param serverName the name of the regionserver
|
||||
* @param queueId a String that identifies the queue
|
||||
* @param fileName name of the WAL
|
||||
* @return the current position in the file
|
||||
*/
|
||||
long getWALPosition(ServerName serverName, String queueId, String fileName)
|
||||
throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Get a list of all queues for the specified region server.
|
||||
* @param serverName the server name of the region server that owns the set of queues
|
||||
* @return a list of queueIds
|
||||
*/
|
||||
List<String> getAllQueues(ServerName serverName) throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Change ownership for the queue identified by queueId and belongs to a dead region server.
|
||||
* @param sourceServerName the name of the dead region server
|
||||
* @param destServerName the name of the target region server
|
||||
* @param queueId the id of the queue
|
||||
* @return the new PeerId and A SortedSet of WALs in its queue
|
||||
*/
|
||||
Pair<String, SortedSet<String>> claimQueue(ServerName sourceServerName, String queueId,
|
||||
ServerName destServerName) throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Remove the record of region server if the queue is empty.
|
||||
*/
|
||||
void removeReplicatorIfQueueIsEmpty(ServerName serverName) throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Get a list of all region servers that have outstanding replication queues. These servers could
|
||||
* be alive, dead or from a previous run of the cluster.
|
||||
* @return a list of server names
|
||||
*/
|
||||
List<ServerName> getListOfReplicators() throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Load all wals in all replication queues. This method guarantees to return a snapshot which
|
||||
* contains all WALs in the zookeeper at the start of this call even there is concurrent queue
|
||||
* failover. However, some newly created WALs during the call may not be included.
|
||||
*/
|
||||
Set<String> getAllWALs() throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Add a peer to hfile reference queue if peer does not exist.
|
||||
* @param peerId peer cluster id to be added
|
||||
* @throws ReplicationException if fails to add a peer id to hfile reference queue
|
||||
*/
|
||||
void addPeerToHFileRefs(String peerId) throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Remove a peer from hfile reference queue.
|
||||
* @param peerId peer cluster id to be removed
|
||||
*/
|
||||
void removePeerFromHFileRefs(String peerId) throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Add new hfile references to the queue.
|
||||
* @param peerId peer cluster id to which the hfiles need to be replicated
|
||||
* @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
|
||||
* will be added in the queue }
|
||||
* @throws ReplicationException if fails to add a hfile reference
|
||||
*/
|
||||
void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Remove hfile references from the queue.
|
||||
* @param peerId peer cluster id from which this hfile references needs to be removed
|
||||
* @param files list of hfile references to be removed
|
||||
*/
|
||||
void removeHFileRefs(String peerId, List<String> files) throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Get the change version number of replication hfile references node. This can be used as
|
||||
* optimistic locking to get a consistent snapshot of the replication queues of hfile references.
|
||||
* @return change version number of hfile references node
|
||||
*/
|
||||
int getHFileRefsNodeChangeVersion() throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Get list of all peers from hfile reference queue.
|
||||
* @return a list of peer ids
|
||||
*/
|
||||
List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Get a list of all hfile references in the given peer.
|
||||
* @param peerId a String that identifies the peer
|
||||
* @return a list of hfile references
|
||||
*/
|
||||
List<String> getReplicableHFiles(String peerId) throws ReplicationException;
|
||||
}
|
|
@ -63,7 +63,6 @@ public abstract class ReplicationStateZKBase {
|
|||
protected final Configuration conf;
|
||||
protected final Abortable abortable;
|
||||
|
||||
// Public for testing
|
||||
public static final byte[] ENABLED_ZNODE_BYTES =
|
||||
toByteArray(ReplicationProtos.ReplicationState.State.ENABLED);
|
||||
public static final byte[] DISABLED_ZNODE_BYTES =
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Used to create replication storage(peer, queue) classes.
|
||||
* <p>
|
||||
* For now we only have zk based implementation.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ReplicationStorageFactory {
|
||||
|
||||
private ReplicationStorageFactory() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new {@link ReplicationPeerStorage}.
|
||||
*/
|
||||
public static ReplicationPeerStorage getReplicationPeerStorage(ZKWatcher zk, Configuration conf) {
|
||||
return new ZKReplicationPeerStorage(zk, conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new {@link ReplicationQueueStorage}.
|
||||
*/
|
||||
public static ReplicationQueueStorage getReplicationQueueStorage(ZKWatcher zk,
|
||||
Configuration conf) {
|
||||
return new ZKReplicationQueueStorage(zk, conf);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,164 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.util.CollectionUtils;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
|
||||
|
||||
/**
|
||||
* ZK based replication peer storage.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements ReplicationPeerStorage {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationPeerStorage.class);
|
||||
|
||||
public static final byte[] ENABLED_ZNODE_BYTES =
|
||||
toByteArray(ReplicationProtos.ReplicationState.State.ENABLED);
|
||||
public static final byte[] DISABLED_ZNODE_BYTES =
|
||||
toByteArray(ReplicationProtos.ReplicationState.State.DISABLED);
|
||||
|
||||
/**
|
||||
* The name of the znode that contains the replication status of a remote slave (i.e. peer)
|
||||
* cluster.
|
||||
*/
|
||||
private final String peerStateNodeName;
|
||||
|
||||
/**
|
||||
* The name of the znode that contains a list of all remote slave (i.e. peer) clusters.
|
||||
*/
|
||||
private final String peersZNode;
|
||||
|
||||
public ZKReplicationPeerStorage(ZKWatcher zookeeper, Configuration conf) {
|
||||
super(zookeeper, conf);
|
||||
this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
|
||||
String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
|
||||
this.peersZNode = ZNodePaths.joinZNode(replicationZNode, peersZNodeName);
|
||||
}
|
||||
|
||||
private String getPeerStateNode(String peerId) {
|
||||
return ZNodePaths.joinZNode(getPeerNode(peerId), peerStateNodeName);
|
||||
}
|
||||
|
||||
private String getPeerNode(String peerId) {
|
||||
return ZNodePaths.joinZNode(peersZNode, peerId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
|
||||
throws ReplicationException {
|
||||
try {
|
||||
ZKUtil.createWithParents(zookeeper, peersZNode);
|
||||
ZKUtil.multiOrSequential(zookeeper,
|
||||
Arrays.asList(
|
||||
ZKUtilOp.createAndFailSilent(getPeerNode(peerId),
|
||||
ReplicationPeerConfigUtil.toByteArray(peerConfig)),
|
||||
ZKUtilOp.createAndFailSilent(getPeerStateNode(peerId),
|
||||
enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES)),
|
||||
false);
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Could not add peer with id=" + peerId + ", peerConfif=>" +
|
||||
peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removePeer(String peerId) throws ReplicationException {
|
||||
try {
|
||||
ZKUtil.deleteNodeRecursively(zookeeper, getPeerNode(peerId));
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Could not remove peer with id=" + peerId, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setPeerState(String peerId, boolean enabled) throws ReplicationException {
|
||||
byte[] stateBytes = enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES;
|
||||
try {
|
||||
ZKUtil.setData(zookeeper, getPeerStateNode(peerId), stateBytes);
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Unable to change state of the peer with id=" + peerId, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
|
||||
throws ReplicationException {
|
||||
try {
|
||||
ZKUtil.setData(this.zookeeper, getPeerNode(peerId),
|
||||
ReplicationPeerConfigUtil.toByteArray(peerConfig));
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException(
|
||||
"There was a problem trying to save changes to the " + "replication peer " + peerId, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> listPeerIds() throws ReplicationException {
|
||||
try {
|
||||
return CollectionUtils.nullToEmpty(ZKUtil.listChildrenAndWatchThem(zookeeper, peersZNode));
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Cannot get the list of peers", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isPeerEnabled(String peerId) throws ReplicationException {
|
||||
try {
|
||||
return Arrays.equals(ENABLED_ZNODE_BYTES,
|
||||
ZKUtil.getData(zookeeper, getPeerStateNode(peerId)));
|
||||
} catch (KeeperException | InterruptedException e) {
|
||||
throw new ReplicationException("Unable to get status of the peer with id=" + peerId, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) throws ReplicationException {
|
||||
byte[] data;
|
||||
try {
|
||||
data = ZKUtil.getData(zookeeper, getPeerNode(peerId));
|
||||
} catch (KeeperException | InterruptedException e) {
|
||||
throw new ReplicationException("Error getting configuration for peer with id=" + peerId, e);
|
||||
}
|
||||
if (data == null || data.length == 0) {
|
||||
return Optional.empty();
|
||||
}
|
||||
try {
|
||||
return Optional.of(ReplicationPeerConfigUtil.parsePeerFrom(data));
|
||||
} catch (DeserializationException e) {
|
||||
LOG.warn("Failed to parse replication peer config for peer with id=" + peerId, e);
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,425 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static org.apache.hadoop.hbase.util.CollectionUtils.nullToEmpty;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CollectionUtils;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.KeeperException.BadVersionException;
|
||||
import org.apache.zookeeper.KeeperException.NoNodeException;
|
||||
import org.apache.zookeeper.KeeperException.NodeExistsException;
|
||||
import org.apache.zookeeper.KeeperException.NotEmptyException;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
|
||||
|
||||
/**
|
||||
* ZK based replication queue storage.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class ZKReplicationQueueStorage extends ZKReplicationStorageBase
|
||||
implements ReplicationQueueStorage {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationQueueStorage.class);
|
||||
|
||||
public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY =
|
||||
"zookeeper.znode.replication.hfile.refs";
|
||||
public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs";
|
||||
|
||||
/**
|
||||
* The name of the znode that contains all replication queues
|
||||
*/
|
||||
private final String queuesZNode;
|
||||
|
||||
/**
|
||||
* The name of the znode that contains queues of hfile references to be replicated
|
||||
*/
|
||||
private final String hfileRefsZNode;
|
||||
|
||||
public ZKReplicationQueueStorage(ZKWatcher zookeeper, Configuration conf) {
|
||||
super(zookeeper, conf);
|
||||
|
||||
String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
|
||||
String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
|
||||
ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
|
||||
this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName);
|
||||
this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hfileRefsZNodeName);
|
||||
}
|
||||
|
||||
private String getRsNode(ServerName serverName) {
|
||||
return ZNodePaths.joinZNode(queuesZNode, serverName.getServerName());
|
||||
}
|
||||
|
||||
private String getQueueNode(ServerName serverName, String queueId) {
|
||||
return ZNodePaths.joinZNode(getRsNode(serverName), queueId);
|
||||
}
|
||||
|
||||
private String getFileNode(String queueNode, String fileName) {
|
||||
return ZNodePaths.joinZNode(queueNode, fileName);
|
||||
}
|
||||
|
||||
private String getFileNode(ServerName serverName, String queueId, String fileName) {
|
||||
return getFileNode(getQueueNode(serverName, queueId), fileName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeQueue(ServerName serverName, String queueId) throws ReplicationException {
|
||||
try {
|
||||
ZKUtil.deleteNodeRecursively(zookeeper, getQueueNode(serverName, queueId));
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException(
|
||||
"Failed to delete queue (serverName=" + serverName + ", queueId=" + queueId + ")", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addWAL(ServerName serverName, String queueId, String fileName)
|
||||
throws ReplicationException {
|
||||
try {
|
||||
ZKUtil.createWithParents(zookeeper, getFileNode(serverName, queueId, fileName));
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Failed to add wal to queue (serverName=" + serverName +
|
||||
", queueId=" + queueId + ", fileName=" + fileName + ")", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeWAL(ServerName serverName, String queueId, String fileName)
|
||||
throws ReplicationException {
|
||||
String fileNode = getFileNode(serverName, queueId, fileName);
|
||||
try {
|
||||
ZKUtil.deleteNode(zookeeper, fileNode);
|
||||
} catch (NoNodeException e) {
|
||||
LOG.warn(fileNode + " has already been deleted when removing log");
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Failed to remove wal from queue (serverName=" + serverName +
|
||||
", queueId=" + queueId + ", fileName=" + fileName + ")", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setWALPosition(ServerName serverName, String queueId, String fileName, long position)
|
||||
throws ReplicationException {
|
||||
try {
|
||||
ZKUtil.setData(zookeeper, getFileNode(serverName, queueId, fileName),
|
||||
ZKUtil.positionToByteArray(position));
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Failed to set log position (serverName=" + serverName +
|
||||
", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getWALPosition(ServerName serverName, String queueId, String fileName)
|
||||
throws ReplicationException {
|
||||
byte[] bytes;
|
||||
try {
|
||||
bytes = ZKUtil.getData(zookeeper, getFileNode(serverName, queueId, fileName));
|
||||
} catch (KeeperException | InterruptedException e) {
|
||||
throw new ReplicationException("Failed to get log position (serverName=" + serverName +
|
||||
", queueId=" + queueId + ", fileName=" + fileName + ")", e);
|
||||
}
|
||||
try {
|
||||
return ZKUtil.parseWALPositionFrom(bytes);
|
||||
} catch (DeserializationException de) {
|
||||
LOG.warn("Failed to parse log position (serverName=" + serverName + ", queueId=" + queueId +
|
||||
", fileName=" + fileName + ")");
|
||||
}
|
||||
// if we can not parse the position, start at the beginning of the wal file again
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Pair<String, SortedSet<String>> claimQueue(ServerName sourceServerName, String queueId,
|
||||
ServerName destServerName) throws ReplicationException {
|
||||
LOG.info(
|
||||
"Atomically moving " + sourceServerName + "/" + queueId + "'s WALs to " + destServerName);
|
||||
try {
|
||||
ZKUtil.createWithParents(zookeeper, getRsNode(destServerName));
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException(
|
||||
"Claim queue queueId=" + queueId + " from " + sourceServerName + " to " + destServerName +
|
||||
" failed when creating the node for " + destServerName,
|
||||
e);
|
||||
}
|
||||
try {
|
||||
String oldQueueNode = getQueueNode(sourceServerName, queueId);
|
||||
List<String> wals = ZKUtil.listChildrenNoWatch(zookeeper, oldQueueNode);
|
||||
String newQueueId = queueId + "-" + sourceServerName;
|
||||
if (CollectionUtils.isEmpty(wals)) {
|
||||
ZKUtil.deleteNodeFailSilent(zookeeper, oldQueueNode);
|
||||
LOG.info("Removed " + sourceServerName + "/" + queueId + " since it's empty");
|
||||
return new Pair<>(newQueueId, Collections.emptySortedSet());
|
||||
}
|
||||
String newQueueNode = getQueueNode(destServerName, newQueueId);
|
||||
List<ZKUtilOp> listOfOps = new ArrayList<>();
|
||||
SortedSet<String> logQueue = new TreeSet<>();
|
||||
// create the new cluster znode
|
||||
listOfOps.add(ZKUtilOp.createAndFailSilent(newQueueNode, HConstants.EMPTY_BYTE_ARRAY));
|
||||
// get the offset of the logs and set it to new znodes
|
||||
for (String wal : wals) {
|
||||
String oldWalNode = getFileNode(oldQueueNode, wal);
|
||||
byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalNode);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Creating " + wal + " with data " + Bytes.toStringBinary(logOffset));
|
||||
}
|
||||
String newWalNode = getFileNode(newQueueNode, wal);
|
||||
listOfOps.add(ZKUtilOp.createAndFailSilent(newWalNode, logOffset));
|
||||
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalNode));
|
||||
logQueue.add(wal);
|
||||
}
|
||||
// add delete op for peer
|
||||
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldQueueNode));
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("The multi list size is: " + listOfOps.size());
|
||||
}
|
||||
ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
|
||||
|
||||
LOG.info(
|
||||
"Atomically moved " + sourceServerName + "/" + queueId + "'s WALs to " + destServerName);
|
||||
return new Pair<>(newQueueId, logQueue);
|
||||
} catch (NoNodeException | NodeExistsException | NotEmptyException | BadVersionException e) {
|
||||
// Multi call failed; it looks like some other regionserver took away the logs.
|
||||
// These exceptions mean that zk tells us the request can not be execute so it is safe to just
|
||||
// return a null. For other types of exception should be thrown out to notify the upper layer.
|
||||
LOG.info(
|
||||
"Claim queue queueId=" + queueId + " from " + sourceServerName + " to " + destServerName +
|
||||
" failed with " + e.toString() + ", maybe someone else has already took away the logs");
|
||||
return null;
|
||||
} catch (KeeperException | InterruptedException e) {
|
||||
throw new ReplicationException("Claim queue queueId=" + queueId + " from " +
|
||||
sourceServerName + " to " + destServerName + " failed", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeReplicatorIfQueueIsEmpty(ServerName serverName) throws ReplicationException {
|
||||
try {
|
||||
ZKUtil.deleteNodeFailSilent(zookeeper, getRsNode(serverName));
|
||||
} catch (NotEmptyException e) {
|
||||
// keep silence to avoid logging too much.
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Failed to remove replicator for " + serverName, e);
|
||||
}
|
||||
}
|
||||
|
||||
private List<ServerName> getListOfReplicators0() throws KeeperException {
|
||||
return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, queuesZNode)).stream()
|
||||
.map(ServerName::parseServerName).collect(toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ServerName> getListOfReplicators() throws ReplicationException {
|
||||
try {
|
||||
return getListOfReplicators0();
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Failed to get list of replicators", e);
|
||||
}
|
||||
}
|
||||
|
||||
private List<String> getLogsInQueue0(ServerName serverName, String queueId)
|
||||
throws KeeperException {
|
||||
return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, getQueueNode(serverName, queueId)));
|
||||
}
|
||||
|
||||
private List<String> getAllQueues0(ServerName serverName) throws KeeperException {
|
||||
return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, getRsNode(serverName)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getAllQueues(ServerName serverName) throws ReplicationException {
|
||||
try {
|
||||
return getAllQueues0(serverName);
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Failed to get all queues (serverName=" + serverName + ")", e);
|
||||
}
|
||||
}
|
||||
|
||||
private int getQueuesZNodeCversion() throws KeeperException {
|
||||
Stat stat = new Stat();
|
||||
ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat);
|
||||
return stat.getCversion();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getAllWALs() throws ReplicationException {
|
||||
try {
|
||||
for (int retry = 0;; retry++) {
|
||||
int v0 = getQueuesZNodeCversion();
|
||||
List<ServerName> rss = getListOfReplicators0();
|
||||
if (rss.isEmpty()) {
|
||||
LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
|
||||
return Collections.emptySet();
|
||||
}
|
||||
Set<String> wals = Sets.newHashSet();
|
||||
for (ServerName rs : rss) {
|
||||
for (String queueId : getAllQueues0(rs)) {
|
||||
wals.addAll(getLogsInQueue0(rs, queueId));
|
||||
}
|
||||
}
|
||||
int v1 = getQueuesZNodeCversion();
|
||||
if (v0 == v1) {
|
||||
return wals;
|
||||
}
|
||||
LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d",
|
||||
v0, v1, retry));
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Failed to get all wals", e);
|
||||
}
|
||||
}
|
||||
|
||||
private String getHFileRefsPeerNode(String peerId) {
|
||||
return ZNodePaths.joinZNode(hfileRefsZNode, peerId);
|
||||
}
|
||||
|
||||
private String getHFileNode(String peerNode, String fileName) {
|
||||
return ZNodePaths.joinZNode(peerNode, fileName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addPeerToHFileRefs(String peerId) throws ReplicationException {
|
||||
String peerNode = getHFileRefsPeerNode(peerId);
|
||||
try {
|
||||
if (ZKUtil.checkExists(zookeeper, peerNode) == -1) {
|
||||
LOG.info("Adding peer " + peerId + " to hfile reference queue.");
|
||||
ZKUtil.createWithParents(zookeeper, peerNode);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Failed to add peer " + peerId + " to hfile reference queue.",
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removePeerFromHFileRefs(String peerId) throws ReplicationException {
|
||||
String peerNode = getHFileRefsPeerNode(peerId);
|
||||
try {
|
||||
if (ZKUtil.checkExists(zookeeper, peerNode) == -1) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Peer " + peerNode + " not found in hfile reference queue.");
|
||||
}
|
||||
} else {
|
||||
LOG.info("Removing peer " + peerNode + " from hfile reference queue.");
|
||||
ZKUtil.deleteNodeRecursively(zookeeper, peerNode);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException(
|
||||
"Failed to remove peer " + peerId + " from hfile reference queue.", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
|
||||
throws ReplicationException {
|
||||
String peerNode = getHFileRefsPeerNode(peerId);
|
||||
boolean debugEnabled = LOG.isDebugEnabled();
|
||||
if (debugEnabled) {
|
||||
LOG.debug("Adding hfile references " + pairs + " in queue " + peerNode);
|
||||
}
|
||||
List<ZKUtilOp> listOfOps =
|
||||
pairs.stream().map(p -> p.getSecond().getName()).map(n -> getHFileNode(peerNode, n))
|
||||
.map(f -> ZKUtilOp.createAndFailSilent(f, HConstants.EMPTY_BYTE_ARRAY)).collect(toList());
|
||||
if (debugEnabled) {
|
||||
LOG.debug("The multi list size for adding hfile references in zk for node " + peerNode +
|
||||
" is " + listOfOps.size());
|
||||
}
|
||||
try {
|
||||
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Failed to add hfile reference to peer " + peerId, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeHFileRefs(String peerId, List<String> files) throws ReplicationException {
|
||||
String peerNode = getHFileRefsPeerNode(peerId);
|
||||
boolean debugEnabled = LOG.isDebugEnabled();
|
||||
if (debugEnabled) {
|
||||
LOG.debug("Removing hfile references " + files + " from queue " + peerNode);
|
||||
}
|
||||
|
||||
List<ZKUtilOp> listOfOps = files.stream().map(n -> getHFileNode(peerNode, n))
|
||||
.map(ZKUtilOp::deleteNodeFailSilent).collect(toList());
|
||||
if (debugEnabled) {
|
||||
LOG.debug("The multi list size for removing hfile references in zk for node " + peerNode +
|
||||
" is " + listOfOps.size());
|
||||
}
|
||||
try {
|
||||
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Failed to remove hfile reference from peer " + peerId, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getHFileRefsNodeChangeVersion() throws ReplicationException {
|
||||
Stat stat = new Stat();
|
||||
try {
|
||||
ZKUtil.getDataNoWatch(zookeeper, hfileRefsZNode, stat);
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Failed to get stat of replication hfile references node.", e);
|
||||
}
|
||||
return stat.getCversion();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException {
|
||||
try {
|
||||
return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, hfileRefsZNode));
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Failed to get list of all peers in hfile references node.",
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getReplicableHFiles(String peerId) throws ReplicationException {
|
||||
try {
|
||||
return nullToEmpty(ZKUtil.listChildrenNoWatch(this.zookeeper, getHFileRefsPeerNode(peerId)));
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Failed to get list of hfile references for peer " + peerId,
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,75 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
|
||||
|
||||
/**
|
||||
* This is a base class for maintaining replication related data,for example, peer, queue, etc, in
|
||||
* zookeeper.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class ZKReplicationStorageBase {
|
||||
|
||||
/** The name of the base znode that contains all replication state. */
|
||||
protected final String replicationZNode;
|
||||
|
||||
protected final ZKWatcher zookeeper;
|
||||
protected final Configuration conf;
|
||||
|
||||
protected ZKReplicationStorageBase(ZKWatcher zookeeper, Configuration conf) {
|
||||
this.zookeeper = zookeeper;
|
||||
this.conf = conf;
|
||||
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
|
||||
|
||||
this.replicationZNode =
|
||||
ZNodePaths.joinZNode(this.zookeeper.znodePaths.baseZNode, replicationZNodeName);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Serialized protobuf of <code>state</code> with pb magic prefix prepended suitable for use as
|
||||
* content of a peer-state znode under a peer cluster id as in
|
||||
* /hbase/replication/peers/PEER_ID/peer-state.
|
||||
*/
|
||||
protected static byte[] toByteArray(final ReplicationProtos.ReplicationState.State state) {
|
||||
ReplicationProtos.ReplicationState msg =
|
||||
ReplicationProtos.ReplicationState.newBuilder().setState(state).build();
|
||||
// There is no toByteArray on this pb Message?
|
||||
// 32 bytes is default which seems fair enough here.
|
||||
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
|
||||
CodedOutputStream cos = CodedOutputStream.newInstance(baos, 16);
|
||||
msg.writeTo(cos);
|
||||
cos.flush();
|
||||
baos.flush();
|
||||
return ProtobufUtil.prependPBMagic(baos.toByteArray());
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,171 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static java.util.stream.Collectors.toSet;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ ReplicationTests.class, MediumTests.class })
|
||||
public class TestZKReplicationPeerStorage {
|
||||
|
||||
private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility();
|
||||
|
||||
private static ZKReplicationPeerStorage STORAGE;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
UTIL.startMiniZKCluster();
|
||||
STORAGE = new ZKReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws IOException {
|
||||
UTIL.shutdownMiniZKCluster();
|
||||
}
|
||||
|
||||
private Set<String> randNamespaces(Random rand) {
|
||||
return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5))
|
||||
.collect(toSet());
|
||||
}
|
||||
|
||||
private Map<TableName, List<String>> randTableCFs(Random rand) {
|
||||
int size = rand.nextInt(5);
|
||||
Map<TableName, List<String>> map = new HashMap<>();
|
||||
for (int i = 0; i < size; i++) {
|
||||
TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong()));
|
||||
List<String> cfs = Stream.generate(() -> Long.toHexString(rand.nextLong()))
|
||||
.limit(rand.nextInt(5)).collect(toList());
|
||||
map.put(tn, cfs);
|
||||
}
|
||||
return map;
|
||||
}
|
||||
|
||||
private ReplicationPeerConfig getConfig(int seed) {
|
||||
Random rand = new Random(seed);
|
||||
ReplicationPeerConfig config = new ReplicationPeerConfig();
|
||||
config.setClusterKey(Long.toHexString(rand.nextLong()));
|
||||
config.setReplicationEndpointImpl(Long.toHexString(rand.nextLong()));
|
||||
config.setNamespaces(randNamespaces(rand));
|
||||
config.setExcludeNamespaces(randNamespaces(rand));
|
||||
config.setTableCFsMap(randTableCFs(rand));
|
||||
config.setReplicateAllUserTables(rand.nextBoolean());
|
||||
config.setBandwidth(rand.nextInt(1000));
|
||||
return config;
|
||||
}
|
||||
|
||||
private void assertSetEquals(Set<String> expected, Set<String> actual) {
|
||||
if (expected == null || expected.size() == 0) {
|
||||
assertTrue(actual == null || actual.size() == 0);
|
||||
return;
|
||||
}
|
||||
assertEquals(expected.size(), actual.size());
|
||||
expected.forEach(s -> assertTrue(actual.contains(s)));
|
||||
}
|
||||
|
||||
private void assertMapEquals(Map<TableName, List<String>> expected,
|
||||
Map<TableName, List<String>> actual) {
|
||||
if (expected == null || expected.size() == 0) {
|
||||
assertTrue(actual == null || actual.size() == 0);
|
||||
return;
|
||||
}
|
||||
assertEquals(expected.size(), actual.size());
|
||||
expected.forEach((expectedTn, expectedCFs) -> {
|
||||
List<String> actualCFs = actual.get(expectedTn);
|
||||
if (expectedCFs == null || expectedCFs.size() == 0) {
|
||||
assertTrue(actual.containsKey(expectedTn));
|
||||
assertTrue(actualCFs == null || actualCFs.size() == 0);
|
||||
} else {
|
||||
assertNotNull(actualCFs);
|
||||
assertEquals(expectedCFs.size(), actualCFs.size());
|
||||
for (Iterator<String> expectedIt = expectedCFs.iterator(), actualIt = actualCFs.iterator();
|
||||
expectedIt.hasNext();) {
|
||||
assertEquals(expectedIt.next(), actualIt.next());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void assertConfigEquals(ReplicationPeerConfig expected, ReplicationPeerConfig actual) {
|
||||
assertEquals(expected.getClusterKey(), actual.getClusterKey());
|
||||
assertEquals(expected.getReplicationEndpointImpl(), actual.getReplicationEndpointImpl());
|
||||
assertSetEquals(expected.getNamespaces(), actual.getNamespaces());
|
||||
assertSetEquals(expected.getExcludeNamespaces(), actual.getExcludeNamespaces());
|
||||
assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap());
|
||||
assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap());
|
||||
assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables());
|
||||
assertEquals(expected.getBandwidth(), actual.getBandwidth());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws ReplicationException {
|
||||
int peerCount = 10;
|
||||
for (int i = 0; i < peerCount; i++) {
|
||||
STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0);
|
||||
}
|
||||
List<String> peerIds = STORAGE.listPeerIds();
|
||||
assertEquals(peerCount, peerIds.size());
|
||||
for (String peerId : peerIds) {
|
||||
int seed = Integer.parseInt(peerId);
|
||||
assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId).get());
|
||||
}
|
||||
for (int i = 0; i < peerCount; i++) {
|
||||
STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1));
|
||||
}
|
||||
for (String peerId : peerIds) {
|
||||
int seed = Integer.parseInt(peerId);
|
||||
assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId).get());
|
||||
}
|
||||
for (int i = 0; i < peerCount; i++) {
|
||||
assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i)));
|
||||
}
|
||||
for (int i = 0; i < peerCount; i++) {
|
||||
STORAGE.setPeerState(Integer.toString(i), i % 2 != 0);
|
||||
}
|
||||
for (int i = 0; i < peerCount; i++) {
|
||||
assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i)));
|
||||
}
|
||||
String toRemove = Integer.toString(peerCount / 2);
|
||||
STORAGE.removePeer(toRemove);
|
||||
peerIds = STORAGE.listPeerIds();
|
||||
assertEquals(peerCount - 1, peerIds.size());
|
||||
assertFalse(peerIds.contains(toRemove));
|
||||
assertFalse(STORAGE.getPeerConfig(toRemove).isPresent());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,171 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.hasItems;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.SortedSet;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ ReplicationTests.class, MediumTests.class })
|
||||
public class TestZKReplicationQueueStorage {
|
||||
private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility();
|
||||
|
||||
private static ZKReplicationQueueStorage STORAGE;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
UTIL.startMiniZKCluster();
|
||||
STORAGE = new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws IOException {
|
||||
UTIL.shutdownMiniZKCluster();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDownAfterTest() throws ReplicationException {
|
||||
for (ServerName serverName : STORAGE.getListOfReplicators()) {
|
||||
for (String queue : STORAGE.getAllQueues(serverName)) {
|
||||
STORAGE.removeQueue(serverName, queue);
|
||||
}
|
||||
STORAGE.removeReplicatorIfQueueIsEmpty(serverName);
|
||||
}
|
||||
for (String peerId : STORAGE.getAllPeersFromHFileRefsQueue()) {
|
||||
STORAGE.removePeerFromHFileRefs(peerId);
|
||||
}
|
||||
}
|
||||
|
||||
private ServerName getServerName(int i) {
|
||||
return ServerName.valueOf("127.0.0.1", 8000 + i, 10000 + i);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplicator() throws ReplicationException {
|
||||
assertTrue(STORAGE.getListOfReplicators().isEmpty());
|
||||
String queueId = "1";
|
||||
for (int i = 0; i < 10; i++) {
|
||||
STORAGE.addWAL(getServerName(i), queueId, "file" + i);
|
||||
}
|
||||
List<ServerName> replicators = STORAGE.getListOfReplicators();
|
||||
assertEquals(10, replicators.size());
|
||||
for (int i = 0; i < 10; i++) {
|
||||
assertThat(replicators, hasItems(getServerName(i)));
|
||||
}
|
||||
for (int i = 0; i < 5; i++) {
|
||||
STORAGE.removeQueue(getServerName(i), queueId);
|
||||
}
|
||||
for (int i = 0; i < 10; i++) {
|
||||
STORAGE.removeReplicatorIfQueueIsEmpty(getServerName(i));
|
||||
}
|
||||
replicators = STORAGE.getListOfReplicators();
|
||||
assertEquals(5, replicators.size());
|
||||
for (int i = 5; i < 10; i++) {
|
||||
assertThat(replicators, hasItems(getServerName(i)));
|
||||
}
|
||||
}
|
||||
|
||||
private String getFileName(String base, int i) {
|
||||
return String.format(base + "-%04d", i);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddRemoveLog() throws ReplicationException {
|
||||
ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
|
||||
assertTrue(STORAGE.getAllQueues(serverName1).isEmpty());
|
||||
String queue1 = "1";
|
||||
String queue2 = "2";
|
||||
for (int i = 0; i < 10; i++) {
|
||||
STORAGE.addWAL(serverName1, queue1, getFileName("file1", i));
|
||||
STORAGE.addWAL(serverName1, queue2, getFileName("file2", i));
|
||||
}
|
||||
List<String> queueIds = STORAGE.getAllQueues(serverName1);
|
||||
assertEquals(2, queueIds.size());
|
||||
assertThat(queueIds, hasItems("1", "2"));
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
assertEquals(0, STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i)));
|
||||
assertEquals(0, STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i)));
|
||||
STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100);
|
||||
STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i + 1) * 100 + 10);
|
||||
}
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
assertEquals((i + 1) * 100,
|
||||
STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i)));
|
||||
assertEquals((i + 1) * 100 + 10,
|
||||
STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i)));
|
||||
}
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
if (i % 2 == 0) {
|
||||
STORAGE.removeWAL(serverName1, queue1, getFileName("file1", i));
|
||||
} else {
|
||||
STORAGE.removeWAL(serverName1, queue2, getFileName("file2", i));
|
||||
}
|
||||
}
|
||||
|
||||
queueIds = STORAGE.getAllQueues(serverName1);
|
||||
assertEquals(2, queueIds.size());
|
||||
assertThat(queueIds, hasItems("1", "2"));
|
||||
|
||||
ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001);
|
||||
Pair<String, SortedSet<String>> peer1 = STORAGE.claimQueue(serverName1, "1", serverName2);
|
||||
|
||||
assertEquals("1-" + serverName1.getServerName(), peer1.getFirst());
|
||||
assertEquals(5, peer1.getSecond().size());
|
||||
int i = 1;
|
||||
for (String wal : peer1.getSecond()) {
|
||||
assertEquals(getFileName("file1", i), wal);
|
||||
assertEquals((i + 1) * 100,
|
||||
STORAGE.getWALPosition(serverName2, peer1.getFirst(), getFileName("file1", i)));
|
||||
i += 2;
|
||||
}
|
||||
|
||||
queueIds = STORAGE.getAllQueues(serverName1);
|
||||
assertEquals(1, queueIds.size());
|
||||
assertThat(queueIds, hasItems("2"));
|
||||
|
||||
queueIds = STORAGE.getAllQueues(serverName2);
|
||||
assertEquals(1, queueIds.size());
|
||||
assertThat(queueIds, hasItems(peer1.getFirst()));
|
||||
|
||||
Set<String> allWals = STORAGE.getAllWALs();
|
||||
assertEquals(10, allWals.size());
|
||||
for (i = 0; i < 10; i++) {
|
||||
assertThat(allWals, hasItems(i % 2 == 0 ? getFileName("file2", i) : getFileName("file1", i)));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -40,6 +40,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
|
@ -133,7 +134,7 @@ import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure;
|
|||
import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure;
|
||||
import org.apache.hadoop.hbase.master.replication.ModifyPeerProcedure;
|
||||
import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplicationManager;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
|
||||
import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
|
||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
|
||||
import org.apache.hadoop.hbase.mob.MobConstants;
|
||||
|
@ -327,7 +328,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
private AssignmentManager assignmentManager;
|
||||
|
||||
// manager of replication
|
||||
private ReplicationManager replicationManager;
|
||||
private ReplicationPeerManager replicationPeerManager;
|
||||
|
||||
// buffer for "fatal error" notices from region servers
|
||||
// in the cluster. This is only used for assisting
|
||||
|
@ -718,8 +719,8 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
/**
|
||||
* Initialize all ZK based system trackers.
|
||||
*/
|
||||
void initializeZKBasedSystemTrackers() throws IOException,
|
||||
InterruptedException, KeeperException {
|
||||
void initializeZKBasedSystemTrackers()
|
||||
throws IOException, InterruptedException, KeeperException, ReplicationException {
|
||||
this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
|
||||
this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
|
||||
this.normalizer.setMasterServices(this);
|
||||
|
@ -737,7 +738,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
this.assignmentManager = new AssignmentManager(this);
|
||||
this.assignmentManager.start();
|
||||
|
||||
this.replicationManager = new ReplicationManager(conf, zooKeeper, this);
|
||||
this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf);
|
||||
|
||||
this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.serverManager);
|
||||
this.regionServerTracker.start();
|
||||
|
@ -783,8 +784,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
* </ol>
|
||||
*/
|
||||
private void finishActiveMasterInitialization(MonitoredTask status)
|
||||
throws IOException, InterruptedException, KeeperException {
|
||||
|
||||
throws IOException, InterruptedException, KeeperException, ReplicationException {
|
||||
Thread zombieDetector = new Thread(new InitializationMonitor(this),
|
||||
"ActiveMasterInitializationMonitor-" + System.currentTimeMillis());
|
||||
zombieDetector.setDaemon(true);
|
||||
|
@ -3414,18 +3414,19 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
}
|
||||
|
||||
@Override
|
||||
public ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws ReplicationException,
|
||||
IOException {
|
||||
public ReplicationPeerConfig getReplicationPeerConfig(String peerId)
|
||||
throws ReplicationException, IOException {
|
||||
if (cpHost != null) {
|
||||
cpHost.preGetReplicationPeerConfig(peerId);
|
||||
}
|
||||
final ReplicationPeerConfig peerConfig = this.replicationManager.getPeerConfig(peerId);
|
||||
LOG.info(getClientIdAuditPrefix() + " get replication peer config, id=" + peerId + ", config="
|
||||
+ peerConfig);
|
||||
LOG.info(getClientIdAuditPrefix() + " get replication peer config, id=" + peerId);
|
||||
Optional<ReplicationPeerConfig> peerConfig =
|
||||
this.replicationPeerManager.getPeerConfig(peerId);
|
||||
|
||||
if (cpHost != null) {
|
||||
cpHost.postGetReplicationPeerConfig(peerId);
|
||||
}
|
||||
return peerConfig;
|
||||
return peerConfig.orElse(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -3444,7 +3445,8 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
}
|
||||
LOG.info(getClientIdAuditPrefix() + " list replication peers, regex=" + regex);
|
||||
Pattern pattern = regex == null ? null : Pattern.compile(regex);
|
||||
List<ReplicationPeerDescription> peers = this.replicationManager.listReplicationPeers(pattern);
|
||||
List<ReplicationPeerDescription> peers =
|
||||
this.replicationPeerManager.listPeers(pattern);
|
||||
if (cpHost != null) {
|
||||
cpHost.postListReplicationPeers(regex);
|
||||
}
|
||||
|
@ -3593,8 +3595,8 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
}
|
||||
|
||||
@Override
|
||||
public ReplicationManager getReplicationManager() {
|
||||
return replicationManager;
|
||||
public ReplicationPeerManager getReplicationPeerManager() {
|
||||
return replicationPeerManager;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
|||
import org.apache.hadoop.hbase.master.locking.LockManager;
|
||||
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplicationManager;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
|
||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
|
||||
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
|
||||
import org.apache.hadoop.hbase.procedure2.LockedResource;
|
||||
|
@ -459,9 +459,9 @@ public interface MasterServices extends Server {
|
|||
IOException;
|
||||
|
||||
/**
|
||||
* Returns the {@link ReplicationManager}.
|
||||
* Returns the {@link ReplicationPeerManager}.
|
||||
*/
|
||||
ReplicationManager getReplicationManager();
|
||||
ReplicationPeerManager getReplicationPeerManager();
|
||||
|
||||
/**
|
||||
* Update the peerConfig for the specified peer
|
||||
|
|
|
@ -24,24 +24,24 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplicationManager;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
|
||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
|
||||
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.security.Superusers;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
|
@ -138,8 +138,8 @@ public class MasterProcedureEnv implements ConfigurationObserver {
|
|||
return remoteDispatcher;
|
||||
}
|
||||
|
||||
public ReplicationManager getReplicationManager() {
|
||||
return master.getReplicationManager();
|
||||
public ReplicationPeerManager getReplicationPeerManager() {
|
||||
return master.getReplicationPeerManager();
|
||||
}
|
||||
|
||||
public boolean isRunning() {
|
||||
|
@ -151,22 +151,22 @@ public class MasterProcedureEnv implements ConfigurationObserver {
|
|||
return master.isInitialized();
|
||||
}
|
||||
|
||||
public boolean waitInitialized(Procedure proc) {
|
||||
public boolean waitInitialized(Procedure<?> proc) {
|
||||
return master.getInitializedEvent().suspendIfNotReady(proc);
|
||||
}
|
||||
|
||||
public boolean waitServerCrashProcessingEnabled(Procedure proc) {
|
||||
public boolean waitServerCrashProcessingEnabled(Procedure<?> proc) {
|
||||
if (master instanceof HMaster) {
|
||||
return ((HMaster)master).getServerCrashProcessingEnabledEvent().suspendIfNotReady(proc);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean waitFailoverCleanup(Procedure proc) {
|
||||
public boolean waitFailoverCleanup(Procedure<?> proc) {
|
||||
return master.getAssignmentManager().getFailoverCleanupEvent().suspendIfNotReady(proc);
|
||||
}
|
||||
|
||||
public void setEventReady(ProcedureEvent event, boolean isReady) {
|
||||
public void setEventReady(ProcedureEvent<?> event, boolean isReady) {
|
||||
if (isReady) {
|
||||
event.wake(procSched);
|
||||
} else {
|
||||
|
|
|
@ -58,16 +58,18 @@ public class AddPeerProcedure extends ModifyPeerProcedure {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void prePeerModification(MasterProcedureEnv env) throws IOException {
|
||||
protected void prePeerModification(MasterProcedureEnv env)
|
||||
throws IOException, ReplicationException {
|
||||
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
||||
if (cpHost != null) {
|
||||
cpHost.preAddReplicationPeer(peerId, peerConfig);
|
||||
}
|
||||
env.getReplicationPeerManager().preAddPeer(peerId, peerConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
|
||||
env.getReplicationManager().addReplicationPeer(peerId, peerConfig, enabled);
|
||||
env.getReplicationPeerManager().addPeer(peerId, peerConfig, enabled);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.io.IOException;
|
|||
|
||||
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -51,12 +52,12 @@ public class DisablePeerProcedure extends ModifyPeerProcedure {
|
|||
if (cpHost != null) {
|
||||
cpHost.preDisableReplicationPeer(peerId);
|
||||
}
|
||||
env.getReplicationPeerManager().preDisablePeer(peerId);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void updatePeerStorage(MasterProcedureEnv env)
|
||||
throws IllegalArgumentException, Exception {
|
||||
env.getReplicationManager().disableReplicationPeer(peerId);
|
||||
protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
|
||||
env.getReplicationPeerManager().disablePeer(peerId);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.io.IOException;
|
|||
|
||||
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -51,11 +52,12 @@ public class EnablePeerProcedure extends ModifyPeerProcedure {
|
|||
if (cpHost != null) {
|
||||
cpHost.preEnableReplicationPeer(peerId);
|
||||
}
|
||||
env.getReplicationPeerManager().preEnablePeer(peerId);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void updatePeerStorage(MasterProcedureEnv env) throws Exception {
|
||||
env.getReplicationManager().enableReplicationPeer(peerId);
|
||||
protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
|
||||
env.getReplicationPeerManager().enablePeer(peerId);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.master.replication;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
|
||||
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
|
||||
|
@ -27,6 +26,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
|||
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
||||
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -67,17 +67,16 @@ public abstract class ModifyPeerProcedure
|
|||
}
|
||||
|
||||
/**
|
||||
* Called before we start the actual processing. If an exception is thrown then we will give up
|
||||
* and mark the procedure as failed directly.
|
||||
* Called before we start the actual processing. The implementation should call the pre CP hook,
|
||||
* and also the pre-check for the peer modification.
|
||||
* <p>
|
||||
* If an IOException is thrown then we will give up and mark the procedure as failed directly. If
|
||||
* all checks passes then the procedure can not be rolled back any more.
|
||||
*/
|
||||
protected abstract void prePeerModification(MasterProcedureEnv env) throws IOException;
|
||||
protected abstract void prePeerModification(MasterProcedureEnv env)
|
||||
throws IOException, ReplicationException;
|
||||
|
||||
/**
|
||||
* We will give up and mark the procedure as failure if {@link IllegalArgumentException} is
|
||||
* thrown, for other type of Exception we will retry.
|
||||
*/
|
||||
protected abstract void updatePeerStorage(MasterProcedureEnv env)
|
||||
throws IllegalArgumentException, Exception;
|
||||
protected abstract void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Called before we finish the procedure. The implementation can do some logging work, and also
|
||||
|
@ -100,23 +99,24 @@ public abstract class ModifyPeerProcedure
|
|||
try {
|
||||
prePeerModification(env);
|
||||
} catch (IOException e) {
|
||||
LOG.warn(getClass().getName() + " failed to call prePeerModification for peer " + peerId +
|
||||
", mark the procedure as failure and give up", e);
|
||||
setFailure("prePeerModification", e);
|
||||
LOG.warn(
|
||||
getClass().getName() + " failed to call CP hook or the pre check is failed for peer " +
|
||||
peerId + ", mark the procedure as failure and give up",
|
||||
e);
|
||||
setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e);
|
||||
releaseLatch();
|
||||
return Flow.NO_MORE_STATE;
|
||||
} catch (ReplicationException e) {
|
||||
LOG.warn(getClass().getName() + " failed to call prePeerModification for peer " + peerId +
|
||||
", retry", e);
|
||||
throw new ProcedureYieldException();
|
||||
}
|
||||
setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
|
||||
return Flow.HAS_MORE_STATE;
|
||||
case UPDATE_PEER_STORAGE:
|
||||
try {
|
||||
updatePeerStorage(env);
|
||||
} catch (IllegalArgumentException e) {
|
||||
setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer",
|
||||
new DoNotRetryIOException(e));
|
||||
releaseLatch();
|
||||
return Flow.NO_MORE_STATE;
|
||||
} catch (Exception e) {
|
||||
} catch (ReplicationException e) {
|
||||
LOG.warn(
|
||||
getClass().getName() + " update peer storage for peer " + peerId + " failed, retry", e);
|
||||
throw new ProcedureYieldException();
|
||||
|
@ -158,8 +158,7 @@ public abstract class ModifyPeerProcedure
|
|||
@Override
|
||||
protected void rollbackState(MasterProcedureEnv env, PeerModificationState state)
|
||||
throws IOException, InterruptedException {
|
||||
if (state == PeerModificationState.PRE_PEER_MODIFICATION ||
|
||||
state == PeerModificationState.UPDATE_PEER_STORAGE) {
|
||||
if (state == PeerModificationState.PRE_PEER_MODIFICATION) {
|
||||
// actually the peer related operations has no rollback, but if we haven't done any
|
||||
// modifications on the peer storage, we can just return.
|
||||
return;
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.io.IOException;
|
|||
|
||||
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -51,11 +52,12 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {
|
|||
if (cpHost != null) {
|
||||
cpHost.preRemoveReplicationPeer(peerId);
|
||||
}
|
||||
env.getReplicationPeerManager().preRemovePeer(peerId);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void updatePeerStorage(MasterProcedureEnv env) throws Exception {
|
||||
env.getReplicationManager().removeReplicationPeer(peerId);
|
||||
protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
|
||||
env.getReplicationPeerManager().removePeer(peerId);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,199 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.replication;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Manages and performs all replication admin operations.
|
||||
* <p>
|
||||
* Used to add/remove a replication peer.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ReplicationManager {
|
||||
private final ReplicationQueuesClient replicationQueuesClient;
|
||||
private final ReplicationPeers replicationPeers;
|
||||
|
||||
public ReplicationManager(Configuration conf, ZKWatcher zkw, Abortable abortable)
|
||||
throws IOException {
|
||||
try {
|
||||
this.replicationQueuesClient = ReplicationFactory
|
||||
.getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf, abortable, zkw));
|
||||
this.replicationQueuesClient.init();
|
||||
this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf,
|
||||
this.replicationQueuesClient, abortable);
|
||||
this.replicationPeers.init();
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Failed to construct ReplicationManager", e);
|
||||
}
|
||||
}
|
||||
|
||||
public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
|
||||
throws ReplicationException {
|
||||
checkPeerConfig(peerConfig);
|
||||
replicationPeers.registerPeer(peerId, peerConfig, enabled);
|
||||
replicationPeers.peerConnected(peerId);
|
||||
}
|
||||
|
||||
public void removeReplicationPeer(String peerId) throws ReplicationException {
|
||||
replicationPeers.peerDisconnected(peerId);
|
||||
replicationPeers.unregisterPeer(peerId);
|
||||
}
|
||||
|
||||
public void enableReplicationPeer(String peerId) throws ReplicationException {
|
||||
this.replicationPeers.enablePeer(peerId);
|
||||
}
|
||||
|
||||
public void disableReplicationPeer(String peerId) throws ReplicationException {
|
||||
this.replicationPeers.disablePeer(peerId);
|
||||
}
|
||||
|
||||
public ReplicationPeerConfig getPeerConfig(String peerId)
|
||||
throws ReplicationException, ReplicationPeerNotFoundException {
|
||||
ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(peerId);
|
||||
if (peerConfig == null) {
|
||||
throw new ReplicationPeerNotFoundException(peerId);
|
||||
}
|
||||
return peerConfig;
|
||||
}
|
||||
|
||||
public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
|
||||
throws ReplicationException, IOException {
|
||||
checkPeerConfig(peerConfig);
|
||||
this.replicationPeers.updatePeerConfig(peerId, peerConfig);
|
||||
}
|
||||
|
||||
public List<ReplicationPeerDescription> listReplicationPeers(Pattern pattern)
|
||||
throws ReplicationException {
|
||||
List<ReplicationPeerDescription> peers = new ArrayList<>();
|
||||
List<String> peerIds = replicationPeers.getAllPeerIds();
|
||||
for (String peerId : peerIds) {
|
||||
if (pattern == null || (pattern != null && pattern.matcher(peerId).matches())) {
|
||||
peers.add(new ReplicationPeerDescription(peerId,
|
||||
replicationPeers.getStatusOfPeerFromBackingStore(peerId),
|
||||
replicationPeers.getReplicationPeerConfig(peerId)));
|
||||
}
|
||||
}
|
||||
return peers;
|
||||
}
|
||||
|
||||
/**
|
||||
* If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
|
||||
* Then allow config exclude namespaces or exclude table-cfs which can't be replicated to
|
||||
* peer cluster.
|
||||
*
|
||||
* If replicate_all flag is false, it means all user tables can't be replicated to peer cluster.
|
||||
* Then allow to config namespaces or table-cfs which will be replicated to peer cluster.
|
||||
*/
|
||||
private void checkPeerConfig(ReplicationPeerConfig peerConfig) {
|
||||
if (peerConfig.replicateAllUserTables()) {
|
||||
if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) ||
|
||||
(peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
|
||||
throw new IllegalArgumentException("Need clean namespaces or table-cfs config firstly " +
|
||||
"when you want replicate all cluster");
|
||||
}
|
||||
checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(),
|
||||
peerConfig.getExcludeTableCFsMap());
|
||||
} else {
|
||||
if ((peerConfig.getExcludeNamespaces() != null
|
||||
&& !peerConfig.getExcludeNamespaces().isEmpty())
|
||||
|| (peerConfig.getExcludeTableCFsMap() != null
|
||||
&& !peerConfig.getExcludeTableCFsMap().isEmpty())) {
|
||||
throw new IllegalArgumentException(
|
||||
"Need clean exclude-namespaces or exclude-table-cfs config firstly"
|
||||
+ " when replicate_all flag is false");
|
||||
}
|
||||
checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
|
||||
peerConfig.getTableCFsMap());
|
||||
}
|
||||
checkConfiguredWALEntryFilters(peerConfig);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set a namespace in the peer config means that all tables in this namespace will be replicated
|
||||
* to the peer cluster.
|
||||
* <ol>
|
||||
* <li>If peer config already has a namespace, then not allow set any table of this namespace to
|
||||
* the peer config.</li>
|
||||
* <li>If peer config already has a table, then not allow set this table's namespace to the peer
|
||||
* config.</li>
|
||||
* </ol>
|
||||
* <p>
|
||||
* Set a exclude namespace in the peer config means that all tables in this namespace can't be
|
||||
* replicated to the peer cluster.
|
||||
* <ol>
|
||||
* <li>If peer config already has a exclude namespace, then not allow set any exclude table of
|
||||
* this namespace to the peer config.</li>
|
||||
* <li>If peer config already has a exclude table, then not allow set this table's namespace as a
|
||||
* exclude namespace.</li>
|
||||
* </ol>
|
||||
*/
|
||||
private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces,
|
||||
Map<TableName, ? extends Collection<String>> tableCfs) {
|
||||
if (namespaces == null || namespaces.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
if (tableCfs == null || tableCfs.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
|
||||
TableName table = entry.getKey();
|
||||
if (namespaces.contains(table.getNamespaceAsString())) {
|
||||
throw new IllegalArgumentException("Table-cfs " + table + " is conflict with namespaces "
|
||||
+ table.getNamespaceAsString() + " in peer config");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) {
|
||||
String filterCSV = peerConfig.getConfiguration()
|
||||
.get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
|
||||
if (filterCSV != null && !filterCSV.isEmpty()) {
|
||||
String[] filters = filterCSV.split(",");
|
||||
for (String filter : filters) {
|
||||
try {
|
||||
Class.forName(filter).newInstance();
|
||||
} catch (Exception e) {
|
||||
throw new IllegalArgumentException("Configured WALEntryFilter " + filter +
|
||||
" could not be created. Failing add/update " + "peer operation.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,331 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.replication;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Manages and performs all replication admin operations.
|
||||
* <p>
|
||||
* Used to add/remove a replication peer.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public final class ReplicationPeerManager {
|
||||
|
||||
private final ReplicationPeerStorage peerStorage;
|
||||
|
||||
private final ReplicationQueueStorage queueStorage;
|
||||
|
||||
private final ConcurrentMap<String, ReplicationPeerDescription> peers;
|
||||
|
||||
private ReplicationPeerManager(ReplicationPeerStorage peerStorage,
|
||||
ReplicationQueueStorage queueStorage,
|
||||
ConcurrentMap<String, ReplicationPeerDescription> peers) {
|
||||
this.peerStorage = peerStorage;
|
||||
this.queueStorage = queueStorage;
|
||||
this.peers = peers;
|
||||
}
|
||||
|
||||
private void checkQueuesDeleted(String peerId)
|
||||
throws ReplicationException, DoNotRetryIOException {
|
||||
for (ServerName replicator : queueStorage.getListOfReplicators()) {
|
||||
List<String> queueIds = queueStorage.getAllQueues(replicator);
|
||||
for (String queueId : queueIds) {
|
||||
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
|
||||
if (queueInfo.getPeerId().equals(peerId)) {
|
||||
throw new DoNotRetryIOException("undeleted queue for peerId: " + peerId +
|
||||
", replicator: " + replicator + ", queueId: " + queueId);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) {
|
||||
throw new DoNotRetryIOException("Undeleted queue for peer " + peerId + " in hfile-refs");
|
||||
}
|
||||
}
|
||||
|
||||
public void preAddPeer(String peerId, ReplicationPeerConfig peerConfig)
|
||||
throws DoNotRetryIOException, ReplicationException {
|
||||
if (peerId.contains("-")) {
|
||||
throw new DoNotRetryIOException("Found invalid peer name: " + peerId);
|
||||
}
|
||||
checkPeerConfig(peerConfig);
|
||||
if (peers.containsKey(peerId)) {
|
||||
throw new DoNotRetryIOException("Replication peer " + peerId + " already exists");
|
||||
}
|
||||
// make sure that there is no queues with the same peer id. This may happen when we create a
|
||||
// peer with the same id with a old deleted peer. If the replication queues for the old peer
|
||||
// have not been cleaned up yet then we should not create the new peer, otherwise the old wal
|
||||
// file may also be replicated.
|
||||
checkQueuesDeleted(peerId);
|
||||
}
|
||||
|
||||
private ReplicationPeerDescription checkPeerExists(String peerId) throws DoNotRetryIOException {
|
||||
ReplicationPeerDescription desc = peers.get(peerId);
|
||||
if (desc == null) {
|
||||
throw new DoNotRetryIOException("Replication peer " + peerId + " does not exist");
|
||||
}
|
||||
return desc;
|
||||
}
|
||||
|
||||
public void preRemovePeer(String peerId) throws DoNotRetryIOException {
|
||||
checkPeerExists(peerId);
|
||||
}
|
||||
|
||||
public void preEnablePeer(String peerId) throws DoNotRetryIOException {
|
||||
ReplicationPeerDescription desc = checkPeerExists(peerId);
|
||||
if (desc.isEnabled()) {
|
||||
throw new DoNotRetryIOException("Replication peer " + peerId + " has already been enabled");
|
||||
}
|
||||
}
|
||||
|
||||
public void preDisablePeer(String peerId) throws DoNotRetryIOException {
|
||||
ReplicationPeerDescription desc = checkPeerExists(peerId);
|
||||
if (!desc.isEnabled()) {
|
||||
throw new DoNotRetryIOException("Replication peer " + peerId + " has already been disabled");
|
||||
}
|
||||
}
|
||||
|
||||
public void preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
|
||||
throws DoNotRetryIOException {
|
||||
checkPeerConfig(peerConfig);
|
||||
ReplicationPeerDescription desc = checkPeerExists(peerId);
|
||||
ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
|
||||
if (!StringUtils.isBlank(peerConfig.getClusterKey()) &&
|
||||
!peerConfig.getClusterKey().equals(oldPeerConfig.getClusterKey())) {
|
||||
throw new DoNotRetryIOException(
|
||||
"Changing the cluster key on an existing peer is not allowed. Existing key '" +
|
||||
oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" +
|
||||
peerConfig.getClusterKey() + "'");
|
||||
}
|
||||
|
||||
if (!StringUtils.isBlank(peerConfig.getReplicationEndpointImpl()) &&
|
||||
!peerConfig.getReplicationEndpointImpl().equals(oldPeerConfig.getReplicationEndpointImpl())) {
|
||||
throw new DoNotRetryIOException("Changing the replication endpoint implementation class " +
|
||||
"on an existing peer is not allowed. Existing class '" +
|
||||
oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId +
|
||||
" does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'");
|
||||
}
|
||||
}
|
||||
|
||||
private ReplicationPeerConfig copy(ReplicationPeerConfig peerConfig) {
|
||||
ReplicationPeerConfig copiedPeerConfig = new ReplicationPeerConfig();
|
||||
copiedPeerConfig.getConfiguration().putAll(peerConfig.getConfiguration());
|
||||
copiedPeerConfig.getPeerData().putAll(peerConfig.getPeerData());
|
||||
copiedPeerConfig.setTableCFsMap(peerConfig.getTableCFsMap());
|
||||
copiedPeerConfig.setNamespaces(peerConfig.getNamespaces());
|
||||
copiedPeerConfig.setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap());
|
||||
copiedPeerConfig.setExcludeNamespaces(peerConfig.getExcludeNamespaces());
|
||||
copiedPeerConfig.setBandwidth(peerConfig.getBandwidth());
|
||||
copiedPeerConfig.setReplicateAllUserTables(peerConfig.replicateAllUserTables());
|
||||
copiedPeerConfig.setClusterKey(peerConfig.getClusterKey());
|
||||
copiedPeerConfig.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
|
||||
return copiedPeerConfig;
|
||||
}
|
||||
|
||||
public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
|
||||
throws ReplicationException {
|
||||
if (peers.containsKey(peerId)) {
|
||||
// this should be a retry, just return
|
||||
return;
|
||||
}
|
||||
ReplicationPeerConfig copiedPeerConfig = copy(peerConfig);
|
||||
peerStorage.addPeer(peerId, copiedPeerConfig, enabled);
|
||||
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig));
|
||||
}
|
||||
|
||||
public void removePeer(String peerId) throws ReplicationException {
|
||||
if (!peers.containsKey(peerId)) {
|
||||
// this should be a retry, just return
|
||||
return;
|
||||
}
|
||||
peerStorage.removePeer(peerId);
|
||||
peers.remove(peerId);
|
||||
}
|
||||
|
||||
private void setPeerState(String peerId, boolean enabled) throws ReplicationException {
|
||||
ReplicationPeerDescription desc = peers.get(peerId);
|
||||
if (desc.isEnabled() == enabled) {
|
||||
// this should be a retry, just return
|
||||
return;
|
||||
}
|
||||
peerStorage.setPeerState(peerId, enabled);
|
||||
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig()));
|
||||
}
|
||||
|
||||
public void enablePeer(String peerId) throws ReplicationException {
|
||||
setPeerState(peerId, true);
|
||||
}
|
||||
|
||||
public void disablePeer(String peerId) throws ReplicationException {
|
||||
setPeerState(peerId, false);
|
||||
}
|
||||
|
||||
public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
|
||||
throws ReplicationException {
|
||||
// the checking rules are too complicated here so we give up checking whether this is a retry.
|
||||
ReplicationPeerDescription desc = peers.get(peerId);
|
||||
ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
|
||||
ReplicationPeerConfig newPeerConfig = copy(peerConfig);
|
||||
// we need to use the new conf to overwrite the old one.
|
||||
newPeerConfig.getConfiguration().putAll(oldPeerConfig.getConfiguration());
|
||||
newPeerConfig.getConfiguration().putAll(peerConfig.getConfiguration());
|
||||
newPeerConfig.getPeerData().putAll(oldPeerConfig.getPeerData());
|
||||
newPeerConfig.getPeerData().putAll(peerConfig.getPeerData());
|
||||
|
||||
peerStorage.updatePeerConfig(peerId, newPeerConfig);
|
||||
peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig));
|
||||
}
|
||||
|
||||
public List<ReplicationPeerDescription> listPeers(Pattern pattern) {
|
||||
if (pattern == null) {
|
||||
return new ArrayList<>(peers.values());
|
||||
}
|
||||
return peers.values().stream().filter(r -> pattern.matcher(r.getPeerId()).matches())
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) {
|
||||
ReplicationPeerDescription desc = peers.get(peerId);
|
||||
return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
* If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
|
||||
* Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer
|
||||
* cluster.
|
||||
* <p>
|
||||
* If replicate_all flag is false, it means all user tables can't be replicated to peer cluster.
|
||||
* Then allow to config namespaces or table-cfs which will be replicated to peer cluster.
|
||||
*/
|
||||
private static void checkPeerConfig(ReplicationPeerConfig peerConfig)
|
||||
throws DoNotRetryIOException {
|
||||
if (peerConfig.replicateAllUserTables()) {
|
||||
if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) ||
|
||||
(peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
|
||||
throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly " +
|
||||
"when you want replicate all cluster");
|
||||
}
|
||||
checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(),
|
||||
peerConfig.getExcludeTableCFsMap());
|
||||
} else {
|
||||
if ((peerConfig.getExcludeNamespaces() != null &&
|
||||
!peerConfig.getExcludeNamespaces().isEmpty()) ||
|
||||
(peerConfig.getExcludeTableCFsMap() != null &&
|
||||
!peerConfig.getExcludeTableCFsMap().isEmpty())) {
|
||||
throw new DoNotRetryIOException(
|
||||
"Need clean exclude-namespaces or exclude-table-cfs config firstly" +
|
||||
" when replicate_all flag is false");
|
||||
}
|
||||
checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
|
||||
peerConfig.getTableCFsMap());
|
||||
}
|
||||
checkConfiguredWALEntryFilters(peerConfig);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set a namespace in the peer config means that all tables in this namespace will be replicated
|
||||
* to the peer cluster.
|
||||
* <ol>
|
||||
* <li>If peer config already has a namespace, then not allow set any table of this namespace to
|
||||
* the peer config.</li>
|
||||
* <li>If peer config already has a table, then not allow set this table's namespace to the peer
|
||||
* config.</li>
|
||||
* </ol>
|
||||
* <p>
|
||||
* Set a exclude namespace in the peer config means that all tables in this namespace can't be
|
||||
* replicated to the peer cluster.
|
||||
* <ol>
|
||||
* <li>If peer config already has a exclude namespace, then not allow set any exclude table of
|
||||
* this namespace to the peer config.</li>
|
||||
* <li>If peer config already has a exclude table, then not allow set this table's namespace as a
|
||||
* exclude namespace.</li>
|
||||
* </ol>
|
||||
*/
|
||||
private static void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces,
|
||||
Map<TableName, ? extends Collection<String>> tableCfs) throws DoNotRetryIOException {
|
||||
if (namespaces == null || namespaces.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
if (tableCfs == null || tableCfs.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
|
||||
TableName table = entry.getKey();
|
||||
if (namespaces.contains(table.getNamespaceAsString())) {
|
||||
throw new DoNotRetryIOException("Table-cfs " + table + " is conflict with namespaces " +
|
||||
table.getNamespaceAsString() + " in peer config");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)
|
||||
throws DoNotRetryIOException {
|
||||
String filterCSV = peerConfig.getConfiguration()
|
||||
.get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
|
||||
if (filterCSV != null && !filterCSV.isEmpty()) {
|
||||
String[] filters = filterCSV.split(",");
|
||||
for (String filter : filters) {
|
||||
try {
|
||||
Class.forName(filter).newInstance();
|
||||
} catch (Exception e) {
|
||||
throw new DoNotRetryIOException("Configured WALEntryFilter " + filter +
|
||||
" could not be created. Failing add/update " + "peer operation.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf)
|
||||
throws ReplicationException {
|
||||
ReplicationPeerStorage peerStorage =
|
||||
ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
|
||||
ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>();
|
||||
for (String peerId : peerStorage.listPeerIds()) {
|
||||
Optional<ReplicationPeerConfig> peerConfig = peerStorage.getPeerConfig(peerId);
|
||||
boolean enabled = peerStorage.isPeerEnabled(peerId);
|
||||
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig.get()));
|
||||
}
|
||||
return new ReplicationPeerManager(peerStorage,
|
||||
ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers);
|
||||
}
|
||||
}
|
|
@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
|||
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -59,12 +60,12 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure {
|
|||
if (cpHost != null) {
|
||||
cpHost.preUpdateReplicationPeerConfig(peerId, peerConfig);
|
||||
}
|
||||
env.getReplicationPeerManager().preUpdatePeerConfig(peerId, peerConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void updatePeerStorage(MasterProcedureEnv env)
|
||||
throws IllegalArgumentException, Exception {
|
||||
env.getReplicationManager().updatePeerConfig(peerId, peerConfig);
|
||||
protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
|
||||
env.getReplicationPeerManager().updatePeerConfig(peerId, peerConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -17,6 +17,13 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client.replication;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
|
@ -25,26 +32,24 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
||||
import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterReplicationEndpointForTest;
|
||||
import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.ReplicationEndpointForTest;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
@ -53,15 +58,6 @@ import org.junit.Rule;
|
|||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
/**
|
||||
* Unit testing of ReplicationAdmin
|
||||
|
@ -73,8 +69,6 @@ public class TestReplicationAdmin {
|
|||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestReplicationAdmin.class);
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationAdmin.class);
|
||||
|
||||
private final static HBaseTestingUtility TEST_UTIL =
|
||||
new HBaseTestingUtility();
|
||||
|
||||
|
@ -109,16 +103,17 @@ public class TestReplicationAdmin {
|
|||
}
|
||||
|
||||
@After
|
||||
public void cleanupPeer() {
|
||||
try {
|
||||
hbaseAdmin.removeReplicationPeer(ID_ONE);
|
||||
} catch (Exception e) {
|
||||
LOG.debug("Replication peer " + ID_ONE + " may already be removed");
|
||||
public void tearDown() throws Exception {
|
||||
for (ReplicationPeerDescription desc : hbaseAdmin.listReplicationPeers()) {
|
||||
hbaseAdmin.removeReplicationPeer(desc.getPeerId());
|
||||
}
|
||||
try {
|
||||
hbaseAdmin.removeReplicationPeer(ID_SECOND);
|
||||
} catch (Exception e) {
|
||||
LOG.debug("Replication peer " + ID_SECOND + " may already be removed");
|
||||
ReplicationQueueStorage queueStorage = ReplicationStorageFactory
|
||||
.getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration());
|
||||
for (ServerName serverName : queueStorage.getListOfReplicators()) {
|
||||
for (String queue : queueStorage.getAllQueues(serverName)) {
|
||||
queueStorage.removeQueue(serverName, queue);
|
||||
}
|
||||
queueStorage.removeReplicatorIfQueueIsEmpty(serverName);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -208,32 +203,29 @@ public class TestReplicationAdmin {
|
|||
ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
|
||||
rpc2.setClusterKey(KEY_SECOND);
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
ZKWatcher zkw = new ZKWatcher(conf, "Test HBaseAdmin", null);
|
||||
ReplicationQueues repQueues =
|
||||
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, null, zkw));
|
||||
repQueues.init("server1");
|
||||
ReplicationQueueStorage queueStorage =
|
||||
ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), conf);
|
||||
|
||||
ServerName serverName = ServerName.valueOf("server1", 8000, 1234);
|
||||
// add queue for ID_ONE
|
||||
repQueues.addLog(ID_ONE, "file1");
|
||||
queueStorage.addWAL(serverName, ID_ONE, "file1");
|
||||
try {
|
||||
admin.addPeer(ID_ONE, rpc1, null);
|
||||
fail();
|
||||
} catch (Exception e) {
|
||||
// OK!
|
||||
}
|
||||
repQueues.removeQueue(ID_ONE);
|
||||
assertEquals(0, repQueues.getAllQueues().size());
|
||||
queueStorage.removeQueue(serverName, ID_ONE);
|
||||
assertEquals(0, queueStorage.getAllQueues(serverName).size());
|
||||
|
||||
// add recovered queue for ID_ONE
|
||||
repQueues.addLog(ID_ONE + "-server2", "file1");
|
||||
queueStorage.addWAL(serverName, ID_ONE + "-server2", "file1");
|
||||
try {
|
||||
admin.addPeer(ID_ONE, rpc2, null);
|
||||
fail();
|
||||
} catch (Exception e) {
|
||||
// OK!
|
||||
}
|
||||
repQueues.removeAllQueues();
|
||||
zkw.close();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -429,7 +421,7 @@ public class TestReplicationAdmin {
|
|||
tableCFs.clear();
|
||||
tableCFs.put(tableName2, null);
|
||||
admin.removePeerTableCFs(ID_ONE, tableCFs);
|
||||
assertTrue(false);
|
||||
fail();
|
||||
} catch (ReplicationException e) {
|
||||
}
|
||||
tableCFs.clear();
|
||||
|
|
|
@ -19,9 +19,9 @@ package org.apache.hadoop.hbase.master;
|
|||
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import com.google.protobuf.Service;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hbase.ChoreService;
|
||||
|
@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
|||
import org.apache.hadoop.hbase.master.locking.LockManager;
|
||||
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplicationManager;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
|
||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
|
||||
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
|
||||
import org.apache.hadoop.hbase.procedure2.LockedResource;
|
||||
|
@ -55,9 +55,8 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
|||
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
|
||||
import com.google.protobuf.Service;
|
||||
|
||||
public class MockNoopMasterServices implements MasterServices {
|
||||
|
||||
private final Configuration conf;
|
||||
private final MetricsMaster metricsMaster;
|
||||
|
||||
|
@ -461,7 +460,7 @@ public class MockNoopMasterServices implements MasterServices {
|
|||
}
|
||||
|
||||
@Override
|
||||
public ProcedureEvent getInitializedEvent() {
|
||||
public ProcedureEvent<?> getInitializedEvent() {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -476,7 +475,7 @@ public class MockNoopMasterServices implements MasterServices {
|
|||
}
|
||||
|
||||
@Override
|
||||
public ReplicationManager getReplicationManager() {
|
||||
public ReplicationPeerManager getReplicationPeerManager() {
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
|
|||
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
|
@ -274,7 +275,7 @@ public class TestMasterNoCluster {
|
|||
|
||||
@Override
|
||||
void initializeZKBasedSystemTrackers() throws IOException, InterruptedException,
|
||||
KeeperException {
|
||||
KeeperException, ReplicationException {
|
||||
super.initializeZKBasedSystemTrackers();
|
||||
// Record a newer server in server manager at first
|
||||
getServerManager().recordNewServerWithLock(newServer,
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
/*
|
||||
*
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -54,9 +53,6 @@ public class TestReplicationDisableInactivePeer extends TestReplicationBase {
|
|||
*/
|
||||
@Test
|
||||
public void testDisableInactivePeer() throws Exception {
|
||||
|
||||
// enabling and shutdown the peer
|
||||
admin.enablePeer("2");
|
||||
utility2.shutdownMiniHBaseCluster();
|
||||
|
||||
byte[] rowkey = Bytes.toBytes("disable inactive peer");
|
||||
|
|
Loading…
Reference in New Issue