HBASE-19525 RS side changes for moving peer modification from zk watcher to procedure

This commit is contained in:
huzheng 2017-12-20 10:47:18 +08:00 committed by zhangduo
parent 7afae59323
commit 8f5e54a456
20 changed files with 505 additions and 438 deletions

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.protobuf;
import static org.apache.hadoop.hbase.protobuf.ProtobufMagic.PB_MAGIC;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.InvalidProtocolBufferException;
@ -199,7 +201,7 @@ public final class ProtobufUtil {
* byte array that is <code>bytes.length</code> plus {@link ProtobufMagic#PB_MAGIC}.length.
*/
public static byte [] prependPBMagic(final byte [] bytes) {
return Bytes.add(ProtobufMagic.PB_MAGIC, bytes);
return Bytes.add(PB_MAGIC, bytes);
}
/**
@ -224,10 +226,11 @@ public final class ProtobufUtil {
* @param bytes bytes to check
* @throws DeserializationException if we are missing the pb magic prefix
*/
public static void expectPBMagicPrefix(final byte [] bytes) throws DeserializationException {
public static void expectPBMagicPrefix(final byte[] bytes) throws DeserializationException {
if (!isPBMagicPrefix(bytes)) {
throw new DeserializationException("Missing pb magic " +
Bytes.toString(ProtobufMagic.PB_MAGIC) + " prefix");
String bytesPrefix = bytes == null ? "null" : Bytes.toStringBinary(bytes, 0, PB_MAGIC.length);
throw new DeserializationException(
"Missing pb magic " + Bytes.toString(PB_MAGIC) + " prefix, bytes: " + bytesPrefix);
}
}

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.shaded.protobuf;
import static org.apache.hadoop.hbase.protobuf.ProtobufMagic.PB_MAGIC;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
@ -279,7 +281,7 @@ public final class ProtobufUtil {
* byte array that is <code>bytes.length</code> plus {@link ProtobufMagic#PB_MAGIC}.length.
*/
public static byte [] prependPBMagic(final byte [] bytes) {
return Bytes.add(ProtobufMagic.PB_MAGIC, bytes);
return Bytes.add(PB_MAGIC, bytes);
}
/**
@ -304,10 +306,11 @@ public final class ProtobufUtil {
* @param bytes bytes to check
* @throws DeserializationException if we are missing the pb magic prefix
*/
public static void expectPBMagicPrefix(final byte [] bytes) throws DeserializationException {
public static void expectPBMagicPrefix(final byte[] bytes) throws DeserializationException {
if (!isPBMagicPrefix(bytes)) {
throw new DeserializationException("Missing pb magic " +
Bytes.toString(ProtobufMagic.PB_MAGIC) + " prefix");
String bytesPrefix = bytes == null ? "null" : Bytes.toStringBinary(bytes, 0, PB_MAGIC.length);
throw new DeserializationException(
"Missing pb magic " + Bytes.toString(PB_MAGIC) + " prefix" + ", bytes: " + bytesPrefix);
}
}
@ -1945,7 +1948,7 @@ public final class ProtobufUtil {
public static byte [] toDelimitedByteArray(final Message m) throws IOException {
// Allocate arbitrary big size so we avoid resizing.
ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
baos.write(ProtobufMagic.PB_MAGIC);
baos.write(PB_MAGIC);
m.writeDelimitedTo(baos);
return baos.toByteArray();
}

View File

@ -18,8 +18,6 @@
*/
package org.apache.hadoop.hbase.replication;
import java.util.List;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -36,16 +34,4 @@ public interface ReplicationListener {
* @param regionServer the removed region server
*/
public void regionServerRemoved(String regionServer);
/**
* A peer cluster has been removed (i.e. unregistered) from replication.
* @param peerId The peer id of the cluster that has been removed
*/
public void peerRemoved(String peerId);
/**
* The list of registered peer clusters has changed.
* @param peerIds A list of all currently registered peer clusters
*/
public void peerListChanged(List<String> peerIds);
}

View File

@ -55,11 +55,26 @@ public interface ReplicationPeer {
public ReplicationPeerConfig getPeerConfig();
/**
* Returns the state of the peer
* Get the peer config object. if loadFromBackingStore is true, it will load from backing store
* directly and update its load peer config. otherwise, just return the local cached peer config.
* @return the ReplicationPeerConfig for this peer
*/
public ReplicationPeerConfig getPeerConfig(boolean loadFromBackingStore)
throws ReplicationException;
/**
* Returns the state of the peer by reading local cache.
* @return the enabled state
*/
PeerState getPeerState();
/**
* Returns the state of peer, if loadFromBackingStore is true, it will load from backing store
* directly and update its local peer state. otherwise, just return the local cached peer state.
* @return the enabled state
*/
PeerState getPeerState(boolean loadFromBackingStore) throws ReplicationException;
/**
* Get the configuration object required to communicate with this peer
* @return configuration object
@ -84,6 +99,15 @@ public interface ReplicationPeer {
*/
public long getPeerBandwidth();
void trackPeerConfigChanges(ReplicationPeerConfigListener listener);
/**
* Register a peer config listener to catch the peer config change event.
* @param listener listener to catch the peer config change event.
*/
public void registerPeerConfigListener(ReplicationPeerConfigListener listener);
/**
* Notify all the registered ReplicationPeerConfigListener to update their peer config.
* @param newPeerConfig the new peer config.
*/
public void triggerPeerConfigChange(ReplicationPeerConfig newPeerConfig);
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -31,7 +32,6 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
@ -47,14 +47,13 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
implements ReplicationPeer, Abortable, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerZKImpl.class);
private ReplicationPeerConfig peerConfig;
private volatile ReplicationPeerConfig peerConfig;
private final String id;
private volatile PeerState peerState;
private volatile Map<TableName, List<String>> tableCFs = new HashMap<>();
private final Configuration conf;
private PeerStateTracker peerStateTracker;
private PeerConfigTracker peerConfigTracker;
private final List<ReplicationPeerConfigListener> peerConfigListeners;
/**
* Constructor that takes all the objects required to communicate with the specified peer, except
@ -63,59 +62,35 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
* @param id string representation of this peer's identifier
* @param peerConfig configuration for the replication peer
*/
public ReplicationPeerZKImpl(ZKWatcher zkWatcher, Configuration conf,
String id, ReplicationPeerConfig peerConfig,
Abortable abortable)
throws ReplicationException {
public ReplicationPeerZKImpl(ZKWatcher zkWatcher, Configuration conf, String id,
ReplicationPeerConfig peerConfig, Abortable abortable) throws ReplicationException {
super(zkWatcher, conf, abortable);
this.conf = conf;
this.peerConfig = peerConfig;
this.id = id;
this.peerConfigListeners = new ArrayList<>();
}
/**
* start a state tracker to check whether this peer is enabled or not
*
* @param peerStateNode path to zk node which stores peer state
* @throws KeeperException if creating the znode fails
*/
public void startStateTracker(String peerStateNode) throws KeeperException {
ensurePeerEnabled(peerStateNode);
this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
this.peerStateTracker.start();
private PeerState readPeerState() throws ReplicationException {
try {
this.readPeerStateZnode();
} catch (DeserializationException e) {
throw ZKUtil.convert(e);
byte[] data = ZKUtil.getData(zookeeper, this.getPeerStateNode(id));
this.peerState = isStateEnabled(data) ? PeerState.ENABLED : PeerState.DISABLED;
} catch (DeserializationException | KeeperException | InterruptedException e) {
throw new ReplicationException("Get and deserialize peer state data from zookeeper failed: ",
e);
}
return this.peerState;
}
private void readPeerStateZnode() throws DeserializationException {
this.peerState =
isStateEnabled(this.peerStateTracker.getData(false))
? PeerState.ENABLED
: PeerState.DISABLED;
}
/**
* start a table-cfs tracker to listen the (table, cf-list) map change
* @param peerConfigNode path to zk node which stores table-cfs
*/
public void startPeerConfigTracker(String peerConfigNode) throws KeeperException {
this.peerConfigTracker = new PeerConfigTracker(peerConfigNode, zookeeper,
this);
this.peerConfigTracker.start();
this.readPeerConfig();
}
private ReplicationPeerConfig readPeerConfig() {
private ReplicationPeerConfig readPeerConfig() throws ReplicationException {
try {
byte[] data = peerConfigTracker.getData(false);
byte[] data = ZKUtil.getData(zookeeper, this.getPeerNode(id));
if (data != null) {
this.peerConfig = ReplicationPeerConfigUtil.parsePeerFrom(data);
}
} catch (DeserializationException e) {
LOG.error("", e);
} catch (DeserializationException | KeeperException | InterruptedException e) {
throw new ReplicationException("Get and deserialize peer config date from zookeeper failed: ",
e);
}
return this.peerConfig;
}
@ -125,6 +100,15 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
return peerState;
}
@Override
public PeerState getPeerState(boolean loadFromBackingStore) throws ReplicationException {
if (loadFromBackingStore) {
return readPeerState();
} else {
return peerState;
}
}
/**
* Get the identifier of this peer
* @return string representation of the id (short)
@ -143,6 +127,16 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
return peerConfig;
}
@Override
public ReplicationPeerConfig getPeerConfig(boolean loadFromBackingStore)
throws ReplicationException {
if (loadFromBackingStore) {
return readPeerConfig();
} else {
return peerConfig;
}
}
/**
* Get the configuration object required to communicate with this peer
* @return configuration object
@ -177,9 +171,14 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
}
@Override
public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) {
if (this.peerConfigTracker != null){
this.peerConfigTracker.setListener(listener);
public void registerPeerConfigListener(ReplicationPeerConfigListener listener) {
this.peerConfigListeners.add(listener);
}
@Override
public void triggerPeerConfigChange(ReplicationPeerConfig newPeerConfig) {
for (ReplicationPeerConfigListener listener : this.peerConfigListeners) {
listener.peerConfigUpdated(newPeerConfig);
}
}
@ -220,95 +219,16 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
private static ReplicationProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
throws DeserializationException {
ProtobufUtil.expectPBMagicPrefix(bytes);
int pblen = ProtobufUtil.lengthOfPBMagic();
int pbLen = ProtobufUtil.lengthOfPBMagic();
ReplicationProtos.ReplicationState.Builder builder =
ReplicationProtos.ReplicationState.newBuilder();
ReplicationProtos.ReplicationState state;
try {
ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen);
state = builder.build();
return state.getState();
} catch (IOException e) {
throw new DeserializationException(e);
}
}
/**
* Utility method to ensure an ENABLED znode is in place; if not present, we create it.
* @param path Path to znode to check
* @return True if we created the znode.
* @throws KeeperException if creating the znode fails
*/
private boolean ensurePeerEnabled(final String path) throws KeeperException {
if (ZKUtil.checkExists(zookeeper, path) == -1) {
// There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
// peer-state znode. This happens while adding a peer.
// The peer state data is set as "ENABLED" by default.
ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path,
ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
return true;
}
return false;
}
/**
* Tracker for state of this peer
*/
public class PeerStateTracker extends ZKNodeTracker {
public PeerStateTracker(String peerStateZNode, ZKWatcher watcher,
Abortable abortable) {
super(watcher, peerStateZNode, abortable);
}
@Override
public synchronized void nodeDataChanged(String path) {
if (path.equals(node)) {
super.nodeDataChanged(path);
try {
readPeerStateZnode();
} catch (DeserializationException e) {
LOG.warn("Failed deserializing the content of " + path, e);
}
}
}
}
/**
* Tracker for PeerConfigNode of this peer
*/
public class PeerConfigTracker extends ZKNodeTracker {
ReplicationPeerConfigListener listener;
public PeerConfigTracker(String peerConfigNode, ZKWatcher watcher,
Abortable abortable) {
super(watcher, peerConfigNode, abortable);
}
public synchronized void setListener(ReplicationPeerConfigListener listener){
this.listener = listener;
}
@Override
public synchronized void nodeCreated(String path) {
if (path.equals(node)) {
super.nodeCreated(path);
ReplicationPeerConfig config = readPeerConfig();
if (listener != null){
listener.peerConfigUpdated(config);
}
}
}
@Override
public synchronized void nodeDataChanged(String path) {
//superclass calls nodeCreated
if (path.equals(node)) {
super.nodeDataChanged(path);
}
}
}
}

View File

@ -501,21 +501,12 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
Configuration peerConf = pair.getSecond();
ReplicationPeerZKImpl peer = new ReplicationPeerZKImpl(zookeeper,
peerConf, peerId, pair.getFirst(), abortable);
try {
peer.startStateTracker(this.getPeerStateNode(peerId));
} catch (KeeperException e) {
throw new ReplicationException("Error starting the peer state tracker for peerId=" +
peerId, e);
}
ReplicationPeerZKImpl peer =
new ReplicationPeerZKImpl(zookeeper, peerConf, peerId, pair.getFirst(), abortable);
try {
peer.startPeerConfigTracker(this.getPeerNode(peerId));
} catch (KeeperException e) {
throw new ReplicationException("Error starting the peer tableCFs tracker for peerId=" +
peerId, e);
}
// Load peer state and peer config by reading zookeeper directly.
peer.getPeerState(true);
peer.getPeerConfig(true);
return peer;
}

View File

@ -48,16 +48,12 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements
private final List<ReplicationListener> listeners = new CopyOnWriteArrayList<>();
// List of all the other region servers in this cluster
private final ArrayList<String> otherRegionServers = new ArrayList<>();
private final ReplicationPeers replicationPeers;
public ReplicationTrackerZKImpl(ZKWatcher zookeeper,
final ReplicationPeers replicationPeers, Configuration conf, Abortable abortable,
Stoppable stopper) {
public ReplicationTrackerZKImpl(ZKWatcher zookeeper, final ReplicationPeers replicationPeers,
Configuration conf, Abortable abortable, Stoppable stopper) {
super(zookeeper, conf, abortable);
this.replicationPeers = replicationPeers;
this.stopper = stopper;
this.zookeeper.registerListener(new OtherRegionServerWatcher(this.zookeeper));
this.zookeeper.registerListener(new PeersWatcher(this.zookeeper));
}
@Override
@ -145,71 +141,6 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements
}
}
/**
* Watcher used to follow the creation and deletion of peer clusters.
*/
public class PeersWatcher extends ZKListener {
/**
* Construct a ZooKeeper event listener.
*/
public PeersWatcher(ZKWatcher watcher) {
super(watcher);
}
/**
* Called when a node has been deleted
* @param path full path of the deleted node
*/
@Override
public void nodeDeleted(String path) {
List<String> peers = refreshPeersList(path);
if (peers == null) {
return;
}
if (isPeerPath(path)) {
String id = getZNodeName(path);
LOG.info(path + " znode expired, triggering peerRemoved event");
for (ReplicationListener rl : listeners) {
rl.peerRemoved(id);
}
}
}
/**
* Called when an existing node has a child node added or removed.
* @param path full path of the node whose children have changed
*/
@Override
public void nodeChildrenChanged(String path) {
List<String> peers = refreshPeersList(path);
if (peers == null) {
return;
}
LOG.info(path + " znode expired, triggering peerListChanged event");
for (ReplicationListener rl : listeners) {
rl.peerListChanged(peers);
}
}
}
/**
* Verify if this event is meant for us, and if so then get the latest peers' list from ZK. Also
* reset the watches.
* @param path path to check against
* @return A list of peers' identifiers if the event concerns this watcher, else null.
*/
private List<String> refreshPeersList(String path) {
if (!path.startsWith(getPeersZNode())) {
return null;
}
return this.replicationPeers.getAllPeerIds();
}
private String getPeersZNode() {
return this.peersZNode;
}
/**
* Extracts the znode name of a peer cluster from a ZK path
* @param fullPath Path to extract the id from

View File

@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.replication.regionserver.PeerProcedureHandler;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
@ -29,8 +30,14 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
@InterfaceAudience.Private
public interface ReplicationSourceService extends ReplicationService {
/**
* Returns a WALObserver for the service. This is needed to
* Returns a WALObserver for the service. This is needed to
* observe log rolls and log archival events.
*/
WALActionsListener getWALActionsListener();
/**
* Returns a Handler to handle peer procedures.
*/
PeerProcedureHandler getPeerProcedureHandler();
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver.handler;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.log4j.Logger;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -28,6 +29,7 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class RSProcedureHandler extends EventHandler {
private static final Logger LOG = Logger.getLogger(RSProcedureHandler.class);
private final long procId;
private final RSProcedureCallable callable;
@ -44,6 +46,7 @@ public class RSProcedureHandler extends EventHandler {
try {
callable.call();
} catch (Exception e) {
LOG.error("Catch exception when call RSProcedureCallable: ", e);
error = e;
}
((HRegionServer) server).reportProcedureDone(procId, error);

View File

@ -49,7 +49,7 @@ public abstract class BaseReplicationEndpoint extends AbstractService
if (this.ctx != null){
ReplicationPeer peer = this.ctx.getReplicationPeer();
if (peer != null){
peer.trackPeerConfigChanges(this);
peer.registerPeerConfigListener(this);
} else {
LOG.warn("Not tracking replication peer config changes for Peer Id " + this.ctx.getPeerId() +
" because there's no such peer");

View File

@ -15,34 +15,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
package org.apache.hadoop.hbase.replication.regionserver;
public class DummyModifyPeerProcedure extends ModifyPeerProcedure {
import java.io.IOException;
public DummyModifyPeerProcedure() {
}
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.yetus.audience.InterfaceAudience;
public DummyModifyPeerProcedure(String peerId) {
super(peerId);
}
@InterfaceAudience.Private
public interface PeerProcedureHandler {
@Override
public PeerOperationType getPeerOperationType() {
return PeerOperationType.ADD;
}
public void addPeer(String peerId) throws ReplicationException, IOException;
@Override
protected void prePeerModification(MasterProcedureEnv env) {
}
public void removePeer(String peerId) throws ReplicationException, IOException;
@Override
protected void updatePeerStorage(MasterProcedureEnv env) {
}
public void disablePeer(String peerId) throws ReplicationException, IOException;
@Override
protected void postPeerModification(MasterProcedureEnv env) {
}
public void enablePeer(String peerId) throws ReplicationException, IOException;
public void updatePeerConfig(String peerId) throws ReplicationException, IOException;
}

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.log4j.Logger;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
private static final Logger LOG = Logger.getLogger(PeerProcedureHandlerImpl.class);
private ReplicationSourceManager replicationSourceManager;
public PeerProcedureHandlerImpl(ReplicationSourceManager replicationSourceManager) {
this.replicationSourceManager = replicationSourceManager;
}
@Override
public void addPeer(String peerId) throws ReplicationException, IOException {
replicationSourceManager.addPeer(peerId);
}
@Override
public void removePeer(String peerId) throws ReplicationException, IOException {
replicationSourceManager.removePeer(peerId);
}
@Override
public void disablePeer(String peerId) throws ReplicationException, IOException {
ReplicationPeer peer = replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
if (peer != null) {
PeerState peerState = peer.getPeerState(true);
LOG.info("disablePeer state, peer id: " + peerId + ", state: " + peerState);
} else {
throw new ReplicationException("No connected peer found, peerId=" + peerId);
}
}
@Override
public void enablePeer(String peerId) throws ReplicationException, IOException {
ReplicationPeer peer = replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
if (peer != null) {
PeerState peerState = peer.getPeerState(true);
LOG.info("enablePeer state, peer id: " + peerId + ", state: " + peerState);
} else {
throw new ReplicationException("No connected peer found, peerId=" + peerId);
}
}
@Override
public void updatePeerConfig(String peerId) throws ReplicationException, IOException {
ReplicationPeer peer = replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
if (peer == null) {
throw new ReplicationException("No connected peer found, peerId=" + peerId);
}
ReplicationPeerConfig rpc = peer.getPeerConfig(true);
peer.triggerPeerConfigChange(rpc);
}
}

View File

@ -17,27 +17,29 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationType;
import org.apache.log4j.Logger;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerParameter;
/**
* The callable executed at RS side to refresh the peer config/state.
* <p>
* TODO: only a dummy implementation for verifying the framework, will add implementation later.
* The callable executed at RS side to refresh the peer config/state. <br/>
*/
@InterfaceAudience.Private
public class RefreshPeerCallable implements RSProcedureCallable {
private static final Logger LOG = Logger.getLogger(RefreshPeerCallable.class);
private HRegionServer rs;
private String peerId;
private PeerModificationType type;
private Exception initError;
@Override
@ -45,9 +47,27 @@ public class RefreshPeerCallable implements RSProcedureCallable {
if (initError != null) {
throw initError;
}
Path dir = new Path("/" + peerId);
if (rs.getFileSystem().exists(dir)) {
rs.getFileSystem().create(new Path(dir, rs.getServerName().toString())).close();
LOG.info("Received a peer change event, peerId=" + peerId + ", type=" + type);
PeerProcedureHandler handler = rs.getReplicationSourceService().getPeerProcedureHandler();
switch (type) {
case ADD_PEER:
handler.addPeer(this.peerId);
break;
case REMOVE_PEER:
handler.removePeer(this.peerId);
break;
case ENABLE_PEER:
handler.enablePeer(this.peerId);
break;
case DISABLE_PEER:
handler.disablePeer(this.peerId);
break;
case UPDATE_PEER_CONFIG:
handler.updatePeerConfig(this.peerId);
break;
default:
throw new IllegalArgumentException("Unknown peer modification type: " + type);
}
return null;
}
@ -56,10 +76,11 @@ public class RefreshPeerCallable implements RSProcedureCallable {
public void init(byte[] parameter, HRegionServer rs) {
this.rs = rs;
try {
this.peerId = RefreshPeerParameter.parseFrom(parameter).getPeerId();
RefreshPeerParameter param = RefreshPeerParameter.parseFrom(parameter);
this.peerId = param.getPeerId();
this.type = param.getType();
} catch (InvalidProtocolBufferException e) {
initError = e;
return;
}
}

View File

@ -87,6 +87,8 @@ public class Replication implements
// ReplicationLoad to access replication metrics
private ReplicationLoad replicationLoad;
private PeerProcedureHandler peerProcedureHandler;
/**
* Instantiate the replication management (if rep is enabled).
* @param server Hosting server
@ -151,6 +153,8 @@ public class Replication implements
this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);
this.replicationLoad = new ReplicationLoad();
this.peerProcedureHandler = new PeerProcedureHandlerImpl(replicationManager);
}
/**
@ -168,6 +172,12 @@ public class Replication implements
public WALActionsListener getWALActionsListener() {
return this;
}
@Override
public PeerProcedureHandler getPeerProcedureHandler() {
return peerProcedureHandler;
}
/**
* Stops replication service.
*/

View File

@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@ -446,12 +445,10 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
public void terminate(String reason, Exception cause, boolean join) {
if (cause == null) {
LOG.info("Closing source "
+ this.peerClusterZnode + " because: " + reason);
LOG.info("Closing source " + this.peerClusterZnode + " because: " + reason);
} else {
LOG.error("Closing source " + this.peerClusterZnode
+ " because an error occurred: " + reason, cause);
LOG.error("Closing source " + this.peerClusterZnode + " because an error occurred: " + reason,
cause);
}
this.sourceRunning = false;
Collection<ReplicationSourceShipper> workers = workerThreads.values();

View File

@ -564,6 +564,18 @@ public class ReplicationSourceManager implements ReplicationListener {
this.walsById.remove(src.getPeerClusterZnode());
}
public void addPeer(String id) throws ReplicationException, IOException {
LOG.info("Trying to add peer, peerId: " + id);
boolean added = this.replicationPeers.peerConnected(id);
if (added) {
LOG.info("Peer " + id + " connected success, trying to start the replication source thread.");
addSource(id);
if (replicationForBulkLoadDataEnabled) {
this.replicationQueues.addPeerToHFileRefs(id);
}
}
}
/**
* Thie method first deletes all the recovered sources for the specified
* id, then deletes the normal source (deleting all related data in ZK).
@ -611,6 +623,8 @@ public class ReplicationSourceManager implements ReplicationListener {
}
deleteSource(id, true);
}
// Remove HFile Refs znode from zookeeper
this.replicationQueues.removePeerFromHFileRefs(id);
}
@Override
@ -618,29 +632,6 @@ public class ReplicationSourceManager implements ReplicationListener {
transferQueues(regionserver);
}
@Override
public void peerRemoved(String peerId) {
removePeer(peerId);
this.replicationQueues.removePeerFromHFileRefs(peerId);
}
@Override
public void peerListChanged(List<String> peerIds) {
for (String id : peerIds) {
try {
boolean added = this.replicationPeers.peerConnected(id);
if (added) {
addSource(id);
if (replicationForBulkLoadDataEnabled) {
this.replicationQueues.addPeerToHFileRefs(id);
}
}
} catch (Exception e) {
LOG.error("Error while adding a new peer", e);
}
}
}
/**
* Class responsible to setup new ReplicationSources to take care of the
* queues from dead region servers.

View File

@ -0,0 +1,226 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.replication;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MediumTests.class, ClientTests.class })
public class TestReplicationAdminUsingProcedure extends TestReplicationBase {
private static final String PEER_ID = "2";
private static final Logger LOG = Logger.getLogger(TestReplicationAdminUsingProcedure.class);
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1.setInt("hbase.multihconnection.threads.max", 10);
// Start the master & slave mini cluster.
TestReplicationBase.setUpBeforeClass();
// Remove the replication peer
hbaseAdmin.removeReplicationPeer(PEER_ID);
}
private void loadData(int startRowKey, int endRowKey) throws IOException {
for (int i = startRowKey; i < endRowKey; i++) {
byte[] rowKey = Bytes.add(row, Bytes.toBytes(i));
Put put = new Put(rowKey);
put.addColumn(famName, null, Bytes.toBytes(i));
htable1.put(put);
}
}
private void waitForReplication(int expectedRows, int retries)
throws IOException, InterruptedException {
Scan scan;
for (int i = 0; i < retries; i++) {
scan = new Scan();
if (i == retries - 1) {
throw new IOException("Waited too much time for normal batch replication");
}
try (ResultScanner scanner = htable2.getScanner(scan)) {
int count = 0;
for (Result res : scanner) {
count++;
}
if (count != expectedRows) {
LOG.info("Only got " + count + " rows, expected rows: " + expectedRows);
Thread.sleep(SLEEP_TIME);
} else {
return;
}
}
}
}
@Before
public void setUp() throws IOException {
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(utility2.getClusterKey());
hbaseAdmin.addReplicationPeer(PEER_ID, rpc);
utility1.waitUntilAllRegionsAssigned(tableName);
utility2.waitUntilAllRegionsAssigned(tableName);
}
@After
public void tearDown() throws IOException {
hbaseAdmin.removeReplicationPeer(PEER_ID);
truncateBoth();
}
private void truncateBoth() throws IOException {
utility1.deleteTableData(tableName);
utility2.deleteTableData(tableName);
}
@Test
public void testAddPeer() throws Exception {
// Load data
loadData(0, NB_ROWS_IN_BATCH);
// Wait the replication finished
waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
}
@Test
public void testRemovePeer() throws Exception {
// prev-check
waitForReplication(0, NB_RETRIES);
// Load data
loadData(0, NB_ROWS_IN_BATCH);
// Wait the replication finished
waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
// Remove the peer id
hbaseAdmin.removeReplicationPeer(PEER_ID);
// Load data again
loadData(NB_ROWS_IN_BATCH, 2 * NB_ROWS_IN_BATCH);
// Wait the replication again
boolean foundException = false;
try {
waitForReplication(NB_ROWS_IN_BATCH * 2, NB_RETRIES);
} catch (IOException e) {
foundException = true;
}
Assert.assertTrue(foundException);
// Truncate the table in source cluster
truncateBoth();
// Add peer again
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(utility2.getClusterKey());
hbaseAdmin.addReplicationPeer(PEER_ID, rpc);
// Load data again
loadData(0, NB_ROWS_IN_BATCH);
// Wait the replication finished
waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
}
@Test
public void testDisableAndEnablePeer() throws Exception {
// disable peer
hbaseAdmin.disableReplicationPeer(PEER_ID);
// Load data
loadData(0, NB_ROWS_IN_BATCH);
// Will failed to wait the replication.
boolean foundException = false;
try {
waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
} catch (IOException e) {
foundException = true;
}
Assert.assertTrue(foundException);
// Enable the peer
hbaseAdmin.enableReplicationPeer(PEER_ID);
waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
// Load more data
loadData(NB_ROWS_IN_BATCH, NB_ROWS_IN_BATCH * 2);
// Wait replication again.
waitForReplication(NB_ROWS_IN_BATCH * 2, NB_RETRIES);
}
@Test
public void testUpdatePeerConfig() throws Exception {
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(utility2.getClusterKey());
rpc.setExcludeTableCFsMap(
ImmutableMap.of(tableName, ImmutableList.of(Bytes.toString(famName))));
// Update the peer config to exclude the test table name.
hbaseAdmin.updateReplicationPeerConfig(PEER_ID, rpc);
// Load data
loadData(0, NB_ROWS_IN_BATCH);
// Will failed to wait the replication
boolean foundException = false;
try {
waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
} catch (IOException e) {
foundException = true;
}
Assert.assertTrue(foundException);
// Truncate the table in source cluster
truncateBoth();
// Update the peer config to include the test table name.
ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
rpc2.setClusterKey(utility2.getClusterKey());
hbaseAdmin.updateReplicationPeerConfig(PEER_ID, rpc2);
// Load data again
loadData(0, NB_ROWS_IN_BATCH);
// Wait the replication finished
waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
}
}

View File

@ -1,80 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import static org.junit.Assert.assertTrue;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MasterTests.class, LargeTests.class })
public class TestDummyModifyPeerProcedure {
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static String PEER_ID;
private static Path DIR;
@BeforeClass
public static void setUp() throws Exception {
UTIL.startMiniCluster(3);
PEER_ID = "testPeer";
DIR = new Path("/" + PEER_ID);
UTIL.getTestFileSystem().mkdirs(DIR);
}
@AfterClass
public static void tearDown() throws Exception {
UTIL.shutdownMiniCluster();
}
@Test
public void test() throws Exception {
ProcedureExecutor<?> executor =
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
long procId = executor.submitProcedure(new DummyModifyPeerProcedure(PEER_ID));
UTIL.waitFor(30000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return executor.isFinished(procId);
}
});
Set<String> serverNames = UTIL.getHBaseCluster().getRegionServerThreads().stream()
.map(t -> t.getRegionServer().getServerName().toString())
.collect(Collectors.toCollection(HashSet::new));
for (FileStatus s : UTIL.getTestFileSystem().listStatus(DIR)) {
assertTrue(serverNames.remove(s.getPath().getName()));
}
assertTrue(serverNames.isEmpty());
}
}

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
@ -155,41 +154,6 @@ public class TestReplicationTrackerZKImpl {
assertEquals("hostname2.example.org:1234", rsRemovedData);
}
@Test(timeout = 30000)
public void testPeerRemovedEvent() throws Exception {
rp.registerPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
rt.registerListener(new DummyReplicationListener());
rp.unregisterPeer("5");
// wait for event
while (peerRemovedCount.get() < 1) {
Thread.sleep(5);
}
assertEquals("5", peerRemovedData);
}
@Test(timeout = 30000)
public void testPeerListChangedEvent() throws Exception {
// add a peer
rp.registerPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
zkw.getRecoverableZooKeeper().getZooKeeper().getChildren("/hbase/replication/peers/5", true);
rt.registerListener(new DummyReplicationListener());
rp.disablePeer("5");
int tmp = plChangedCount.get();
LOG.info("Peer count=" + tmp);
ZKUtil.deleteNode(zkw, "/hbase/replication/peers/5/peer-state");
// wait for event
while (plChangedCount.get() <= tmp) {
Thread.sleep(100);
LOG.info("Peer count=" + tmp);
}
assertEquals(1, plChangedData.size());
assertTrue(plChangedData.contains("5"));
// clean up
//ZKUtil.deleteNode(zkw, "/hbase/replication/peers/5");
rp.unregisterPeer("5");
}
@Test(timeout = 30000)
public void testPeerNameControl() throws Exception {
int exists = 0;
@ -222,21 +186,6 @@ public class TestReplicationTrackerZKImpl {
rsRemovedCount.getAndIncrement();
LOG.debug("Received regionServerRemoved event: " + regionServer);
}
@Override
public void peerRemoved(String peerId) {
peerRemovedData = peerId;
peerRemovedCount.getAndIncrement();
LOG.debug("Received peerDisconnected event: " + peerId);
}
@Override
public void peerListChanged(List<String> peerIds) {
plChangedData.clear();
plChangedData.addAll(peerIds);
int count = plChangedCount.getAndIncrement();
LOG.debug("Received peerListChanged event " + count);
}
}
private class DummyServer implements Server {

View File

@ -176,6 +176,12 @@ public abstract class TestReplicationSourceManager {
replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
managerOfCluster = getManagerFromCluster();
if (managerOfCluster != null) {
// After replication procedure, we need to add peer by hand (other than by receiving
// notification from zk)
managerOfCluster.addPeer(slaveId);
}
manager = replication.getReplicationManager();
manager.addSource(slaveId);
if (managerOfCluster != null) {
@ -535,18 +541,16 @@ public abstract class TestReplicationSourceManager {
final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
final long sizeOfLatestPath = getSizeOfLatestPath();
addPeerAndWait(peerId, peerConfig, true);
assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial,
globalSource.getSizeOfLogQueue());
assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
ReplicationSourceInterface source = manager.getSource(peerId);
// Sanity check
assertNotNull(source);
final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
// Enqueue log and check if metrics updated
source.enqueueLog(new Path("abc"));
assertEquals(1 + sizeOfSingleLogQueue,
source.getSourceMetrics().getSizeOfLogQueue());
assertEquals(source.getSourceMetrics().getSizeOfLogQueue()
+ globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
globalSource.getSizeOfLogQueue());
// Removing the peer should reset the global metrics
removePeerAndWait(peerId);
@ -556,9 +560,8 @@ public abstract class TestReplicationSourceManager {
addPeerAndWait(peerId, peerConfig, true);
source = manager.getSource(peerId);
assertNotNull(source);
assertEquals(sizeOfLatestPath, source.getSourceMetrics().getSizeOfLogQueue());
assertEquals(source.getSourceMetrics().getSizeOfLogQueue()
+ globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
globalSource.getSizeOfLogQueue());
} finally {
removePeerAndWait(peerId);
}
@ -575,8 +578,14 @@ public abstract class TestReplicationSourceManager {
final boolean waitForSource) throws Exception {
final ReplicationPeers rp = manager.getReplicationPeers();
rp.registerPeer(peerId, peerConfig);
try {
manager.addPeer(peerId);
} catch (Exception e) {
// ignore the failed exception, because we'll test both success & failed case.
}
waitPeer(peerId, manager, waitForSource);
if (managerOfCluster != null) {
managerOfCluster.addPeer(peerId);
waitPeer(peerId, managerOfCluster, waitForSource);
}
}
@ -609,6 +618,11 @@ public abstract class TestReplicationSourceManager {
final ReplicationPeers rp = manager.getReplicationPeers();
if (rp.getAllPeerIds().contains(peerId)) {
rp.unregisterPeer(peerId);
try {
manager.removePeer(peerId);
} catch (Exception e) {
// ignore the failed exception and continue.
}
}
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {