HBASE-19622 Reimplement ReplicationPeers with the new replication storage interface
This commit is contained in:
parent
62a4f5bb46
commit
f89920a60f
|
@ -247,22 +247,22 @@ public final class ReplicationPeerConfigUtil {
|
|||
public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
|
||||
throws DeserializationException {
|
||||
if (ProtobufUtil.isPBMagicPrefix(bytes)) {
|
||||
int pblen = ProtobufUtil.lengthOfPBMagic();
|
||||
int pbLen = ProtobufUtil.lengthOfPBMagic();
|
||||
ReplicationProtos.ReplicationPeer.Builder builder =
|
||||
ReplicationProtos.ReplicationPeer.newBuilder();
|
||||
ReplicationProtos.ReplicationPeer peer;
|
||||
try {
|
||||
ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
|
||||
ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen);
|
||||
peer = builder.build();
|
||||
} catch (IOException e) {
|
||||
throw new DeserializationException(e);
|
||||
}
|
||||
return convert(peer);
|
||||
} else {
|
||||
if (bytes.length > 0) {
|
||||
return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build();
|
||||
if (bytes == null || bytes.length <= 0) {
|
||||
throw new DeserializationException("Bytes to deserialize should not be empty.");
|
||||
}
|
||||
return ReplicationPeerConfig.newBuilder().setClusterKey("").build();
|
||||
return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -339,15 +339,10 @@ public class VerifyReplication extends Configured implements Tool {
|
|||
@Override public boolean isAborted() {return false;}
|
||||
});
|
||||
|
||||
ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
|
||||
ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf);
|
||||
rp.init();
|
||||
|
||||
Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId);
|
||||
if (pair == null) {
|
||||
throw new IOException("Couldn't get peer conf!");
|
||||
}
|
||||
|
||||
return pair;
|
||||
return Pair.newPair(rp.getPeerConfig(peerId), rp.getPeerClusterConfiguration(peerId));
|
||||
} catch (ReplicationException e) {
|
||||
throw new IOException(
|
||||
"An error occurred while trying to connect to the remove peer cluster", e);
|
||||
|
|
|
@ -31,14 +31,8 @@ public final class ReplicationFactory {
|
|||
private ReplicationFactory() {
|
||||
}
|
||||
|
||||
public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf,
|
||||
Abortable abortable) {
|
||||
return getReplicationPeers(zk, conf, null, abortable);
|
||||
}
|
||||
|
||||
public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf,
|
||||
ReplicationQueueStorage queueStorage, Abortable abortable) {
|
||||
return new ReplicationPeersZKImpl(zk, conf, queueStorage, abortable);
|
||||
public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf) {
|
||||
return new ReplicationPeers(zk, conf);
|
||||
}
|
||||
|
||||
public static ReplicationTracker getReplicationTracker(ZKWatcher zookeeper,
|
||||
|
|
|
@ -18,29 +18,16 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class ReplicationPeerImpl implements ReplicationPeer {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerImpl.class);
|
||||
|
||||
private final ReplicationPeerStorage peerStorage;
|
||||
|
||||
private final Configuration conf;
|
||||
|
||||
private final String id;
|
||||
|
@ -58,21 +45,21 @@ public class ReplicationPeerImpl implements ReplicationPeer {
|
|||
* @param id string representation of this peer's identifier
|
||||
* @param peerConfig configuration for the replication peer
|
||||
*/
|
||||
public ReplicationPeerImpl(ZKWatcher zkWatcher, Configuration conf, String id,
|
||||
public ReplicationPeerImpl(Configuration conf, String id, boolean peerState,
|
||||
ReplicationPeerConfig peerConfig) {
|
||||
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zkWatcher, conf);
|
||||
this.conf = conf;
|
||||
this.peerConfig = peerConfig;
|
||||
this.id = id;
|
||||
this.peerState = peerState ? PeerState.ENABLED : PeerState.DISABLED;
|
||||
this.peerConfig = peerConfig;
|
||||
this.peerConfigListeners = new ArrayList<>();
|
||||
}
|
||||
|
||||
public void refreshPeerState() throws ReplicationException {
|
||||
this.peerState = peerStorage.isPeerEnabled(id) ? PeerState.ENABLED : PeerState.DISABLED;
|
||||
void setPeerState(boolean enabled) {
|
||||
this.peerState = enabled ? PeerState.ENABLED : PeerState.DISABLED;
|
||||
}
|
||||
|
||||
public void refreshPeerConfig() throws ReplicationException {
|
||||
this.peerConfig = peerStorage.getPeerConfig(id).orElse(peerConfig);
|
||||
void setPeerConfig(ReplicationPeerConfig peerConfig) {
|
||||
this.peerConfig = peerConfig;
|
||||
peerConfigListeners.forEach(listener -> listener.peerConfigUpdated(peerConfig));
|
||||
}
|
||||
|
||||
|
@ -135,36 +122,4 @@ public class ReplicationPeerImpl implements ReplicationPeer {
|
|||
public void registerPeerConfigListener(ReplicationPeerConfigListener listener) {
|
||||
this.peerConfigListeners.add(listener);
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse the raw data from ZK to get a peer's state
|
||||
* @param bytes raw ZK data
|
||||
* @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state.
|
||||
* @throws DeserializationException if parsing the state fails
|
||||
*/
|
||||
public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
|
||||
ReplicationProtos.ReplicationState.State state = parseStateFrom(bytes);
|
||||
return ReplicationProtos.ReplicationState.State.ENABLED == state;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param bytes Content of a state znode.
|
||||
* @return State parsed from the passed bytes.
|
||||
* @throws DeserializationException if a ProtoBuf operation fails
|
||||
*/
|
||||
private static ReplicationProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
|
||||
throws DeserializationException {
|
||||
ProtobufUtil.expectPBMagicPrefix(bytes);
|
||||
int pbLen = ProtobufUtil.lengthOfPBMagic();
|
||||
ReplicationProtos.ReplicationState.Builder builder =
|
||||
ReplicationProtos.ReplicationState.newBuilder();
|
||||
ReplicationProtos.ReplicationState state;
|
||||
try {
|
||||
ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen);
|
||||
state = builder.build();
|
||||
return state.getState();
|
||||
} catch (IOException e) {
|
||||
throw new DeserializationException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
|
@ -70,5 +69,5 @@ public interface ReplicationPeerStorage {
|
|||
* Get the peer config of a replication peer.
|
||||
* @throws ReplicationException if there are errors accessing the storage service.
|
||||
*/
|
||||
Optional<ReplicationPeerConfig> getPeerConfig(String peerId) throws ReplicationException;
|
||||
ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException;
|
||||
}
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
/*
|
||||
*
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -18,58 +17,53 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.io.IOException;
|
||||
import java.util.Set;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.CompoundConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* This provides an interface for maintaining a set of peer clusters. These peers are remote slave
|
||||
* clusters that data is replicated to. A peer cluster can be in three different states:
|
||||
*
|
||||
* 1. Not-Registered - There is no notion of the peer cluster.
|
||||
* 2. Registered - The peer has an id and is being tracked but there is no connection.
|
||||
* 3. Connected - There is an active connection to the remote peer.
|
||||
*
|
||||
* In the registered or connected state, a peer cluster can either be enabled or disabled.
|
||||
* This provides an class for maintaining a set of peer clusters. These peers are remote slave
|
||||
* clusters that data is replicated to.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface ReplicationPeers {
|
||||
public class ReplicationPeers {
|
||||
|
||||
/**
|
||||
* Initialize the ReplicationPeers interface.
|
||||
*/
|
||||
void init() throws ReplicationException;
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeers.class);
|
||||
|
||||
/**
|
||||
* Add a new remote slave cluster for replication.
|
||||
* @param peerId a short that identifies the cluster
|
||||
* @param peerConfig configuration for the replication slave cluster
|
||||
*/
|
||||
default void registerPeer(String peerId, ReplicationPeerConfig peerConfig)
|
||||
throws ReplicationException {
|
||||
registerPeer(peerId, peerConfig, true);
|
||||
private final Configuration conf;
|
||||
|
||||
// Map of peer clusters keyed by their id
|
||||
private final ConcurrentMap<String, ReplicationPeerImpl> peerCache;
|
||||
private final ReplicationPeerStorage peerStorage;
|
||||
|
||||
protected ReplicationPeers(ZKWatcher zookeeper, Configuration conf) {
|
||||
this.conf = conf;
|
||||
this.peerCache = new ConcurrentHashMap<>();
|
||||
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zookeeper, conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a new remote slave cluster for replication.
|
||||
* @param peerId a short that identifies the cluster
|
||||
* @param peerConfig configuration for the replication slave cluster
|
||||
* @param enabled peer state, true if ENABLED and false if DISABLED
|
||||
*/
|
||||
void registerPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
|
||||
throws ReplicationException;
|
||||
public void init() throws ReplicationException {
|
||||
// Loading all existing peerIds into peer cache.
|
||||
for (String peerId : this.peerStorage.listPeerIds()) {
|
||||
addPeer(peerId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes a remote slave cluster and stops the replication to it.
|
||||
* @param peerId a short that identifies the cluster
|
||||
*/
|
||||
void unregisterPeer(String peerId) throws ReplicationException;
|
||||
@VisibleForTesting
|
||||
public ReplicationPeerStorage getPeerStorage() {
|
||||
return this.peerStorage;
|
||||
}
|
||||
|
||||
/**
|
||||
* Method called after a peer has been connected. It will create a ReplicationPeer to track the
|
||||
|
@ -78,111 +72,108 @@ public interface ReplicationPeers {
|
|||
* @return whether a ReplicationPeer was successfully created
|
||||
* @throws ReplicationException if connecting to the peer fails
|
||||
*/
|
||||
boolean peerConnected(String peerId) throws ReplicationException;
|
||||
public boolean addPeer(String peerId) throws ReplicationException {
|
||||
if (this.peerCache.containsKey(peerId)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
peerCache.put(peerId, createPeer(peerId));
|
||||
return true;
|
||||
}
|
||||
|
||||
public void removePeer(String peerId) {
|
||||
peerCache.remove(peerId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Method called after a peer has been disconnected. It will remove the ReplicationPeer that
|
||||
* tracked the disconnected cluster.
|
||||
* Get the peer state for the specified connected remote slave cluster. The value might be read
|
||||
* from cache, so it is recommended to use {@link #peerStorage } to read storage directly if
|
||||
* reading the state after enabling or disabling it.
|
||||
* @param peerId a short that identifies the cluster
|
||||
* @return true if replication is enabled, false otherwise.
|
||||
*/
|
||||
void peerDisconnected(String peerId);
|
||||
public boolean isPeerEnabled(String peerId) {
|
||||
ReplicationPeer replicationPeer = this.peerCache.get(peerId);
|
||||
if (replicationPeer == null) {
|
||||
throw new IllegalArgumentException("Peer with id= " + peerId + " is not cached");
|
||||
}
|
||||
return replicationPeer.getPeerState() == PeerState.ENABLED;
|
||||
}
|
||||
|
||||
/**
|
||||
* Restart the replication to the specified remote slave cluster.
|
||||
* @param peerId a short that identifies the cluster
|
||||
*/
|
||||
void enablePeer(String peerId) throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Stop the replication to the specified remote slave cluster.
|
||||
* @param peerId a short that identifies the cluster
|
||||
*/
|
||||
void disablePeer(String peerId) throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Get the table and column-family list string of the peer from the underlying storage.
|
||||
* @param peerId a short that identifies the cluster
|
||||
*/
|
||||
public Map<TableName, List<String>> getPeerTableCFsConfig(String peerId)
|
||||
throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Set the table and column-family list string of the peer to the underlying storage.
|
||||
* @param peerId a short that identifies the cluster
|
||||
* @param tableCFs the table and column-family list which will be replicated for this peer
|
||||
*/
|
||||
public void setPeerTableCFsConfig(String peerId,
|
||||
Map<TableName, ? extends Collection<String>> tableCFs)
|
||||
throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Returns the ReplicationPeerImpl for the specified connected peer. This ReplicationPeer will
|
||||
* continue to track changes to the Peer's state and config. This method returns null if no
|
||||
* peer has been connected with the given peerId.
|
||||
* Returns the ReplicationPeerImpl for the specified cached peer. This ReplicationPeer will
|
||||
* continue to track changes to the Peer's state and config. This method returns null if no peer
|
||||
* has been cached with the given peerId.
|
||||
* @param peerId id for the peer
|
||||
* @return ReplicationPeer object
|
||||
*/
|
||||
ReplicationPeerImpl getConnectedPeer(String peerId);
|
||||
public ReplicationPeerImpl getPeer(String peerId) {
|
||||
return peerCache.get(peerId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the set of peerIds of the clusters that have been connected and have an underlying
|
||||
* ReplicationPeer.
|
||||
* @return a Set of Strings for peerIds
|
||||
*/
|
||||
public Set<String> getConnectedPeerIds();
|
||||
public Set<String> getAllPeerIds() {
|
||||
return peerCache.keySet();
|
||||
}
|
||||
|
||||
public ReplicationPeerConfig getPeerConfig(String peerId) {
|
||||
ReplicationPeer replicationPeer = this.peerCache.get(peerId);
|
||||
if (replicationPeer == null) {
|
||||
throw new IllegalArgumentException("Peer with id= " + peerId + " is not cached");
|
||||
}
|
||||
return replicationPeer.getPeerConfig();
|
||||
}
|
||||
|
||||
public Configuration getPeerClusterConfiguration(String peerId) throws ReplicationException {
|
||||
ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
|
||||
|
||||
Configuration otherConf;
|
||||
try {
|
||||
otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey());
|
||||
} catch (IOException e) {
|
||||
throw new ReplicationException("Can't get peer configuration for peerId=" + peerId, e);
|
||||
}
|
||||
|
||||
if (!peerConfig.getConfiguration().isEmpty()) {
|
||||
CompoundConfiguration compound = new CompoundConfiguration();
|
||||
compound.add(otherConf);
|
||||
compound.addStringMap(peerConfig.getConfiguration());
|
||||
return compound;
|
||||
}
|
||||
|
||||
return otherConf;
|
||||
}
|
||||
|
||||
public PeerState refreshPeerState(String peerId) throws ReplicationException {
|
||||
ReplicationPeerImpl peer = peerCache.get(peerId);
|
||||
if (peer == null) {
|
||||
throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
|
||||
}
|
||||
peer.setPeerState(peerStorage.isPeerEnabled(peerId));
|
||||
return peer.getPeerState();
|
||||
}
|
||||
|
||||
public ReplicationPeerConfig refreshPeerConfig(String peerId) throws ReplicationException {
|
||||
ReplicationPeerImpl peer = peerCache.get(peerId);
|
||||
if (peer == null) {
|
||||
throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
|
||||
}
|
||||
peer.setPeerConfig(peerStorage.getPeerConfig(peerId));
|
||||
return peer.getPeerConfig();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the replication status for the specified connected remote slave cluster.
|
||||
* The value might be read from cache, so it is recommended to
|
||||
* use {@link #getStatusOfPeerFromBackingStore(String)}
|
||||
* if reading the state after enabling or disabling it.
|
||||
* @param peerId a short that identifies the cluster
|
||||
* @return true if replication is enabled, false otherwise.
|
||||
* Helper method to connect to a peer
|
||||
* @param peerId peer's identifier
|
||||
* @return object representing the peer
|
||||
*/
|
||||
boolean getStatusOfPeer(String peerId);
|
||||
|
||||
/**
|
||||
* Get the replication status for the specified remote slave cluster, which doesn't
|
||||
* have to be connected. The state is read directly from the backing store.
|
||||
* @param peerId a short that identifies the cluster
|
||||
* @return true if replication is enabled, false otherwise.
|
||||
* @throws ReplicationException thrown if there's an error contacting the store
|
||||
*/
|
||||
boolean getStatusOfPeerFromBackingStore(String peerId) throws ReplicationException;
|
||||
|
||||
/**
|
||||
* List the cluster replication configs of all remote slave clusters (whether they are
|
||||
* enabled/disabled or connected/disconnected).
|
||||
* @return A map of peer ids to peer cluster keys
|
||||
*/
|
||||
Map<String, ReplicationPeerConfig> getAllPeerConfigs();
|
||||
|
||||
/**
|
||||
* List the peer ids of all remote slave clusters (whether they are enabled/disabled or
|
||||
* connected/disconnected).
|
||||
* @return A list of peer ids
|
||||
*/
|
||||
List<String> getAllPeerIds();
|
||||
|
||||
/**
|
||||
* Returns the configured ReplicationPeerConfig for this peerId
|
||||
* @param peerId a short name that identifies the cluster
|
||||
* @return ReplicationPeerConfig for the peer
|
||||
*/
|
||||
ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Returns the configuration needed to talk to the remote slave cluster.
|
||||
* @param peerId a short that identifies the cluster
|
||||
* @return the configuration for the peer cluster, null if it was unable to get the configuration
|
||||
*/
|
||||
Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId) throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Update the peerConfig for the a given peer cluster
|
||||
* @param id a short that identifies the cluster
|
||||
* @param peerConfig new config for the peer cluster
|
||||
* @throws ReplicationException if updating the peer configuration fails
|
||||
*/
|
||||
void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) throws ReplicationException;
|
||||
private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException {
|
||||
ReplicationPeerConfig peerConf = peerStorage.getPeerConfig(peerId);
|
||||
boolean enabled = peerStorage.isPeerEnabled(peerId);
|
||||
return new ReplicationPeerImpl(getPeerClusterConfiguration(peerId), peerId, enabled, peerConf);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,543 +0,0 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.CompoundConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
|
||||
|
||||
/**
|
||||
* This class provides an implementation of the ReplicationPeers interface using ZooKeeper. The
|
||||
* peers znode contains a list of all peer replication clusters and the current replication state of
|
||||
* those clusters. It has one child peer znode for each peer cluster. The peer znode is named with
|
||||
* the cluster id provided by the user in the HBase shell. The value of the peer znode contains the
|
||||
* peers cluster key provided by the user in the HBase Shell. The cluster key contains a list of
|
||||
* zookeeper quorum peers, the client port for the zookeeper quorum, and the base znode for HBase.
|
||||
* For example:
|
||||
*
|
||||
* /hbase/replication/peers/1 [Value: zk1.host.com,zk2.host.com,zk3.host.com:2181:/hbase]
|
||||
* /hbase/replication/peers/2 [Value: zk5.host.com,zk6.host.com,zk7.host.com:2181:/hbase]
|
||||
*
|
||||
* Each of these peer znodes has a child znode that indicates whether or not replication is enabled
|
||||
* on that peer cluster. These peer-state znodes do not have child znodes and simply contain a
|
||||
* boolean value (i.e. ENABLED or DISABLED). This value is read/maintained by the
|
||||
* ReplicationPeer.PeerStateTracker class. For example:
|
||||
*
|
||||
* /hbase/replication/peers/1/peer-state [Value: ENABLED]
|
||||
*
|
||||
* Each of these peer znodes has a child znode that indicates which data will be replicated
|
||||
* to the peer cluster. These peer-tableCFs znodes do not have child znodes and only have a
|
||||
* table/cf list config. This value is read/maintained by the ReplicationPeer.TableCFsTracker
|
||||
* class. For example:
|
||||
*
|
||||
* /hbase/replication/peers/1/tableCFs [Value: "table1; table2:cf1,cf3; table3:cfx,cfy"]
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers {
|
||||
|
||||
// Map of peer clusters keyed by their id
|
||||
private ConcurrentMap<String, ReplicationPeerImpl> peerClusters;
|
||||
private final ReplicationQueueStorage queueStorage;
|
||||
private Abortable abortable;
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeersZKImpl.class);
|
||||
|
||||
public ReplicationPeersZKImpl(ZKWatcher zk, Configuration conf,
|
||||
ReplicationQueueStorage queueStorage, Abortable abortable) {
|
||||
super(zk, conf, abortable);
|
||||
this.abortable = abortable;
|
||||
this.peerClusters = new ConcurrentHashMap<>();
|
||||
this.queueStorage = queueStorage;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init() throws ReplicationException {
|
||||
try {
|
||||
if (ZKUtil.checkExists(this.zookeeper, this.peersZNode) < 0) {
|
||||
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Could not initialize replication peers", e);
|
||||
}
|
||||
addExistingPeers();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerPeer(String id, ReplicationPeerConfig peerConfig, boolean enabled)
|
||||
throws ReplicationException {
|
||||
try {
|
||||
if (peerExists(id)) {
|
||||
throw new IllegalArgumentException("Cannot add a peer with id=" + id
|
||||
+ " because that id already exists.");
|
||||
}
|
||||
|
||||
if(id.contains("-")){
|
||||
throw new IllegalArgumentException("Found invalid peer name:" + id);
|
||||
}
|
||||
|
||||
if (peerConfig.getClusterKey() != null) {
|
||||
try {
|
||||
ZKConfig.validateClusterKey(peerConfig.getClusterKey());
|
||||
} catch (IOException ioe) {
|
||||
throw new IllegalArgumentException(ioe.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
checkQueuesDeleted(id);
|
||||
|
||||
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
|
||||
|
||||
List<ZKUtilOp> listOfOps = new ArrayList<>(2);
|
||||
ZKUtilOp op1 =
|
||||
ZKUtilOp.createAndFailSilent(getPeerNode(id),
|
||||
ReplicationPeerConfigUtil.toByteArray(peerConfig));
|
||||
ZKUtilOp op2 =
|
||||
ZKUtilOp.createAndFailSilent(getPeerStateNode(id), enabled ? ENABLED_ZNODE_BYTES
|
||||
: DISABLED_ZNODE_BYTES);
|
||||
listOfOps.add(op1);
|
||||
listOfOps.add(op2);
|
||||
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Could not add peer with id=" + id + ", peerConfif=>"
|
||||
+ peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterPeer(String id) throws ReplicationException {
|
||||
try {
|
||||
if (!peerExists(id)) {
|
||||
throw new IllegalArgumentException("Cannot remove peer with id=" + id
|
||||
+ " because that id does not exist.");
|
||||
}
|
||||
ZKUtil.deleteNodeRecursively(this.zookeeper, ZNodePaths.joinZNode(this.peersZNode, id));
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Could not remove peer with id=" + id, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void enablePeer(String id) throws ReplicationException {
|
||||
changePeerState(id, ReplicationProtos.ReplicationState.State.ENABLED);
|
||||
LOG.info("peer " + id + " is enabled");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void disablePeer(String id) throws ReplicationException {
|
||||
changePeerState(id, ReplicationProtos.ReplicationState.State.DISABLED);
|
||||
LOG.info("peer " + id + " is disabled");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<TableName, List<String>> getPeerTableCFsConfig(String id) throws ReplicationException {
|
||||
try {
|
||||
if (!peerExists(id)) {
|
||||
throw new IllegalArgumentException("peer " + id + " doesn't exist");
|
||||
}
|
||||
try {
|
||||
ReplicationPeerConfig rpc = getReplicationPeerConfig(id);
|
||||
if (rpc == null) {
|
||||
throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id);
|
||||
}
|
||||
return rpc.getTableCFsMap();
|
||||
} catch (Exception e) {
|
||||
throw new ReplicationException(e);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setPeerTableCFsConfig(String id,
|
||||
Map<TableName, ? extends Collection<String>> tableCFs)
|
||||
throws ReplicationException {
|
||||
try {
|
||||
if (!peerExists(id)) {
|
||||
throw new IllegalArgumentException("Cannot set peer tableCFs because id=" + id
|
||||
+ " does not exist.");
|
||||
}
|
||||
ReplicationPeerConfig rpc = getReplicationPeerConfig(id);
|
||||
if (rpc == null) {
|
||||
throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id);
|
||||
}
|
||||
rpc.setTableCFsMap(tableCFs);
|
||||
ZKUtil.setData(this.zookeeper, getPeerNode(id),
|
||||
ReplicationPeerConfigUtil.toByteArray(rpc));
|
||||
LOG.info("Peer tableCFs with id= " + id + " is now " +
|
||||
ReplicationPeerConfigUtil.convertToString(tableCFs));
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean getStatusOfPeer(String id) {
|
||||
ReplicationPeer replicationPeer = this.peerClusters.get(id);
|
||||
if (replicationPeer == null) {
|
||||
throw new IllegalArgumentException("Peer with id= " + id + " is not cached");
|
||||
}
|
||||
return replicationPeer.getPeerState() == PeerState.ENABLED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean getStatusOfPeerFromBackingStore(String id) throws ReplicationException {
|
||||
try {
|
||||
if (!peerExists(id)) {
|
||||
throw new IllegalArgumentException("peer " + id + " doesn't exist");
|
||||
}
|
||||
String peerStateZNode = getPeerStateNode(id);
|
||||
try {
|
||||
return ReplicationPeerImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException(e);
|
||||
} catch (DeserializationException e) {
|
||||
throw new ReplicationException(e);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Unable to get status of the peer with id=" + id +
|
||||
" from backing store", e);
|
||||
} catch (InterruptedException e) {
|
||||
throw new ReplicationException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, ReplicationPeerConfig> getAllPeerConfigs() {
|
||||
Map<String, ReplicationPeerConfig> peers = new TreeMap<>();
|
||||
List<String> ids = null;
|
||||
try {
|
||||
ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
|
||||
for (String id : ids) {
|
||||
ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
|
||||
if (peerConfig == null) {
|
||||
LOG.warn("Failed to get replication peer configuration of clusterid=" + id
|
||||
+ " znode content, continuing.");
|
||||
continue;
|
||||
}
|
||||
peers.put(id, peerConfig);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
this.abortable.abort("Cannot get the list of peers ", e);
|
||||
} catch (ReplicationException e) {
|
||||
this.abortable.abort("Cannot get the list of peers ", e);
|
||||
}
|
||||
return peers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplicationPeerImpl getConnectedPeer(String peerId) {
|
||||
return peerClusters.get(peerId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getConnectedPeerIds() {
|
||||
return peerClusters.keySet(); // this is not thread-safe
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a ReplicationPeerConfig from the znode or null for the given peerId.
|
||||
*/
|
||||
@Override
|
||||
public ReplicationPeerConfig getReplicationPeerConfig(String peerId)
|
||||
throws ReplicationException {
|
||||
String znode = getPeerNode(peerId);
|
||||
byte[] data = null;
|
||||
try {
|
||||
data = ZKUtil.getData(this.zookeeper, znode);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Could not get configuration for peer because the thread " +
|
||||
"was interrupted. peerId=" + peerId);
|
||||
Thread.currentThread().interrupt();
|
||||
return null;
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Error getting configuration for peer with id="
|
||||
+ peerId, e);
|
||||
}
|
||||
if (data == null) {
|
||||
LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId);
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
return ReplicationPeerConfigUtil.parsePeerFrom(data);
|
||||
} catch (DeserializationException e) {
|
||||
LOG.warn("Failed to parse cluster key from peerId=" + peerId
|
||||
+ ", specifically the content from the following znode: " + znode);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId)
|
||||
throws ReplicationException {
|
||||
ReplicationPeerConfig peerConfig = getReplicationPeerConfig(peerId);
|
||||
|
||||
if (peerConfig == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Configuration otherConf;
|
||||
try {
|
||||
otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey());
|
||||
} catch (IOException e) {
|
||||
LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e);
|
||||
return null;
|
||||
}
|
||||
|
||||
if (!peerConfig.getConfiguration().isEmpty()) {
|
||||
CompoundConfiguration compound = new CompoundConfiguration();
|
||||
compound.add(otherConf);
|
||||
compound.addStringMap(peerConfig.getConfiguration());
|
||||
return new Pair<>(peerConfig, compound);
|
||||
}
|
||||
|
||||
return new Pair<>(peerConfig, otherConf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updatePeerConfig(String id, ReplicationPeerConfig newConfig)
|
||||
throws ReplicationException {
|
||||
ReplicationPeer peer = getConnectedPeer(id);
|
||||
if (peer == null){
|
||||
throw new ReplicationException("Could not find peer Id " + id + " in connected peers");
|
||||
}
|
||||
ReplicationPeerConfig existingConfig = peer.getPeerConfig();
|
||||
if (newConfig.getClusterKey() != null && !newConfig.getClusterKey().isEmpty() &&
|
||||
!newConfig.getClusterKey().equals(existingConfig.getClusterKey())){
|
||||
throw new ReplicationException("Changing the cluster key on an existing peer is not allowed."
|
||||
+ " Existing key '" + existingConfig.getClusterKey() + "' does not match new key '"
|
||||
+ newConfig.getClusterKey() +
|
||||
"'");
|
||||
}
|
||||
String existingEndpointImpl = existingConfig.getReplicationEndpointImpl();
|
||||
if (newConfig.getReplicationEndpointImpl() != null &&
|
||||
!newConfig.getReplicationEndpointImpl().isEmpty() &&
|
||||
!newConfig.getReplicationEndpointImpl().equals(existingEndpointImpl)){
|
||||
throw new ReplicationException("Changing the replication endpoint implementation class " +
|
||||
"on an existing peer is not allowed. Existing class '"
|
||||
+ existingConfig.getReplicationEndpointImpl()
|
||||
+ "' does not match new class '" + newConfig.getReplicationEndpointImpl() + "'");
|
||||
}
|
||||
// Update existingConfig's peer config and peer data with the new values, but don't touch config
|
||||
// or data that weren't explicitly changed
|
||||
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(existingConfig);
|
||||
builder.putAllConfiguration(newConfig.getConfiguration())
|
||||
.putAllPeerData(newConfig.getPeerData())
|
||||
.setReplicateAllUserTables(newConfig.replicateAllUserTables())
|
||||
.setNamespaces(newConfig.getNamespaces()).setTableCFsMap(newConfig.getTableCFsMap())
|
||||
.setExcludeNamespaces(newConfig.getExcludeNamespaces())
|
||||
.setExcludeTableCFsMap(newConfig.getExcludeTableCFsMap())
|
||||
.setBandwidth(newConfig.getBandwidth());
|
||||
|
||||
try {
|
||||
ZKUtil.setData(this.zookeeper, getPeerNode(id),
|
||||
ReplicationPeerConfigUtil.toByteArray(builder.build()));
|
||||
}
|
||||
catch(KeeperException ke){
|
||||
throw new ReplicationException("There was a problem trying to save changes to the " +
|
||||
"replication peer " + id, ke);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* List all registered peer clusters and set a watch on their znodes.
|
||||
*/
|
||||
@Override
|
||||
public List<String> getAllPeerIds() {
|
||||
List<String> ids = null;
|
||||
try {
|
||||
ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
|
||||
} catch (KeeperException e) {
|
||||
this.abortable.abort("Cannot get the list of peers ", e);
|
||||
}
|
||||
return ids;
|
||||
}
|
||||
|
||||
/**
|
||||
* A private method used during initialization. This method attempts to add all registered
|
||||
* peer clusters. This method does not set a watch on the peer cluster znodes.
|
||||
*/
|
||||
private void addExistingPeers() throws ReplicationException {
|
||||
List<String> znodes = null;
|
||||
try {
|
||||
znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Error getting the list of peer clusters.", e);
|
||||
}
|
||||
if (znodes != null) {
|
||||
for (String z : znodes) {
|
||||
createAndAddPeer(z);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean peerConnected(String peerId) throws ReplicationException {
|
||||
return createAndAddPeer(peerId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void peerDisconnected(String peerId) {
|
||||
ReplicationPeer rp = this.peerClusters.get(peerId);
|
||||
if (rp != null) {
|
||||
peerClusters.remove(peerId, rp);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempt to connect to a new remote slave cluster.
|
||||
* @param peerId a short that identifies the cluster
|
||||
* @return true if a new connection was made, false if no new connection was made.
|
||||
*/
|
||||
public boolean createAndAddPeer(String peerId) throws ReplicationException {
|
||||
if (peerClusters == null) {
|
||||
return false;
|
||||
}
|
||||
if (this.peerClusters.containsKey(peerId)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
ReplicationPeerImpl peer = null;
|
||||
try {
|
||||
peer = createPeer(peerId);
|
||||
} catch (Exception e) {
|
||||
throw new ReplicationException("Error adding peer with id=" + peerId, e);
|
||||
}
|
||||
if (peer == null) {
|
||||
return false;
|
||||
}
|
||||
ReplicationPeerImpl previous = peerClusters.putIfAbsent(peerId, peer);
|
||||
if (previous == null) {
|
||||
LOG.info("Added new peer cluster=" + peer.getPeerConfig().getClusterKey());
|
||||
} else {
|
||||
LOG.info("Peer already present, " + previous.getPeerConfig().getClusterKey() +
|
||||
", new cluster=" + peer.getPeerConfig().getClusterKey());
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the state znode of a peer cluster.
|
||||
* @param id the id of the peer
|
||||
* @param state the state to update to
|
||||
*/
|
||||
private void changePeerState(String id, ReplicationProtos.ReplicationState.State state)
|
||||
throws ReplicationException {
|
||||
try {
|
||||
if (!peerExists(id)) {
|
||||
throw new IllegalArgumentException("Cannot enable/disable peer because id=" + id
|
||||
+ " does not exist.");
|
||||
}
|
||||
String peerStateZNode = getPeerStateNode(id);
|
||||
byte[] stateBytes =
|
||||
(state == ReplicationProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
|
||||
: DISABLED_ZNODE_BYTES;
|
||||
if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
|
||||
ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
|
||||
} else {
|
||||
ZKUtil.createAndWatch(this.zookeeper, peerStateZNode, stateBytes);
|
||||
}
|
||||
LOG.info("Peer with id= " + id + " is now " + state.name());
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Unable to change state of the peer with id=" + id, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to connect to a peer
|
||||
* @param peerId peer's identifier
|
||||
* @return object representing the peer
|
||||
* @throws ReplicationException if creating the peer fails
|
||||
*/
|
||||
private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException {
|
||||
Pair<ReplicationPeerConfig, Configuration> pair = getPeerConf(peerId);
|
||||
if (pair == null) {
|
||||
return null;
|
||||
}
|
||||
Configuration peerConf = pair.getSecond();
|
||||
|
||||
ReplicationPeerImpl peer =
|
||||
new ReplicationPeerImpl(zookeeper, peerConf, peerId, pair.getFirst());
|
||||
|
||||
// Load peer state and peer config by reading zookeeper directly.
|
||||
peer.refreshPeerState();
|
||||
peer.refreshPeerConfig();
|
||||
|
||||
return peer;
|
||||
}
|
||||
|
||||
private void checkQueuesDeleted(String peerId) throws ReplicationException {
|
||||
if (queueStorage == null) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
List<ServerName> replicators = queueStorage.getListOfReplicators();
|
||||
if (replicators == null || replicators.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
for (ServerName replicator : replicators) {
|
||||
List<String> queueIds = queueStorage.getAllQueues(replicator);
|
||||
for (String queueId : queueIds) {
|
||||
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
|
||||
if (queueInfo.getPeerId().equals(peerId)) {
|
||||
throw new IllegalArgumentException("undeleted queue for peerId: " + peerId
|
||||
+ ", replicator: " + replicator + ", queueId: " + queueId);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Check for hfile-refs queue
|
||||
if (-1 != ZKUtil.checkExists(zookeeper, hfileRefsZNode)
|
||||
&& queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) {
|
||||
throw new IllegalArgumentException("Undeleted queue for peerId: " + peerId
|
||||
+ ", found in hfile-refs node path " + hfileRefsZNode);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Could not check queues deleted with id=" + peerId, e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.replication;
|
|||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
||||
|
@ -144,7 +143,7 @@ class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements Repli
|
|||
}
|
||||
|
||||
@Override
|
||||
public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) throws ReplicationException {
|
||||
public ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException {
|
||||
byte[] data;
|
||||
try {
|
||||
data = ZKUtil.getData(zookeeper, getPeerNode(peerId));
|
||||
|
@ -152,13 +151,14 @@ class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements Repli
|
|||
throw new ReplicationException("Error getting configuration for peer with id=" + peerId, e);
|
||||
}
|
||||
if (data == null || data.length == 0) {
|
||||
return Optional.empty();
|
||||
throw new ReplicationException(
|
||||
"Replication peer config data shouldn't be empty, peerId=" + peerId);
|
||||
}
|
||||
try {
|
||||
return Optional.of(ReplicationPeerConfigUtil.parsePeerFrom(data));
|
||||
return ReplicationPeerConfigUtil.parsePeerFrom(data);
|
||||
} catch (DeserializationException e) {
|
||||
LOG.warn("Failed to parse replication peer config for peer with id=" + peerId, e);
|
||||
return Optional.empty();
|
||||
throw new ReplicationException(
|
||||
"Failed to parse replication peer config for peer with id=" + peerId, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,8 +48,7 @@ class ZKReplicationStorageBase {
|
|||
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
|
||||
|
||||
this.replicationZNode =
|
||||
ZNodePaths.joinZNode(this.zookeeper.znodePaths.baseZNode, replicationZNodeName);
|
||||
|
||||
ZNodePaths.joinZNode(this.zookeeper.znodePaths.baseZNode, replicationZNodeName);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -55,7 +55,6 @@ public abstract class TestReplicationStateBasic {
|
|||
protected static String KEY_TWO;
|
||||
|
||||
// For testing when we try to replicate to ourself
|
||||
protected String OUR_ID = "3";
|
||||
protected String OUR_KEY;
|
||||
|
||||
protected static int zkTimeoutCount;
|
||||
|
@ -151,37 +150,6 @@ public abstract class TestReplicationStateBasic {
|
|||
assertEquals(0, rqs.getListOfReplicators().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidClusterKeys() throws ReplicationException, KeeperException {
|
||||
rp.init();
|
||||
|
||||
try {
|
||||
rp.registerPeer(ID_ONE,
|
||||
new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:hbase"));
|
||||
fail("Should throw an IllegalArgumentException because " +
|
||||
"zookeeper.znode.parent is missing leading '/'.");
|
||||
} catch (IllegalArgumentException e) {
|
||||
// Expected.
|
||||
}
|
||||
|
||||
try {
|
||||
rp.registerPeer(ID_ONE,
|
||||
new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:/"));
|
||||
fail("Should throw an IllegalArgumentException because zookeeper.znode.parent is missing.");
|
||||
} catch (IllegalArgumentException e) {
|
||||
// Expected.
|
||||
}
|
||||
|
||||
try {
|
||||
rp.registerPeer(ID_ONE,
|
||||
new ReplicationPeerConfig().setClusterKey("hostname1.example.org::/hbase"));
|
||||
fail("Should throw an IllegalArgumentException because " +
|
||||
"hbase.zookeeper.property.clientPort is missing.");
|
||||
} catch (IllegalArgumentException e) {
|
||||
// Expected.
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException {
|
||||
rp.init();
|
||||
|
@ -192,7 +160,8 @@ public abstract class TestReplicationStateBasic {
|
|||
files1.add(new Pair<>(null, new Path("file_3")));
|
||||
assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
|
||||
assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
|
||||
rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
|
||||
rp.getPeerStorage().addPeer(ID_ONE,
|
||||
ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true);
|
||||
rqs.addPeerToHFileRefs(ID_ONE);
|
||||
rqs.addHFileRefs(ID_ONE, files1);
|
||||
assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
|
||||
|
@ -208,15 +177,17 @@ public abstract class TestReplicationStateBasic {
|
|||
hfiles2.add(removedString);
|
||||
rqs.removeHFileRefs(ID_ONE, hfiles2);
|
||||
assertEquals(0, rqs.getReplicableHFiles(ID_ONE).size());
|
||||
rp.unregisterPeer(ID_ONE);
|
||||
rp.getPeerStorage().removePeer(ID_ONE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException {
|
||||
rp.init();
|
||||
rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
|
||||
rp.getPeerStorage().addPeer(ID_ONE,
|
||||
ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true);
|
||||
rqs.addPeerToHFileRefs(ID_ONE);
|
||||
rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO));
|
||||
rp.getPeerStorage().addPeer(ID_TWO,
|
||||
ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true);
|
||||
rqs.addPeerToHFileRefs(ID_TWO);
|
||||
|
||||
List<Pair<Path, Path>> files1 = new ArrayList<>(3);
|
||||
|
@ -229,13 +200,13 @@ public abstract class TestReplicationStateBasic {
|
|||
assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size());
|
||||
assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size());
|
||||
|
||||
rp.unregisterPeer(ID_ONE);
|
||||
rp.getPeerStorage().removePeer(ID_ONE);
|
||||
rqs.removePeerFromHFileRefs(ID_ONE);
|
||||
assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
|
||||
assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
|
||||
assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size());
|
||||
|
||||
rp.unregisterPeer(ID_TWO);
|
||||
rp.getPeerStorage().removePeer(ID_TWO);
|
||||
rqs.removePeerFromHFileRefs(ID_TWO);
|
||||
assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
|
||||
assertTrue(rqs.getReplicableHFiles(ID_TWO).isEmpty());
|
||||
|
@ -245,74 +216,77 @@ public abstract class TestReplicationStateBasic {
|
|||
public void testReplicationPeers() throws Exception {
|
||||
rp.init();
|
||||
|
||||
// Test methods with non-existent peer ids
|
||||
try {
|
||||
rp.unregisterPeer("bogus");
|
||||
rp.getPeerStorage().setPeerState("bogus", true);
|
||||
fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
|
||||
} catch (ReplicationException e) {
|
||||
}
|
||||
try {
|
||||
rp.getPeerStorage().setPeerState("bogus", false);
|
||||
fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
|
||||
} catch (ReplicationException e) {
|
||||
}
|
||||
try {
|
||||
rp.isPeerEnabled("bogus");
|
||||
fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
|
||||
} catch (IllegalArgumentException e) {
|
||||
}
|
||||
try {
|
||||
rp.enablePeer("bogus");
|
||||
fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
|
||||
} catch (IllegalArgumentException e) {
|
||||
}
|
||||
try {
|
||||
rp.disablePeer("bogus");
|
||||
fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
|
||||
} catch (IllegalArgumentException e) {
|
||||
}
|
||||
try {
|
||||
rp.getStatusOfPeer("bogus");
|
||||
fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
|
||||
} catch (IllegalArgumentException e) {
|
||||
}
|
||||
assertFalse(rp.peerConnected("bogus"));
|
||||
rp.peerDisconnected("bogus");
|
||||
|
||||
assertNull(rp.getPeerConf("bogus"));
|
||||
try {
|
||||
assertFalse(rp.addPeer("bogus"));
|
||||
fail("Should have thrown an ReplicationException when passed a bogus peerId");
|
||||
} catch (ReplicationException e) {
|
||||
}
|
||||
|
||||
try {
|
||||
assertNull(rp.getPeerClusterConfiguration("bogus"));
|
||||
fail("Should have thrown an ReplicationException when passed a bogus peerId");
|
||||
} catch (ReplicationException e) {
|
||||
}
|
||||
|
||||
assertNumberOfPeers(0);
|
||||
|
||||
// Add some peers
|
||||
rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
|
||||
rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true);
|
||||
assertNumberOfPeers(1);
|
||||
rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO));
|
||||
rp.getPeerStorage().addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), true);
|
||||
assertNumberOfPeers(2);
|
||||
|
||||
// Test methods with a peer that is added but not connected
|
||||
try {
|
||||
rp.getStatusOfPeer(ID_ONE);
|
||||
rp.isPeerEnabled(ID_ONE);
|
||||
fail("There are no connected peers, should have thrown an IllegalArgumentException");
|
||||
} catch (IllegalArgumentException e) {
|
||||
}
|
||||
assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond()));
|
||||
rp.unregisterPeer(ID_ONE);
|
||||
rp.peerDisconnected(ID_ONE);
|
||||
assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(rp.getPeerClusterConfiguration(ID_ONE)));
|
||||
rp.getPeerStorage().removePeer(ID_ONE);
|
||||
rp.removePeer(ID_ONE);
|
||||
assertNumberOfPeers(1);
|
||||
|
||||
// Add one peer
|
||||
rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
|
||||
rp.peerConnected(ID_ONE);
|
||||
rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true);
|
||||
rp.addPeer(ID_ONE);
|
||||
assertNumberOfPeers(2);
|
||||
assertTrue(rp.getStatusOfPeer(ID_ONE));
|
||||
rp.disablePeer(ID_ONE);
|
||||
assertTrue(rp.isPeerEnabled(ID_ONE));
|
||||
rp.getPeerStorage().setPeerState(ID_ONE, false);
|
||||
// now we do not rely on zk watcher to trigger the state change so we need to trigger it
|
||||
// manually...
|
||||
ReplicationPeerImpl peer = rp.getConnectedPeer(ID_ONE);
|
||||
peer.refreshPeerState();
|
||||
ReplicationPeerImpl peer = rp.getPeer(ID_ONE);
|
||||
rp.refreshPeerState(peer.getId());
|
||||
assertEquals(PeerState.DISABLED, peer.getPeerState());
|
||||
assertConnectedPeerStatus(false, ID_ONE);
|
||||
rp.enablePeer(ID_ONE);
|
||||
rp.getPeerStorage().setPeerState(ID_ONE, true);
|
||||
// now we do not rely on zk watcher to trigger the state change so we need to trigger it
|
||||
// manually...
|
||||
peer.refreshPeerState();
|
||||
rp.refreshPeerState(peer.getId());
|
||||
assertEquals(PeerState.ENABLED, peer.getPeerState());
|
||||
assertConnectedPeerStatus(true, ID_ONE);
|
||||
|
||||
// Disconnect peer
|
||||
rp.peerDisconnected(ID_ONE);
|
||||
rp.removePeer(ID_ONE);
|
||||
assertNumberOfPeers(2);
|
||||
try {
|
||||
rp.getStatusOfPeer(ID_ONE);
|
||||
rp.isPeerEnabled(ID_ONE);
|
||||
fail("There are no connected peers, should have thrown an IllegalArgumentException");
|
||||
} catch (IllegalArgumentException e) {
|
||||
}
|
||||
|
@ -320,16 +294,16 @@ public abstract class TestReplicationStateBasic {
|
|||
|
||||
protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception {
|
||||
// we can first check if the value was changed in the store, if it wasn't then fail right away
|
||||
if (status != rp.getStatusOfPeerFromBackingStore(peerId)) {
|
||||
if (status != rp.getPeerStorage().isPeerEnabled(peerId)) {
|
||||
fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK");
|
||||
}
|
||||
while (true) {
|
||||
if (status == rp.getStatusOfPeer(peerId)) {
|
||||
if (status == rp.isPeerEnabled(peerId)) {
|
||||
return;
|
||||
}
|
||||
if (zkTimeoutCount < ZK_MAX_COUNT) {
|
||||
LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status +
|
||||
", sleeping and trying again.");
|
||||
LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status
|
||||
+ ", sleeping and trying again.");
|
||||
Thread.sleep(ZK_SLEEP_INTERVAL);
|
||||
} else {
|
||||
fail("Timed out waiting for ConnectedPeerStatus to be " + status);
|
||||
|
@ -337,10 +311,8 @@ public abstract class TestReplicationStateBasic {
|
|||
}
|
||||
}
|
||||
|
||||
protected void assertNumberOfPeers(int total) {
|
||||
assertEquals(total, rp.getAllPeerConfigs().size());
|
||||
assertEquals(total, rp.getAllPeerIds().size());
|
||||
assertEquals(total, rp.getAllPeerIds().size());
|
||||
protected void assertNumberOfPeers(int total) throws ReplicationException {
|
||||
assertEquals(total, rp.getPeerStorage().listPeerIds().size());
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -359,8 +331,9 @@ public abstract class TestReplicationStateBasic {
|
|||
rqs.addWAL(server3, "qId" + i, "filename" + j);
|
||||
}
|
||||
// Add peers for the corresponding queues so they are not orphans
|
||||
rp.registerPeer("qId" + i,
|
||||
new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus" + i));
|
||||
rp.getPeerStorage().addPeer("qId" + i,
|
||||
ReplicationPeerConfig.newBuilder().setClusterKey("localhost:2818:/bogus" + i).build(),
|
||||
true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -79,7 +79,7 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
|
|||
public void setUp() {
|
||||
zkTimeoutCount = 0;
|
||||
rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
|
||||
rp = ReplicationFactory.getReplicationPeers(zkw, conf, new WarnOnlyAbortable());
|
||||
rp = ReplicationFactory.getReplicationPeers(zkw, conf);
|
||||
OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);
|
||||
}
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
|
@ -143,14 +144,14 @@ public class TestZKReplicationPeerStorage {
|
|||
assertEquals(peerCount, peerIds.size());
|
||||
for (String peerId : peerIds) {
|
||||
int seed = Integer.parseInt(peerId);
|
||||
assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId).get());
|
||||
assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId));
|
||||
}
|
||||
for (int i = 0; i < peerCount; i++) {
|
||||
STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1));
|
||||
}
|
||||
for (String peerId : peerIds) {
|
||||
int seed = Integer.parseInt(peerId);
|
||||
assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId).get());
|
||||
assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId));
|
||||
}
|
||||
for (int i = 0; i < peerCount; i++) {
|
||||
assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i)));
|
||||
|
@ -166,6 +167,11 @@ public class TestZKReplicationPeerStorage {
|
|||
peerIds = STORAGE.listPeerIds();
|
||||
assertEquals(peerCount - 1, peerIds.size());
|
||||
assertFalse(peerIds.contains(toRemove));
|
||||
assertFalse(STORAGE.getPeerConfig(toRemove).isPresent());
|
||||
|
||||
try {
|
||||
STORAGE.getPeerConfig(toRemove);
|
||||
fail("Should throw a ReplicationException when get peer config of a peerId");
|
||||
} catch (ReplicationException e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,8 +30,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
|
||||
|
@ -51,20 +50,14 @@ import org.slf4j.LoggerFactory;
|
|||
public class ReplicationZKNodeCleaner {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ReplicationZKNodeCleaner.class);
|
||||
private final ReplicationQueueStorage queueStorage;
|
||||
private final ReplicationPeers replicationPeers;
|
||||
private final ReplicationPeerStorage peerStorage;
|
||||
private final ReplicationQueueDeletor queueDeletor;
|
||||
|
||||
public ReplicationZKNodeCleaner(Configuration conf, ZKWatcher zkw, Abortable abortable)
|
||||
throws IOException {
|
||||
try {
|
||||
this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
|
||||
this.replicationPeers =
|
||||
ReplicationFactory.getReplicationPeers(zkw, conf, this.queueStorage, abortable);
|
||||
this.replicationPeers.init();
|
||||
this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, abortable);
|
||||
} catch (ReplicationException e) {
|
||||
throw new IOException("failed to construct ReplicationZKNodeCleaner", e);
|
||||
}
|
||||
this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
|
||||
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zkw, conf);
|
||||
this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, abortable);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -73,8 +66,8 @@ public class ReplicationZKNodeCleaner {
|
|||
*/
|
||||
public Map<ServerName, List<String>> getUnDeletedQueues() throws IOException {
|
||||
Map<ServerName, List<String>> undeletedQueues = new HashMap<>();
|
||||
Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds());
|
||||
try {
|
||||
Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds());
|
||||
List<ServerName> replicators = this.queueStorage.getListOfReplicators();
|
||||
if (replicators == null || replicators.isEmpty()) {
|
||||
return undeletedQueues;
|
||||
|
@ -84,8 +77,7 @@ public class ReplicationZKNodeCleaner {
|
|||
for (String queueId : queueIds) {
|
||||
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
|
||||
if (!peerIds.contains(queueInfo.getPeerId())) {
|
||||
undeletedQueues.computeIfAbsent(replicator, (key) -> new ArrayList<>()).add(
|
||||
queueId);
|
||||
undeletedQueues.computeIfAbsent(replicator, (key) -> new ArrayList<>()).add(queueId);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Undeleted replication queue for removed peer found: "
|
||||
+ String.format("[removedPeerId=%s, replicator=%s, queueId=%s]",
|
||||
|
@ -106,9 +98,9 @@ public class ReplicationZKNodeCleaner {
|
|||
*/
|
||||
public Set<String> getUnDeletedHFileRefsQueues() throws IOException {
|
||||
Set<String> undeletedHFileRefsQueue = new HashSet<>();
|
||||
Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds());
|
||||
String hfileRefsZNode = queueDeletor.getHfileRefsZNode();
|
||||
try {
|
||||
Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds());
|
||||
List<String> listOfPeers = this.queueStorage.getAllPeersFromHFileRefsQueue();
|
||||
Set<String> peers = new HashSet<>(listOfPeers);
|
||||
peers.removeAll(peerIds);
|
||||
|
@ -116,15 +108,15 @@ public class ReplicationZKNodeCleaner {
|
|||
undeletedHFileRefsQueue.addAll(peers);
|
||||
}
|
||||
} catch (ReplicationException e) {
|
||||
throw new IOException(
|
||||
"Failed to get list of all peers from hfile-refs znode " + hfileRefsZNode, e);
|
||||
throw new IOException("Failed to get list of all peers from hfile-refs znode "
|
||||
+ hfileRefsZNode, e);
|
||||
}
|
||||
return undeletedHFileRefsQueue;
|
||||
}
|
||||
|
||||
private class ReplicationQueueDeletor extends ReplicationStateZKBase {
|
||||
|
||||
public ReplicationQueueDeletor(ZKWatcher zk, Configuration conf, Abortable abortable) {
|
||||
ReplicationQueueDeletor(ZKWatcher zk, Configuration conf, Abortable abortable) {
|
||||
super(zk, conf, abortable);
|
||||
}
|
||||
|
||||
|
@ -132,19 +124,20 @@ public class ReplicationZKNodeCleaner {
|
|||
* @param replicator The regionserver which has undeleted queue
|
||||
* @param queueId The undeleted queue id
|
||||
*/
|
||||
public void removeQueue(final ServerName replicator, final String queueId) throws IOException {
|
||||
String queueZnodePath = ZNodePaths
|
||||
.joinZNode(ZNodePaths.joinZNode(this.queuesZNode, replicator.getServerName()), queueId);
|
||||
void removeQueue(final ServerName replicator, final String queueId) throws IOException {
|
||||
String queueZnodePath =
|
||||
ZNodePaths.joinZNode(ZNodePaths.joinZNode(this.queuesZNode, replicator.getServerName()),
|
||||
queueId);
|
||||
try {
|
||||
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
|
||||
if (!replicationPeers.getAllPeerIds().contains(queueInfo.getPeerId())) {
|
||||
if (!peerStorage.listPeerIds().contains(queueInfo.getPeerId())) {
|
||||
ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath);
|
||||
LOG.info("Successfully removed replication queue, replicator: " + replicator +
|
||||
", queueId: " + queueId);
|
||||
LOG.info("Successfully removed replication queue, replicator: " + replicator
|
||||
+ ", queueId: " + queueId);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException(
|
||||
"Failed to delete queue, replicator: " + replicator + ", queueId: " + queueId);
|
||||
} catch (ReplicationException | KeeperException e) {
|
||||
throw new IOException("Failed to delete queue, replicator: " + replicator + ", queueId: "
|
||||
+ queueId);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -152,17 +145,17 @@ public class ReplicationZKNodeCleaner {
|
|||
* @param hfileRefsQueueId The undeleted hfile-refs queue id
|
||||
* @throws IOException
|
||||
*/
|
||||
public void removeHFileRefsQueue(final String hfileRefsQueueId) throws IOException {
|
||||
void removeHFileRefsQueue(final String hfileRefsQueueId) throws IOException {
|
||||
String node = ZNodePaths.joinZNode(this.hfileRefsZNode, hfileRefsQueueId);
|
||||
try {
|
||||
if (!replicationPeers.getAllPeerIds().contains(hfileRefsQueueId)) {
|
||||
if (!peerStorage.listPeerIds().contains(hfileRefsQueueId)) {
|
||||
ZKUtil.deleteNodeRecursively(this.zookeeper, node);
|
||||
LOG.info("Successfully removed hfile-refs queue " + hfileRefsQueueId + " from path "
|
||||
+ hfileRefsZNode);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
} catch (ReplicationException | KeeperException e) {
|
||||
throw new IOException("Failed to delete hfile-refs queue " + hfileRefsQueueId
|
||||
+ " from path " + hfileRefsZNode);
|
||||
+ " from path " + hfileRefsZNode, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -314,12 +314,12 @@ public class ReplicationPeerManager {
|
|||
public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf)
|
||||
throws ReplicationException {
|
||||
ReplicationPeerStorage peerStorage =
|
||||
ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
|
||||
ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
|
||||
ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>();
|
||||
for (String peerId : peerStorage.listPeerIds()) {
|
||||
Optional<ReplicationPeerConfig> peerConfig = peerStorage.getPeerConfig(peerId);
|
||||
ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
|
||||
boolean enabled = peerStorage.isPeerEnabled(peerId);
|
||||
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig.get()));
|
||||
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig));
|
||||
}
|
||||
return new ReplicationPeerManager(peerStorage,
|
||||
ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers);
|
||||
|
|
|
@ -310,7 +310,7 @@ public class DumpReplicationQueues extends Configured implements Tool {
|
|||
|
||||
queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
|
||||
replicationPeers =
|
||||
ReplicationFactory.getReplicationPeers(zkw, getConf(), queueStorage, connection);
|
||||
ReplicationFactory.getReplicationPeers(zkw, getConf());
|
||||
replicationTracker = ReplicationFactory.getReplicationTracker(zkw, replicationPeers, getConf(),
|
||||
new WarnOnlyAbortable(), new WarnOnlyStoppable());
|
||||
Set<String> liveRegionServers = new HashSet<>(replicationTracker.getListOfRegionServers());
|
||||
|
|
|
@ -19,9 +19,10 @@
|
|||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -30,7 +31,8 @@ import org.slf4j.LoggerFactory;
|
|||
public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(PeerProcedureHandlerImpl.class);
|
||||
|
||||
private ReplicationSourceManager replicationSourceManager;
|
||||
private final ReplicationSourceManager replicationSourceManager;
|
||||
private final ReentrantLock peersLock = new ReentrantLock();
|
||||
|
||||
public PeerProcedureHandlerImpl(ReplicationSourceManager replicationSourceManager) {
|
||||
this.replicationSourceManager = replicationSourceManager;
|
||||
|
@ -38,45 +40,40 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
|
|||
|
||||
@Override
|
||||
public void addPeer(String peerId) throws ReplicationException, IOException {
|
||||
replicationSourceManager.addPeer(peerId);
|
||||
peersLock.lock();
|
||||
try {
|
||||
replicationSourceManager.addPeer(peerId);
|
||||
} finally {
|
||||
peersLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removePeer(String peerId) throws ReplicationException, IOException {
|
||||
replicationSourceManager.removePeer(peerId);
|
||||
peersLock.lock();
|
||||
try {
|
||||
if (replicationSourceManager.getReplicationPeers().getPeer(peerId) != null) {
|
||||
replicationSourceManager.removePeer(peerId);
|
||||
}
|
||||
} finally {
|
||||
peersLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void disablePeer(String peerId) throws ReplicationException, IOException {
|
||||
ReplicationPeerImpl peer =
|
||||
replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
|
||||
if (peer != null) {
|
||||
peer.refreshPeerState();
|
||||
LOG.info("disable replication peer, id: " + peerId + ", new state: " + peer.getPeerState());
|
||||
} else {
|
||||
throw new ReplicationException("No connected peer found, peerId=" + peerId);
|
||||
}
|
||||
PeerState newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
|
||||
LOG.info("disable replication peer, id: " + peerId + ", new state: " + newState);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void enablePeer(String peerId) throws ReplicationException, IOException {
|
||||
ReplicationPeerImpl peer =
|
||||
replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
|
||||
if (peer != null) {
|
||||
peer.refreshPeerState();
|
||||
LOG.info("enable replication peer, id: " + peerId + ", new state: " + peer.getPeerState());
|
||||
} else {
|
||||
throw new ReplicationException("No connected peer found, peerId=" + peerId);
|
||||
}
|
||||
PeerState newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
|
||||
LOG.info("enable replication peer, id: " + peerId + ", new state: " + newState);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updatePeerConfig(String peerId) throws ReplicationException, IOException {
|
||||
ReplicationPeerImpl peer =
|
||||
replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
|
||||
if (peer == null) {
|
||||
throw new ReplicationException("No connected peer found, peerId=" + peerId);
|
||||
}
|
||||
peer.refreshPeerConfig();
|
||||
replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -129,7 +129,7 @@ public class Replication implements
|
|||
this.queueStorage =
|
||||
ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
|
||||
this.replicationPeers =
|
||||
ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server);
|
||||
ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf);
|
||||
this.replicationPeers.init();
|
||||
this.replicationTracker =
|
||||
ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers,
|
||||
|
|
|
@ -223,7 +223,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
// A peerId will not have "-" in its name, see HBASE-11394
|
||||
peerId = peerClusterZnode.split("-")[0];
|
||||
}
|
||||
Map<TableName, List<String>> tableCFMap = replicationPeers.getConnectedPeer(peerId).getTableCFs();
|
||||
Map<TableName, List<String>> tableCFMap = replicationPeers.getPeer(peerId).getTableCFs();
|
||||
if (tableCFMap != null) {
|
||||
List<String> tableCfs = tableCFMap.get(tableName);
|
||||
if (tableCFMap.containsKey(tableName)
|
||||
|
@ -371,7 +371,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
}
|
||||
|
||||
private long getCurrentBandwidth() {
|
||||
ReplicationPeer replicationPeer = this.replicationPeers.getConnectedPeer(peerId);
|
||||
ReplicationPeer replicationPeer = this.replicationPeers.getPeer(peerId);
|
||||
long peerBandwidth = replicationPeer != null ? replicationPeer.getPeerBandwidth() : 0;
|
||||
// user can set peer bandwidth to 0 to use default bandwidth
|
||||
return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
|
||||
|
@ -416,7 +416,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
*/
|
||||
@Override
|
||||
public boolean isPeerEnabled() {
|
||||
return this.replicationPeers.getStatusOfPeer(this.peerId);
|
||||
return this.replicationPeers.isPeerEnabled(this.peerId);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -166,7 +166,6 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
this.clusterId = clusterId;
|
||||
this.walFileLengthProvider = walFileLengthProvider;
|
||||
this.replicationTracker.registerListener(this);
|
||||
this.replicationPeers.getAllPeerIds();
|
||||
// It's preferable to failover 1 RS at a time, but with good zk servers
|
||||
// more could be processed at the same time.
|
||||
int nbWorkers = conf.getInt("replication.executor.workers", 1);
|
||||
|
@ -270,8 +269,8 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
}
|
||||
List<ServerName> otherRegionServers = replicationTracker.getListOfRegionServers().stream()
|
||||
.map(ServerName::valueOf).collect(Collectors.toList());
|
||||
LOG.info(
|
||||
"Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers);
|
||||
LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
|
||||
+ otherRegionServers);
|
||||
|
||||
// Look if there's anything to process after a restart
|
||||
for (ServerName rs : currentReplicators) {
|
||||
|
@ -288,7 +287,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
* The returned future is for adoptAbandonedQueues task.
|
||||
*/
|
||||
Future<?> init() throws IOException, ReplicationException {
|
||||
for (String id : this.replicationPeers.getConnectedPeerIds()) {
|
||||
for (String id : this.replicationPeers.getAllPeerIds()) {
|
||||
addSource(id);
|
||||
if (replicationForBulkLoadDataEnabled) {
|
||||
// Check if peer exists in hfile-refs queue, if not add it. This can happen in the case
|
||||
|
@ -307,8 +306,8 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
*/
|
||||
@VisibleForTesting
|
||||
ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException {
|
||||
ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id);
|
||||
ReplicationPeer peer = replicationPeers.getConnectedPeer(id);
|
||||
ReplicationPeerConfig peerConfig = replicationPeers.getPeerConfig(id);
|
||||
ReplicationPeer peer = replicationPeers.getPeer(id);
|
||||
ReplicationSourceInterface src = getReplicationSource(id, peerConfig, peer);
|
||||
synchronized (this.walsById) {
|
||||
this.sources.add(src);
|
||||
|
@ -354,7 +353,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
public void deleteSource(String peerId, boolean closeConnection) {
|
||||
abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), peerId));
|
||||
if (closeConnection) {
|
||||
this.replicationPeers.peerDisconnected(peerId);
|
||||
this.replicationPeers.removePeer(peerId);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -445,12 +444,12 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
// update replication queues on ZK
|
||||
// synchronize on replicationPeers to avoid adding source for the to-be-removed peer
|
||||
synchronized (replicationPeers) {
|
||||
for (String id : replicationPeers.getConnectedPeerIds()) {
|
||||
for (String id : replicationPeers.getAllPeerIds()) {
|
||||
try {
|
||||
this.queueStorage.addWAL(server.getServerName(), id, logName);
|
||||
} catch (ReplicationException e) {
|
||||
throw new IOException("Cannot add log to replication queue" +
|
||||
" when creating a new source, queueId=" + id + ", filename=" + logName, e);
|
||||
throw new IOException("Cannot add log to replication queue"
|
||||
+ " when creating a new source, queueId=" + id + ", filename=" + logName, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -593,7 +592,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
|
||||
public void addPeer(String id) throws ReplicationException, IOException {
|
||||
LOG.info("Trying to add peer, peerId: " + id);
|
||||
boolean added = this.replicationPeers.peerConnected(id);
|
||||
boolean added = this.replicationPeers.addPeer(id);
|
||||
if (added) {
|
||||
LOG.info("Peer " + id + " connected success, trying to start the replication source thread.");
|
||||
addSource(id);
|
||||
|
@ -729,19 +728,25 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
// there is not an actual peer defined corresponding to peerId for the failover.
|
||||
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
|
||||
String actualPeerId = replicationQueueInfo.getPeerId();
|
||||
ReplicationPeer peer = replicationPeers.getConnectedPeer(actualPeerId);
|
||||
ReplicationPeerConfig peerConfig = null;
|
||||
try {
|
||||
peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
|
||||
} catch (ReplicationException ex) {
|
||||
LOG.warn("Received exception while getting replication peer config, skipping replay"
|
||||
+ ex);
|
||||
}
|
||||
if (peer == null || peerConfig == null) {
|
||||
LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS);
|
||||
|
||||
ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
|
||||
if (peer == null) {
|
||||
LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS
|
||||
+ ", peer is null");
|
||||
abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId));
|
||||
continue;
|
||||
}
|
||||
|
||||
ReplicationPeerConfig peerConfig = null;
|
||||
try {
|
||||
peerConfig = replicationPeers.getPeerConfig(actualPeerId);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS
|
||||
+ ", failed to read peer config", e);
|
||||
abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId));
|
||||
continue;
|
||||
}
|
||||
|
||||
// track sources in walsByIdRecoveredQueues
|
||||
Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
|
||||
walsByIdRecoveredQueues.put(peerId, walsByGroup);
|
||||
|
@ -760,7 +765,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
// synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
|
||||
// see removePeer
|
||||
synchronized (oldsources) {
|
||||
if (!replicationPeers.getConnectedPeerIds().contains(src.getPeerId())) {
|
||||
if (!replicationPeers.getAllPeerIds().contains(src.getPeerId())) {
|
||||
src.terminate("Recovered queue doesn't belong to any current peer");
|
||||
closeRecoveredQueue(src);
|
||||
continue;
|
||||
|
|
|
@ -87,7 +87,7 @@ public class TestReplicationHFileCleaner {
|
|||
server = new DummyServer();
|
||||
conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
|
||||
Replication.decorateMasterConfiguration(conf);
|
||||
rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf, server);
|
||||
rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf);
|
||||
rp.init();
|
||||
rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
|
||||
fs = FileSystem.get(conf);
|
||||
|
@ -101,7 +101,8 @@ public class TestReplicationHFileCleaner {
|
|||
@Before
|
||||
public void setup() throws ReplicationException, IOException {
|
||||
root = TEST_UTIL.getDataTestDirOnTestFS();
|
||||
rp.registerPeer(peerId, new ReplicationPeerConfig().setClusterKey(TEST_UTIL.getClusterKey()));
|
||||
rp.getPeerStorage().addPeer(peerId,
|
||||
ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL.getClusterKey()).build(), true);
|
||||
rq.addPeerToHFileRefs(peerId);
|
||||
}
|
||||
|
||||
|
@ -112,7 +113,7 @@ public class TestReplicationHFileCleaner {
|
|||
} catch (IOException e) {
|
||||
LOG.warn("Failed to delete files recursively from path " + root);
|
||||
}
|
||||
rp.unregisterPeer(peerId);
|
||||
rp.getPeerStorage().removePeer(peerId);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -21,9 +21,7 @@ package org.apache.hadoop.hbase.replication;
|
|||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
|
|
|
@ -22,8 +22,6 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -44,6 +42,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
|
|||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
@ -73,10 +72,6 @@ public class TestReplicationTrackerZKImpl {
|
|||
private ReplicationTracker rt;
|
||||
private AtomicInteger rsRemovedCount;
|
||||
private String rsRemovedData;
|
||||
private AtomicInteger plChangedCount;
|
||||
private List<String> plChangedData;
|
||||
private AtomicInteger peerRemovedCount;
|
||||
private String peerRemovedData;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
|
@ -93,7 +88,7 @@ public class TestReplicationTrackerZKImpl {
|
|||
String fakeRs1 = ZNodePaths.joinZNode(zkw.znodePaths.rsZNode, "hostname1.example.org:1234");
|
||||
try {
|
||||
ZKClusterId.setClusterId(zkw, new ClusterId());
|
||||
rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
|
||||
rp = ReplicationFactory.getReplicationPeers(zkw, conf);
|
||||
rp.init();
|
||||
rt = ReplicationFactory.getReplicationTracker(zkw, rp, conf, zkw, new DummyServer(fakeRs1));
|
||||
} catch (Exception e) {
|
||||
|
@ -101,10 +96,6 @@ public class TestReplicationTrackerZKImpl {
|
|||
}
|
||||
rsRemovedCount = new AtomicInteger(0);
|
||||
rsRemovedData = "";
|
||||
plChangedCount = new AtomicInteger(0);
|
||||
plChangedData = new ArrayList<>();
|
||||
peerRemovedCount = new AtomicInteger(0);
|
||||
peerRemovedData = "";
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
@ -157,25 +148,22 @@ public class TestReplicationTrackerZKImpl {
|
|||
@Test(timeout = 30000)
|
||||
public void testPeerNameControl() throws Exception {
|
||||
int exists = 0;
|
||||
int hyphen = 0;
|
||||
rp.registerPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
|
||||
rp.getPeerStorage().addPeer("6",
|
||||
ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true);
|
||||
|
||||
try{
|
||||
rp.registerPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
|
||||
}catch(IllegalArgumentException e){
|
||||
exists++;
|
||||
try {
|
||||
rp.getPeerStorage().addPeer("6",
|
||||
ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true);
|
||||
} catch (ReplicationException e) {
|
||||
if (e.getCause() instanceof KeeperException.NodeExistsException) {
|
||||
exists++;
|
||||
}
|
||||
}
|
||||
|
||||
try{
|
||||
rp.registerPeer("6-ec2", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
|
||||
}catch(IllegalArgumentException e){
|
||||
hyphen++;
|
||||
}
|
||||
assertEquals(1, exists);
|
||||
assertEquals(1, hyphen);
|
||||
|
||||
// clean up
|
||||
rp.unregisterPeer("6");
|
||||
rp.getPeerStorage().removePeer("6");
|
||||
}
|
||||
|
||||
private class DummyReplicationListener implements ReplicationListener {
|
||||
|
|
|
@ -380,7 +380,7 @@ public abstract class TestReplicationSourceManager {
|
|||
}
|
||||
Server s1 = new DummyServer("dummyserver1.example.org");
|
||||
ReplicationPeers rp1 =
|
||||
ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1);
|
||||
ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration());
|
||||
rp1.init();
|
||||
NodeFailoverWorker w1 =
|
||||
manager.new NodeFailoverWorker(server.getServerName());
|
||||
|
@ -561,7 +561,7 @@ public abstract class TestReplicationSourceManager {
|
|||
private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig,
|
||||
final boolean waitForSource) throws Exception {
|
||||
final ReplicationPeers rp = manager.getReplicationPeers();
|
||||
rp.registerPeer(peerId, peerConfig);
|
||||
rp.getPeerStorage().addPeer(peerId, peerConfig, true);
|
||||
try {
|
||||
manager.addPeer(peerId);
|
||||
} catch (Exception e) {
|
||||
|
@ -588,7 +588,7 @@ public abstract class TestReplicationSourceManager {
|
|||
}
|
||||
return true;
|
||||
} else {
|
||||
return (rp.getConnectedPeer(peerId) != null);
|
||||
return (rp.getPeer(peerId) != null);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -600,8 +600,8 @@ public abstract class TestReplicationSourceManager {
|
|||
*/
|
||||
private void removePeerAndWait(final String peerId) throws Exception {
|
||||
final ReplicationPeers rp = manager.getReplicationPeers();
|
||||
if (rp.getAllPeerIds().contains(peerId)) {
|
||||
rp.unregisterPeer(peerId);
|
||||
if (rp.getPeerStorage().listPeerIds().contains(peerId)) {
|
||||
rp.getPeerStorage().removePeer(peerId);
|
||||
try {
|
||||
manager.removePeer(peerId);
|
||||
} catch (Exception e) {
|
||||
|
@ -611,10 +611,9 @@ public abstract class TestReplicationSourceManager {
|
|||
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
List<String> peers = rp.getAllPeerIds();
|
||||
return (!manager.getAllQueues().contains(peerId)) &&
|
||||
(rp.getConnectedPeer(peerId) == null) && (!peers.contains(peerId)) &&
|
||||
manager.getSource(peerId) == null;
|
||||
Collection<String> peers = rp.getPeerStorage().listPeerIds();
|
||||
return (!manager.getAllQueues().contains(peerId)) && (rp.getPeer(peerId) == null)
|
||||
&& (!peers.contains(peerId)) && manager.getSource(peerId) == null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -182,8 +182,7 @@ public class HBaseZKTestingUtility extends HBaseCommonTestingUtility {
|
|||
/**
|
||||
* Gets a ZKWatcher.
|
||||
*/
|
||||
public static ZKWatcher getZooKeeperWatcher(HBaseZKTestingUtility testUtil)
|
||||
throws ZooKeeperConnectionException, IOException {
|
||||
public static ZKWatcher getZooKeeperWatcher(HBaseZKTestingUtility testUtil) throws IOException {
|
||||
ZKWatcher zkw = new ZKWatcher(testUtil.getConfiguration(), "unittest", new Abortable() {
|
||||
boolean aborted = false;
|
||||
|
||||
|
|
Loading…
Reference in New Issue