HBASE-5965 Move replication znodes to pb
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1378714 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2fd27a77c5
commit
6d7e438c41
|
@ -163,20 +163,6 @@ public class ReplicationAdmin implements Closeable {
|
||||||
return this.replicationZk.listPeers();
|
return this.replicationZk.listPeers();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Get state of the peer
|
|
||||||
*
|
|
||||||
* @param id peer's identifier
|
|
||||||
* @return current state of the peer
|
|
||||||
*/
|
|
||||||
public String getPeerState(String id) throws IOException {
|
|
||||||
try {
|
|
||||||
return this.replicationZk.getPeerState(id).name();
|
|
||||||
} catch (KeeperException e) {
|
|
||||||
throw new IOException("Couldn't get the state of the peer " + id, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the current status of the kill switch, if the cluster is replicating
|
* Get the current status of the kill switch, if the cluster is replicating
|
||||||
* or not.
|
* or not.
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -30,9 +30,8 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
|
import org.apache.hadoop.hbase.DeserializationException;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationZookeeper.PeerState;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
|
@ -81,20 +80,18 @@ public class ReplicationPeer implements Abortable, Closeable {
|
||||||
*/
|
*/
|
||||||
public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode)
|
public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode)
|
||||||
throws KeeperException {
|
throws KeeperException {
|
||||||
if (ZKUtil.checkExists(zookeeper, peerStateNode) == -1) {
|
ReplicationZookeeper.ensurePeerEnabled(zookeeper, peerStateNode);
|
||||||
ZKUtil.createAndWatch(zookeeper, peerStateNode,
|
this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
|
||||||
Bytes.toBytes(PeerState.ENABLED.name())); // enabled by default
|
|
||||||
}
|
|
||||||
this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper,
|
|
||||||
this);
|
|
||||||
this.peerStateTracker.start();
|
this.peerStateTracker.start();
|
||||||
this.readPeerStateZnode();
|
try {
|
||||||
|
this.readPeerStateZnode();
|
||||||
|
} catch (DeserializationException e) {
|
||||||
|
throw ZKUtil.convert(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void readPeerStateZnode() {
|
private void readPeerStateZnode() throws DeserializationException {
|
||||||
String currentState = Bytes.toString(peerStateTracker.getData(false));
|
this.peerEnabled.set(ReplicationZookeeper.isPeerEnabled(this.peerStateTracker.getData(false)));
|
||||||
this.peerEnabled.set(PeerState.ENABLED.equals(PeerState
|
|
||||||
.valueOf(currentState)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -199,7 +196,11 @@ public class ReplicationPeer implements Abortable, Closeable {
|
||||||
public synchronized void nodeDataChanged(String path) {
|
public synchronized void nodeDataChanged(String path) {
|
||||||
if (path.equals(node)) {
|
if (path.equals(node)) {
|
||||||
super.nodeDataChanged(path);
|
super.nodeDataChanged(path);
|
||||||
readPeerStateZnode();
|
try {
|
||||||
|
readPeerStateZnode();
|
||||||
|
} catch (DeserializationException e) {
|
||||||
|
LOG.warn("Failed deserializing the content of " + path, e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,9 +37,12 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
|
import org.apache.hadoop.hbase.DeserializationException;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.Server;
|
import org.apache.hadoop.hbase.Server;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
|
||||||
|
@ -47,8 +50,11 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.apache.zookeeper.KeeperException.ConnectionLossException;
|
import org.apache.zookeeper.KeeperException.ConnectionLossException;
|
||||||
|
import org.apache.zookeeper.KeeperException.NodeExistsException;
|
||||||
import org.apache.zookeeper.KeeperException.SessionExpiredException;
|
import org.apache.zookeeper.KeeperException.SessionExpiredException;
|
||||||
|
|
||||||
|
import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class serves as a helper for all things related to zookeeper in
|
* This class serves as a helper for all things related to zookeeper in
|
||||||
* replication.
|
* replication.
|
||||||
|
@ -85,11 +91,6 @@ public class ReplicationZookeeper implements Closeable{
|
||||||
// Name of znode we use to lock when failover
|
// Name of znode we use to lock when failover
|
||||||
private final static String RS_LOCK_ZNODE = "lock";
|
private final static String RS_LOCK_ZNODE = "lock";
|
||||||
|
|
||||||
// Values of znode which stores state of a peer
|
|
||||||
public static enum PeerState {
|
|
||||||
ENABLED, DISABLED
|
|
||||||
};
|
|
||||||
|
|
||||||
// Our handle on zookeeper
|
// Our handle on zookeeper
|
||||||
private final ZooKeeperWatcher zookeeper;
|
private final ZooKeeperWatcher zookeeper;
|
||||||
// Map of peer clusters keyed by their id
|
// Map of peer clusters keyed by their id
|
||||||
|
@ -104,7 +105,8 @@ public class ReplicationZookeeper implements Closeable{
|
||||||
private String rsServerNameZnode;
|
private String rsServerNameZnode;
|
||||||
// Name node if the replicationState znode
|
// Name node if the replicationState znode
|
||||||
private String replicationStateNodeName;
|
private String replicationStateNodeName;
|
||||||
// Name of zk node which stores peer state
|
// Name of zk node which stores peer state. The peer-state znode is under a
|
||||||
|
// peers' id node; e.g. /hbase/replication/peers/PEER_ID/peer-state
|
||||||
private String peerStateNodeName;
|
private String peerStateNodeName;
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
// Is this cluster replicating at the moment?
|
// Is this cluster replicating at the moment?
|
||||||
|
@ -115,6 +117,17 @@ public class ReplicationZookeeper implements Closeable{
|
||||||
private Abortable abortable;
|
private Abortable abortable;
|
||||||
private ReplicationStatusTracker statusTracker;
|
private ReplicationStatusTracker statusTracker;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ZNode content if enabled state.
|
||||||
|
*/
|
||||||
|
// Public so it can be seen by test code.
|
||||||
|
public static final byte[] ENABLED_ZNODE_BYTES = toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ZNode content if disabled state.
|
||||||
|
*/
|
||||||
|
static final byte[] DISABLED_ZNODE_BYTES = toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor used by clients of replication (like master and HBase clients)
|
* Constructor used by clients of replication (like master and HBase clients)
|
||||||
* @param conf conf to use
|
* @param conf conf to use
|
||||||
|
@ -122,9 +135,7 @@ public class ReplicationZookeeper implements Closeable{
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public ReplicationZookeeper(final Abortable abortable, final Configuration conf,
|
public ReplicationZookeeper(final Abortable abortable, final Configuration conf,
|
||||||
final ZooKeeperWatcher zk)
|
final ZooKeeperWatcher zk) throws KeeperException {
|
||||||
throws KeeperException {
|
|
||||||
|
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.zookeeper = zk;
|
this.zookeeper = zk;
|
||||||
this.replicating = new AtomicBoolean();
|
this.replicating = new AtomicBoolean();
|
||||||
|
@ -156,27 +167,20 @@ public class ReplicationZookeeper implements Closeable{
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setZNodes(Abortable abortable) throws KeeperException {
|
private void setZNodes(Abortable abortable) throws KeeperException {
|
||||||
String replicationZNodeName =
|
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
|
||||||
conf.get("zookeeper.znode.replication", "replication");
|
String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
|
||||||
String peersZNodeName =
|
this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
|
||||||
conf.get("zookeeper.znode.replication.peers", "peers");
|
this.replicationStateNodeName = conf.get("zookeeper.znode.replication.state", "state");
|
||||||
this.peerStateNodeName = conf.get(
|
String rsZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
|
||||||
"zookeeper.znode.replication.peers.state", "peer-state");
|
|
||||||
this.replicationStateNodeName =
|
|
||||||
conf.get("zookeeper.znode.replication.state", "state");
|
|
||||||
String rsZNodeName =
|
|
||||||
conf.get("zookeeper.znode.replication.rs", "rs");
|
|
||||||
this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf);
|
this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf);
|
||||||
this.replicationZNode =
|
this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
|
||||||
ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
|
|
||||||
this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
|
this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
|
||||||
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
|
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
|
||||||
this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName);
|
this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName);
|
||||||
ZKUtil.createWithParents(this.zookeeper, this.rsZNode);
|
ZKUtil.createWithParents(this.zookeeper, this.rsZNode);
|
||||||
|
|
||||||
// Set a tracker on replicationStateNodeNode
|
// Set a tracker on replicationStateNodeNode
|
||||||
this.statusTracker =
|
this.statusTracker = new ReplicationStatusTracker(this.zookeeper, abortable);
|
||||||
new ReplicationStatusTracker(this.zookeeper, abortable);
|
|
||||||
statusTracker.start();
|
statusTracker.start();
|
||||||
readReplicationStateZnode();
|
readReplicationStateZnode();
|
||||||
}
|
}
|
||||||
|
@ -214,14 +218,22 @@ public class ReplicationZookeeper implements Closeable{
|
||||||
try {
|
try {
|
||||||
ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
|
ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
|
||||||
for (String id : ids) {
|
for (String id : ids) {
|
||||||
peers.put(id, Bytes.toString(ZKUtil.getData(this.zookeeper,
|
byte[] bytes = ZKUtil.getData(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
|
||||||
ZKUtil.joinZNode(this.peersZNode, id))));
|
String clusterKey = null;
|
||||||
|
try {
|
||||||
|
clusterKey = parsePeerFrom(bytes);
|
||||||
|
} catch (DeserializationException de) {
|
||||||
|
LOG.warn("Failed parse of clusterid=" + id + " znode content, continuing.");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
peers.put(id, clusterKey);
|
||||||
}
|
}
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
this.abortable.abort("Cannot get the list of peers ", e);
|
this.abortable.abort("Cannot get the list of peers ", e);
|
||||||
}
|
}
|
||||||
return peers;
|
return peers;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns all region servers from given peer
|
* Returns all region servers from given peer
|
||||||
*
|
*
|
||||||
|
@ -337,7 +349,13 @@ public class ReplicationZookeeper implements Closeable{
|
||||||
public ReplicationPeer getPeer(String peerId) throws IOException, KeeperException{
|
public ReplicationPeer getPeer(String peerId) throws IOException, KeeperException{
|
||||||
String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
|
String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
|
||||||
byte [] data = ZKUtil.getData(this.zookeeper, znode);
|
byte [] data = ZKUtil.getData(this.zookeeper, znode);
|
||||||
String otherClusterKey = Bytes.toString(data);
|
String otherClusterKey = "";
|
||||||
|
try {
|
||||||
|
otherClusterKey = parsePeerFrom(data);
|
||||||
|
} catch (DeserializationException e) {
|
||||||
|
LOG.warn("Failed parse of cluster key from peerId=" + peerId
|
||||||
|
+ ", specifically the content from the following znode: " + znode);
|
||||||
|
}
|
||||||
if (this.ourClusterKey.equals(otherClusterKey)) {
|
if (this.ourClusterKey.equals(otherClusterKey)) {
|
||||||
LOG.debug("Not connecting to " + peerId + " because it's us");
|
LOG.debug("Not connecting to " + peerId + " because it's us");
|
||||||
return null;
|
return null;
|
||||||
|
@ -364,9 +382,9 @@ public class ReplicationZookeeper implements Closeable{
|
||||||
public void setReplicating(boolean newState) throws KeeperException {
|
public void setReplicating(boolean newState) throws KeeperException {
|
||||||
ZKUtil.createWithParents(this.zookeeper,
|
ZKUtil.createWithParents(this.zookeeper,
|
||||||
ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
|
ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
|
||||||
|
byte[] stateBytes = (newState == true) ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES;
|
||||||
ZKUtil.setData(this.zookeeper,
|
ZKUtil.setData(this.zookeeper,
|
||||||
ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName),
|
ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName), stateBytes);
|
||||||
Bytes.toBytes(Boolean.toString(newState)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -401,15 +419,165 @@ public class ReplicationZookeeper implements Closeable{
|
||||||
throw new IllegalArgumentException("Cannot add existing peer");
|
throw new IllegalArgumentException("Cannot add existing peer");
|
||||||
}
|
}
|
||||||
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
|
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
|
||||||
ZKUtil.createAndWatch(this.zookeeper,
|
ZKUtil.createAndWatch(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id),
|
||||||
ZKUtil.joinZNode(this.peersZNode, id), Bytes.toBytes(clusterKey));
|
toByteArray(clusterKey));
|
||||||
ZKUtil.createAndWatch(this.zookeeper, getPeerStateNode(id),
|
// A peer is enabled by default
|
||||||
Bytes.toBytes(PeerState.ENABLED.name())); // enabled by default
|
ZKUtil.createAndWatch(this.zookeeper, getPeerStateNode(id), ENABLED_ZNODE_BYTES);
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
throw new IOException("Unable to add peer", e);
|
throw new IOException("Unable to add peer", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param clusterKey
|
||||||
|
* @return Serialized protobuf of <code>clusterKey</code> with pb magic prefix
|
||||||
|
* prepended suitable for use as content of a this.peersZNode; i.e.
|
||||||
|
* the content of PEER_ID znode under /hbase/replication/peers/PEER_ID
|
||||||
|
*/
|
||||||
|
static byte[] toByteArray(final String clusterKey) {
|
||||||
|
byte[] bytes = ZooKeeperProtos.ReplicationPeer.newBuilder().setClusterkey(clusterKey).build()
|
||||||
|
.toByteArray();
|
||||||
|
return ProtobufUtil.prependPBMagic(bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param state
|
||||||
|
* @return Serialized protobuf of <code>state</code> with pb magic prefix
|
||||||
|
* prepended suitable for use as content of either the cluster state
|
||||||
|
* znode -- whether or not we should be replicating kept in
|
||||||
|
* /hbase/replication/state -- or as content of a peer-state znode
|
||||||
|
* under a peer cluster id as in
|
||||||
|
* /hbase/replication/peers/PEER_ID/peer-state.
|
||||||
|
*/
|
||||||
|
static byte[] toByteArray(final ZooKeeperProtos.ReplicationState.State state) {
|
||||||
|
byte[] bytes = ZooKeeperProtos.ReplicationState.newBuilder().setState(state).build()
|
||||||
|
.toByteArray();
|
||||||
|
return ProtobufUtil.prependPBMagic(bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param position
|
||||||
|
* @return Serialized protobuf of <code>position</code> with pb magic prefix
|
||||||
|
* prepended suitable for use as content of an hlog position in a
|
||||||
|
* replication queue.
|
||||||
|
*/
|
||||||
|
static byte[] toByteArray(
|
||||||
|
final long position) {
|
||||||
|
byte[] bytes = ZooKeeperProtos.ReplicationHLogPosition.newBuilder().setPosition(position)
|
||||||
|
.build().toByteArray();
|
||||||
|
return ProtobufUtil.prependPBMagic(bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param lockOwner
|
||||||
|
* @return Serialized protobuf of <code>lockOwner</code> with pb magic prefix
|
||||||
|
* prepended suitable for use as content of an replication lock during
|
||||||
|
* region server fail over.
|
||||||
|
*/
|
||||||
|
static byte[] lockToByteArray(
|
||||||
|
final String lockOwner) {
|
||||||
|
byte[] bytes = ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build()
|
||||||
|
.toByteArray();
|
||||||
|
return ProtobufUtil.prependPBMagic(bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param bytes Content of a peer znode.
|
||||||
|
* @return ClusterKey parsed from the passed bytes.
|
||||||
|
* @throws DeserializationException
|
||||||
|
*/
|
||||||
|
static String parsePeerFrom(final byte[] bytes) throws DeserializationException {
|
||||||
|
if (ProtobufUtil.isPBMagicPrefix(bytes)) {
|
||||||
|
int pblen = ProtobufUtil.lengthOfPBMagic();
|
||||||
|
ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer
|
||||||
|
.newBuilder();
|
||||||
|
ZooKeeperProtos.ReplicationPeer peer;
|
||||||
|
try {
|
||||||
|
peer = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
|
||||||
|
} catch (InvalidProtocolBufferException e) {
|
||||||
|
throw new DeserializationException(e);
|
||||||
|
}
|
||||||
|
return peer.getClusterkey();
|
||||||
|
} else {
|
||||||
|
if (bytes.length > 0) {
|
||||||
|
return Bytes.toString(bytes);
|
||||||
|
}
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param bytes Content of a state znode.
|
||||||
|
* @return State parsed from the passed bytes.
|
||||||
|
* @throws DeserializationException
|
||||||
|
*/
|
||||||
|
static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
|
||||||
|
throws DeserializationException {
|
||||||
|
ProtobufUtil.expectPBMagicPrefix(bytes);
|
||||||
|
int pblen = ProtobufUtil.lengthOfPBMagic();
|
||||||
|
ZooKeeperProtos.ReplicationState.Builder builder = ZooKeeperProtos.ReplicationState
|
||||||
|
.newBuilder();
|
||||||
|
ZooKeeperProtos.ReplicationState state;
|
||||||
|
try {
|
||||||
|
state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
|
||||||
|
return state.getState();
|
||||||
|
} catch (InvalidProtocolBufferException e) {
|
||||||
|
throw new DeserializationException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param bytes - Content of a HLog position znode.
|
||||||
|
* @return long - The current HLog position.
|
||||||
|
* @throws DeserializationException
|
||||||
|
*/
|
||||||
|
static long parseHLogPositionFrom(
|
||||||
|
final byte[] bytes) throws DeserializationException {
|
||||||
|
if (ProtobufUtil.isPBMagicPrefix(bytes)) {
|
||||||
|
int pblen = ProtobufUtil.lengthOfPBMagic();
|
||||||
|
ZooKeeperProtos.ReplicationHLogPosition.Builder builder = ZooKeeperProtos.ReplicationHLogPosition
|
||||||
|
.newBuilder();
|
||||||
|
ZooKeeperProtos.ReplicationHLogPosition position;
|
||||||
|
try {
|
||||||
|
position = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
|
||||||
|
} catch (InvalidProtocolBufferException e) {
|
||||||
|
throw new DeserializationException(e);
|
||||||
|
}
|
||||||
|
return position.getPosition();
|
||||||
|
} else {
|
||||||
|
if (bytes.length > 0) {
|
||||||
|
return Bytes.toLong(bytes);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param bytes - Content of a lock znode.
|
||||||
|
* @return String - The owner of the lock.
|
||||||
|
* @throws DeserializationException
|
||||||
|
*/
|
||||||
|
static String parseLockOwnerFrom(
|
||||||
|
final byte[] bytes) throws DeserializationException {
|
||||||
|
if (ProtobufUtil.isPBMagicPrefix(bytes)) {
|
||||||
|
int pblen = ProtobufUtil.lengthOfPBMagic();
|
||||||
|
ZooKeeperProtos.ReplicationLock.Builder builder = ZooKeeperProtos.ReplicationLock
|
||||||
|
.newBuilder();
|
||||||
|
ZooKeeperProtos.ReplicationLock lock;
|
||||||
|
try {
|
||||||
|
lock = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
|
||||||
|
} catch (InvalidProtocolBufferException e) {
|
||||||
|
throw new DeserializationException(e);
|
||||||
|
}
|
||||||
|
return lock.getLockOwner();
|
||||||
|
} else {
|
||||||
|
if (bytes.length > 0) {
|
||||||
|
return Bytes.toString(bytes);
|
||||||
|
}
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private boolean peerExists(String id) throws KeeperException {
|
private boolean peerExists(String id) throws KeeperException {
|
||||||
return ZKUtil.checkExists(this.zookeeper,
|
return ZKUtil.checkExists(this.zookeeper,
|
||||||
ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
|
ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
|
||||||
|
@ -423,7 +591,7 @@ public class ReplicationZookeeper implements Closeable{
|
||||||
* Thrown when the peer doesn't exist
|
* Thrown when the peer doesn't exist
|
||||||
*/
|
*/
|
||||||
public void enablePeer(String id) throws IOException {
|
public void enablePeer(String id) throws IOException {
|
||||||
changePeerState(id, PeerState.ENABLED);
|
changePeerState(id, ZooKeeperProtos.ReplicationState.State.ENABLED);
|
||||||
LOG.info("peer " + id + " is enabled");
|
LOG.info("peer " + id + " is enabled");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -435,22 +603,23 @@ public class ReplicationZookeeper implements Closeable{
|
||||||
* Thrown when the peer doesn't exist
|
* Thrown when the peer doesn't exist
|
||||||
*/
|
*/
|
||||||
public void disablePeer(String id) throws IOException {
|
public void disablePeer(String id) throws IOException {
|
||||||
changePeerState(id, PeerState.DISABLED);
|
changePeerState(id, ZooKeeperProtos.ReplicationState.State.DISABLED);
|
||||||
LOG.info("peer " + id + " is disabled");
|
LOG.info("peer " + id + " is disabled");
|
||||||
}
|
}
|
||||||
|
|
||||||
private void changePeerState(String id, PeerState state) throws IOException {
|
private void changePeerState(String id, ZooKeeperProtos.ReplicationState.State state)
|
||||||
|
throws IOException {
|
||||||
try {
|
try {
|
||||||
if (!peerExists(id)) {
|
if (!peerExists(id)) {
|
||||||
throw new IllegalArgumentException("peer " + id + " is not registered");
|
throw new IllegalArgumentException("peer " + id + " is not registered");
|
||||||
}
|
}
|
||||||
String peerStateZNode = getPeerStateNode(id);
|
String peerStateZNode = getPeerStateNode(id);
|
||||||
|
byte[] stateBytes = (state == ZooKeeperProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
|
||||||
|
: DISABLED_ZNODE_BYTES;
|
||||||
if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
|
if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
|
||||||
ZKUtil.setData(this.zookeeper, peerStateZNode,
|
ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
|
||||||
Bytes.toBytes(state.name()));
|
|
||||||
} else {
|
} else {
|
||||||
ZKUtil.createAndWatch(zookeeper, peerStateZNode,
|
ZKUtil.createAndWatch(zookeeper, peerStateZNode, stateBytes);
|
||||||
Bytes.toBytes(state.name()));
|
|
||||||
}
|
}
|
||||||
LOG.info("state of the peer " + id + " changed to " + state.name());
|
LOG.info("state of the peer " + id + " changed to " + state.name());
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
|
@ -458,18 +627,6 @@ public class ReplicationZookeeper implements Closeable{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Get state of the peer. This method checks the state by connecting to ZK.
|
|
||||||
*
|
|
||||||
* @param id peer's identifier
|
|
||||||
* @return current state of the peer
|
|
||||||
*/
|
|
||||||
public PeerState getPeerState(String id) throws KeeperException {
|
|
||||||
byte[] peerStateBytes = ZKUtil
|
|
||||||
.getData(this.zookeeper, getPeerStateNode(id));
|
|
||||||
return PeerState.valueOf(Bytes.toString(peerStateBytes));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check whether the peer is enabled or not. This method checks the atomic
|
* Check whether the peer is enabled or not. This method checks the atomic
|
||||||
* boolean of ReplicationPeer locally.
|
* boolean of ReplicationPeer locally.
|
||||||
|
@ -487,8 +644,7 @@ public class ReplicationZookeeper implements Closeable{
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getPeerStateNode(String id) {
|
private String getPeerStateNode(String id) {
|
||||||
return ZKUtil.joinZNode(this.peersZNode,
|
return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
|
||||||
ZKUtil.joinZNode(id, this.peerStateNodeName));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -516,7 +672,11 @@ public class ReplicationZookeeper implements Closeable{
|
||||||
setReplicating(true);
|
setReplicating(true);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return Boolean.parseBoolean(Bytes.toString(data));
|
try {
|
||||||
|
return isPeerEnabled(data);
|
||||||
|
} catch (DeserializationException e) {
|
||||||
|
throw ZKUtil.convert(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getRepStateNode() {
|
private String getRepStateNode() {
|
||||||
|
@ -563,8 +723,7 @@ public class ReplicationZookeeper implements Closeable{
|
||||||
String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId);
|
String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId);
|
||||||
znode = ZKUtil.joinZNode(znode, filename);
|
znode = ZKUtil.joinZNode(znode, filename);
|
||||||
// Why serialize String of Long and note Long as bytes?
|
// Why serialize String of Long and note Long as bytes?
|
||||||
ZKUtil.setData(this.zookeeper, znode,
|
ZKUtil.setData(this.zookeeper, znode, toByteArray(position));
|
||||||
Bytes.toBytes(Long.toString(position)));
|
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
this.abortable.abort("Writing replication status", e);
|
this.abortable.abort("Writing replication status", e);
|
||||||
}
|
}
|
||||||
|
@ -648,7 +807,7 @@ public class ReplicationZookeeper implements Closeable{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE);
|
String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE);
|
||||||
ZKUtil.createAndWatch(this.zookeeper, p, Bytes.toBytes(rsServerNameZnode));
|
ZKUtil.createAndWatch(this.zookeeper, p, lockToByteArray(rsServerNameZnode));
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
// This exception will pop up if the znode under which we're trying to
|
// This exception will pop up if the znode under which we're trying to
|
||||||
// create the lock is already deleted by another region server, meaning
|
// create the lock is already deleted by another region server, meaning
|
||||||
|
@ -707,10 +866,18 @@ public class ReplicationZookeeper implements Closeable{
|
||||||
queues.put(newCluster, logQueue);
|
queues.put(newCluster, logQueue);
|
||||||
for (String hlog : hlogs) {
|
for (String hlog : hlogs) {
|
||||||
String z = ZKUtil.joinZNode(clusterPath, hlog);
|
String z = ZKUtil.joinZNode(clusterPath, hlog);
|
||||||
byte [] position = ZKUtil.getData(this.zookeeper, z);
|
byte[] positionBytes = ZKUtil.getData(this.zookeeper, z);
|
||||||
LOG.debug("Creating " + hlog + " with data " + Bytes.toString(position));
|
long position = 0;
|
||||||
|
try {
|
||||||
|
position = parseHLogPositionFrom(positionBytes);
|
||||||
|
} catch (DeserializationException e) {
|
||||||
|
LOG.warn("Failed parse of hlog position from the following znode: " + z);
|
||||||
|
}
|
||||||
|
LOG.debug("Creating " + hlog + " with data " + position);
|
||||||
String child = ZKUtil.joinZNode(newClusterZnode, hlog);
|
String child = ZKUtil.joinZNode(newClusterZnode, hlog);
|
||||||
ZKUtil.createAndWatch(this.zookeeper, child, position);
|
// Position doesn't actually change, we are just deserializing it for
|
||||||
|
// logging, so just use the already serialized version
|
||||||
|
ZKUtil.createAndWatch(this.zookeeper, child, positionBytes);
|
||||||
logQueue.add(hlog);
|
logQueue.add(hlog);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -797,8 +964,16 @@ public class ReplicationZookeeper implements Closeable{
|
||||||
throws KeeperException {
|
throws KeeperException {
|
||||||
String clusterZnode = ZKUtil.joinZNode(rsServerNameZnode, peerId);
|
String clusterZnode = ZKUtil.joinZNode(rsServerNameZnode, peerId);
|
||||||
String znode = ZKUtil.joinZNode(clusterZnode, hlog);
|
String znode = ZKUtil.joinZNode(clusterZnode, hlog);
|
||||||
String data = Bytes.toString(ZKUtil.getData(this.zookeeper, znode));
|
byte[] bytes = ZKUtil.getData(this.zookeeper, znode);
|
||||||
return data == null || data.length() == 0 ? 0 : Long.parseLong(data);
|
try {
|
||||||
|
return parseHLogPositionFrom(bytes);
|
||||||
|
} catch (DeserializationException de) {
|
||||||
|
LOG.warn("Failed parse of HLogPosition for peerId=" + peerId + " and hlog=" + hlog
|
||||||
|
+ "znode content, continuing.");
|
||||||
|
}
|
||||||
|
// if we can not parse the position, start at the beginning of the hlog file
|
||||||
|
// again
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void registerRegionServerListener(ZooKeeperListener listener) {
|
public void registerRegionServerListener(ZooKeeperListener listener) {
|
||||||
|
@ -846,6 +1021,35 @@ public class ReplicationZookeeper implements Closeable{
|
||||||
statusTracker.stop();
|
statusTracker.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility method to ensure an ENABLED znode is in place; if not present, we
|
||||||
|
* create it.
|
||||||
|
* @param zookeeper
|
||||||
|
* @param path Path to znode to check
|
||||||
|
* @return True if we created the znode.
|
||||||
|
* @throws NodeExistsException
|
||||||
|
* @throws KeeperException
|
||||||
|
*/
|
||||||
|
static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
|
||||||
|
throws NodeExistsException, KeeperException {
|
||||||
|
if (ZKUtil.checkExists(zookeeper, path) == -1) {
|
||||||
|
ZKUtil.createAndWatch(zookeeper, path, ENABLED_ZNODE_BYTES);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param bytes
|
||||||
|
* @return True if the passed in <code>bytes</code> are those of a pb
|
||||||
|
* serialized ENABLED state.
|
||||||
|
* @throws DeserializationException
|
||||||
|
*/
|
||||||
|
static boolean isPeerEnabled(final byte[] bytes) throws DeserializationException {
|
||||||
|
ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
|
||||||
|
return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tracker for status of the replication
|
* Tracker for status of the replication
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -93,8 +93,8 @@ public class Replication implements WALActionsListener,
|
||||||
throw new IOException("Failed replication handler create " +
|
throw new IOException("Failed replication handler create " +
|
||||||
"(replicating=" + this.replicating, ke);
|
"(replicating=" + this.replicating, ke);
|
||||||
}
|
}
|
||||||
this.replicationManager = new ReplicationSourceManager(zkHelper, conf,
|
this.replicationManager = new ReplicationSourceManager(zkHelper, conf, this.server, fs,
|
||||||
this.server, fs, this.replicating, logDir, oldLogDir) ;
|
this.replicating, logDir, oldLogDir);
|
||||||
} else {
|
} else {
|
||||||
this.replicationManager = null;
|
this.replicationManager = null;
|
||||||
this.zkHelper = null;
|
this.zkHelper = null;
|
||||||
|
|
|
@ -98,3 +98,37 @@ message Table {
|
||||||
// for more.
|
// for more.
|
||||||
required State state = 1 [default = ENABLED];
|
required State state = 1 [default = ENABLED];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used by replication. Holds a replication peer key.
|
||||||
|
*/
|
||||||
|
message ReplicationPeer {
|
||||||
|
// clusterKey is the concatenation of the slave cluster's
|
||||||
|
// hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
|
||||||
|
required string clusterkey = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used by replication. Holds whether enabled or disabled
|
||||||
|
*/
|
||||||
|
message ReplicationState {
|
||||||
|
enum State {
|
||||||
|
ENABLED = 0;
|
||||||
|
DISABLED = 1;
|
||||||
|
}
|
||||||
|
required State state = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used by replication. Holds the current position in an HLog file.
|
||||||
|
*/
|
||||||
|
message ReplicationHLogPosition {
|
||||||
|
required int64 position = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used by replication. Used to lock a region server during failover.
|
||||||
|
*/
|
||||||
|
message ReplicationLock {
|
||||||
|
required string lockOwner = 1;
|
||||||
|
}
|
||||||
|
|
|
@ -112,9 +112,9 @@ public class TestReplicationSourceManager {
|
||||||
+ conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
|
+ conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
|
||||||
ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
|
ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
|
||||||
ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
|
ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
|
||||||
Bytes.toBytes(ReplicationZookeeper.PeerState.ENABLED.name()));
|
ReplicationZookeeper.ENABLED_ZNODE_BYTES);
|
||||||
ZKUtil.createWithParents(zkw, "/hbase/replication/state");
|
ZKUtil.createWithParents(zkw, "/hbase/replication/state");
|
||||||
ZKUtil.setData(zkw, "/hbase/replication/state", Bytes.toBytes("true"));
|
ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationZookeeper.ENABLED_ZNODE_BYTES);
|
||||||
|
|
||||||
replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
|
replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
|
||||||
manager = replication.getReplicationManager();
|
manager = replication.getReplicationManager();
|
||||||
|
@ -135,8 +135,6 @@ public class TestReplicationSourceManager {
|
||||||
htd.addFamily(col);
|
htd.addFamily(col);
|
||||||
|
|
||||||
hri = new HRegionInfo(htd.getName(), r1, r2);
|
hri = new HRegionInfo(htd.getName(), r1, r2);
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
|
|
Loading…
Reference in New Issue