HBASE-19525 RS side changes for moving peer modification from zk watcher to procedure

This commit is contained in:
huzheng 2017-12-20 10:47:18 +08:00 committed by zhangduo
parent da07870995
commit 3fd417600e
20 changed files with 505 additions and 455 deletions

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hbase.protobuf; package org.apache.hadoop.hbase.protobuf;
import static org.apache.hadoop.hbase.protobuf.ProtobufMagic.PB_MAGIC;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.protobuf.CodedInputStream; import com.google.protobuf.CodedInputStream;
import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.InvalidProtocolBufferException;
@ -191,7 +193,7 @@ public final class ProtobufUtil {
* byte array that is <code>bytes.length</code> plus {@link ProtobufMagic#PB_MAGIC}.length. * byte array that is <code>bytes.length</code> plus {@link ProtobufMagic#PB_MAGIC}.length.
*/ */
public static byte [] prependPBMagic(final byte [] bytes) { public static byte [] prependPBMagic(final byte [] bytes) {
return Bytes.add(ProtobufMagic.PB_MAGIC, bytes); return Bytes.add(PB_MAGIC, bytes);
} }
/** /**
@ -216,10 +218,11 @@ public final class ProtobufUtil {
* @param bytes bytes to check * @param bytes bytes to check
* @throws DeserializationException if we are missing the pb magic prefix * @throws DeserializationException if we are missing the pb magic prefix
*/ */
public static void expectPBMagicPrefix(final byte [] bytes) throws DeserializationException { public static void expectPBMagicPrefix(final byte[] bytes) throws DeserializationException {
if (!isPBMagicPrefix(bytes)) { if (!isPBMagicPrefix(bytes)) {
throw new DeserializationException("Missing pb magic " + String bytesPrefix = bytes == null ? "null" : Bytes.toStringBinary(bytes, 0, PB_MAGIC.length);
Bytes.toString(ProtobufMagic.PB_MAGIC) + " prefix"); throw new DeserializationException(
"Missing pb magic " + Bytes.toString(PB_MAGIC) + " prefix, bytes: " + bytesPrefix);
} }
} }

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hbase.shaded.protobuf; package org.apache.hadoop.hbase.shaded.protobuf;
import static org.apache.hadoop.hbase.protobuf.ProtobufMagic.PB_MAGIC;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -277,7 +279,7 @@ public final class ProtobufUtil {
* byte array that is <code>bytes.length</code> plus {@link ProtobufMagic#PB_MAGIC}.length. * byte array that is <code>bytes.length</code> plus {@link ProtobufMagic#PB_MAGIC}.length.
*/ */
public static byte [] prependPBMagic(final byte [] bytes) { public static byte [] prependPBMagic(final byte [] bytes) {
return Bytes.add(ProtobufMagic.PB_MAGIC, bytes); return Bytes.add(PB_MAGIC, bytes);
} }
/** /**
@ -302,10 +304,11 @@ public final class ProtobufUtil {
* @param bytes bytes to check * @param bytes bytes to check
* @throws DeserializationException if we are missing the pb magic prefix * @throws DeserializationException if we are missing the pb magic prefix
*/ */
public static void expectPBMagicPrefix(final byte [] bytes) throws DeserializationException { public static void expectPBMagicPrefix(final byte[] bytes) throws DeserializationException {
if (!isPBMagicPrefix(bytes)) { if (!isPBMagicPrefix(bytes)) {
throw new DeserializationException("Missing pb magic " + String bytesPrefix = bytes == null ? "null" : Bytes.toStringBinary(bytes, 0, PB_MAGIC.length);
Bytes.toString(ProtobufMagic.PB_MAGIC) + " prefix"); throw new DeserializationException(
"Missing pb magic " + Bytes.toString(PB_MAGIC) + " prefix" + ", bytes: " + bytesPrefix);
} }
} }
@ -1941,7 +1944,7 @@ public final class ProtobufUtil {
public static byte [] toDelimitedByteArray(final Message m) throws IOException { public static byte [] toDelimitedByteArray(final Message m) throws IOException {
// Allocate arbitrary big size so we avoid resizing. // Allocate arbitrary big size so we avoid resizing.
ByteArrayOutputStream baos = new ByteArrayOutputStream(4096); ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
baos.write(ProtobufMagic.PB_MAGIC); baos.write(PB_MAGIC);
m.writeDelimitedTo(baos); m.writeDelimitedTo(baos);
return baos.toByteArray(); return baos.toByteArray();
} }

View File

@ -18,8 +18,6 @@
*/ */
package org.apache.hadoop.hbase.replication; package org.apache.hadoop.hbase.replication;
import java.util.List;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
/** /**
@ -36,16 +34,4 @@ public interface ReplicationListener {
* @param regionServer the removed region server * @param regionServer the removed region server
*/ */
public void regionServerRemoved(String regionServer); public void regionServerRemoved(String regionServer);
/**
* A peer cluster has been removed (i.e. unregistered) from replication.
* @param peerId The peer id of the cluster that has been removed
*/
public void peerRemoved(String peerId);
/**
* The list of registered peer clusters has changed.
* @param peerIds A list of all currently registered peer clusters
*/
public void peerListChanged(List<String> peerIds);
} }

View File

@ -55,11 +55,26 @@ public interface ReplicationPeer {
public ReplicationPeerConfig getPeerConfig(); public ReplicationPeerConfig getPeerConfig();
/** /**
* Returns the state of the peer * Get the peer config object. if loadFromBackingStore is true, it will load from backing store
* directly and update its load peer config. otherwise, just return the local cached peer config.
* @return the ReplicationPeerConfig for this peer
*/
public ReplicationPeerConfig getPeerConfig(boolean loadFromBackingStore)
throws ReplicationException;
/**
* Returns the state of the peer by reading local cache.
* @return the enabled state * @return the enabled state
*/ */
PeerState getPeerState(); PeerState getPeerState();
/**
* Returns the state of peer, if loadFromBackingStore is true, it will load from backing store
* directly and update its local peer state. otherwise, just return the local cached peer state.
* @return the enabled state
*/
PeerState getPeerState(boolean loadFromBackingStore) throws ReplicationException;
/** /**
* Get the configuration object required to communicate with this peer * Get the configuration object required to communicate with this peer
* @return configuration object * @return configuration object
@ -84,6 +99,15 @@ public interface ReplicationPeer {
*/ */
public long getPeerBandwidth(); public long getPeerBandwidth();
void trackPeerConfigChanges(ReplicationPeerConfigListener listener); /**
* Register a peer config listener to catch the peer config change event.
* @param listener listener to catch the peer config change event.
*/
public void registerPeerConfigListener(ReplicationPeerConfigListener listener);
/**
* Notify all the registered ReplicationPeerConfigListener to update their peer config.
* @param newPeerConfig the new peer config.
*/
public void triggerPeerConfigChange(ReplicationPeerConfig newPeerConfig);
} }

View File

@ -20,41 +20,39 @@ package org.apache.hadoop.hbase.replication;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
@InterfaceAudience.Private @InterfaceAudience.Private
public class ReplicationPeerZKImpl extends ReplicationStateZKBase public class ReplicationPeerZKImpl extends ReplicationStateZKBase
implements ReplicationPeer, Abortable, Closeable { implements ReplicationPeer, Abortable, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerZKImpl.class); private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerZKImpl.class);
private ReplicationPeerConfig peerConfig; private volatile ReplicationPeerConfig peerConfig;
private final String id; private final String id;
private volatile PeerState peerState; private volatile PeerState peerState;
private volatile Map<TableName, List<String>> tableCFs = new HashMap<>(); private volatile Map<TableName, List<String>> tableCFs = new HashMap<>();
private final Configuration conf; private final Configuration conf;
private PeerStateTracker peerStateTracker;
private PeerConfigTracker peerConfigTracker;
private final List<ReplicationPeerConfigListener> peerConfigListeners;
/** /**
* Constructor that takes all the objects required to communicate with the specified peer, except * Constructor that takes all the objects required to communicate with the specified peer, except
@ -63,62 +61,35 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
* @param id string representation of this peer's identifier * @param id string representation of this peer's identifier
* @param peerConfig configuration for the replication peer * @param peerConfig configuration for the replication peer
*/ */
public ReplicationPeerZKImpl(ZKWatcher zkWatcher, Configuration conf, public ReplicationPeerZKImpl(ZKWatcher zkWatcher, Configuration conf, String id,
String id, ReplicationPeerConfig peerConfig, ReplicationPeerConfig peerConfig, Abortable abortable) throws ReplicationException {
Abortable abortable)
throws ReplicationException {
super(zkWatcher, conf, abortable); super(zkWatcher, conf, abortable);
this.conf = conf; this.conf = conf;
this.peerConfig = peerConfig; this.peerConfig = peerConfig;
this.id = id; this.id = id;
this.peerConfigListeners = new ArrayList<>();
} }
/** private PeerState readPeerState() throws ReplicationException {
* start a state tracker to check whether this peer is enabled or not
*
* @param peerStateNode path to zk node which stores peer state
* @throws KeeperException
*/
public void startStateTracker(String peerStateNode)
throws KeeperException {
ensurePeerEnabled(peerStateNode);
this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
this.peerStateTracker.start();
try { try {
this.readPeerStateZnode(); byte[] data = ZKUtil.getData(zookeeper, this.getPeerStateNode(id));
} catch (DeserializationException e) { this.peerState = isStateEnabled(data) ? PeerState.ENABLED : PeerState.DISABLED;
throw ZKUtil.convert(e); } catch (DeserializationException | KeeperException | InterruptedException e) {
throw new ReplicationException("Get and deserialize peer state data from zookeeper failed: ",
e);
} }
return this.peerState;
} }
private void readPeerStateZnode() throws DeserializationException { private ReplicationPeerConfig readPeerConfig() throws ReplicationException {
this.peerState =
isStateEnabled(this.peerStateTracker.getData(false))
? PeerState.ENABLED
: PeerState.DISABLED;
}
/**
* start a table-cfs tracker to listen the (table, cf-list) map change
* @param peerConfigNode path to zk node which stores table-cfs
* @throws KeeperException
*/
public void startPeerConfigTracker(String peerConfigNode)
throws KeeperException {
this.peerConfigTracker = new PeerConfigTracker(peerConfigNode, zookeeper,
this);
this.peerConfigTracker.start();
this.readPeerConfig();
}
private ReplicationPeerConfig readPeerConfig() {
try { try {
byte[] data = peerConfigTracker.getData(false); byte[] data = ZKUtil.getData(zookeeper, this.getPeerNode(id));
if (data != null) { if (data != null) {
this.peerConfig = ReplicationPeerConfigUtil.parsePeerFrom(data); this.peerConfig = ReplicationPeerConfigUtil.parsePeerFrom(data);
} }
} catch (DeserializationException e) { } catch (DeserializationException | KeeperException | InterruptedException e) {
LOG.error("", e); throw new ReplicationException("Get and deserialize peer config date from zookeeper failed: ",
e);
} }
return this.peerConfig; return this.peerConfig;
} }
@ -128,6 +99,15 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
return peerState; return peerState;
} }
@Override
public PeerState getPeerState(boolean loadFromBackingStore) throws ReplicationException {
if (loadFromBackingStore) {
return readPeerState();
} else {
return peerState;
}
}
/** /**
* Get the identifier of this peer * Get the identifier of this peer
* @return string representation of the id (short) * @return string representation of the id (short)
@ -146,6 +126,16 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
return peerConfig; return peerConfig;
} }
@Override
public ReplicationPeerConfig getPeerConfig(boolean loadFromBackingStore)
throws ReplicationException {
if (loadFromBackingStore) {
return readPeerConfig();
} else {
return peerConfig;
}
}
/** /**
* Get the configuration object required to communicate with this peer * Get the configuration object required to communicate with this peer
* @return configuration object * @return configuration object
@ -180,9 +170,14 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
} }
@Override @Override
public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) { public void registerPeerConfigListener(ReplicationPeerConfigListener listener) {
if (this.peerConfigTracker != null){ this.peerConfigListeners.add(listener);
this.peerConfigTracker.setListener(listener); }
@Override
public void triggerPeerConfigChange(ReplicationPeerConfig newPeerConfig) {
for (ReplicationPeerConfigListener listener : this.peerConfigListeners) {
listener.peerConfigUpdated(newPeerConfig);
} }
} }
@ -223,97 +218,16 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
private static ReplicationProtos.ReplicationState.State parseStateFrom(final byte[] bytes) private static ReplicationProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
throws DeserializationException { throws DeserializationException {
ProtobufUtil.expectPBMagicPrefix(bytes); ProtobufUtil.expectPBMagicPrefix(bytes);
int pblen = ProtobufUtil.lengthOfPBMagic(); int pbLen = ProtobufUtil.lengthOfPBMagic();
ReplicationProtos.ReplicationState.Builder builder = ReplicationProtos.ReplicationState.Builder builder =
ReplicationProtos.ReplicationState.newBuilder(); ReplicationProtos.ReplicationState.newBuilder();
ReplicationProtos.ReplicationState state; ReplicationProtos.ReplicationState state;
try { try {
ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen);
state = builder.build(); state = builder.build();
return state.getState(); return state.getState();
} catch (IOException e) { } catch (IOException e) {
throw new DeserializationException(e); throw new DeserializationException(e);
} }
} }
/**
* Utility method to ensure an ENABLED znode is in place; if not present, we create it.
* @param path Path to znode to check
* @return True if we created the znode.
* @throws NodeExistsException
* @throws KeeperException
*/
private boolean ensurePeerEnabled(final String path)
throws NodeExistsException, KeeperException {
if (ZKUtil.checkExists(zookeeper, path) == -1) {
// There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
// peer-state znode. This happens while adding a peer.
// The peer state data is set as "ENABLED" by default.
ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path,
ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
return true;
}
return false;
}
/**
* Tracker for state of this peer
*/
public class PeerStateTracker extends ZKNodeTracker {
public PeerStateTracker(String peerStateZNode, ZKWatcher watcher,
Abortable abortable) {
super(watcher, peerStateZNode, abortable);
}
@Override
public synchronized void nodeDataChanged(String path) {
if (path.equals(node)) {
super.nodeDataChanged(path);
try {
readPeerStateZnode();
} catch (DeserializationException e) {
LOG.warn("Failed deserializing the content of " + path, e);
}
}
}
}
/**
* Tracker for PeerConfigNode of this peer
*/
public class PeerConfigTracker extends ZKNodeTracker {
ReplicationPeerConfigListener listener;
public PeerConfigTracker(String peerConfigNode, ZKWatcher watcher,
Abortable abortable) {
super(watcher, peerConfigNode, abortable);
}
public synchronized void setListener(ReplicationPeerConfigListener listener){
this.listener = listener;
}
@Override
public synchronized void nodeCreated(String path) {
if (path.equals(node)) {
super.nodeCreated(path);
ReplicationPeerConfig config = readPeerConfig();
if (listener != null){
listener.peerConfigUpdated(config);
}
}
}
@Override
public synchronized void nodeDataChanged(String path) {
//superclass calls nodeCreated
if (path.equals(node)) {
super.nodeDataChanged(path);
}
}
}
} }

View File

@ -499,21 +499,12 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
} }
Configuration peerConf = pair.getSecond(); Configuration peerConf = pair.getSecond();
ReplicationPeerZKImpl peer = new ReplicationPeerZKImpl(zookeeper, ReplicationPeerZKImpl peer =
peerConf, peerId, pair.getFirst(), abortable); new ReplicationPeerZKImpl(zookeeper, peerConf, peerId, pair.getFirst(), abortable);
try {
peer.startStateTracker(this.getPeerStateNode(peerId));
} catch (KeeperException e) {
throw new ReplicationException("Error starting the peer state tracker for peerId=" +
peerId, e);
}
try { // Load peer state and peer config by reading zookeeper directly.
peer.startPeerConfigTracker(this.getPeerNode(peerId)); peer.getPeerState(true);
} catch (KeeperException e) { peer.getPeerConfig(true);
throw new ReplicationException("Error starting the peer tableCFs tracker for peerId=" +
peerId, e);
}
return peer; return peer;
} }

View File

@ -48,16 +48,12 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements
private final List<ReplicationListener> listeners = new CopyOnWriteArrayList<>(); private final List<ReplicationListener> listeners = new CopyOnWriteArrayList<>();
// List of all the other region servers in this cluster // List of all the other region servers in this cluster
private final ArrayList<String> otherRegionServers = new ArrayList<>(); private final ArrayList<String> otherRegionServers = new ArrayList<>();
private final ReplicationPeers replicationPeers;
public ReplicationTrackerZKImpl(ZKWatcher zookeeper, public ReplicationTrackerZKImpl(ZKWatcher zookeeper, final ReplicationPeers replicationPeers,
final ReplicationPeers replicationPeers, Configuration conf, Abortable abortable, Configuration conf, Abortable abortable, Stoppable stopper) {
Stoppable stopper) {
super(zookeeper, conf, abortable); super(zookeeper, conf, abortable);
this.replicationPeers = replicationPeers;
this.stopper = stopper; this.stopper = stopper;
this.zookeeper.registerListener(new OtherRegionServerWatcher(this.zookeeper)); this.zookeeper.registerListener(new OtherRegionServerWatcher(this.zookeeper));
this.zookeeper.registerListener(new PeersWatcher(this.zookeeper));
} }
@Override @Override
@ -145,71 +141,6 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements
} }
} }
/**
* Watcher used to follow the creation and deletion of peer clusters.
*/
public class PeersWatcher extends ZKListener {
/**
* Construct a ZooKeeper event listener.
*/
public PeersWatcher(ZKWatcher watcher) {
super(watcher);
}
/**
* Called when a node has been deleted
* @param path full path of the deleted node
*/
@Override
public void nodeDeleted(String path) {
List<String> peers = refreshPeersList(path);
if (peers == null) {
return;
}
if (isPeerPath(path)) {
String id = getZNodeName(path);
LOG.info(path + " znode expired, triggering peerRemoved event");
for (ReplicationListener rl : listeners) {
rl.peerRemoved(id);
}
}
}
/**
* Called when an existing node has a child node added or removed.
* @param path full path of the node whose children have changed
*/
@Override
public void nodeChildrenChanged(String path) {
List<String> peers = refreshPeersList(path);
if (peers == null) {
return;
}
LOG.info(path + " znode expired, triggering peerListChanged event");
for (ReplicationListener rl : listeners) {
rl.peerListChanged(peers);
}
}
}
/**
* Verify if this event is meant for us, and if so then get the latest peers' list from ZK. Also
* reset the watches.
* @param path path to check against
* @return A list of peers' identifiers if the event concerns this watcher, else null.
*/
private List<String> refreshPeersList(String path) {
if (!path.startsWith(getPeersZNode())) {
return null;
}
return this.replicationPeers.getAllPeerIds();
}
private String getPeersZNode() {
return this.peersZNode;
}
/** /**
* Extracts the znode name of a peer cluster from a ZK path * Extracts the znode name of a peer cluster from a ZK path
* @param fullPath Path to extract the id from * @param fullPath Path to extract the id from

View File

@ -18,6 +18,7 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.replication.regionserver.PeerProcedureHandler;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
/** /**
@ -27,4 +28,9 @@ import org.apache.yetus.audience.InterfaceAudience;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public interface ReplicationSourceService extends ReplicationService { public interface ReplicationSourceService extends ReplicationService {
/**
* Returns a Handler to handle peer procedures.
*/
PeerProcedureHandler getPeerProcedureHandler();
} }

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver.handler;
import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.log4j.Logger;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
/** /**
@ -28,6 +29,7 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private @InterfaceAudience.Private
public class RSProcedureHandler extends EventHandler { public class RSProcedureHandler extends EventHandler {
private static final Logger LOG = Logger.getLogger(RSProcedureHandler.class);
private final long procId; private final long procId;
private final RSProcedureCallable callable; private final RSProcedureCallable callable;
@ -44,6 +46,7 @@ public class RSProcedureHandler extends EventHandler {
try { try {
callable.call(); callable.call();
} catch (Exception e) { } catch (Exception e) {
LOG.error("Catch exception when call RSProcedureCallable: ", e);
error = e; error = e;
} }
((HRegionServer) server).reportProcedureDone(procId, error); ((HRegionServer) server).reportProcedureDone(procId, error);

View File

@ -49,7 +49,7 @@ public abstract class BaseReplicationEndpoint extends AbstractService
if (this.ctx != null){ if (this.ctx != null){
ReplicationPeer peer = this.ctx.getReplicationPeer(); ReplicationPeer peer = this.ctx.getReplicationPeer();
if (peer != null){ if (peer != null){
peer.trackPeerConfigChanges(this); peer.registerPeerConfigListener(this);
} else { } else {
LOG.warn("Not tracking replication peer config changes for Peer Id " + this.ctx.getPeerId() + LOG.warn("Not tracking replication peer config changes for Peer Id " + this.ctx.getPeerId() +
" because there's no such peer"); " because there's no such peer");

View File

@ -15,34 +15,24 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.master.replication;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; package org.apache.hadoop.hbase.replication.regionserver;
public class DummyModifyPeerProcedure extends ModifyPeerProcedure { import java.io.IOException;
public DummyModifyPeerProcedure() { import org.apache.hadoop.hbase.replication.ReplicationException;
} import org.apache.yetus.audience.InterfaceAudience;
public DummyModifyPeerProcedure(String peerId) { @InterfaceAudience.Private
super(peerId); public interface PeerProcedureHandler {
}
@Override public void addPeer(String peerId) throws ReplicationException, IOException;
public PeerOperationType getPeerOperationType() {
return PeerOperationType.ADD;
}
@Override public void removePeer(String peerId) throws ReplicationException, IOException;
protected void prePeerModification(MasterProcedureEnv env) {
}
@Override public void disablePeer(String peerId) throws ReplicationException, IOException;
protected void updatePeerStorage(MasterProcedureEnv env) {
}
@Override public void enablePeer(String peerId) throws ReplicationException, IOException;
protected void postPeerModification(MasterProcedureEnv env) {
}
public void updatePeerConfig(String peerId) throws ReplicationException, IOException;
} }

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.log4j.Logger;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
private static final Logger LOG = Logger.getLogger(PeerProcedureHandlerImpl.class);
private ReplicationSourceManager replicationSourceManager;
public PeerProcedureHandlerImpl(ReplicationSourceManager replicationSourceManager) {
this.replicationSourceManager = replicationSourceManager;
}
@Override
public void addPeer(String peerId) throws ReplicationException, IOException {
replicationSourceManager.addPeer(peerId);
}
@Override
public void removePeer(String peerId) throws ReplicationException, IOException {
replicationSourceManager.removePeer(peerId);
}
@Override
public void disablePeer(String peerId) throws ReplicationException, IOException {
ReplicationPeer peer = replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
if (peer != null) {
PeerState peerState = peer.getPeerState(true);
LOG.info("disablePeer state, peer id: " + peerId + ", state: " + peerState);
} else {
throw new ReplicationException("No connected peer found, peerId=" + peerId);
}
}
@Override
public void enablePeer(String peerId) throws ReplicationException, IOException {
ReplicationPeer peer = replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
if (peer != null) {
PeerState peerState = peer.getPeerState(true);
LOG.info("enablePeer state, peer id: " + peerId + ", state: " + peerState);
} else {
throw new ReplicationException("No connected peer found, peerId=" + peerId);
}
}
@Override
public void updatePeerConfig(String peerId) throws ReplicationException, IOException {
ReplicationPeer peer = replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
if (peer == null) {
throw new ReplicationException("No connected peer found, peerId=" + peerId);
}
ReplicationPeerConfig rpc = peer.getPeerConfig(true);
peer.triggerPeerConfigChange(rpc);
}
}

View File

@ -17,27 +17,29 @@
*/ */
package org.apache.hadoop.hbase.replication.regionserver; package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationType;
import org.apache.log4j.Logger;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerParameter; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerParameter;
/** /**
* The callable executed at RS side to refresh the peer config/state. * The callable executed at RS side to refresh the peer config/state. <br/>
* <p>
* TODO: only a dummy implementation for verifying the framework, will add implementation later.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class RefreshPeerCallable implements RSProcedureCallable { public class RefreshPeerCallable implements RSProcedureCallable {
private static final Logger LOG = Logger.getLogger(RefreshPeerCallable.class);
private HRegionServer rs; private HRegionServer rs;
private String peerId; private String peerId;
private PeerModificationType type;
private Exception initError; private Exception initError;
@Override @Override
@ -45,9 +47,27 @@ public class RefreshPeerCallable implements RSProcedureCallable {
if (initError != null) { if (initError != null) {
throw initError; throw initError;
} }
Path dir = new Path("/" + peerId);
if (rs.getFileSystem().exists(dir)) { LOG.info("Received a peer change event, peerId=" + peerId + ", type=" + type);
rs.getFileSystem().create(new Path(dir, rs.getServerName().toString())).close(); PeerProcedureHandler handler = rs.getReplicationSourceService().getPeerProcedureHandler();
switch (type) {
case ADD_PEER:
handler.addPeer(this.peerId);
break;
case REMOVE_PEER:
handler.removePeer(this.peerId);
break;
case ENABLE_PEER:
handler.enablePeer(this.peerId);
break;
case DISABLE_PEER:
handler.disablePeer(this.peerId);
break;
case UPDATE_PEER_CONFIG:
handler.updatePeerConfig(this.peerId);
break;
default:
throw new IllegalArgumentException("Unknown peer modification type: " + type);
} }
return null; return null;
} }
@ -56,10 +76,11 @@ public class RefreshPeerCallable implements RSProcedureCallable {
public void init(byte[] parameter, HRegionServer rs) { public void init(byte[] parameter, HRegionServer rs) {
this.rs = rs; this.rs = rs;
try { try {
this.peerId = RefreshPeerParameter.parseFrom(parameter).getPeerId(); RefreshPeerParameter param = RefreshPeerParameter.parseFrom(parameter);
this.peerId = param.getPeerId();
this.type = param.getType();
} catch (InvalidProtocolBufferException e) { } catch (InvalidProtocolBufferException e) {
initError = e; initError = e;
return;
} }
} }

View File

@ -76,6 +76,8 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
// ReplicationLoad to access replication metrics // ReplicationLoad to access replication metrics
private ReplicationLoad replicationLoad; private ReplicationLoad replicationLoad;
private PeerProcedureHandler peerProcedureHandler;
/** /**
* Empty constructor * Empty constructor
*/ */
@ -134,6 +136,13 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
LOG.debug("Replication stats-in-log period={} seconds", this.statsThreadPeriod); LOG.debug("Replication stats-in-log period={} seconds", this.statsThreadPeriod);
this.replicationLoad = new ReplicationLoad(); this.replicationLoad = new ReplicationLoad();
this.peerProcedureHandler = new PeerProcedureHandlerImpl(replicationManager);
}
@Override
public PeerProcedureHandler getPeerProcedureHandler() {
return peerProcedureHandler;
} }
/** /**

View File

@ -446,12 +446,10 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
public void terminate(String reason, Exception cause, boolean join) { public void terminate(String reason, Exception cause, boolean join) {
if (cause == null) { if (cause == null) {
LOG.info("Closing source " LOG.info("Closing source " + this.peerClusterZnode + " because: " + reason);
+ this.peerClusterZnode + " because: " + reason);
} else { } else {
LOG.error("Closing source " + this.peerClusterZnode LOG.error("Closing source " + this.peerClusterZnode + " because an error occurred: " + reason,
+ " because an error occurred: " + reason, cause); cause);
} }
this.sourceRunning = false; this.sourceRunning = false;
Collection<ReplicationSourceShipper> workers = workerThreads.values(); Collection<ReplicationSourceShipper> workers = workerThreads.values();

View File

@ -558,6 +558,18 @@ public class ReplicationSourceManager implements ReplicationListener {
this.walsById.remove(src.getPeerClusterZnode()); this.walsById.remove(src.getPeerClusterZnode());
} }
public void addPeer(String id) throws ReplicationException, IOException {
LOG.info("Trying to add peer, peerId: " + id);
boolean added = this.replicationPeers.peerConnected(id);
if (added) {
LOG.info("Peer " + id + " connected success, trying to start the replication source thread.");
addSource(id);
if (replicationForBulkLoadDataEnabled) {
this.replicationQueues.addPeerToHFileRefs(id);
}
}
}
/** /**
* Thie method first deletes all the recovered sources for the specified * Thie method first deletes all the recovered sources for the specified
* id, then deletes the normal source (deleting all related data in ZK). * id, then deletes the normal source (deleting all related data in ZK).
@ -605,6 +617,8 @@ public class ReplicationSourceManager implements ReplicationListener {
} }
deleteSource(id, true); deleteSource(id, true);
} }
// Remove HFile Refs znode from zookeeper
this.replicationQueues.removePeerFromHFileRefs(id);
} }
@Override @Override
@ -612,29 +626,6 @@ public class ReplicationSourceManager implements ReplicationListener {
transferQueues(regionserver); transferQueues(regionserver);
} }
@Override
public void peerRemoved(String peerId) {
removePeer(peerId);
this.replicationQueues.removePeerFromHFileRefs(peerId);
}
@Override
public void peerListChanged(List<String> peerIds) {
for (String id : peerIds) {
try {
boolean added = this.replicationPeers.peerConnected(id);
if (added) {
addSource(id);
if (replicationForBulkLoadDataEnabled) {
this.replicationQueues.addPeerToHFileRefs(id);
}
}
} catch (Exception e) {
LOG.error("Error while adding a new peer", e);
}
}
}
/** /**
* Class responsible to setup new ReplicationSources to take care of the * Class responsible to setup new ReplicationSources to take care of the
* queues from dead region servers. * queues from dead region servers.

View File

@ -0,0 +1,226 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.replication;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MediumTests.class, ClientTests.class })
public class TestReplicationAdminUsingProcedure extends TestReplicationBase {
private static final String PEER_ID = "2";
private static final Logger LOG = Logger.getLogger(TestReplicationAdminUsingProcedure.class);
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1.setInt("hbase.multihconnection.threads.max", 10);
// Start the master & slave mini cluster.
TestReplicationBase.setUpBeforeClass();
// Remove the replication peer
hbaseAdmin.removeReplicationPeer(PEER_ID);
}
private void loadData(int startRowKey, int endRowKey) throws IOException {
for (int i = startRowKey; i < endRowKey; i++) {
byte[] rowKey = Bytes.add(row, Bytes.toBytes(i));
Put put = new Put(rowKey);
put.addColumn(famName, null, Bytes.toBytes(i));
htable1.put(put);
}
}
private void waitForReplication(int expectedRows, int retries)
throws IOException, InterruptedException {
Scan scan;
for (int i = 0; i < retries; i++) {
scan = new Scan();
if (i == retries - 1) {
throw new IOException("Waited too much time for normal batch replication");
}
try (ResultScanner scanner = htable2.getScanner(scan)) {
int count = 0;
for (Result res : scanner) {
count++;
}
if (count != expectedRows) {
LOG.info("Only got " + count + " rows, expected rows: " + expectedRows);
Thread.sleep(SLEEP_TIME);
} else {
return;
}
}
}
}
@Before
public void setUp() throws IOException {
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(utility2.getClusterKey());
hbaseAdmin.addReplicationPeer(PEER_ID, rpc);
utility1.waitUntilAllRegionsAssigned(tableName);
utility2.waitUntilAllRegionsAssigned(tableName);
}
@After
public void tearDown() throws IOException {
hbaseAdmin.removeReplicationPeer(PEER_ID);
truncateBoth();
}
private void truncateBoth() throws IOException {
utility1.deleteTableData(tableName);
utility2.deleteTableData(tableName);
}
@Test
public void testAddPeer() throws Exception {
// Load data
loadData(0, NB_ROWS_IN_BATCH);
// Wait the replication finished
waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
}
@Test
public void testRemovePeer() throws Exception {
// prev-check
waitForReplication(0, NB_RETRIES);
// Load data
loadData(0, NB_ROWS_IN_BATCH);
// Wait the replication finished
waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
// Remove the peer id
hbaseAdmin.removeReplicationPeer(PEER_ID);
// Load data again
loadData(NB_ROWS_IN_BATCH, 2 * NB_ROWS_IN_BATCH);
// Wait the replication again
boolean foundException = false;
try {
waitForReplication(NB_ROWS_IN_BATCH * 2, NB_RETRIES);
} catch (IOException e) {
foundException = true;
}
Assert.assertTrue(foundException);
// Truncate the table in source cluster
truncateBoth();
// Add peer again
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(utility2.getClusterKey());
hbaseAdmin.addReplicationPeer(PEER_ID, rpc);
// Load data again
loadData(0, NB_ROWS_IN_BATCH);
// Wait the replication finished
waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
}
@Test
public void testDisableAndEnablePeer() throws Exception {
// disable peer
hbaseAdmin.disableReplicationPeer(PEER_ID);
// Load data
loadData(0, NB_ROWS_IN_BATCH);
// Will failed to wait the replication.
boolean foundException = false;
try {
waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
} catch (IOException e) {
foundException = true;
}
Assert.assertTrue(foundException);
// Enable the peer
hbaseAdmin.enableReplicationPeer(PEER_ID);
waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
// Load more data
loadData(NB_ROWS_IN_BATCH, NB_ROWS_IN_BATCH * 2);
// Wait replication again.
waitForReplication(NB_ROWS_IN_BATCH * 2, NB_RETRIES);
}
@Test
public void testUpdatePeerConfig() throws Exception {
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(utility2.getClusterKey());
rpc.setExcludeTableCFsMap(
ImmutableMap.of(tableName, ImmutableList.of(Bytes.toString(famName))));
// Update the peer config to exclude the test table name.
hbaseAdmin.updateReplicationPeerConfig(PEER_ID, rpc);
// Load data
loadData(0, NB_ROWS_IN_BATCH);
// Will failed to wait the replication
boolean foundException = false;
try {
waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
} catch (IOException e) {
foundException = true;
}
Assert.assertTrue(foundException);
// Truncate the table in source cluster
truncateBoth();
// Update the peer config to include the test table name.
ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
rpc2.setClusterKey(utility2.getClusterKey());
hbaseAdmin.updateReplicationPeerConfig(PEER_ID, rpc2);
// Load data again
loadData(0, NB_ROWS_IN_BATCH);
// Wait the replication finished
waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
}
}

View File

@ -1,80 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import static org.junit.Assert.assertTrue;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MasterTests.class, LargeTests.class })
public class TestDummyModifyPeerProcedure {
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static String PEER_ID;
private static Path DIR;
@BeforeClass
public static void setUp() throws Exception {
UTIL.startMiniCluster(3);
PEER_ID = "testPeer";
DIR = new Path("/" + PEER_ID);
UTIL.getTestFileSystem().mkdirs(DIR);
}
@AfterClass
public static void tearDown() throws Exception {
UTIL.shutdownMiniCluster();
}
@Test
public void test() throws Exception {
ProcedureExecutor<?> executor =
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
long procId = executor.submitProcedure(new DummyModifyPeerProcedure(PEER_ID));
UTIL.waitFor(30000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return executor.isFinished(procId);
}
});
Set<String> serverNames = UTIL.getHBaseCluster().getRegionServerThreads().stream()
.map(t -> t.getRegionServer().getServerName().toString())
.collect(Collectors.toCollection(HashSet::new));
for (FileStatus s : UTIL.getTestFileSystem().listStatus(DIR)) {
assertTrue(serverNames.remove(s.getPath().getName()));
}
assertTrue(serverNames.isEmpty());
}
}

View File

@ -18,12 +18,9 @@
package org.apache.hadoop.hbase.replication; package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -78,10 +75,6 @@ public class TestReplicationTrackerZKImpl {
private ReplicationTracker rt; private ReplicationTracker rt;
private AtomicInteger rsRemovedCount; private AtomicInteger rsRemovedCount;
private String rsRemovedData; private String rsRemovedData;
private AtomicInteger plChangedCount;
private List<String> plChangedData;
private AtomicInteger peerRemovedCount;
private String peerRemovedData;
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
@ -106,10 +99,6 @@ public class TestReplicationTrackerZKImpl {
} }
rsRemovedCount = new AtomicInteger(0); rsRemovedCount = new AtomicInteger(0);
rsRemovedData = ""; rsRemovedData = "";
plChangedCount = new AtomicInteger(0);
plChangedData = new ArrayList<>();
peerRemovedCount = new AtomicInteger(0);
peerRemovedData = "";
} }
@AfterClass @AfterClass
@ -159,41 +148,6 @@ public class TestReplicationTrackerZKImpl {
assertEquals("hostname2.example.org:1234", rsRemovedData); assertEquals("hostname2.example.org:1234", rsRemovedData);
} }
@Test
public void testPeerRemovedEvent() throws Exception {
rp.registerPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
rt.registerListener(new DummyReplicationListener());
rp.unregisterPeer("5");
// wait for event
while (peerRemovedCount.get() < 1) {
Thread.sleep(5);
}
assertEquals("5", peerRemovedData);
}
@Test
public void testPeerListChangedEvent() throws Exception {
// add a peer
rp.registerPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
zkw.getRecoverableZooKeeper().getZooKeeper().getChildren("/hbase/replication/peers/5", true);
rt.registerListener(new DummyReplicationListener());
rp.disablePeer("5");
int tmp = plChangedCount.get();
LOG.info("Peer count=" + tmp);
ZKUtil.deleteNode(zkw, "/hbase/replication/peers/5/peer-state");
// wait for event
while (plChangedCount.get() <= tmp) {
Thread.sleep(100);
LOG.info("Peer count=" + tmp);
}
assertEquals(1, plChangedData.size());
assertTrue(plChangedData.contains("5"));
// clean up
//ZKUtil.deleteNode(zkw, "/hbase/replication/peers/5");
rp.unregisterPeer("5");
}
@Test @Test
public void testPeerNameControl() throws Exception { public void testPeerNameControl() throws Exception {
int exists = 0; int exists = 0;
@ -226,21 +180,6 @@ public class TestReplicationTrackerZKImpl {
rsRemovedCount.getAndIncrement(); rsRemovedCount.getAndIncrement();
LOG.debug("Received regionServerRemoved event: " + regionServer); LOG.debug("Received regionServerRemoved event: " + regionServer);
} }
@Override
public void peerRemoved(String peerId) {
peerRemovedData = peerId;
peerRemovedCount.getAndIncrement();
LOG.debug("Received peerDisconnected event: " + peerId);
}
@Override
public void peerListChanged(List<String> peerIds) {
plChangedData.clear();
plChangedData.addAll(peerIds);
int count = plChangedCount.getAndIncrement();
LOG.debug("Received peerListChanged event " + count);
}
} }
private class DummyServer implements Server { private class DummyServer implements Server {

View File

@ -185,6 +185,12 @@ public abstract class TestReplicationSourceManager {
replication = new Replication(); replication = new Replication();
replication.initialize(new DummyServer(), fs, logDir, oldLogDir, null); replication.initialize(new DummyServer(), fs, logDir, oldLogDir, null);
managerOfCluster = getManagerFromCluster(); managerOfCluster = getManagerFromCluster();
if (managerOfCluster != null) {
// After replication procedure, we need to add peer by hand (other than by receiving
// notification from zk)
managerOfCluster.addPeer(slaveId);
}
manager = replication.getReplicationManager(); manager = replication.getReplicationManager();
manager.addSource(slaveId); manager.addSource(slaveId);
if (managerOfCluster != null) { if (managerOfCluster != null) {
@ -560,18 +566,16 @@ public abstract class TestReplicationSourceManager {
final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue(); final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
final long sizeOfLatestPath = getSizeOfLatestPath(); final long sizeOfLatestPath = getSizeOfLatestPath();
addPeerAndWait(peerId, peerConfig, true); addPeerAndWait(peerId, peerConfig, true);
assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
globalSource.getSizeOfLogQueue());
ReplicationSourceInterface source = manager.getSource(peerId); ReplicationSourceInterface source = manager.getSource(peerId);
// Sanity check // Sanity check
assertNotNull(source); assertNotNull(source);
final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue(); final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
// Enqueue log and check if metrics updated // Enqueue log and check if metrics updated
source.enqueueLog(new Path("abc")); source.enqueueLog(new Path("abc"));
assertEquals(1 + sizeOfSingleLogQueue, assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
source.getSourceMetrics().getSizeOfLogQueue()); assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
assertEquals(source.getSourceMetrics().getSizeOfLogQueue() globalSource.getSizeOfLogQueue());
+ globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
// Removing the peer should reset the global metrics // Removing the peer should reset the global metrics
removePeerAndWait(peerId); removePeerAndWait(peerId);
@ -581,9 +585,8 @@ public abstract class TestReplicationSourceManager {
addPeerAndWait(peerId, peerConfig, true); addPeerAndWait(peerId, peerConfig, true);
source = manager.getSource(peerId); source = manager.getSource(peerId);
assertNotNull(source); assertNotNull(source);
assertEquals(sizeOfLatestPath, source.getSourceMetrics().getSizeOfLogQueue()); assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
assertEquals(source.getSourceMetrics().getSizeOfLogQueue() globalSource.getSizeOfLogQueue());
+ globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
} finally { } finally {
removePeerAndWait(peerId); removePeerAndWait(peerId);
} }
@ -600,8 +603,14 @@ public abstract class TestReplicationSourceManager {
final boolean waitForSource) throws Exception { final boolean waitForSource) throws Exception {
final ReplicationPeers rp = manager.getReplicationPeers(); final ReplicationPeers rp = manager.getReplicationPeers();
rp.registerPeer(peerId, peerConfig); rp.registerPeer(peerId, peerConfig);
try {
manager.addPeer(peerId);
} catch (Exception e) {
// ignore the failed exception, because we'll test both success & failed case.
}
waitPeer(peerId, manager, waitForSource); waitPeer(peerId, manager, waitForSource);
if (managerOfCluster != null) { if (managerOfCluster != null) {
managerOfCluster.addPeer(peerId);
waitPeer(peerId, managerOfCluster, waitForSource); waitPeer(peerId, managerOfCluster, waitForSource);
} }
} }
@ -634,6 +643,11 @@ public abstract class TestReplicationSourceManager {
final ReplicationPeers rp = manager.getReplicationPeers(); final ReplicationPeers rp = manager.getReplicationPeers();
if (rp.getAllPeerIds().contains(peerId)) { if (rp.getAllPeerIds().contains(peerId)) {
rp.unregisterPeer(peerId); rp.unregisterPeer(peerId);
try {
manager.removePeer(peerId);
} catch (Exception e) {
// ignore the failed exception and continue.
}
} }
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception { @Override public boolean evaluate() throws Exception {