HBASE-19573 Rewrite ReplicationPeer with the new replication storage interface

This commit is contained in:
Guanghao Zhang 2017-12-26 11:39:34 +08:00 committed by zhangduo
parent 59cad95b58
commit eae251d203
7 changed files with 71 additions and 151 deletions

View File

@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.util.Bytes;
@ -333,7 +332,6 @@ public class VerifyReplication extends Configured implements Tool {
private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig(
final Configuration conf, String peerId) throws IOException {
ZKWatcher localZKW = null;
ReplicationPeerZKImpl peer = null;
try {
localZKW = new ZKWatcher(conf, "VerifyReplication",
new Abortable() {
@ -354,9 +352,6 @@ public class VerifyReplication extends Configured implements Tool {
throw new IOException(
"An error occurred while trying to connect to the remove peer cluster", e);
} finally {
if (peer != null) {
peer.close();
}
if (localZKW != null) {
localZKW.close();
}

View File

@ -26,7 +26,6 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
/**
* ReplicationPeer manages enabled / disabled state for the peer.
*/
@ -48,20 +47,6 @@ public interface ReplicationPeer {
*/
String getId();
/**
* Get the peer config object
* @return the ReplicationPeerConfig for this peer
*/
public ReplicationPeerConfig getPeerConfig();
/**
* Get the peer config object. if loadFromBackingStore is true, it will load from backing store
* directly and update its load peer config. otherwise, just return the local cached peer config.
* @return the ReplicationPeerConfig for this peer
*/
public ReplicationPeerConfig getPeerConfig(boolean loadFromBackingStore)
throws ReplicationException;
/**
* Returns the state of the peer by reading local cache.
* @return the enabled state
@ -69,45 +54,46 @@ public interface ReplicationPeer {
PeerState getPeerState();
/**
* Returns the state of peer, if loadFromBackingStore is true, it will load from backing store
* directly and update its local peer state. otherwise, just return the local cached peer state.
* @return the enabled state
* Get the peer config object
* @return the ReplicationPeerConfig for this peer
*/
PeerState getPeerState(boolean loadFromBackingStore) throws ReplicationException;
ReplicationPeerConfig getPeerConfig();
/**
* Get the configuration object required to communicate with this peer
* @return configuration object
*/
public Configuration getConfiguration();
Configuration getConfiguration();
/**
* Get replicable (table, cf-list) map of this peer
* @return the replicable (table, cf-list) map
*/
public Map<TableName, List<String>> getTableCFs();
Map<TableName, List<String>> getTableCFs();
/**
* Get replicable namespace set of this peer
* @return the replicable namespaces set
*/
public Set<String> getNamespaces();
Set<String> getNamespaces();
/**
* Get the per node bandwidth upper limit for this peer
* @return the bandwidth upper limit
*/
public long getPeerBandwidth();
long getPeerBandwidth();
/**
* Register a peer config listener to catch the peer config change event.
* @param listener listener to catch the peer config change event.
*/
public void registerPeerConfigListener(ReplicationPeerConfigListener listener);
void registerPeerConfigListener(ReplicationPeerConfigListener listener);
/**
* Notify all the registered ReplicationPeerConfigListener to update their peer config.
* @param newPeerConfig the new peer config.
* @deprecated Use {@link #registerPeerConfigListener(ReplicationPeerConfigListener)} instead.
*/
public void triggerPeerConfigChange(ReplicationPeerConfig newPeerConfig);
}
@Deprecated
default void trackPeerConfigChanges(ReplicationPeerConfigListener listener) {
registerPeerConfigListener(listener);
}
}

View File

@ -18,23 +18,16 @@
*/
package org.apache.hadoop.hbase.replication;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -42,15 +35,18 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
@InterfaceAudience.Private
public class ReplicationPeerZKImpl extends ReplicationStateZKBase
implements ReplicationPeer, Abortable, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerZKImpl.class);
public class ReplicationPeerImpl implements ReplicationPeer {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerImpl.class);
private final ReplicationPeerStorage peerStorage;
private final Configuration conf;
private final String id;
private volatile ReplicationPeerConfig peerConfig;
private final String id;
private volatile PeerState peerState;
private volatile Map<TableName, List<String>> tableCFs = new HashMap<>();
private final Configuration conf;
private final List<ReplicationPeerConfigListener> peerConfigListeners;
@ -61,51 +57,22 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
* @param id string representation of this peer's identifier
* @param peerConfig configuration for the replication peer
*/
public ReplicationPeerZKImpl(ZKWatcher zkWatcher, Configuration conf, String id,
ReplicationPeerConfig peerConfig, Abortable abortable) throws ReplicationException {
super(zkWatcher, conf, abortable);
public ReplicationPeerImpl(ZKWatcher zkWatcher, Configuration conf, String id,
ReplicationPeerConfig peerConfig) {
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zkWatcher, conf);
this.conf = conf;
this.peerConfig = peerConfig;
this.id = id;
this.peerConfigListeners = new ArrayList<>();
}
private PeerState readPeerState() throws ReplicationException {
try {
byte[] data = ZKUtil.getData(zookeeper, this.getPeerStateNode(id));
this.peerState = isStateEnabled(data) ? PeerState.ENABLED : PeerState.DISABLED;
} catch (DeserializationException | KeeperException | InterruptedException e) {
throw new ReplicationException("Get and deserialize peer state data from zookeeper failed: ",
e);
}
return this.peerState;
public void refreshPeerState() throws ReplicationException {
this.peerState = peerStorage.isPeerEnabled(id) ? PeerState.ENABLED : PeerState.DISABLED;
}
private ReplicationPeerConfig readPeerConfig() throws ReplicationException {
try {
byte[] data = ZKUtil.getData(zookeeper, this.getPeerNode(id));
if (data != null) {
this.peerConfig = ReplicationPeerConfigUtil.parsePeerFrom(data);
}
} catch (DeserializationException | KeeperException | InterruptedException e) {
throw new ReplicationException("Get and deserialize peer config date from zookeeper failed: ",
e);
}
return this.peerConfig;
}
@Override
public PeerState getPeerState() {
return peerState;
}
@Override
public PeerState getPeerState(boolean loadFromBackingStore) throws ReplicationException {
if (loadFromBackingStore) {
return readPeerState();
} else {
return peerState;
}
public void refreshPeerConfig() throws ReplicationException {
this.peerConfig = peerStorage.getPeerConfig(id).orElse(peerConfig);
peerConfigListeners.forEach(listener -> listener.peerConfigUpdated(peerConfig));
}
/**
@ -117,6 +84,11 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
return id;
}
@Override
public PeerState getPeerState() {
return peerState;
}
/**
* Get the peer config object
* @return the ReplicationPeerConfig for this peer
@ -126,16 +98,6 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
return peerConfig;
}
@Override
public ReplicationPeerConfig getPeerConfig(boolean loadFromBackingStore)
throws ReplicationException {
if (loadFromBackingStore) {
return readPeerConfig();
} else {
return peerConfig;
}
}
/**
* Get the configuration object required to communicate with this peer
* @return configuration object
@ -151,8 +113,7 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
*/
@Override
public Map<TableName, List<String>> getTableCFs() {
this.tableCFs = peerConfig.getTableCFsMap();
return this.tableCFs;
return this.peerConfig.getTableCFsMap();
}
/**
@ -174,31 +135,6 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
this.peerConfigListeners.add(listener);
}
@Override
public void triggerPeerConfigChange(ReplicationPeerConfig newPeerConfig) {
for (ReplicationPeerConfigListener listener : this.peerConfigListeners) {
listener.peerConfigUpdated(newPeerConfig);
}
}
@Override
public void abort(String why, Throwable e) {
LOG.error(HBaseMarkers.FATAL, "The ReplicationPeer corresponding to peer " +
peerConfig + " was aborted for the following reason(s):" + why, e);
}
@Override
public boolean isAborted() {
// Currently the replication peer is never "Aborted", we just log when the
// abort method is called.
return false;
}
@Override
public void close() throws IOException {
// TODO: stop zkw?
}
/**
* Parse the raw data from ZK to get a peer's state
* @param bytes raw ZK data
@ -230,4 +166,4 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
throw new DeserializationException(e);
}
}
}
}

View File

@ -116,13 +116,13 @@ public interface ReplicationPeers {
throws ReplicationException;
/**
* Returns the ReplicationPeer for the specified connected peer. This ReplicationPeer will
* Returns the ReplicationPeerImpl for the specified connected peer. This ReplicationPeer will
* continue to track changes to the Peer's state and config. This method returns null if no
* peer has been connected with the given peerId.
* @param peerId id for the peer
* @return ReplicationPeer object
*/
ReplicationPeer getConnectedPeer(String peerId);
ReplicationPeerImpl getConnectedPeer(String peerId);
/**
* Returns the set of peerIds of the clusters that have been connected and have an underlying

View File

@ -80,7 +80,7 @@ import org.slf4j.LoggerFactory;
public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers {
// Map of peer clusters keyed by their id
private Map<String, ReplicationPeerZKImpl> peerClusters;
private ConcurrentMap<String, ReplicationPeerImpl> peerClusters;
private final ReplicationQueueStorage queueStorage;
private Abortable abortable;
@ -232,7 +232,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
String peerStateZNode = getPeerStateNode(id);
try {
return ReplicationPeerZKImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
return ReplicationPeerImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
} catch (KeeperException e) {
throw new ReplicationException(e);
} catch (DeserializationException e) {
@ -270,7 +270,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
@Override
public ReplicationPeer getConnectedPeer(String peerId) {
public ReplicationPeerImpl getConnectedPeer(String peerId) {
return peerClusters.get(peerId);
}
@ -423,7 +423,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
public void peerDisconnected(String peerId) {
ReplicationPeer rp = this.peerClusters.get(peerId);
if (rp != null) {
((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).remove(peerId, rp);
peerClusters.remove(peerId, rp);
}
}
@ -440,7 +440,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
return false;
}
ReplicationPeerZKImpl peer = null;
ReplicationPeerImpl peer = null;
try {
peer = createPeer(peerId);
} catch (Exception e) {
@ -449,8 +449,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
if (peer == null) {
return false;
}
ReplicationPeerZKImpl previous =
((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).putIfAbsent(peerId, peer);
ReplicationPeerImpl previous = peerClusters.putIfAbsent(peerId, peer);
if (previous == null) {
LOG.info("Added peer cluster=" + peer.getPeerConfig().getClusterKey());
} else {
@ -493,19 +492,19 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
* @return object representing the peer
* @throws ReplicationException
*/
private ReplicationPeerZKImpl createPeer(String peerId) throws ReplicationException {
private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException {
Pair<ReplicationPeerConfig, Configuration> pair = getPeerConf(peerId);
if (pair == null) {
return null;
}
Configuration peerConf = pair.getSecond();
ReplicationPeerZKImpl peer =
new ReplicationPeerZKImpl(zookeeper, peerConf, peerId, pair.getFirst(), abortable);
ReplicationPeerImpl peer =
new ReplicationPeerImpl(zookeeper, peerConf, peerId, pair.getFirst());
// Load peer state and peer config by reading zookeeper directly.
peer.getPeerState(true);
peer.getPeerConfig(true);
peer.refreshPeerState();
peer.refreshPeerConfig();
return peer;
}

View File

@ -312,12 +312,15 @@ public abstract class TestReplicationStateBasic {
rp.disablePeer(ID_ONE);
// now we do not rely on zk watcher to trigger the state change so we need to trigger it
// manually...
assertEquals(PeerState.DISABLED, rp.getConnectedPeer(ID_ONE).getPeerState(true));
ReplicationPeerImpl peer = rp.getConnectedPeer(ID_ONE);
peer.refreshPeerState();
assertEquals(PeerState.DISABLED, peer.getPeerState());
assertConnectedPeerStatus(false, ID_ONE);
rp.enablePeer(ID_ONE);
// now we do not rely on zk watcher to trigger the state change so we need to trigger it
// manually...
assertEquals(PeerState.ENABLED, rp.getConnectedPeer(ID_ONE).getPeerState(true));
peer.refreshPeerState();
assertEquals(PeerState.ENABLED, peer.getPeerState());
assertConnectedPeerStatus(true, ID_ONE);
// Disconnect peer

View File

@ -21,15 +21,14 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.log4j.Logger;
import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
private static final Logger LOG = Logger.getLogger(PeerProcedureHandlerImpl.class);
private static final Logger LOG = LoggerFactory.getLogger(PeerProcedureHandlerImpl.class);
private ReplicationSourceManager replicationSourceManager;
@ -49,10 +48,11 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
@Override
public void disablePeer(String peerId) throws ReplicationException, IOException {
ReplicationPeer peer = replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
ReplicationPeerImpl peer =
replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
if (peer != null) {
PeerState peerState = peer.getPeerState(true);
LOG.info("disablePeer state, peer id: " + peerId + ", state: " + peerState);
peer.refreshPeerState();
LOG.info("disable replication peer, id: " + peerId + ", new state: " + peer.getPeerState());
} else {
throw new ReplicationException("No connected peer found, peerId=" + peerId);
}
@ -60,10 +60,11 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
@Override
public void enablePeer(String peerId) throws ReplicationException, IOException {
ReplicationPeer peer = replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
ReplicationPeerImpl peer =
replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
if (peer != null) {
PeerState peerState = peer.getPeerState(true);
LOG.info("enablePeer state, peer id: " + peerId + ", state: " + peerState);
peer.refreshPeerState();
LOG.info("enable replication peer, id: " + peerId + ", new state: " + peer.getPeerState());
} else {
throw new ReplicationException("No connected peer found, peerId=" + peerId);
}
@ -71,11 +72,11 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
@Override
public void updatePeerConfig(String peerId) throws ReplicationException, IOException {
ReplicationPeer peer = replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
ReplicationPeerImpl peer =
replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
if (peer == null) {
throw new ReplicationException("No connected peer found, peerId=" + peerId);
}
ReplicationPeerConfig rpc = peer.getPeerConfig(true);
peer.triggerPeerConfigChange(rpc);
peer.refreshPeerConfig();
}
}