HBASE-19622 Reimplement ReplicationPeers with the new replication storage interface

This commit is contained in:
huzheng 2017-12-26 16:46:10 +08:00 committed by zhangduo
parent 4c6942df58
commit 1e36a84afc
24 changed files with 322 additions and 966 deletions

View File

@ -247,22 +247,22 @@ public final class ReplicationPeerConfigUtil {
public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes) public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
throws DeserializationException { throws DeserializationException {
if (ProtobufUtil.isPBMagicPrefix(bytes)) { if (ProtobufUtil.isPBMagicPrefix(bytes)) {
int pblen = ProtobufUtil.lengthOfPBMagic(); int pbLen = ProtobufUtil.lengthOfPBMagic();
ReplicationProtos.ReplicationPeer.Builder builder = ReplicationProtos.ReplicationPeer.Builder builder =
ReplicationProtos.ReplicationPeer.newBuilder(); ReplicationProtos.ReplicationPeer.newBuilder();
ReplicationProtos.ReplicationPeer peer; ReplicationProtos.ReplicationPeer peer;
try { try {
ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen);
peer = builder.build(); peer = builder.build();
} catch (IOException e) { } catch (IOException e) {
throw new DeserializationException(e); throw new DeserializationException(e);
} }
return convert(peer); return convert(peer);
} else { } else {
if (bytes.length > 0) { if (bytes == null || bytes.length <= 0) {
return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build(); throw new DeserializationException("Bytes to deserialize should not be empty.");
} }
return ReplicationPeerConfig.newBuilder().setClusterKey("").build(); return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build();
} }
} }

View File

@ -339,15 +339,10 @@ public class VerifyReplication extends Configured implements Tool {
@Override public boolean isAborted() {return false;} @Override public boolean isAborted() {return false;}
}); });
ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW); ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf);
rp.init(); rp.init();
Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId); return Pair.newPair(rp.getPeerConfig(peerId), rp.getPeerClusterConfiguration(peerId));
if (pair == null) {
throw new IOException("Couldn't get peer conf!");
}
return pair;
} catch (ReplicationException e) { } catch (ReplicationException e) {
throw new IOException( throw new IOException(
"An error occurred while trying to connect to the remove peer cluster", e); "An error occurred while trying to connect to the remove peer cluster", e);

View File

@ -29,14 +29,8 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private @InterfaceAudience.Private
public class ReplicationFactory { public class ReplicationFactory {
public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf, public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf) {
Abortable abortable) { return new ReplicationPeers(zk, conf);
return getReplicationPeers(zk, conf, null, abortable);
}
public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf,
ReplicationQueueStorage queueStorage, Abortable abortable) {
return new ReplicationPeersZKImpl(zk, conf, queueStorage, abortable);
} }
public static ReplicationTracker getReplicationTracker(ZKWatcher zookeeper, public static ReplicationTracker getReplicationTracker(ZKWatcher zookeeper,

View File

@ -18,28 +18,16 @@
*/ */
package org.apache.hadoop.hbase.replication; package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
@InterfaceAudience.Private @InterfaceAudience.Private
public class ReplicationPeerImpl implements ReplicationPeer { public class ReplicationPeerImpl implements ReplicationPeer {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerImpl.class);
private final ReplicationPeerStorage peerStorage;
private final Configuration conf; private final Configuration conf;
private final String id; private final String id;
@ -57,21 +45,21 @@ public class ReplicationPeerImpl implements ReplicationPeer {
* @param id string representation of this peer's identifier * @param id string representation of this peer's identifier
* @param peerConfig configuration for the replication peer * @param peerConfig configuration for the replication peer
*/ */
public ReplicationPeerImpl(ZKWatcher zkWatcher, Configuration conf, String id, public ReplicationPeerImpl(Configuration conf, String id, boolean peerState,
ReplicationPeerConfig peerConfig) { ReplicationPeerConfig peerConfig) {
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zkWatcher, conf);
this.conf = conf; this.conf = conf;
this.peerConfig = peerConfig;
this.id = id; this.id = id;
this.peerState = peerState ? PeerState.ENABLED : PeerState.DISABLED;
this.peerConfig = peerConfig;
this.peerConfigListeners = new ArrayList<>(); this.peerConfigListeners = new ArrayList<>();
} }
public void refreshPeerState() throws ReplicationException { void setPeerState(boolean enabled) {
this.peerState = peerStorage.isPeerEnabled(id) ? PeerState.ENABLED : PeerState.DISABLED; this.peerState = enabled ? PeerState.ENABLED : PeerState.DISABLED;
} }
public void refreshPeerConfig() throws ReplicationException { void setPeerConfig(ReplicationPeerConfig peerConfig) {
this.peerConfig = peerStorage.getPeerConfig(id).orElse(peerConfig); this.peerConfig = peerConfig;
peerConfigListeners.forEach(listener -> listener.peerConfigUpdated(peerConfig)); peerConfigListeners.forEach(listener -> listener.peerConfigUpdated(peerConfig));
} }
@ -134,36 +122,4 @@ public class ReplicationPeerImpl implements ReplicationPeer {
public void registerPeerConfigListener(ReplicationPeerConfigListener listener) { public void registerPeerConfigListener(ReplicationPeerConfigListener listener) {
this.peerConfigListeners.add(listener); this.peerConfigListeners.add(listener);
} }
}
/**
* Parse the raw data from ZK to get a peer's state
* @param bytes raw ZK data
* @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state.
* @throws DeserializationException
*/
public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
ReplicationProtos.ReplicationState.State state = parseStateFrom(bytes);
return ReplicationProtos.ReplicationState.State.ENABLED == state;
}
/**
* @param bytes Content of a state znode.
* @return State parsed from the passed bytes.
* @throws DeserializationException
*/
private static ReplicationProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
throws DeserializationException {
ProtobufUtil.expectPBMagicPrefix(bytes);
int pbLen = ProtobufUtil.lengthOfPBMagic();
ReplicationProtos.ReplicationState.Builder builder =
ReplicationProtos.ReplicationState.newBuilder();
ReplicationProtos.ReplicationState state;
try {
ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen);
state = builder.build();
return state.getState();
} catch (IOException e) {
throw new DeserializationException(e);
}
}
}

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.replication; package org.apache.hadoop.hbase.replication;
import java.util.List; import java.util.List;
import java.util.Optional;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -70,5 +69,5 @@ public interface ReplicationPeerStorage {
* Get the peer config of a replication peer. * Get the peer config of a replication peer.
* @throws ReplicationException if there are errors accessing the storage service. * @throws ReplicationException if there are errors accessing the storage service.
*/ */
Optional<ReplicationPeerConfig> getPeerConfig(String peerId) throws ReplicationException; ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException;
} }

View File

@ -1,5 +1,4 @@
/* /**
*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -18,58 +17,53 @@
*/ */
package org.apache.hadoop.hbase.replication; package org.apache.hadoop.hbase.replication;
import java.util.Collection; import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.util.Pair; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/** /**
* This provides an interface for maintaining a set of peer clusters. These peers are remote slave * This provides an class for maintaining a set of peer clusters. These peers are remote slave
* clusters that data is replicated to. A peer cluster can be in three different states: * clusters that data is replicated to.
*
* 1. Not-Registered - There is no notion of the peer cluster.
* 2. Registered - The peer has an id and is being tracked but there is no connection.
* 3. Connected - There is an active connection to the remote peer.
*
* In the registered or connected state, a peer cluster can either be enabled or disabled.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public interface ReplicationPeers { public class ReplicationPeers {
/** private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeers.class);
* Initialize the ReplicationPeers interface.
*/
void init() throws ReplicationException;
/** private final Configuration conf;
* Add a new remote slave cluster for replication.
* @param peerId a short that identifies the cluster // Map of peer clusters keyed by their id
* @param peerConfig configuration for the replication slave cluster private final ConcurrentMap<String, ReplicationPeerImpl> peerCache;
*/ private final ReplicationPeerStorage peerStorage;
default void registerPeer(String peerId, ReplicationPeerConfig peerConfig)
throws ReplicationException { protected ReplicationPeers(ZKWatcher zookeeper, Configuration conf) {
registerPeer(peerId, peerConfig, true); this.conf = conf;
this.peerCache = new ConcurrentHashMap<>();
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zookeeper, conf);
} }
/** public void init() throws ReplicationException {
* Add a new remote slave cluster for replication. // Loading all existing peerIds into peer cache.
* @param peerId a short that identifies the cluster for (String peerId : this.peerStorage.listPeerIds()) {
* @param peerConfig configuration for the replication slave cluster addPeer(peerId);
* @param enabled peer state, true if ENABLED and false if DISABLED }
*/ }
void registerPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
throws ReplicationException;
/** @VisibleForTesting
* Removes a remote slave cluster and stops the replication to it. public ReplicationPeerStorage getPeerStorage() {
* @param peerId a short that identifies the cluster return this.peerStorage;
*/ }
void unregisterPeer(String peerId) throws ReplicationException;
/** /**
* Method called after a peer has been connected. It will create a ReplicationPeer to track the * Method called after a peer has been connected. It will create a ReplicationPeer to track the
@ -78,111 +72,115 @@ public interface ReplicationPeers {
* @return whether a ReplicationPeer was successfully created * @return whether a ReplicationPeer was successfully created
* @throws ReplicationException * @throws ReplicationException
*/ */
boolean peerConnected(String peerId) throws ReplicationException; public boolean addPeer(String peerId) throws ReplicationException {
if (this.peerCache.containsKey(peerId)) {
return false;
}
peerCache.put(peerId, createPeer(peerId));
return true;
}
public void removePeer(String peerId) {
peerCache.remove(peerId);
}
/** /**
* Method called after a peer has been disconnected. It will remove the ReplicationPeer that * Get the peer state for the specified connected remote slave cluster. The value might be read
* tracked the disconnected cluster. * from cache, so it is recommended to use {@link #peerStorage } to read storage directly if
* reading the state after enabling or disabling it.
* @param peerId a short that identifies the cluster * @param peerId a short that identifies the cluster
* @return true if replication is enabled, false otherwise.
*/ */
void peerDisconnected(String peerId); public boolean isPeerEnabled(String peerId) {
ReplicationPeer replicationPeer = this.peerCache.get(peerId);
if (replicationPeer == null) {
throw new IllegalArgumentException("Peer with id= " + peerId + " is not cached");
}
return replicationPeer.getPeerState() == PeerState.ENABLED;
}
/** /**
* Restart the replication to the specified remote slave cluster. * Returns the ReplicationPeerImpl for the specified cached peer. This ReplicationPeer will
* @param peerId a short that identifies the cluster * continue to track changes to the Peer's state and config. This method returns null if no peer
*/ * has been cached with the given peerId.
void enablePeer(String peerId) throws ReplicationException;
/**
* Stop the replication to the specified remote slave cluster.
* @param peerId a short that identifies the cluster
*/
void disablePeer(String peerId) throws ReplicationException;
/**
* Get the table and column-family list string of the peer from the underlying storage.
* @param peerId a short that identifies the cluster
*/
public Map<TableName, List<String>> getPeerTableCFsConfig(String peerId)
throws ReplicationException;
/**
* Set the table and column-family list string of the peer to the underlying storage.
* @param peerId a short that identifies the cluster
* @param tableCFs the table and column-family list which will be replicated for this peer
*/
public void setPeerTableCFsConfig(String peerId,
Map<TableName, ? extends Collection<String>> tableCFs)
throws ReplicationException;
/**
* Returns the ReplicationPeerImpl for the specified connected peer. This ReplicationPeer will
* continue to track changes to the Peer's state and config. This method returns null if no
* peer has been connected with the given peerId.
* @param peerId id for the peer * @param peerId id for the peer
* @return ReplicationPeer object * @return ReplicationPeer object
*/ */
ReplicationPeerImpl getConnectedPeer(String peerId); public ReplicationPeerImpl getPeer(String peerId) {
return peerCache.get(peerId);
}
/** /**
* Returns the set of peerIds of the clusters that have been connected and have an underlying * Returns the set of peerIds of the clusters that have been connected and have an underlying
* ReplicationPeer. * ReplicationPeer.
* @return a Set of Strings for peerIds * @return a Set of Strings for peerIds
*/ */
public Set<String> getConnectedPeerIds(); public Set<String> getAllPeerIds() {
return peerCache.keySet();
/** }
* Get the replication status for the specified connected remote slave cluster.
* The value might be read from cache, so it is recommended to public ReplicationPeerConfig getPeerConfig(String peerId) {
* use {@link #getStatusOfPeerFromBackingStore(String)} ReplicationPeer replicationPeer = this.peerCache.get(peerId);
* if reading the state after enabling or disabling it. if (replicationPeer == null) {
* @param peerId a short that identifies the cluster throw new IllegalArgumentException("Peer with id= " + peerId + " is not cached");
* @return true if replication is enabled, false otherwise. }
*/ return replicationPeer.getPeerConfig();
boolean getStatusOfPeer(String peerId); }
/** public Configuration getPeerClusterConfiguration(String peerId) throws ReplicationException {
* Get the replication status for the specified remote slave cluster, which doesn't ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
* have to be connected. The state is read directly from the backing store.
* @param peerId a short that identifies the cluster Configuration otherConf;
* @return true if replication is enabled, false otherwise. try {
* @throws ReplicationException thrown if there's an error contacting the store otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey());
*/ } catch (IOException e) {
boolean getStatusOfPeerFromBackingStore(String peerId) throws ReplicationException; throw new ReplicationException("Can't get peer configuration for peerId=" + peerId, e);
}
/**
* List the cluster replication configs of all remote slave clusters (whether they are if (!peerConfig.getConfiguration().isEmpty()) {
* enabled/disabled or connected/disconnected). CompoundConfiguration compound = new CompoundConfiguration();
* @return A map of peer ids to peer cluster keys compound.add(otherConf);
*/ compound.addStringMap(peerConfig.getConfiguration());
Map<String, ReplicationPeerConfig> getAllPeerConfigs(); return compound;
}
/**
* List the peer ids of all remote slave clusters (whether they are enabled/disabled or return otherConf;
* connected/disconnected). }
* @return A list of peer ids
*/ public PeerState refreshPeerState(String peerId) throws ReplicationException {
List<String> getAllPeerIds(); ReplicationPeerImpl peer = peerCache.get(peerId);
if (peer == null) {
/** throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
* Returns the configured ReplicationPeerConfig for this peerId }
* @param peerId a short name that identifies the cluster peer.setPeerState(peerStorage.isPeerEnabled(peerId));
* @return ReplicationPeerConfig for the peer return peer.getPeerState();
*/ }
ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws ReplicationException;
public ReplicationPeerConfig refreshPeerConfig(String peerId) throws ReplicationException {
/** ReplicationPeerImpl peer = peerCache.get(peerId);
* Returns the configuration needed to talk to the remote slave cluster. if (peer == null) {
* @param peerId a short that identifies the cluster throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
* @return the configuration for the peer cluster, null if it was unable to get the configuration }
*/ peer.setPeerConfig(peerStorage.getPeerConfig(peerId));
Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId) throws ReplicationException; return peer.getPeerConfig();
}
/** /**
<<<<<<< 2bb2fd611d4b88c724a2b561f10433b56c6fd3dd
* Update the peerConfig for the a given peer cluster * Update the peerConfig for the a given peer cluster
* @param id a short that identifies the cluster * @param id a short that identifies the cluster
* @param peerConfig new config for the peer cluster * @param peerConfig new config for the peer cluster
* @throws ReplicationException * @throws ReplicationException
=======
* Helper method to connect to a peer
* @param peerId peer's identifier
* @return object representing the peer
>>>>>>> HBASE-19622 Reimplement ReplicationPeers with the new replication storage interface
*/ */
void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) throws ReplicationException; private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException {
ReplicationPeerConfig peerConf = peerStorage.getPeerConfig(peerId);
boolean enabled = peerStorage.isPeerEnabled(peerId);
return new ReplicationPeerImpl(getPeerClusterConfiguration(peerId), peerId, enabled, peerConf);
}
} }

View File

@ -1,552 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class provides an implementation of the ReplicationPeers interface using ZooKeeper. The
* peers znode contains a list of all peer replication clusters and the current replication state of
* those clusters. It has one child peer znode for each peer cluster. The peer znode is named with
* the cluster id provided by the user in the HBase shell. The value of the peer znode contains the
* peers cluster key provided by the user in the HBase Shell. The cluster key contains a list of
* zookeeper quorum peers, the client port for the zookeeper quorum, and the base znode for HBase.
* For example:
*
* /hbase/replication/peers/1 [Value: zk1.host.com,zk2.host.com,zk3.host.com:2181:/hbase]
* /hbase/replication/peers/2 [Value: zk5.host.com,zk6.host.com,zk7.host.com:2181:/hbase]
*
* Each of these peer znodes has a child znode that indicates whether or not replication is enabled
* on that peer cluster. These peer-state znodes do not have child znodes and simply contain a
* boolean value (i.e. ENABLED or DISABLED). This value is read/maintained by the
* ReplicationPeer.PeerStateTracker class. For example:
*
* /hbase/replication/peers/1/peer-state [Value: ENABLED]
*
* Each of these peer znodes has a child znode that indicates which data will be replicated
* to the peer cluster. These peer-tableCFs znodes do not have child znodes and only have a
* table/cf list config. This value is read/maintained by the ReplicationPeer.TableCFsTracker
* class. For example:
*
* /hbase/replication/peers/1/tableCFs [Value: "table1; table2:cf1,cf3; table3:cfx,cfy"]
*/
@InterfaceAudience.Private
public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers {
// Map of peer clusters keyed by their id
private ConcurrentMap<String, ReplicationPeerImpl> peerClusters;
private final ReplicationQueueStorage queueStorage;
private Abortable abortable;
private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeersZKImpl.class);
public ReplicationPeersZKImpl(ZKWatcher zk, Configuration conf,
ReplicationQueueStorage queueStorage, Abortable abortable) {
super(zk, conf, abortable);
this.abortable = abortable;
this.peerClusters = new ConcurrentHashMap<>();
this.queueStorage = queueStorage;
}
@Override
public void init() throws ReplicationException {
try {
if (ZKUtil.checkExists(this.zookeeper, this.peersZNode) < 0) {
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
}
} catch (KeeperException e) {
throw new ReplicationException("Could not initialize replication peers", e);
}
addExistingPeers();
}
@Override
public void registerPeer(String id, ReplicationPeerConfig peerConfig, boolean enabled)
throws ReplicationException {
try {
if (peerExists(id)) {
throw new IllegalArgumentException("Cannot add a peer with id=" + id
+ " because that id already exists.");
}
if(id.contains("-")){
throw new IllegalArgumentException("Found invalid peer name:" + id);
}
if (peerConfig.getClusterKey() != null) {
try {
ZKConfig.validateClusterKey(peerConfig.getClusterKey());
} catch (IOException ioe) {
throw new IllegalArgumentException(ioe.getMessage());
}
}
checkQueuesDeleted(id);
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
List<ZKUtilOp> listOfOps = new ArrayList<>(2);
ZKUtilOp op1 =
ZKUtilOp.createAndFailSilent(getPeerNode(id),
ReplicationPeerConfigUtil.toByteArray(peerConfig));
ZKUtilOp op2 =
ZKUtilOp.createAndFailSilent(getPeerStateNode(id), enabled ? ENABLED_ZNODE_BYTES
: DISABLED_ZNODE_BYTES);
listOfOps.add(op1);
listOfOps.add(op2);
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
} catch (KeeperException e) {
throw new ReplicationException("Could not add peer with id=" + id + ", peerConfif=>"
+ peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e);
}
}
@Override
public void unregisterPeer(String id) throws ReplicationException {
try {
if (!peerExists(id)) {
throw new IllegalArgumentException("Cannot remove peer with id=" + id
+ " because that id does not exist.");
}
ZKUtil.deleteNodeRecursively(this.zookeeper, ZNodePaths.joinZNode(this.peersZNode, id));
} catch (KeeperException e) {
throw new ReplicationException("Could not remove peer with id=" + id, e);
}
}
@Override
public void enablePeer(String id) throws ReplicationException {
changePeerState(id, ReplicationProtos.ReplicationState.State.ENABLED);
LOG.info("peer " + id + " is enabled");
}
@Override
public void disablePeer(String id) throws ReplicationException {
changePeerState(id, ReplicationProtos.ReplicationState.State.DISABLED);
LOG.info("peer " + id + " is disabled");
}
@Override
public Map<TableName, List<String>> getPeerTableCFsConfig(String id) throws ReplicationException {
try {
if (!peerExists(id)) {
throw new IllegalArgumentException("peer " + id + " doesn't exist");
}
try {
ReplicationPeerConfig rpc = getReplicationPeerConfig(id);
if (rpc == null) {
throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id);
}
return rpc.getTableCFsMap();
} catch (Exception e) {
throw new ReplicationException(e);
}
} catch (KeeperException e) {
throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id, e);
}
}
@Override
public void setPeerTableCFsConfig(String id,
Map<TableName, ? extends Collection<String>> tableCFs)
throws ReplicationException {
try {
if (!peerExists(id)) {
throw new IllegalArgumentException("Cannot set peer tableCFs because id=" + id
+ " does not exist.");
}
ReplicationPeerConfig rpc = getReplicationPeerConfig(id);
if (rpc == null) {
throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id);
}
rpc.setTableCFsMap(tableCFs);
ZKUtil.setData(this.zookeeper, getPeerNode(id),
ReplicationPeerConfigUtil.toByteArray(rpc));
LOG.info("Peer tableCFs with id= " + id + " is now " +
ReplicationPeerConfigUtil.convertToString(tableCFs));
} catch (KeeperException e) {
throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e);
}
}
@Override
public boolean getStatusOfPeer(String id) {
ReplicationPeer replicationPeer = this.peerClusters.get(id);
if (replicationPeer == null) {
throw new IllegalArgumentException("Peer with id= " + id + " is not cached");
}
return replicationPeer.getPeerState() == PeerState.ENABLED;
}
@Override
public boolean getStatusOfPeerFromBackingStore(String id) throws ReplicationException {
try {
if (!peerExists(id)) {
throw new IllegalArgumentException("peer " + id + " doesn't exist");
}
String peerStateZNode = getPeerStateNode(id);
try {
return ReplicationPeerImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
} catch (KeeperException e) {
throw new ReplicationException(e);
} catch (DeserializationException e) {
throw new ReplicationException(e);
}
} catch (KeeperException e) {
throw new ReplicationException("Unable to get status of the peer with id=" + id +
" from backing store", e);
} catch (InterruptedException e) {
throw new ReplicationException(e);
}
}
@Override
public Map<String, ReplicationPeerConfig> getAllPeerConfigs() {
Map<String, ReplicationPeerConfig> peers = new TreeMap<>();
List<String> ids = null;
try {
ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
for (String id : ids) {
ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
if (peerConfig == null) {
LOG.warn("Failed to get replication peer configuration of clusterid=" + id
+ " znode content, continuing.");
continue;
}
peers.put(id, peerConfig);
}
} catch (KeeperException e) {
this.abortable.abort("Cannot get the list of peers ", e);
} catch (ReplicationException e) {
this.abortable.abort("Cannot get the list of peers ", e);
}
return peers;
}
@Override
public ReplicationPeerImpl getConnectedPeer(String peerId) {
return peerClusters.get(peerId);
}
@Override
public Set<String> getConnectedPeerIds() {
return peerClusters.keySet(); // this is not thread-safe
}
/**
* Returns a ReplicationPeerConfig from the znode or null for the given peerId.
*/
@Override
public ReplicationPeerConfig getReplicationPeerConfig(String peerId)
throws ReplicationException {
String znode = getPeerNode(peerId);
byte[] data = null;
try {
data = ZKUtil.getData(this.zookeeper, znode);
} catch (InterruptedException e) {
LOG.warn("Could not get configuration for peer because the thread " +
"was interrupted. peerId=" + peerId);
Thread.currentThread().interrupt();
return null;
} catch (KeeperException e) {
throw new ReplicationException("Error getting configuration for peer with id="
+ peerId, e);
}
if (data == null) {
LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId);
return null;
}
try {
return ReplicationPeerConfigUtil.parsePeerFrom(data);
} catch (DeserializationException e) {
LOG.warn("Failed to parse cluster key from peerId=" + peerId
+ ", specifically the content from the following znode: " + znode);
return null;
}
}
@Override
public Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId)
throws ReplicationException {
ReplicationPeerConfig peerConfig = getReplicationPeerConfig(peerId);
if (peerConfig == null) {
return null;
}
Configuration otherConf;
try {
otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey());
} catch (IOException e) {
LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e);
return null;
}
if (!peerConfig.getConfiguration().isEmpty()) {
CompoundConfiguration compound = new CompoundConfiguration();
compound.add(otherConf);
compound.addStringMap(peerConfig.getConfiguration());
return new Pair<>(peerConfig, compound);
}
return new Pair<>(peerConfig, otherConf);
}
@Override
public void updatePeerConfig(String id, ReplicationPeerConfig newConfig)
throws ReplicationException {
ReplicationPeer peer = getConnectedPeer(id);
if (peer == null){
throw new ReplicationException("Could not find peer Id " + id + " in connected peers");
}
ReplicationPeerConfig existingConfig = peer.getPeerConfig();
if (!isStringEquals(newConfig.getClusterKey(), existingConfig.getClusterKey())) {
throw new ReplicationException(
"Changing the cluster key on an existing peer is not allowed." + " Existing key '" +
existingConfig.getClusterKey() + "' does not match new key '" +
newConfig.getClusterKey() + "'");
}
if (!isStringEquals(newConfig.getReplicationEndpointImpl(),
existingConfig.getReplicationEndpointImpl())) {
throw new ReplicationException("Changing the replication endpoint implementation class " +
"on an existing peer is not allowed. Existing class '" +
existingConfig.getReplicationEndpointImpl() + "' does not match new class '" +
newConfig.getReplicationEndpointImpl() + "'");
}
// Update existingConfig's peer config and peer data with the new values, but don't touch config
// or data that weren't explicitly changed
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(existingConfig);
builder.putAllConfiguration(newConfig.getConfiguration())
.putAllPeerData(newConfig.getPeerData())
.setReplicateAllUserTables(newConfig.replicateAllUserTables())
.setNamespaces(newConfig.getNamespaces()).setTableCFsMap(newConfig.getTableCFsMap())
.setExcludeNamespaces(newConfig.getExcludeNamespaces())
.setExcludeTableCFsMap(newConfig.getExcludeTableCFsMap())
.setBandwidth(newConfig.getBandwidth());
try {
ZKUtil.setData(this.zookeeper, getPeerNode(id),
ReplicationPeerConfigUtil.toByteArray(builder.build()));
}
catch(KeeperException ke){
throw new ReplicationException("There was a problem trying to save changes to the " +
"replication peer " + id, ke);
}
}
/**
* List all registered peer clusters and set a watch on their znodes.
*/
@Override
public List<String> getAllPeerIds() {
List<String> ids = null;
try {
ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
} catch (KeeperException e) {
this.abortable.abort("Cannot get the list of peers ", e);
}
return ids;
}
/**
* A private method used during initialization. This method attempts to add all registered
* peer clusters. This method does not set a watch on the peer cluster znodes.
*/
private void addExistingPeers() throws ReplicationException {
List<String> znodes = null;
try {
znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
} catch (KeeperException e) {
throw new ReplicationException("Error getting the list of peer clusters.", e);
}
if (znodes != null) {
for (String z : znodes) {
createAndAddPeer(z);
}
}
}
@Override
public boolean peerConnected(String peerId) throws ReplicationException {
return createAndAddPeer(peerId);
}
@Override
public void peerDisconnected(String peerId) {
ReplicationPeer rp = this.peerClusters.get(peerId);
if (rp != null) {
peerClusters.remove(peerId, rp);
}
}
/**
* Attempt to connect to a new remote slave cluster.
* @param peerId a short that identifies the cluster
* @return true if a new connection was made, false if no new connection was made.
*/
public boolean createAndAddPeer(String peerId) throws ReplicationException {
if (peerClusters == null) {
return false;
}
if (this.peerClusters.containsKey(peerId)) {
return false;
}
ReplicationPeerImpl peer = null;
try {
peer = createPeer(peerId);
} catch (Exception e) {
throw new ReplicationException("Error adding peer with id=" + peerId, e);
}
if (peer == null) {
return false;
}
ReplicationPeerImpl previous = peerClusters.putIfAbsent(peerId, peer);
if (previous == null) {
LOG.info("Added peer cluster=" + peer.getPeerConfig().getClusterKey());
} else {
LOG.info("Peer already present, " + previous.getPeerConfig().getClusterKey() +
", new cluster=" + peer.getPeerConfig().getClusterKey());
}
return true;
}
/**
* Update the state znode of a peer cluster.
* @param id
* @param state
*/
private void changePeerState(String id, ReplicationProtos.ReplicationState.State state)
throws ReplicationException {
try {
if (!peerExists(id)) {
throw new IllegalArgumentException("Cannot enable/disable peer because id=" + id
+ " does not exist.");
}
String peerStateZNode = getPeerStateNode(id);
byte[] stateBytes =
(state == ReplicationProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
: DISABLED_ZNODE_BYTES;
if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
} else {
ZKUtil.createAndWatch(this.zookeeper, peerStateZNode, stateBytes);
}
LOG.info("Peer with id= " + id + " is now " + state.name());
} catch (KeeperException e) {
throw new ReplicationException("Unable to change state of the peer with id=" + id, e);
}
}
/**
* Helper method to connect to a peer
* @param peerId peer's identifier
* @return object representing the peer
* @throws ReplicationException
*/
private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException {
Pair<ReplicationPeerConfig, Configuration> pair = getPeerConf(peerId);
if (pair == null) {
return null;
}
Configuration peerConf = pair.getSecond();
ReplicationPeerImpl peer =
new ReplicationPeerImpl(zookeeper, peerConf, peerId, pair.getFirst());
// Load peer state and peer config by reading zookeeper directly.
peer.refreshPeerState();
peer.refreshPeerConfig();
return peer;
}
private void checkQueuesDeleted(String peerId) throws ReplicationException {
if (queueStorage == null) {
return;
}
try {
List<ServerName> replicators = queueStorage.getListOfReplicators();
if (replicators == null || replicators.isEmpty()) {
return;
}
for (ServerName replicator : replicators) {
List<String> queueIds = queueStorage.getAllQueues(replicator);
for (String queueId : queueIds) {
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
if (queueInfo.getPeerId().equals(peerId)) {
throw new IllegalArgumentException("undeleted queue for peerId: " + peerId
+ ", replicator: " + replicator + ", queueId: " + queueId);
}
}
}
// Check for hfile-refs queue
if (-1 != ZKUtil.checkExists(zookeeper, hfileRefsZNode)
&& queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) {
throw new IllegalArgumentException("Undeleted queue for peerId: " + peerId
+ ", found in hfile-refs node path " + hfileRefsZNode);
}
} catch (KeeperException e) {
throw new ReplicationException("Could not check queues deleted with id=" + peerId, e);
}
}
/**
* For replication peer cluster key or endpoint class, null and empty string is same. So here
* don't use {@link StringUtils#equals(CharSequence, CharSequence)} directly.
*/
private boolean isStringEquals(String s1, String s2) {
if (StringUtils.isBlank(s1)) {
return StringUtils.isBlank(s2);
}
return s1.equals(s2);
}
}

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.replication;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
@ -144,7 +143,7 @@ class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements Repli
} }
@Override @Override
public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) throws ReplicationException { public ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException {
byte[] data; byte[] data;
try { try {
data = ZKUtil.getData(zookeeper, getPeerNode(peerId)); data = ZKUtil.getData(zookeeper, getPeerNode(peerId));
@ -152,13 +151,14 @@ class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements Repli
throw new ReplicationException("Error getting configuration for peer with id=" + peerId, e); throw new ReplicationException("Error getting configuration for peer with id=" + peerId, e);
} }
if (data == null || data.length == 0) { if (data == null || data.length == 0) {
return Optional.empty(); throw new ReplicationException(
"Replication peer config data shouldn't be empty, peerId=" + peerId);
} }
try { try {
return Optional.of(ReplicationPeerConfigUtil.parsePeerFrom(data)); return ReplicationPeerConfigUtil.parsePeerFrom(data);
} catch (DeserializationException e) { } catch (DeserializationException e) {
LOG.warn("Failed to parse replication peer config for peer with id=" + peerId, e); throw new ReplicationException(
return Optional.empty(); "Failed to parse replication peer config for peer with id=" + peerId, e);
} }
} }
} }

View File

@ -48,8 +48,7 @@ class ZKReplicationStorageBase {
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
this.replicationZNode = this.replicationZNode =
ZNodePaths.joinZNode(this.zookeeper.znodePaths.baseZNode, replicationZNodeName); ZNodePaths.joinZNode(this.zookeeper.znodePaths.baseZNode, replicationZNodeName);
} }
/** /**

View File

@ -55,7 +55,6 @@ public abstract class TestReplicationStateBasic {
protected static String KEY_TWO; protected static String KEY_TWO;
// For testing when we try to replicate to ourself // For testing when we try to replicate to ourself
protected String OUR_ID = "3";
protected String OUR_KEY; protected String OUR_KEY;
protected static int zkTimeoutCount; protected static int zkTimeoutCount;
@ -151,37 +150,6 @@ public abstract class TestReplicationStateBasic {
assertEquals(0, rqs.getListOfReplicators().size()); assertEquals(0, rqs.getListOfReplicators().size());
} }
@Test
public void testInvalidClusterKeys() throws ReplicationException, KeeperException {
rp.init();
try {
rp.registerPeer(ID_ONE,
new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:hbase"));
fail("Should throw an IllegalArgumentException because " +
"zookeeper.znode.parent is missing leading '/'.");
} catch (IllegalArgumentException e) {
// Expected.
}
try {
rp.registerPeer(ID_ONE,
new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:/"));
fail("Should throw an IllegalArgumentException because zookeeper.znode.parent is missing.");
} catch (IllegalArgumentException e) {
// Expected.
}
try {
rp.registerPeer(ID_ONE,
new ReplicationPeerConfig().setClusterKey("hostname1.example.org::/hbase"));
fail("Should throw an IllegalArgumentException because " +
"hbase.zookeeper.property.clientPort is missing.");
} catch (IllegalArgumentException e) {
// Expected.
}
}
@Test @Test
public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException { public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException {
rp.init(); rp.init();
@ -192,7 +160,8 @@ public abstract class TestReplicationStateBasic {
files1.add(new Pair<>(null, new Path("file_3"))); files1.add(new Pair<>(null, new Path("file_3")));
assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty()); assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size()); assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); rp.getPeerStorage().addPeer(ID_ONE,
ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true);
rqs.addPeerToHFileRefs(ID_ONE); rqs.addPeerToHFileRefs(ID_ONE);
rqs.addHFileRefs(ID_ONE, files1); rqs.addHFileRefs(ID_ONE, files1);
assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size()); assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
@ -208,15 +177,17 @@ public abstract class TestReplicationStateBasic {
hfiles2.add(removedString); hfiles2.add(removedString);
rqs.removeHFileRefs(ID_ONE, hfiles2); rqs.removeHFileRefs(ID_ONE, hfiles2);
assertEquals(0, rqs.getReplicableHFiles(ID_ONE).size()); assertEquals(0, rqs.getReplicableHFiles(ID_ONE).size());
rp.unregisterPeer(ID_ONE); rp.getPeerStorage().removePeer(ID_ONE);
} }
@Test @Test
public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException { public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException {
rp.init(); rp.init();
rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); rp.getPeerStorage().addPeer(ID_ONE,
ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true);
rqs.addPeerToHFileRefs(ID_ONE); rqs.addPeerToHFileRefs(ID_ONE);
rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO)); rp.getPeerStorage().addPeer(ID_TWO,
ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true);
rqs.addPeerToHFileRefs(ID_TWO); rqs.addPeerToHFileRefs(ID_TWO);
List<Pair<Path, Path>> files1 = new ArrayList<>(3); List<Pair<Path, Path>> files1 = new ArrayList<>(3);
@ -229,13 +200,13 @@ public abstract class TestReplicationStateBasic {
assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size()); assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size());
assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size()); assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size());
rp.unregisterPeer(ID_ONE); rp.getPeerStorage().removePeer(ID_ONE);
rqs.removePeerFromHFileRefs(ID_ONE); rqs.removePeerFromHFileRefs(ID_ONE);
assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size()); assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty()); assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size()); assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size());
rp.unregisterPeer(ID_TWO); rp.getPeerStorage().removePeer(ID_TWO);
rqs.removePeerFromHFileRefs(ID_TWO); rqs.removePeerFromHFileRefs(ID_TWO);
assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size()); assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
assertTrue(rqs.getReplicableHFiles(ID_TWO).isEmpty()); assertTrue(rqs.getReplicableHFiles(ID_TWO).isEmpty());
@ -245,74 +216,77 @@ public abstract class TestReplicationStateBasic {
public void testReplicationPeers() throws Exception { public void testReplicationPeers() throws Exception {
rp.init(); rp.init();
// Test methods with non-existent peer ids
try { try {
rp.unregisterPeer("bogus"); rp.getPeerStorage().setPeerState("bogus", true);
fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
} catch (ReplicationException e) {
}
try {
rp.getPeerStorage().setPeerState("bogus", false);
fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
} catch (ReplicationException e) {
}
try {
rp.isPeerEnabled("bogus");
fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
} }
try {
rp.enablePeer("bogus");
fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
} catch (IllegalArgumentException e) {
}
try {
rp.disablePeer("bogus");
fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
} catch (IllegalArgumentException e) {
}
try {
rp.getStatusOfPeer("bogus");
fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
} catch (IllegalArgumentException e) {
}
assertFalse(rp.peerConnected("bogus"));
rp.peerDisconnected("bogus");
assertNull(rp.getPeerConf("bogus")); try {
assertFalse(rp.addPeer("bogus"));
fail("Should have thrown an ReplicationException when passed a bogus peerId");
} catch (ReplicationException e) {
}
try {
assertNull(rp.getPeerClusterConfiguration("bogus"));
fail("Should have thrown an ReplicationException when passed a bogus peerId");
} catch (ReplicationException e) {
}
assertNumberOfPeers(0); assertNumberOfPeers(0);
// Add some peers // Add some peers
rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true);
assertNumberOfPeers(1); assertNumberOfPeers(1);
rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO)); rp.getPeerStorage().addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), true);
assertNumberOfPeers(2); assertNumberOfPeers(2);
// Test methods with a peer that is added but not connected // Test methods with a peer that is added but not connected
try { try {
rp.getStatusOfPeer(ID_ONE); rp.isPeerEnabled(ID_ONE);
fail("There are no connected peers, should have thrown an IllegalArgumentException"); fail("There are no connected peers, should have thrown an IllegalArgumentException");
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
} }
assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond())); assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(rp.getPeerClusterConfiguration(ID_ONE)));
rp.unregisterPeer(ID_ONE); rp.getPeerStorage().removePeer(ID_ONE);
rp.peerDisconnected(ID_ONE); rp.removePeer(ID_ONE);
assertNumberOfPeers(1); assertNumberOfPeers(1);
// Add one peer // Add one peer
rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true);
rp.peerConnected(ID_ONE); rp.addPeer(ID_ONE);
assertNumberOfPeers(2); assertNumberOfPeers(2);
assertTrue(rp.getStatusOfPeer(ID_ONE)); assertTrue(rp.isPeerEnabled(ID_ONE));
rp.disablePeer(ID_ONE); rp.getPeerStorage().setPeerState(ID_ONE, false);
// now we do not rely on zk watcher to trigger the state change so we need to trigger it // now we do not rely on zk watcher to trigger the state change so we need to trigger it
// manually... // manually...
ReplicationPeerImpl peer = rp.getConnectedPeer(ID_ONE); ReplicationPeerImpl peer = rp.getPeer(ID_ONE);
peer.refreshPeerState(); rp.refreshPeerState(peer.getId());
assertEquals(PeerState.DISABLED, peer.getPeerState()); assertEquals(PeerState.DISABLED, peer.getPeerState());
assertConnectedPeerStatus(false, ID_ONE); assertConnectedPeerStatus(false, ID_ONE);
rp.enablePeer(ID_ONE); rp.getPeerStorage().setPeerState(ID_ONE, true);
// now we do not rely on zk watcher to trigger the state change so we need to trigger it // now we do not rely on zk watcher to trigger the state change so we need to trigger it
// manually... // manually...
peer.refreshPeerState(); rp.refreshPeerState(peer.getId());
assertEquals(PeerState.ENABLED, peer.getPeerState()); assertEquals(PeerState.ENABLED, peer.getPeerState());
assertConnectedPeerStatus(true, ID_ONE); assertConnectedPeerStatus(true, ID_ONE);
// Disconnect peer // Disconnect peer
rp.peerDisconnected(ID_ONE); rp.removePeer(ID_ONE);
assertNumberOfPeers(2); assertNumberOfPeers(2);
try { try {
rp.getStatusOfPeer(ID_ONE); rp.isPeerEnabled(ID_ONE);
fail("There are no connected peers, should have thrown an IllegalArgumentException"); fail("There are no connected peers, should have thrown an IllegalArgumentException");
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
} }
@ -320,16 +294,16 @@ public abstract class TestReplicationStateBasic {
protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception { protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception {
// we can first check if the value was changed in the store, if it wasn't then fail right away // we can first check if the value was changed in the store, if it wasn't then fail right away
if (status != rp.getStatusOfPeerFromBackingStore(peerId)) { if (status != rp.getPeerStorage().isPeerEnabled(peerId)) {
fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK"); fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK");
} }
while (true) { while (true) {
if (status == rp.getStatusOfPeer(peerId)) { if (status == rp.isPeerEnabled(peerId)) {
return; return;
} }
if (zkTimeoutCount < ZK_MAX_COUNT) { if (zkTimeoutCount < ZK_MAX_COUNT) {
LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status + LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status
", sleeping and trying again."); + ", sleeping and trying again.");
Thread.sleep(ZK_SLEEP_INTERVAL); Thread.sleep(ZK_SLEEP_INTERVAL);
} else { } else {
fail("Timed out waiting for ConnectedPeerStatus to be " + status); fail("Timed out waiting for ConnectedPeerStatus to be " + status);
@ -337,10 +311,8 @@ public abstract class TestReplicationStateBasic {
} }
} }
protected void assertNumberOfPeers(int total) { protected void assertNumberOfPeers(int total) throws ReplicationException {
assertEquals(total, rp.getAllPeerConfigs().size()); assertEquals(total, rp.getPeerStorage().listPeerIds().size());
assertEquals(total, rp.getAllPeerIds().size());
assertEquals(total, rp.getAllPeerIds().size());
} }
/* /*
@ -359,8 +331,9 @@ public abstract class TestReplicationStateBasic {
rqs.addWAL(server3, "qId" + i, "filename" + j); rqs.addWAL(server3, "qId" + i, "filename" + j);
} }
// Add peers for the corresponding queues so they are not orphans // Add peers for the corresponding queues so they are not orphans
rp.registerPeer("qId" + i, rp.getPeerStorage().addPeer("qId" + i,
new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus" + i)); ReplicationPeerConfig.newBuilder().setClusterKey("localhost:2818:/bogus" + i).build(),
true);
} }
} }
} }

View File

@ -84,7 +84,7 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
public void setUp() { public void setUp() {
zkTimeoutCount = 0; zkTimeoutCount = 0;
rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf); rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
rp = ReplicationFactory.getReplicationPeers(zkw, conf, new WarnOnlyAbortable()); rp = ReplicationFactory.getReplicationPeers(zkw, conf);
OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf); OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);
} }

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
@ -143,14 +144,14 @@ public class TestZKReplicationPeerStorage {
assertEquals(peerCount, peerIds.size()); assertEquals(peerCount, peerIds.size());
for (String peerId : peerIds) { for (String peerId : peerIds) {
int seed = Integer.parseInt(peerId); int seed = Integer.parseInt(peerId);
assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId).get()); assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId));
} }
for (int i = 0; i < peerCount; i++) { for (int i = 0; i < peerCount; i++) {
STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1)); STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1));
} }
for (String peerId : peerIds) { for (String peerId : peerIds) {
int seed = Integer.parseInt(peerId); int seed = Integer.parseInt(peerId);
assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId).get()); assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId));
} }
for (int i = 0; i < peerCount; i++) { for (int i = 0; i < peerCount; i++) {
assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i))); assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i)));
@ -166,6 +167,11 @@ public class TestZKReplicationPeerStorage {
peerIds = STORAGE.listPeerIds(); peerIds = STORAGE.listPeerIds();
assertEquals(peerCount - 1, peerIds.size()); assertEquals(peerCount - 1, peerIds.size());
assertFalse(peerIds.contains(toRemove)); assertFalse(peerIds.contains(toRemove));
assertFalse(STORAGE.getPeerConfig(toRemove).isPresent());
try {
STORAGE.getPeerConfig(toRemove);
fail("Should throw a ReplicationException when get peer config of a peerId");
} catch (ReplicationException e) {
}
} }
} }

View File

@ -30,8 +30,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
@ -51,20 +50,14 @@ import org.slf4j.LoggerFactory;
public class ReplicationZKNodeCleaner { public class ReplicationZKNodeCleaner {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationZKNodeCleaner.class); private static final Logger LOG = LoggerFactory.getLogger(ReplicationZKNodeCleaner.class);
private final ReplicationQueueStorage queueStorage; private final ReplicationQueueStorage queueStorage;
private final ReplicationPeers replicationPeers; private final ReplicationPeerStorage peerStorage;
private final ReplicationQueueDeletor queueDeletor; private final ReplicationQueueDeletor queueDeletor;
public ReplicationZKNodeCleaner(Configuration conf, ZKWatcher zkw, Abortable abortable) public ReplicationZKNodeCleaner(Configuration conf, ZKWatcher zkw, Abortable abortable)
throws IOException { throws IOException {
try { this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf); this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zkw, conf);
this.replicationPeers = this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, abortable);
ReplicationFactory.getReplicationPeers(zkw, conf, this.queueStorage, abortable);
this.replicationPeers.init();
this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, abortable);
} catch (ReplicationException e) {
throw new IOException("failed to construct ReplicationZKNodeCleaner", e);
}
} }
/** /**
@ -73,8 +66,8 @@ public class ReplicationZKNodeCleaner {
*/ */
public Map<ServerName, List<String>> getUnDeletedQueues() throws IOException { public Map<ServerName, List<String>> getUnDeletedQueues() throws IOException {
Map<ServerName, List<String>> undeletedQueues = new HashMap<>(); Map<ServerName, List<String>> undeletedQueues = new HashMap<>();
Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds());
try { try {
Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds());
List<ServerName> replicators = this.queueStorage.getListOfReplicators(); List<ServerName> replicators = this.queueStorage.getListOfReplicators();
if (replicators == null || replicators.isEmpty()) { if (replicators == null || replicators.isEmpty()) {
return undeletedQueues; return undeletedQueues;
@ -84,8 +77,7 @@ public class ReplicationZKNodeCleaner {
for (String queueId : queueIds) { for (String queueId : queueIds) {
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
if (!peerIds.contains(queueInfo.getPeerId())) { if (!peerIds.contains(queueInfo.getPeerId())) {
undeletedQueues.computeIfAbsent(replicator, (key) -> new ArrayList<>()).add( undeletedQueues.computeIfAbsent(replicator, (key) -> new ArrayList<>()).add(queueId);
queueId);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Undeleted replication queue for removed peer found: " LOG.debug("Undeleted replication queue for removed peer found: "
+ String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]",
@ -106,9 +98,9 @@ public class ReplicationZKNodeCleaner {
*/ */
public Set<String> getUnDeletedHFileRefsQueues() throws IOException { public Set<String> getUnDeletedHFileRefsQueues() throws IOException {
Set<String> undeletedHFileRefsQueue = new HashSet<>(); Set<String> undeletedHFileRefsQueue = new HashSet<>();
Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds());
String hfileRefsZNode = queueDeletor.getHfileRefsZNode(); String hfileRefsZNode = queueDeletor.getHfileRefsZNode();
try { try {
Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds());
List<String> listOfPeers = this.queueStorage.getAllPeersFromHFileRefsQueue(); List<String> listOfPeers = this.queueStorage.getAllPeersFromHFileRefsQueue();
Set<String> peers = new HashSet<>(listOfPeers); Set<String> peers = new HashSet<>(listOfPeers);
peers.removeAll(peerIds); peers.removeAll(peerIds);
@ -116,15 +108,15 @@ public class ReplicationZKNodeCleaner {
undeletedHFileRefsQueue.addAll(peers); undeletedHFileRefsQueue.addAll(peers);
} }
} catch (ReplicationException e) { } catch (ReplicationException e) {
throw new IOException( throw new IOException("Failed to get list of all peers from hfile-refs znode "
"Failed to get list of all peers from hfile-refs znode " + hfileRefsZNode, e); + hfileRefsZNode, e);
} }
return undeletedHFileRefsQueue; return undeletedHFileRefsQueue;
} }
private class ReplicationQueueDeletor extends ReplicationStateZKBase { private class ReplicationQueueDeletor extends ReplicationStateZKBase {
public ReplicationQueueDeletor(ZKWatcher zk, Configuration conf, Abortable abortable) { ReplicationQueueDeletor(ZKWatcher zk, Configuration conf, Abortable abortable) {
super(zk, conf, abortable); super(zk, conf, abortable);
} }
@ -132,19 +124,20 @@ public class ReplicationZKNodeCleaner {
* @param replicator The regionserver which has undeleted queue * @param replicator The regionserver which has undeleted queue
* @param queueId The undeleted queue id * @param queueId The undeleted queue id
*/ */
public void removeQueue(final ServerName replicator, final String queueId) throws IOException { void removeQueue(final ServerName replicator, final String queueId) throws IOException {
String queueZnodePath = ZNodePaths String queueZnodePath =
.joinZNode(ZNodePaths.joinZNode(this.queuesZNode, replicator.getServerName()), queueId); ZNodePaths.joinZNode(ZNodePaths.joinZNode(this.queuesZNode, replicator.getServerName()),
queueId);
try { try {
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
if (!replicationPeers.getAllPeerIds().contains(queueInfo.getPeerId())) { if (!peerStorage.listPeerIds().contains(queueInfo.getPeerId())) {
ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath); ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath);
LOG.info("Successfully removed replication queue, replicator: " + replicator + LOG.info("Successfully removed replication queue, replicator: " + replicator
", queueId: " + queueId); + ", queueId: " + queueId);
} }
} catch (KeeperException e) { } catch (ReplicationException | KeeperException e) {
throw new IOException( throw new IOException("Failed to delete queue, replicator: " + replicator + ", queueId: "
"Failed to delete queue, replicator: " + replicator + ", queueId: " + queueId); + queueId);
} }
} }
@ -152,17 +145,17 @@ public class ReplicationZKNodeCleaner {
* @param hfileRefsQueueId The undeleted hfile-refs queue id * @param hfileRefsQueueId The undeleted hfile-refs queue id
* @throws IOException * @throws IOException
*/ */
public void removeHFileRefsQueue(final String hfileRefsQueueId) throws IOException { void removeHFileRefsQueue(final String hfileRefsQueueId) throws IOException {
String node = ZNodePaths.joinZNode(this.hfileRefsZNode, hfileRefsQueueId); String node = ZNodePaths.joinZNode(this.hfileRefsZNode, hfileRefsQueueId);
try { try {
if (!replicationPeers.getAllPeerIds().contains(hfileRefsQueueId)) { if (!peerStorage.listPeerIds().contains(hfileRefsQueueId)) {
ZKUtil.deleteNodeRecursively(this.zookeeper, node); ZKUtil.deleteNodeRecursively(this.zookeeper, node);
LOG.info("Successfully removed hfile-refs queue " + hfileRefsQueueId + " from path " LOG.info("Successfully removed hfile-refs queue " + hfileRefsQueueId + " from path "
+ hfileRefsZNode); + hfileRefsZNode);
} }
} catch (KeeperException e) { } catch (ReplicationException | KeeperException e) {
throw new IOException("Failed to delete hfile-refs queue " + hfileRefsQueueId throw new IOException("Failed to delete hfile-refs queue " + hfileRefsQueueId
+ " from path " + hfileRefsZNode); + " from path " + hfileRefsZNode, e);
} }
} }

View File

@ -314,12 +314,12 @@ public class ReplicationPeerManager {
public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf) public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf)
throws ReplicationException { throws ReplicationException {
ReplicationPeerStorage peerStorage = ReplicationPeerStorage peerStorage =
ReplicationStorageFactory.getReplicationPeerStorage(zk, conf); ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>(); ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>();
for (String peerId : peerStorage.listPeerIds()) { for (String peerId : peerStorage.listPeerIds()) {
Optional<ReplicationPeerConfig> peerConfig = peerStorage.getPeerConfig(peerId); ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
boolean enabled = peerStorage.isPeerEnabled(peerId); boolean enabled = peerStorage.isPeerEnabled(peerId);
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig.get())); peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig));
} }
return new ReplicationPeerManager(peerStorage, return new ReplicationPeerManager(peerStorage,
ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers); ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers);

View File

@ -310,7 +310,7 @@ public class DumpReplicationQueues extends Configured implements Tool {
queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf()); queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
replicationPeers = replicationPeers =
ReplicationFactory.getReplicationPeers(zkw, getConf(), queueStorage, connection); ReplicationFactory.getReplicationPeers(zkw, getConf());
replicationTracker = ReplicationFactory.getReplicationTracker(zkw, replicationPeers, getConf(), replicationTracker = ReplicationFactory.getReplicationTracker(zkw, replicationPeers, getConf(),
new WarnOnlyAbortable(), new WarnOnlyStoppable()); new WarnOnlyAbortable(), new WarnOnlyStoppable());
Set<String> liveRegionServers = new HashSet<>(replicationTracker.getListOfRegionServers()); Set<String> liveRegionServers = new HashSet<>(replicationTracker.getListOfRegionServers());

View File

@ -19,9 +19,10 @@
package org.apache.hadoop.hbase.replication.regionserver; package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -30,7 +31,8 @@ import org.slf4j.LoggerFactory;
public class PeerProcedureHandlerImpl implements PeerProcedureHandler { public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
private static final Logger LOG = LoggerFactory.getLogger(PeerProcedureHandlerImpl.class); private static final Logger LOG = LoggerFactory.getLogger(PeerProcedureHandlerImpl.class);
private ReplicationSourceManager replicationSourceManager; private final ReplicationSourceManager replicationSourceManager;
private final ReentrantLock peersLock = new ReentrantLock();
public PeerProcedureHandlerImpl(ReplicationSourceManager replicationSourceManager) { public PeerProcedureHandlerImpl(ReplicationSourceManager replicationSourceManager) {
this.replicationSourceManager = replicationSourceManager; this.replicationSourceManager = replicationSourceManager;
@ -38,45 +40,40 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
@Override @Override
public void addPeer(String peerId) throws ReplicationException, IOException { public void addPeer(String peerId) throws ReplicationException, IOException {
replicationSourceManager.addPeer(peerId); peersLock.lock();
try {
replicationSourceManager.addPeer(peerId);
} finally {
peersLock.unlock();
}
} }
@Override @Override
public void removePeer(String peerId) throws ReplicationException, IOException { public void removePeer(String peerId) throws ReplicationException, IOException {
replicationSourceManager.removePeer(peerId); peersLock.lock();
try {
if (replicationSourceManager.getReplicationPeers().getPeer(peerId) != null) {
replicationSourceManager.removePeer(peerId);
}
} finally {
peersLock.unlock();
}
} }
@Override @Override
public void disablePeer(String peerId) throws ReplicationException, IOException { public void disablePeer(String peerId) throws ReplicationException, IOException {
ReplicationPeerImpl peer = PeerState newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId); LOG.info("disable replication peer, id: " + peerId + ", new state: " + newState);
if (peer != null) {
peer.refreshPeerState();
LOG.info("disable replication peer, id: " + peerId + ", new state: " + peer.getPeerState());
} else {
throw new ReplicationException("No connected peer found, peerId=" + peerId);
}
} }
@Override @Override
public void enablePeer(String peerId) throws ReplicationException, IOException { public void enablePeer(String peerId) throws ReplicationException, IOException {
ReplicationPeerImpl peer = PeerState newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId); LOG.info("enable replication peer, id: " + peerId + ", new state: " + newState);
if (peer != null) {
peer.refreshPeerState();
LOG.info("enable replication peer, id: " + peerId + ", new state: " + peer.getPeerState());
} else {
throw new ReplicationException("No connected peer found, peerId=" + peerId);
}
} }
@Override @Override
public void updatePeerConfig(String peerId) throws ReplicationException, IOException { public void updatePeerConfig(String peerId) throws ReplicationException, IOException {
ReplicationPeerImpl peer = replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId);
replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
if (peer == null) {
throw new ReplicationException("No connected peer found, peerId=" + peerId);
}
peer.refreshPeerConfig();
} }
} }

View File

@ -107,7 +107,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
this.queueStorage = this.queueStorage =
ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
this.replicationPeers = this.replicationPeers =
ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server); ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf);
this.replicationPeers.init(); this.replicationPeers.init();
this.replicationTracker = this.replicationTracker =
ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers, ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers,

View File

@ -222,8 +222,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
// A peerId will not have "-" in its name, see HBASE-11394 // A peerId will not have "-" in its name, see HBASE-11394
peerId = peerClusterZnode.split("-")[0]; peerId = peerClusterZnode.split("-")[0];
} }
Map<TableName, List<String>> tableCFMap = Map<TableName, List<String>> tableCFMap = replicationPeers.getPeer(peerId).getTableCFs();
replicationPeers.getConnectedPeer(peerId).getTableCFs();
if (tableCFMap != null) { if (tableCFMap != null) {
List<String> tableCfs = tableCFMap.get(tableName); List<String> tableCfs = tableCFMap.get(tableName);
if (tableCFMap.containsKey(tableName) if (tableCFMap.containsKey(tableName)
@ -371,7 +370,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
} }
private long getCurrentBandwidth() { private long getCurrentBandwidth() {
ReplicationPeer replicationPeer = this.replicationPeers.getConnectedPeer(peerId); ReplicationPeer replicationPeer = this.replicationPeers.getPeer(peerId);
long peerBandwidth = replicationPeer != null ? replicationPeer.getPeerBandwidth() : 0; long peerBandwidth = replicationPeer != null ? replicationPeer.getPeerBandwidth() : 0;
// user can set peer bandwidth to 0 to use default bandwidth // user can set peer bandwidth to 0 to use default bandwidth
return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth; return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
@ -416,7 +415,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
*/ */
@Override @Override
public boolean isPeerEnabled() { public boolean isPeerEnabled() {
return this.replicationPeers.getStatusOfPeer(this.peerId); return this.replicationPeers.isPeerEnabled(this.peerId);
} }
@Override @Override

View File

@ -160,7 +160,6 @@ public class ReplicationSourceManager implements ReplicationListener {
this.clusterId = clusterId; this.clusterId = clusterId;
this.walFileLengthProvider = walFileLengthProvider; this.walFileLengthProvider = walFileLengthProvider;
this.replicationTracker.registerListener(this); this.replicationTracker.registerListener(this);
this.replicationPeers.getAllPeerIds();
// It's preferable to failover 1 RS at a time, but with good zk servers // It's preferable to failover 1 RS at a time, but with good zk servers
// more could be processed at the same time. // more could be processed at the same time.
int nbWorkers = conf.getInt("replication.executor.workers", 1); int nbWorkers = conf.getInt("replication.executor.workers", 1);
@ -260,8 +259,8 @@ public class ReplicationSourceManager implements ReplicationListener {
} }
List<ServerName> otherRegionServers = replicationTracker.getListOfRegionServers().stream() List<ServerName> otherRegionServers = replicationTracker.getListOfRegionServers().stream()
.map(ServerName::valueOf).collect(Collectors.toList()); .map(ServerName::valueOf).collect(Collectors.toList());
LOG.info( LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
"Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers); + otherRegionServers);
// Look if there's anything to process after a restart // Look if there's anything to process after a restart
for (ServerName rs : currentReplicators) { for (ServerName rs : currentReplicators) {
@ -278,7 +277,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* The returned future is for adoptAbandonedQueues task. * The returned future is for adoptAbandonedQueues task.
*/ */
Future<?> init() throws IOException, ReplicationException { Future<?> init() throws IOException, ReplicationException {
for (String id : this.replicationPeers.getConnectedPeerIds()) { for (String id : this.replicationPeers.getAllPeerIds()) {
addSource(id); addSource(id);
if (replicationForBulkLoadDataEnabled) { if (replicationForBulkLoadDataEnabled) {
// Check if peer exists in hfile-refs queue, if not add it. This can happen in the case // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case
@ -297,8 +296,8 @@ public class ReplicationSourceManager implements ReplicationListener {
*/ */
@VisibleForTesting @VisibleForTesting
ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException { ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException {
ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id); ReplicationPeerConfig peerConfig = replicationPeers.getPeerConfig(id);
ReplicationPeer peer = replicationPeers.getConnectedPeer(id); ReplicationPeer peer = replicationPeers.getPeer(id);
ReplicationSourceInterface src = getReplicationSource(id, peerConfig, peer); ReplicationSourceInterface src = getReplicationSource(id, peerConfig, peer);
synchronized (this.walsById) { synchronized (this.walsById) {
this.sources.add(src); this.sources.add(src);
@ -344,7 +343,7 @@ public class ReplicationSourceManager implements ReplicationListener {
public void deleteSource(String peerId, boolean closeConnection) { public void deleteSource(String peerId, boolean closeConnection) {
abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), peerId)); abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), peerId));
if (closeConnection) { if (closeConnection) {
this.replicationPeers.peerDisconnected(peerId); this.replicationPeers.removePeer(peerId);
} }
} }
@ -437,12 +436,12 @@ public class ReplicationSourceManager implements ReplicationListener {
// update replication queues on ZK // update replication queues on ZK
// synchronize on replicationPeers to avoid adding source for the to-be-removed peer // synchronize on replicationPeers to avoid adding source for the to-be-removed peer
synchronized (replicationPeers) { synchronized (replicationPeers) {
for (String id : replicationPeers.getConnectedPeerIds()) { for (String id : replicationPeers.getAllPeerIds()) {
try { try {
this.queueStorage.addWAL(server.getServerName(), id, logName); this.queueStorage.addWAL(server.getServerName(), id, logName);
} catch (ReplicationException e) { } catch (ReplicationException e) {
throw new IOException("Cannot add log to replication queue" + throw new IOException("Cannot add log to replication queue"
" when creating a new source, queueId=" + id + ", filename=" + logName, e); + " when creating a new source, queueId=" + id + ", filename=" + logName, e);
} }
} }
} }
@ -587,7 +586,7 @@ public class ReplicationSourceManager implements ReplicationListener {
public void addPeer(String id) throws ReplicationException, IOException { public void addPeer(String id) throws ReplicationException, IOException {
LOG.info("Trying to add peer, peerId: " + id); LOG.info("Trying to add peer, peerId: " + id);
boolean added = this.replicationPeers.peerConnected(id); boolean added = this.replicationPeers.addPeer(id);
if (added) { if (added) {
LOG.info("Peer " + id + " connected success, trying to start the replication source thread."); LOG.info("Peer " + id + " connected success, trying to start the replication source thread.");
addSource(id); addSource(id);
@ -723,16 +722,21 @@ public class ReplicationSourceManager implements ReplicationListener {
// there is not an actual peer defined corresponding to peerId for the failover. // there is not an actual peer defined corresponding to peerId for the failover.
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
String actualPeerId = replicationQueueInfo.getPeerId(); String actualPeerId = replicationQueueInfo.getPeerId();
ReplicationPeer peer = replicationPeers.getConnectedPeer(actualPeerId);
ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
if (peer == null) {
LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS
+ ", peer is null");
abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId));
continue;
}
ReplicationPeerConfig peerConfig = null; ReplicationPeerConfig peerConfig = null;
try { try {
peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId); peerConfig = replicationPeers.getPeerConfig(actualPeerId);
} catch (ReplicationException ex) { } catch (Exception e) {
LOG.warn("Received exception while getting replication peer config, skipping replay" LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS
+ ex); + ", failed to read peer config", e);
}
if (peer == null || peerConfig == null) {
LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS);
abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId)); abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId));
continue; continue;
} }
@ -761,7 +765,7 @@ public class ReplicationSourceManager implements ReplicationListener {
// synchronized on oldsources to avoid adding recovered source for the to-be-removed peer // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
// see removePeer // see removePeer
synchronized (oldsources) { synchronized (oldsources) {
if (!replicationPeers.getConnectedPeerIds().contains(src.getPeerId())) { if (!replicationPeers.getAllPeerIds().contains(src.getPeerId())) {
src.terminate("Recovered queue doesn't belong to any current peer"); src.terminate("Recovered queue doesn't belong to any current peer");
closeRecoveredQueue(src); closeRecoveredQueue(src);
continue; continue;

View File

@ -94,7 +94,7 @@ public class TestReplicationHFileCleaner {
server = new DummyServer(); server = new DummyServer();
conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
HMaster.decorateMasterConfiguration(conf); HMaster.decorateMasterConfiguration(conf);
rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf, server); rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf);
rp.init(); rp.init();
rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
fs = FileSystem.get(conf); fs = FileSystem.get(conf);
@ -108,7 +108,8 @@ public class TestReplicationHFileCleaner {
@Before @Before
public void setup() throws ReplicationException, IOException { public void setup() throws ReplicationException, IOException {
root = TEST_UTIL.getDataTestDirOnTestFS(); root = TEST_UTIL.getDataTestDirOnTestFS();
rp.registerPeer(peerId, new ReplicationPeerConfig().setClusterKey(TEST_UTIL.getClusterKey())); rp.getPeerStorage().addPeer(peerId,
ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL.getClusterKey()).build(), true);
rq.addPeerToHFileRefs(peerId); rq.addPeerToHFileRefs(peerId);
} }
@ -119,7 +120,7 @@ public class TestReplicationHFileCleaner {
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to delete files recursively from path " + root); LOG.warn("Failed to delete files recursively from path " + root);
} }
rp.unregisterPeer(peerId); rp.getPeerStorage().removePeer(peerId);
} }
@Test @Test

View File

@ -19,9 +19,7 @@ package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths; import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -91,7 +92,7 @@ public class TestReplicationTrackerZKImpl {
String fakeRs1 = ZNodePaths.joinZNode(zkw.znodePaths.rsZNode, "hostname1.example.org:1234"); String fakeRs1 = ZNodePaths.joinZNode(zkw.znodePaths.rsZNode, "hostname1.example.org:1234");
try { try {
ZKClusterId.setClusterId(zkw, new ClusterId()); ZKClusterId.setClusterId(zkw, new ClusterId());
rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw); rp = ReplicationFactory.getReplicationPeers(zkw, conf);
rp.init(); rp.init();
rt = ReplicationFactory.getReplicationTracker(zkw, rp, conf, zkw, new DummyServer(fakeRs1)); rt = ReplicationFactory.getReplicationTracker(zkw, rp, conf, zkw, new DummyServer(fakeRs1));
} catch (Exception e) { } catch (Exception e) {
@ -151,25 +152,22 @@ public class TestReplicationTrackerZKImpl {
@Test @Test
public void testPeerNameControl() throws Exception { public void testPeerNameControl() throws Exception {
int exists = 0; int exists = 0;
int hyphen = 0; rp.getPeerStorage().addPeer("6",
rp.registerPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey())); ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true);
try{ try {
rp.registerPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey())); rp.getPeerStorage().addPeer("6",
}catch(IllegalArgumentException e){ ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true);
exists++; } catch (ReplicationException e) {
if (e.getCause() instanceof KeeperException.NodeExistsException) {
exists++;
}
} }
try{
rp.registerPeer("6-ec2", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
}catch(IllegalArgumentException e){
hyphen++;
}
assertEquals(1, exists); assertEquals(1, exists);
assertEquals(1, hyphen);
// clean up // clean up
rp.unregisterPeer("6"); rp.getPeerStorage().removePeer("6");
} }
private class DummyReplicationListener implements ReplicationListener { private class DummyReplicationListener implements ReplicationListener {

View File

@ -389,7 +389,7 @@ public abstract class TestReplicationSourceManager {
} }
Server s1 = new DummyServer("dummyserver1.example.org"); Server s1 = new DummyServer("dummyserver1.example.org");
ReplicationPeers rp1 = ReplicationPeers rp1 =
ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1); ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration());
rp1.init(); rp1.init();
NodeFailoverWorker w1 = NodeFailoverWorker w1 =
manager.new NodeFailoverWorker(server.getServerName()); manager.new NodeFailoverWorker(server.getServerName());
@ -585,7 +585,7 @@ public abstract class TestReplicationSourceManager {
private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig, private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig,
final boolean waitForSource) throws Exception { final boolean waitForSource) throws Exception {
final ReplicationPeers rp = manager.getReplicationPeers(); final ReplicationPeers rp = manager.getReplicationPeers();
rp.registerPeer(peerId, peerConfig); rp.getPeerStorage().addPeer(peerId, peerConfig, true);
try { try {
manager.addPeer(peerId); manager.addPeer(peerId);
} catch (Exception e) { } catch (Exception e) {
@ -612,7 +612,7 @@ public abstract class TestReplicationSourceManager {
} }
return true; return true;
} else { } else {
return (rp.getConnectedPeer(peerId) != null); return (rp.getPeer(peerId) != null);
} }
}); });
} }
@ -624,8 +624,8 @@ public abstract class TestReplicationSourceManager {
*/ */
private void removePeerAndWait(final String peerId) throws Exception { private void removePeerAndWait(final String peerId) throws Exception {
final ReplicationPeers rp = manager.getReplicationPeers(); final ReplicationPeers rp = manager.getReplicationPeers();
if (rp.getAllPeerIds().contains(peerId)) { if (rp.getPeerStorage().listPeerIds().contains(peerId)) {
rp.unregisterPeer(peerId); rp.getPeerStorage().removePeer(peerId);
try { try {
manager.removePeer(peerId); manager.removePeer(peerId);
} catch (Exception e) { } catch (Exception e) {
@ -635,10 +635,9 @@ public abstract class TestReplicationSourceManager {
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override @Override
public boolean evaluate() throws Exception { public boolean evaluate() throws Exception {
List<String> peers = rp.getAllPeerIds(); Collection<String> peers = rp.getPeerStorage().listPeerIds();
return (!manager.getAllQueues().contains(peerId)) && return (!manager.getAllQueues().contains(peerId)) && (rp.getPeer(peerId) == null)
(rp.getConnectedPeer(peerId) == null) && (!peers.contains(peerId)) && && (!peers.contains(peerId)) && manager.getSource(peerId) == null;
manager.getSource(peerId) == null;
} }
}); });
} }

View File

@ -182,8 +182,7 @@ public class HBaseZKTestingUtility extends HBaseCommonTestingUtility {
/** /**
* Gets a ZKWatcher. * Gets a ZKWatcher.
*/ */
public static ZKWatcher getZooKeeperWatcher(HBaseZKTestingUtility testUtil) public static ZKWatcher getZooKeeperWatcher(HBaseZKTestingUtility testUtil) throws IOException {
throws ZooKeeperConnectionException, IOException {
ZKWatcher zkw = new ZKWatcher(testUtil.getConfiguration(), "unittest", new Abortable() { ZKWatcher zkw = new ZKWatcher(testUtil.getConfiguration(), "unittest", new Abortable() {
boolean aborted = false; boolean aborted = false;