diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
index 8b63e3a8c66..4f4b870655a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
@@ -25,10 +25,15 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+
+import com.google.protobuf.InvalidProtocolBufferException;
import java.io.Closeable;
import java.io.IOException;
@@ -79,7 +84,7 @@ public class ReplicationPeer implements Abortable, Closeable {
*/
public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode)
throws KeeperException {
- ReplicationZookeeper.ensurePeerEnabled(zookeeper, peerStateNode);
+ ensurePeerEnabled(zookeeper, peerStateNode);
this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
this.peerStateTracker.start();
try {
@@ -90,7 +95,7 @@ public class ReplicationPeer implements Abortable, Closeable {
}
private void readPeerStateZnode() throws DeserializationException {
- this.peerEnabled.set(ReplicationZookeeper.isStateEnabled(this.peerStateTracker.getData(false)));
+ this.peerEnabled.set(isStateEnabled(this.peerStateTracker.getData(false)));
}
/**
@@ -181,6 +186,57 @@ public class ReplicationPeer implements Abortable, Closeable {
}
}
+ /**
+ * @param bytes
+ * @return True if the passed in bytes
are those of a pb serialized ENABLED state.
+ * @throws DeserializationException
+ */
+ private static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
+ ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
+ return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
+ }
+
+ /**
+ * @param bytes Content of a state znode.
+ * @return State parsed from the passed bytes.
+ * @throws DeserializationException
+ */
+ private static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
+ throws DeserializationException {
+ ProtobufUtil.expectPBMagicPrefix(bytes);
+ int pblen = ProtobufUtil.lengthOfPBMagic();
+ ZooKeeperProtos.ReplicationState.Builder builder =
+ ZooKeeperProtos.ReplicationState.newBuilder();
+ ZooKeeperProtos.ReplicationState state;
+ try {
+ state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
+ return state.getState();
+ } catch (InvalidProtocolBufferException e) {
+ throw new DeserializationException(e);
+ }
+ }
+
+ /**
+ * Utility method to ensure an ENABLED znode is in place; if not present, we create it.
+ * @param zookeeper
+ * @param path Path to znode to check
+ * @return True if we created the znode.
+ * @throws NodeExistsException
+ * @throws KeeperException
+ */
+ private static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
+ throws NodeExistsException, KeeperException {
+ if (ZKUtil.checkExists(zookeeper, path) == -1) {
+ // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
+ // peer-state znode. This happens while adding a peer.
+ // The peer state data is set as "ENABLED" by default.
+ ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path,
+ ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+ return true;
+ }
+ return false;
+ }
+
/**
* Tracker for state of this peer
*/
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
new file mode 100644
index 00000000000..7ec77c4b8e3
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -0,0 +1,138 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * This provides an interface for maintaining a set of peer clusters. These peers are remote slave
+ * clusters that data is replicated to. A peer cluster can be in three different states:
+ *
+ * 1. Not-Registered - There is no notion of the peer cluster.
+ * 2. Registered - The peer has an id and is being tracked but there is no connection.
+ * 3. Connected - There is an active connection to the remote peer.
+ *
+ * In the registered or connected state, a peer cluster can either be enabled or disabled.
+ */
+@InterfaceAudience.Private
+public interface ReplicationPeers {
+
+ /**
+ * Initialize the ReplicationPeers interface.
+ * @throws KeeperException
+ */
+ public void init() throws IOException, KeeperException;
+
+ /**
+ * Add a new remote slave cluster for replication.
+ * @param peerId a short that identifies the cluster
+ * @param clusterKey the concatenation of the slave cluster's:
+ * hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
+ */
+ public void addPeer(String peerId, String clusterKey) throws IOException;
+
+ /**
+ * Removes a remote slave cluster and stops the replication to it.
+ * @param peerId a short that identifies the cluster
+ */
+ public void removePeer(String peerId) throws IOException;
+
+ /**
+ * Restart the replication to the specified remote slave cluster.
+ * @param peerId a short that identifies the cluster
+ */
+ public void enablePeer(String peerId) throws IOException;
+
+ /**
+ * Stop the replication to the specified remote slave cluster.
+ * @param peerId a short that identifies the cluster
+ */
+ public void disablePeer(String peerId) throws IOException;
+
+ /**
+ * Get the replication status for the specified connected remote slave cluster.
+ * @param peerId a short that identifies the cluster
+ * @return true if replication is enabled, false otherwise.
+ */
+ public boolean getStatusOfConnectedPeer(String peerId);
+
+ /**
+ * Get a set of all connected remote slave clusters.
+ * @return set of peer ids
+ */
+ public Set getConnectedPeers();
+
+ /**
+ * List the cluster keys of all remote slave clusters (whether they are enabled/disabled or
+ * connected/disconnected).
+ * @return A map of peer ids to peer cluster keys
+ */
+ public Map getAllPeerClusterKeys();
+
+ /**
+ * List the peer ids of all remote slave clusters (whether they are enabled/disabled or
+ * connected/disconnected).
+ * @return A list of peer ids
+ */
+ public List getAllPeerIds();
+
+ /**
+ * Attempt to connect to a new remote slave cluster.
+ * @param peerId a short that identifies the cluster
+ * @return true if a new connection was made, false if no new connection was made.
+ */
+ public boolean connectToPeer(String peerId) throws IOException, KeeperException;
+
+ /**
+ * Disconnect from a remote slave cluster.
+ * @param peerId a short that identifies the cluster
+ */
+ public void disconnectFromPeer(String peerId);
+
+ /**
+ * Returns all region servers from given connected remote slave cluster.
+ * @param peerId a short that identifies the cluster
+ * @return addresses of all region servers in the peer cluster. Returns an empty list if the peer
+ * cluster is unavailable or there are no region servers in the cluster.
+ */
+ public List getRegionServersOfConnectedPeer(String peerId);
+
+ /**
+ * Returns the UUID of the provided peer id.
+ * @param peerId the peer's ID that will be converted into a UUID
+ * @return a UUID or null if the peer cluster does not exist or is not connected.
+ */
+ public UUID getPeerUUID(String peerId);
+
+ /**
+ * Returns the configuration needed to talk to the remote slave cluster.
+ * @param peerId a short that identifies the cluster
+ * @return the configuration for the peer cluster, null if it was unable to get the configuration
+ */
+ public Configuration getPeerConf(String peerId) throws KeeperException;
+}
\ No newline at end of file
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
new file mode 100644
index 00000000000..7dee2a41d8e
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -0,0 +1,409 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.ConnectionLossException;
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * This class provides an implementation of the ReplicationPeers interface using Zookeeper. The
+ * peers znode contains a list of all peer replication clusters and the current replication state of
+ * those clusters. It has one child peer znode for each peer cluster. The peer znode is named with
+ * the cluster id provided by the user in the HBase shell. The value of the peer znode contains the
+ * peers cluster key provided by the user in the HBase Shell. The cluster key contains a list of
+ * zookeeper quorum peers, the client port for the zookeeper quorum, and the base znode for HBase.
+ * For example:
+ *
+ * /hbase/replication/peers/1 [Value: zk1.host.com,zk2.host.com,zk3.host.com:2181:/hbase]
+ * /hbase/replication/peers/2 [Value: zk5.host.com,zk6.host.com,zk7.host.com:2181:/hbase]
+ *
+ * Each of these peer znodes has a child znode that indicates whether or not replication is enabled
+ * on that peer cluster. These peer-state znodes do not have child znodes and simply contain a
+ * boolean value (i.e. ENABLED or DISABLED). This value is read/maintained by the
+ * ReplicationPeer.PeerStateTracker class. For example:
+ *
+ * /hbase/replication/peers/1/peer-state [Value: ENABLED]
+ */
+public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers {
+
+ // Map of peer clusters keyed by their id
+ private Map peerClusters;
+
+ private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class);
+
+ public ReplicationPeersZKImpl(final ZooKeeperWatcher zk, final Configuration conf,
+ Abortable abortable) {
+ super(zk, conf, abortable);
+ this.peerClusters = new HashMap();
+ }
+
+ @Override
+ public void init() throws IOException, KeeperException {
+ ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
+ connectExistingPeers();
+ }
+
+ @Override
+ public void addPeer(String id, String clusterKey) throws IOException {
+ try {
+ if (peerExists(id)) {
+ throw new IllegalArgumentException("Cannot add existing peer");
+ }
+ ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
+ ZKUtil.createAndWatch(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id),
+ toByteArray(clusterKey));
+ // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
+ // peer-state znode. This happens while adding a peer.
+ // The peer state data is set as "ENABLED" by default.
+ ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, getPeerStateNode(id),
+ ENABLED_ZNODE_BYTES);
+ // A peer is enabled by default
+ } catch (KeeperException e) {
+ throw new IOException("Unable to add peer", e);
+ }
+ }
+
+ @Override
+ public void removePeer(String id) throws IOException {
+ try {
+ if (!peerExists(id)) {
+ throw new IllegalArgumentException("Cannot remove inexisting peer");
+ }
+ ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
+ } catch (KeeperException e) {
+ throw new IOException("Unable to remove a peer", e);
+ }
+ }
+
+ @Override
+ public void enablePeer(String id) throws IOException {
+ changePeerState(id, ZooKeeperProtos.ReplicationState.State.ENABLED);
+ LOG.info("peer " + id + " is enabled");
+ }
+
+ @Override
+ public void disablePeer(String id) throws IOException {
+ changePeerState(id, ZooKeeperProtos.ReplicationState.State.DISABLED);
+ LOG.info("peer " + id + " is disabled");
+ }
+
+ @Override
+ public boolean getStatusOfConnectedPeer(String id) {
+ if (!this.peerClusters.containsKey(id)) {
+ throw new IllegalArgumentException("peer " + id + " is not connected");
+ }
+ return this.peerClusters.get(id).getPeerEnabled().get();
+ }
+
+ @Override
+ public boolean connectToPeer(String peerId) throws IOException, KeeperException {
+ if (peerClusters == null) {
+ return false;
+ }
+ if (this.peerClusters.containsKey(peerId)) {
+ return false;
+ }
+ ReplicationPeer peer = getPeer(peerId);
+ if (peer == null) {
+ return false;
+ }
+ this.peerClusters.put(peerId, peer);
+ LOG.info("Added new peer cluster " + peer.getClusterKey());
+ return true;
+ }
+
+ @Override
+ public void disconnectFromPeer(String peerId) {
+ ReplicationPeer rp = this.peerClusters.get(peerId);
+ if (rp != null) {
+ rp.getZkw().close();
+ this.peerClusters.remove(peerId);
+ }
+ }
+
+ @Override
+ public Map getAllPeerClusterKeys() {
+ Map peers = new TreeMap();
+ List ids = null;
+ try {
+ ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
+ for (String id : ids) {
+ byte[] bytes = ZKUtil.getData(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
+ String clusterKey = null;
+ try {
+ clusterKey = parsePeerFrom(bytes);
+ } catch (DeserializationException de) {
+ LOG.warn("Failed parse of clusterid=" + id + " znode content, continuing.");
+ continue;
+ }
+ peers.put(id, clusterKey);
+ }
+ } catch (KeeperException e) {
+ this.abortable.abort("Cannot get the list of peers ", e);
+ }
+ return peers;
+ }
+
+ @Override
+ public List getRegionServersOfConnectedPeer(String peerId) {
+ if (this.peerClusters.size() == 0) {
+ return Collections.emptyList();
+ }
+ ReplicationPeer peer = this.peerClusters.get(peerId);
+ if (peer == null) {
+ return Collections.emptyList();
+ }
+ List addresses;
+ try {
+ addresses = fetchSlavesAddresses(peer.getZkw());
+ } catch (KeeperException ke) {
+ reconnectPeer(ke, peer);
+ addresses = Collections.emptyList();
+ }
+ peer.setRegionServers(addresses);
+ return peer.getRegionServers();
+ }
+
+ @Override
+ public UUID getPeerUUID(String peerId) {
+ ReplicationPeer peer = this.peerClusters.get(peerId);
+ if (peer == null) {
+ return null;
+ }
+ UUID peerUUID = null;
+ try {
+ peerUUID = ZKClusterId.getUUIDForCluster(peer.getZkw());
+ } catch (KeeperException ke) {
+ reconnectPeer(ke, peer);
+ }
+ return peerUUID;
+ }
+
+ @Override
+ public Set getConnectedPeers() {
+ return this.peerClusters.keySet();
+ }
+
+ @Override
+ public Configuration getPeerConf(String peerId) throws KeeperException {
+ String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
+ byte[] data = ZKUtil.getData(this.zookeeper, znode);
+ if (data == null) {
+ LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId);
+ return null;
+ }
+ String otherClusterKey = "";
+ try {
+ otherClusterKey = parsePeerFrom(data);
+ } catch (DeserializationException e) {
+ LOG.warn("Failed to parse cluster key from peerId=" + peerId
+ + ", specifically the content from the following znode: " + znode);
+ return null;
+ }
+
+ Configuration otherConf = new Configuration(this.conf);
+ try {
+ ZKUtil.applyClusterKeyToConf(otherConf, otherClusterKey);
+ } catch (IOException e) {
+ LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e);
+ return null;
+ }
+ return otherConf;
+ }
+
+ /**
+ * List all registered peer clusters and set a watch on their znodes.
+ */
+ @Override
+ public List getAllPeerIds() {
+ List ids = null;
+ try {
+ ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
+ } catch (KeeperException e) {
+ this.abortable.abort("Cannot get the list of peers ", e);
+ }
+ return ids;
+ }
+
+ /**
+ * A private method used during initialization. This method attempts to connect to all registered
+ * peer clusters. This method does not set a watch on the peer cluster znodes.
+ * @throws IOException
+ * @throws KeeperException
+ */
+ private void connectExistingPeers() throws IOException, KeeperException {
+ List znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
+ if (znodes != null) {
+ for (String z : znodes) {
+ connectToPeer(z);
+ }
+ }
+ }
+
+ /**
+ * A private method used to re-establish a zookeeper session with a peer cluster.
+ * @param ke
+ * @param peer
+ */
+ private void reconnectPeer(KeeperException ke, ReplicationPeer peer) {
+ if (ke instanceof ConnectionLossException || ke instanceof SessionExpiredException) {
+ LOG.warn("Lost the ZooKeeper connection for peer " + peer.getClusterKey(), ke);
+ try {
+ peer.reloadZkWatcher();
+ } catch (IOException io) {
+ LOG.warn("Creation of ZookeeperWatcher failed for peer " + peer.getClusterKey(), io);
+ }
+ }
+ }
+
+ /**
+ * Get the list of all the region servers from the specified peer
+ * @param zkw zk connection to use
+ * @return list of region server addresses or an empty list if the slave is unavailable
+ */
+ private static List fetchSlavesAddresses(ZooKeeperWatcher zkw)
+ throws KeeperException {
+ List children = ZKUtil.listChildrenNoWatch(zkw, zkw.rsZNode);
+ if (children == null) {
+ return Collections.emptyList();
+ }
+ List addresses = new ArrayList(children.size());
+ for (String child : children) {
+ addresses.add(ServerName.parseServerName(child));
+ }
+ return addresses;
+ }
+
+ private String getPeerStateNode(String id) {
+ return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
+ }
+
+ /**
+ * Update the state znode of a peer cluster.
+ * @param id
+ * @param state
+ * @throws IOException
+ */
+ private void changePeerState(String id, ZooKeeperProtos.ReplicationState.State state)
+ throws IOException {
+ try {
+ if (!peerExists(id)) {
+ throw new IllegalArgumentException("peer " + id + " is not registered");
+ }
+ String peerStateZNode = getPeerStateNode(id);
+ byte[] stateBytes =
+ (state == ZooKeeperProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
+ : DISABLED_ZNODE_BYTES;
+ if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
+ ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
+ } else {
+ ZKUtil.createAndWatch(this.zookeeper, peerStateZNode, stateBytes);
+ }
+ LOG.info("state of the peer " + id + " changed to " + state.name());
+ } catch (KeeperException e) {
+ throw new IOException("Unable to change state of the peer " + id, e);
+ }
+ }
+
+ /**
+ * Helper method to connect to a peer
+ * @param peerId peer's identifier
+ * @return object representing the peer
+ * @throws IOException
+ * @throws KeeperException
+ */
+ private ReplicationPeer getPeer(String peerId) throws IOException, KeeperException {
+ Configuration peerConf = getPeerConf(peerId);
+ if (peerConf == null) {
+ return null;
+ }
+ if (this.ourClusterKey.equals(ZKUtil.getZooKeeperClusterKey(peerConf))) {
+ LOG.debug("Not connecting to " + peerId + " because it's us");
+ return null;
+ }
+
+ ReplicationPeer peer =
+ new ReplicationPeer(peerConf, peerId, ZKUtil.getZooKeeperClusterKey(peerConf));
+ peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
+ return peer;
+ }
+
+ /**
+ * @param bytes Content of a peer znode.
+ * @return ClusterKey parsed from the passed bytes.
+ * @throws DeserializationException
+ */
+ private static String parsePeerFrom(final byte[] bytes) throws DeserializationException {
+ if (ProtobufUtil.isPBMagicPrefix(bytes)) {
+ int pblen = ProtobufUtil.lengthOfPBMagic();
+ ZooKeeperProtos.ReplicationPeer.Builder builder =
+ ZooKeeperProtos.ReplicationPeer.newBuilder();
+ ZooKeeperProtos.ReplicationPeer peer;
+ try {
+ peer = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
+ } catch (InvalidProtocolBufferException e) {
+ throw new DeserializationException(e);
+ }
+ return peer.getClusterkey();
+ } else {
+ if (bytes.length > 0) {
+ return Bytes.toString(bytes);
+ }
+ return "";
+ }
+ }
+
+ /**
+ * @param clusterKey
+ * @return Serialized protobuf of clusterKey
with pb magic prefix prepended suitable
+ * for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under
+ * /hbase/replication/peers/PEER_ID
+ */
+ private static byte[] toByteArray(final String clusterKey) {
+ byte[] bytes =
+ ZooKeeperProtos.ReplicationPeer.newBuilder().setClusterkey(clusterKey).build()
+ .toByteArray();
+ return ProtobufUtil.prependPBMagic(bytes);
+ }
+}
\ No newline at end of file
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
index 068d91631e2..f7fb89f23a1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
@@ -22,12 +22,14 @@ import java.util.List;
import java.util.SortedMap;
import java.util.SortedSet;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
/**
* This provides an interface for maintaining a region server's replication queues. These queues
* keep track of the HLogs that still need to be replicated to remote clusters.
*/
+@InterfaceAudience.Private
public interface ReplicationQueues {
/**
@@ -35,7 +37,7 @@ public interface ReplicationQueues {
* @param serverName The server name of the region server that owns the replication queues this
* interface manages.
*/
- public void init(String serverName);
+ public void init(String serverName) throws KeeperException;
/**
* Remove a replication queue.
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index 464daa73a57..1b55f9242b6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -39,8 +39,27 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
import org.apache.zookeeper.KeeperException;
-import com.google.protobuf.InvalidProtocolBufferException;
-
+/**
+ * This class provides an implementation of the ReplicationQueues interface using Zookeeper. The
+ * base znode that this class works at is the myQueuesZnode. The myQueuesZnode contains a list of
+ * all outstanding HLog files on this region server that need to be replicated. The myQueuesZnode is
+ * the regionserver name (a concatenation of the region server’s hostname, client port and start
+ * code). For example:
+ *
+ * /hbase/replication/rs/hostname.example.org,6020,1234
+ *
+ * Within this znode, the region server maintains a set of HLog replication queues. These queues are
+ * represented by child znodes named using there give queue id. For example:
+ *
+ * /hbase/replication/rs/hostname.example.org,6020,1234/1
+ * /hbase/replication/rs/hostname.example.org,6020,1234/2
+ *
+ * Each queue has one child znode for every HLog that still needs to be replicated. The value of
+ * these HLog child znodes is the latest position that has been replicated. This position is updated
+ * every time a HLog entry is replicated. For example:
+ *
+ * /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 [VALUE: 254]
+ */
public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements ReplicationQueues {
/** Znode containing all replication queues for this region server. */
@@ -50,14 +69,15 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class);
- public ReplicationQueuesZKImpl(final ZooKeeperWatcher zk, Configuration conf, Abortable abortable)
- throws KeeperException {
+ public ReplicationQueuesZKImpl(final ZooKeeperWatcher zk, Configuration conf,
+ Abortable abortable) {
super(zk, conf, abortable);
}
@Override
- public void init(String serverName) {
+ public void init(String serverName) throws KeeperException {
this.myQueuesZnode = ZKUtil.joinZNode(this.queuesZNode, serverName);
+ ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode);
}
@Override
@@ -94,7 +114,7 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
znode = ZKUtil.joinZNode(znode, filename);
// Why serialize String of Long and not Long as bytes?
- ZKUtil.setData(this.zookeeper, znode, toByteArray(position));
+ ZKUtil.setData(this.zookeeper, znode, ZKUtil.positionToByteArray(position));
} catch (KeeperException e) {
this.abortable.abort("Failed to write replication hlog position (filename=" + filename
+ ", position=" + position + ")", e);
@@ -107,7 +127,7 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
String znode = ZKUtil.joinZNode(clusterZnode, filename);
byte[] bytes = ZKUtil.getData(this.zookeeper, znode);
try {
- return parseHLogPositionFrom(bytes);
+ return ZKUtil.parseHLogPositionFrom(bytes);
} catch (DeserializationException de) {
LOG.warn("Failed to parse HLogPosition for queueId=" + queueId + " and hlog=" + filename
+ "znode content, continuing.");
@@ -351,7 +371,7 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
byte[] positionBytes = ZKUtil.getData(this.zookeeper, z);
long position = 0;
try {
- position = parseHLogPositionFrom(positionBytes);
+ position = ZKUtil.parseHLogPositionFrom(positionBytes);
} catch (DeserializationException e) {
LOG.warn("Failed parse of hlog position from the following znode: " + z
+ ", Exception: " + e);
@@ -380,44 +400,4 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build().toByteArray();
return ProtobufUtil.prependPBMagic(bytes);
}
-
- /**
- * @param position
- * @return Serialized protobuf of position
with pb magic prefix prepended suitable
- * for use as content of an hlog position in a replication queue.
- */
- static byte[] toByteArray(final long position) {
- byte[] bytes =
- ZooKeeperProtos.ReplicationHLogPosition.newBuilder().setPosition(position).build()
- .toByteArray();
- return ProtobufUtil.prependPBMagic(bytes);
- }
-
- /**
- * @param bytes - Content of a HLog position znode.
- * @return long - The current HLog position.
- * @throws DeserializationException
- */
- private long parseHLogPositionFrom(final byte[] bytes) throws DeserializationException {
- if(bytes == null) {
- throw new DeserializationException("Unable to parse null HLog position.");
- }
- if (ProtobufUtil.isPBMagicPrefix(bytes)) {
- int pblen = ProtobufUtil.lengthOfPBMagic();
- ZooKeeperProtos.ReplicationHLogPosition.Builder builder =
- ZooKeeperProtos.ReplicationHLogPosition.newBuilder();
- ZooKeeperProtos.ReplicationHLogPosition position;
- try {
- position = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
- } catch (InvalidProtocolBufferException e) {
- throw new DeserializationException(e);
- }
- return position.getPosition();
- } else {
- if (bytes.length > 0) {
- return Bytes.toLong(bytes);
- }
- return 0;
- }
- }
}
\ No newline at end of file
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java
index ff4d07c602a..fa880536299 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java
@@ -54,8 +54,6 @@ public class ReplicationStateImpl extends ReplicationStateZKBase implements
// Set a tracker on replicationStateNode
this.stateTracker =
new ReplicationStateTracker(this.zookeeper, this.stateZNode, this.abortable);
- stateTracker.start();
- readReplicationStateZnode();
}
public ReplicationStateImpl(final ZooKeeperWatcher zk, final Configuration conf,
@@ -63,6 +61,13 @@ public class ReplicationStateImpl extends ReplicationStateZKBase implements
this(zk, conf, abortable, new AtomicBoolean());
}
+ @Override
+ public void init() throws KeeperException {
+ ZKUtil.createWithParents(this.zookeeper, this.stateZNode);
+ stateTracker.start();
+ readReplicationStateZnode();
+ }
+
@Override
public boolean getState() throws KeeperException {
return getReplication();
@@ -115,8 +120,7 @@ public class ReplicationStateImpl extends ReplicationStateZKBase implements
*/
private void setReplicating(boolean newState) throws KeeperException {
ZKUtil.createWithParents(this.zookeeper, this.stateZNode);
- byte[] stateBytes = (newState == true) ? ReplicationZookeeper.ENABLED_ZNODE_BYTES
- : ReplicationZookeeper.DISABLED_ZNODE_BYTES;
+ byte[] stateBytes = (newState == true) ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES;
ZKUtil.setData(this.zookeeper, this.stateZNode, stateBytes);
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java
index c3028d41061..69dd5402a58 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java
@@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.replication;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import java.io.Closeable;
@@ -27,19 +28,23 @@ import java.io.Closeable;
* cluster. This state is used to indicate whether replication is enabled or
* disabled on a cluster.
*/
+@InterfaceAudience.Private
public interface ReplicationStateInterface extends Closeable {
-
+
+ /**
+ * Initialize the replication state interface.
+ */
+ public void init() throws KeeperException;
+
/**
* Get the current state of replication (i.e. ENABLED or DISABLED).
- *
* @return true if replication is enabled, false otherwise
* @throws KeeperException
*/
public boolean getState() throws KeeperException;
-
+
/**
* Set the state of replication.
- *
* @param newState
* @throws KeeperException
*/
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
index 20ded673856..78f5f385b95 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
@@ -22,6 +22,8 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
@@ -51,7 +53,13 @@ public abstract class ReplicationStateZKBase {
protected final Configuration conf;
protected final Abortable abortable;
- public ReplicationStateZKBase(ZooKeeperWatcher zookeeper, Configuration conf,
+ // Public for testing
+ public static final byte[] ENABLED_ZNODE_BYTES =
+ toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED);
+ protected static final byte[] DISABLED_ZNODE_BYTES =
+ toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED);
+
+ public ReplicationStateZKBase(ZooKeeperWatcher zookeeper, Configuration conf,
Abortable abortable) {
this.zookeeper = zookeeper;
this.conf = conf;
@@ -79,8 +87,20 @@ public abstract class ReplicationStateZKBase {
return result;
}
+ /**
+ * @param state
+ * @return Serialized protobuf of state
with pb magic prefix prepended suitable for
+ * use as content of either the cluster state znode -- whether or not we should be
+ * replicating kept in /hbase/replication/state -- or as content of a peer-state znode
+ * under a peer cluster id as in /hbase/replication/peers/PEER_ID/peer-state.
+ */
+ protected static byte[] toByteArray(final ZooKeeperProtos.ReplicationState.State state) {
+ byte[] bytes =
+ ZooKeeperProtos.ReplicationState.newBuilder().setState(state).build().toByteArray();
+ return ProtobufUtil.prependPBMagic(bytes);
+ }
+
public boolean peerExists(String id) throws KeeperException {
- return ZKUtil.checkExists(this.zookeeper,
- ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
+ return ZKUtil.checkExists(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
}
}
\ No newline at end of file
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
index a527e753ff5..cad7a245161 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
@@ -18,7 +18,6 @@
*/
package org.apache.hadoop.hbase.replication;
-import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -26,29 +25,18 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.ConnectionLossException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.KeeperException.SessionExpiredException;
import java.io.Closeable;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
-import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -83,47 +71,18 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
@InterfaceAudience.Private
public class ReplicationZookeeper extends ReplicationStateZKBase implements Closeable {
- private static final Log LOG =
- LogFactory.getLog(ReplicationZookeeper.class);
+ private static final Log LOG = LogFactory.getLog(ReplicationZookeeper.class);
// Our handle on zookeeper
private final ZooKeeperWatcher zookeeper;
- // Map of peer clusters keyed by their id
- private Map peerClusters;
- // Path to the root replication znode
- private String replicationZNode;
- // Path to the peer clusters znode
private String peersZNode;
- // Path to the znode that contains all RS that replicates
- private String rsZNode;
- // Path to this region server's name under rsZNode
- private String rsServerNameZnode;
- // Name node if the replicationState znode
- private String replicationStateNodeName;
- // Name of zk node which stores peer state. The peer-state znode is under a
- // peers' id node; e.g. /hbase/replication/peers/PEER_ID/peer-state
- private String peerStateNodeName;
private final Configuration conf;
- // The key to our own cluster
- private String ourClusterKey;
// Abortable
private Abortable abortable;
private final ReplicationStateInterface replicationState;
+ private final ReplicationPeers replicationPeers;
private final ReplicationQueues replicationQueues;
- /**
- * ZNode content if enabled state.
- */
- // Public so it can be seen by test code.
- public static final byte[] ENABLED_ZNODE_BYTES =
- toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED);
-
- /**
- * ZNode content if disabled state.
- */
- static final byte[] DISABLED_ZNODE_BYTES =
- toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED);
-
/**
* Constructor used by clients of replication (like master and HBase clients)
* @param conf conf to use
@@ -131,15 +90,18 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
* @throws IOException
*/
public ReplicationZookeeper(final Abortable abortable, final Configuration conf,
- final ZooKeeperWatcher zk) throws KeeperException {
+ final ZooKeeperWatcher zk) throws KeeperException, IOException {
super(zk, conf, abortable);
this.conf = conf;
this.zookeeper = zk;
setZNodes(abortable);
this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, abortable);
+ this.replicationState.init();
// TODO This interface is no longer used by anyone using this constructor. When this class goes
// away, we will no longer have this null initialization business
this.replicationQueues = null;
+ this.replicationPeers = new ReplicationPeersZKImpl(this.zookeeper, this.conf, abortable);
+ this.replicationPeers.init();
}
/**
@@ -157,39 +119,19 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
this.zookeeper = server.getZooKeeper();
this.conf = server.getConfiguration();
setZNodes(server);
-
this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, server, replicating);
- this.peerClusters = new HashMap();
- ZKUtil.createWithParents(this.zookeeper,
- ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
- this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName().toString());
- ZKUtil.createWithParents(this.zookeeper, this.rsServerNameZnode);
+ this.replicationState.init();
this.replicationQueues = new ReplicationQueuesZKImpl(this.zookeeper, this.conf, server);
this.replicationQueues.init(server.getServerName().toString());
- connectExistingPeers();
+ this.replicationPeers = new ReplicationPeersZKImpl(this.zookeeper, this.conf, server);
+ this.replicationPeers.init();
}
private void setZNodes(Abortable abortable) throws KeeperException {
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
- this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
- this.replicationStateNodeName = conf.get("zookeeper.znode.replication.state", "state");
- String rsZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
- this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf);
- this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
+ String replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
- ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
- this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName);
- ZKUtil.createWithParents(this.zookeeper, this.rsZNode);
- }
-
- private void connectExistingPeers() throws IOException, KeeperException {
- List znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
- if (znodes != null) {
- for (String z : znodes) {
- connectToPeer(z);
- }
- }
}
/**
@@ -197,39 +139,15 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
* @return list of all peers' identifiers
*/
public List listPeersIdsAndWatch() {
- List ids = null;
- try {
- ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
- } catch (KeeperException e) {
- this.abortable.abort("Cannot get the list of peers ", e);
- }
- return ids;
+ return this.replicationPeers.getAllPeerIds();
}
/**
* Map of this cluster's peers for display.
* @return A map of peer ids to peer cluster keys
*/
- public Map listPeers() {
- Map peers = new TreeMap();
- List ids = null;
- try {
- ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
- for (String id : ids) {
- byte[] bytes = ZKUtil.getData(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
- String clusterKey = null;
- try {
- clusterKey = parsePeerFrom(bytes);
- } catch (DeserializationException de) {
- LOG.warn("Failed parse of clusterid=" + id + " znode content, continuing.");
- continue;
- }
- peers.put(id, clusterKey);
- }
- } catch (KeeperException e) {
- this.abortable.abort("Cannot get the list of peers ", e);
- }
- return peers;
+ public Map listPeers() {
+ return this.replicationPeers.getAllPeerClusterKeys();
}
/**
@@ -239,126 +157,17 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
* @return addresses of all region servers
*/
public List getSlavesAddresses(String peerClusterId) {
- if (this.peerClusters.size() == 0) {
- return Collections.emptyList();
- }
- ReplicationPeer peer = this.peerClusters.get(peerClusterId);
- if (peer == null) {
- return Collections.emptyList();
- }
-
- List addresses;
- try {
- addresses = fetchSlavesAddresses(peer.getZkw());
- } catch (KeeperException ke) {
- reconnectPeer(ke, peer);
- addresses = Collections.emptyList();
- }
- peer.setRegionServers(addresses);
- return peer.getRegionServers();
- }
-
- /**
- * Get the list of all the region servers from the specified peer
- * @param zkw zk connection to use
- * @return list of region server addresses or an empty list if the slave
- * is unavailable
- */
- private List fetchSlavesAddresses(ZooKeeperWatcher zkw)
- throws KeeperException {
- return listChildrenAndGetAsServerNames(zkw, zkw.rsZNode);
- }
-
- /**
- * Lists the children of the specified znode, retrieving the data of each
- * child as a server address.
- *
- * Used to list the currently online regionservers and their addresses.
- *
- * Sets no watches at all, this method is best effort.
- *
- * Returns an empty list if the node has no children. Returns null if the
- * parent node itself does not exist.
- *
- * @param zkw zookeeper reference
- * @param znode node to get children of as addresses
- * @return list of data of children of specified znode, empty if no children,
- * null if parent does not exist
- * @throws KeeperException if unexpected zookeeper exception
- */
- public static List listChildrenAndGetAsServerNames(
- ZooKeeperWatcher zkw, String znode)
- throws KeeperException {
- List children = ZKUtil.listChildrenNoWatch(zkw, znode);
- if(children == null) {
- return Collections.emptyList();
- }
- List addresses = new ArrayList(children.size());
- for (String child : children) {
- addresses.add(ServerName.parseServerName(child));
- }
- return addresses;
+ return this.replicationPeers.getRegionServersOfConnectedPeer(peerClusterId);
}
/**
* This method connects this cluster to another one and registers it
* in this region server's replication znode
* @param peerId id of the peer cluster
- * @throws KeeperException
- */
- public boolean connectToPeer(String peerId)
- throws IOException, KeeperException {
- if (peerClusters == null) {
- return false;
- }
- if (this.peerClusters.containsKey(peerId)) {
- return false;
- }
- ReplicationPeer peer = getPeer(peerId);
- if (peer == null) {
- return false;
- }
- this.peerClusters.put(peerId, peer);
- ZKUtil.createWithParents(this.zookeeper, ZKUtil.joinZNode(
- this.rsServerNameZnode, peerId));
- LOG.info("Added new peer cluster " + peer.getClusterKey());
- return true;
- }
-
- /**
- * Helper method to connect to a peer
- * @param peerId peer's identifier
- * @return object representing the peer
- * @throws IOException
* @throws KeeperException
*/
- public ReplicationPeer getPeer(String peerId) throws IOException, KeeperException{
- String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
- byte [] data = ZKUtil.getData(this.zookeeper, znode);
- String otherClusterKey = "";
- try {
- otherClusterKey = parsePeerFrom(data);
- } catch (DeserializationException e) {
- LOG.warn("Failed parse of cluster key from peerId=" + peerId
- + ", specifically the content from the following znode: " + znode);
- }
- if (this.ourClusterKey.equals(otherClusterKey)) {
- LOG.debug("Not connecting to " + peerId + " because it's us");
- return null;
- }
- // Construct the connection to the new peer
- Configuration otherConf = new Configuration(this.conf);
- try {
- ZKUtil.applyClusterKeyToConf(otherConf, otherClusterKey);
- } catch (IOException e) {
- LOG.error("Can't get peer because:", e);
- return null;
- }
-
- ReplicationPeer peer = new ReplicationPeer(otherConf, peerId,
- otherClusterKey);
- peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
- return peer;
+ public boolean connectToPeer(String peerId) throws IOException, KeeperException {
+ return this.replicationPeers.connectToPeer(peerId);
}
/**
@@ -368,15 +177,7 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
* @throws IllegalArgumentException Thrown when the peer doesn't exist
*/
public void removePeer(String id) throws IOException {
- try {
- if (!peerExists(id)) {
- throw new IllegalArgumentException("Cannot remove inexisting peer");
- }
- ZKUtil.deleteNodeRecursively(this.zookeeper,
- ZKUtil.joinZNode(this.peersZNode, id));
- } catch (KeeperException e) {
- throw new IOException("Unable to remove a peer", e);
- }
+ this.replicationPeers.removePeer(id);
}
/**
@@ -388,154 +189,7 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
* multi-slave isn't supported yet.
*/
public void addPeer(String id, String clusterKey) throws IOException {
- try {
- if (peerExists(id)) {
- throw new IllegalArgumentException("Cannot add existing peer");
- }
- ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
- ZKUtil.createAndWatch(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id),
- toByteArray(clusterKey));
- // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
- // peer-state znode. This happens while adding a peer.
- // The peer state data is set as "ENABLED" by default.
- ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, getPeerStateNode(id),
- ENABLED_ZNODE_BYTES);
- // A peer is enabled by default
- } catch (KeeperException e) {
- throw new IOException("Unable to add peer", e);
- }
- }
-
- /**
- * @param clusterKey
- * @return Serialized protobuf of clusterKey
with pb magic prefix
- * prepended suitable for use as content of a this.peersZNode; i.e.
- * the content of PEER_ID znode under /hbase/replication/peers/PEER_ID
- */
- static byte[] toByteArray(final String clusterKey) {
- byte[] bytes = ZooKeeperProtos.ReplicationPeer.newBuilder().setClusterkey(clusterKey).build()
- .toByteArray();
- return ProtobufUtil.prependPBMagic(bytes);
- }
-
- /**
- * @param state
- * @return Serialized protobuf of state
with pb magic prefix
- * prepended suitable for use as content of either the cluster state
- * znode -- whether or not we should be replicating kept in
- * /hbase/replication/state -- or as content of a peer-state znode
- * under a peer cluster id as in
- * /hbase/replication/peers/PEER_ID/peer-state.
- */
- static byte[] toByteArray(final ZooKeeperProtos.ReplicationState.State state) {
- byte[] bytes = ZooKeeperProtos.ReplicationState.newBuilder().setState(state).build()
- .toByteArray();
- return ProtobufUtil.prependPBMagic(bytes);
- }
-
- /**
- * @param position
- * @return Serialized protobuf of position
with pb magic prefix
- * prepended suitable for use as content of an hlog position in a
- * replication queue.
- */
- public static byte[] positionToByteArray(
- final long position) {
- return ZKUtil.positionToByteArray(position);
- }
-
- /**
- * @param lockOwner
- * @return Serialized protobuf of lockOwner
with pb magic prefix
- * prepended suitable for use as content of an replication lock during
- * region server fail over.
- */
- static byte[] lockToByteArray(
- final String lockOwner) {
- byte[] bytes = ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build()
- .toByteArray();
- return ProtobufUtil.prependPBMagic(bytes);
- }
-
- /**
- * @param bytes Content of a peer znode.
- * @return ClusterKey parsed from the passed bytes.
- * @throws DeserializationException
- */
- static String parsePeerFrom(final byte[] bytes) throws DeserializationException {
- if (ProtobufUtil.isPBMagicPrefix(bytes)) {
- int pblen = ProtobufUtil.lengthOfPBMagic();
- ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer
- .newBuilder();
- ZooKeeperProtos.ReplicationPeer peer;
- try {
- peer = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
- } catch (InvalidProtocolBufferException e) {
- throw new DeserializationException(e);
- }
- return peer.getClusterkey();
- } else {
- if (bytes.length > 0) {
- return Bytes.toString(bytes);
- }
- return "";
- }
- }
-
- /**
- * @param bytes Content of a state znode.
- * @return State parsed from the passed bytes.
- * @throws DeserializationException
- */
- static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
- throws DeserializationException {
- ProtobufUtil.expectPBMagicPrefix(bytes);
- int pblen = ProtobufUtil.lengthOfPBMagic();
- ZooKeeperProtos.ReplicationState.Builder builder = ZooKeeperProtos.ReplicationState
- .newBuilder();
- ZooKeeperProtos.ReplicationState state;
- try {
- state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
- return state.getState();
- } catch (InvalidProtocolBufferException e) {
- throw new DeserializationException(e);
- }
- }
-
- /**
- * @param bytes - Content of a HLog position znode.
- * @return long - The current HLog position.
- * @throws DeserializationException
- */
- public static long parseHLogPositionFrom(
- final byte[] bytes) throws DeserializationException {
- return ZKUtil.parseHLogPositionFrom(bytes);
- }
-
- /**
- * @param bytes - Content of a lock znode.
- * @return String - The owner of the lock.
- * @throws DeserializationException
- */
- static String parseLockOwnerFrom(
- final byte[] bytes) throws DeserializationException {
- if (ProtobufUtil.isPBMagicPrefix(bytes)) {
- int pblen = ProtobufUtil.lengthOfPBMagic();
- ZooKeeperProtos.ReplicationLock.Builder builder = ZooKeeperProtos.ReplicationLock
- .newBuilder();
- ZooKeeperProtos.ReplicationLock lock;
- try {
- lock = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
- } catch (InvalidProtocolBufferException e) {
- throw new DeserializationException(e);
- }
- return lock.getLockOwner();
- } else {
- if (bytes.length > 0) {
- return Bytes.toString(bytes);
- }
- return "";
- }
+ this.replicationPeers.addPeer(id, clusterKey);
}
/**
@@ -546,8 +200,7 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
* Thrown when the peer doesn't exist
*/
public void enablePeer(String id) throws IOException {
- changePeerState(id, ZooKeeperProtos.ReplicationState.State.ENABLED);
- LOG.info("peer " + id + " is enabled");
+ this.replicationPeers.enablePeer(id);
}
/**
@@ -558,28 +211,7 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
* Thrown when the peer doesn't exist
*/
public void disablePeer(String id) throws IOException {
- changePeerState(id, ZooKeeperProtos.ReplicationState.State.DISABLED);
- LOG.info("peer " + id + " is disabled");
- }
-
- private void changePeerState(String id, ZooKeeperProtos.ReplicationState.State state)
- throws IOException {
- try {
- if (!peerExists(id)) {
- throw new IllegalArgumentException("peer " + id + " is not registered");
- }
- String peerStateZNode = getPeerStateNode(id);
- byte[] stateBytes = (state == ZooKeeperProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
- : DISABLED_ZNODE_BYTES;
- if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
- ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
- } else {
- ZKUtil.createAndWatch(zookeeper, peerStateZNode, stateBytes);
- }
- LOG.info("state of the peer " + id + " changed to " + state.name());
- } catch (KeeperException e) {
- throw new IOException("Unable to change state of the peer " + id, e);
- }
+ this.replicationPeers.disablePeer(id);
}
/**
@@ -592,14 +224,7 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
* Thrown when the peer doesn't exist
*/
public boolean getPeerEnabled(String id) {
- if (!this.peerClusters.containsKey(id)) {
- throw new IllegalArgumentException("peer " + id + " is not registered");
- }
- return this.peerClusters.get(id).getPeerEnabled().get();
- }
-
- private String getPeerStateNode(String id) {
- return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
+ return this.replicationPeers.getStatusOfConnectedPeer(id);
}
/**
@@ -683,8 +308,7 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
public void deleteSource(String peerZnode, boolean closeConnection) {
this.replicationQueues.removeQueue(peerZnode);
if (closeConnection) {
- this.peerClusters.get(peerZnode).getZkw().close();
- this.peerClusters.remove(peerZnode);
+ this.replicationPeers.disconnectFromPeer(peerZnode);
}
}
@@ -714,40 +338,7 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
* @return a UUID or null if there's a ZK connection issue
*/
public UUID getPeerUUID(String peerId) {
- ReplicationPeer peer = getPeerClusters().get(peerId);
- UUID peerUUID = null;
- try {
- peerUUID = getUUIDForCluster(peer.getZkw());
- } catch (KeeperException ke) {
- reconnectPeer(ke, peer);
- }
- return peerUUID;
- }
-
- /**
- * Get the UUID for the provided ZK watcher. Doesn't handle any ZK exceptions
- * @param zkw watcher connected to an ensemble
- * @return the UUID read from zookeeper
- * @throws KeeperException
- */
- public UUID getUUIDForCluster(ZooKeeperWatcher zkw) throws KeeperException {
- return UUID.fromString(ZKClusterId.readClusterIdZNode(zkw));
- }
-
- private void reconnectPeer(KeeperException ke, ReplicationPeer peer) {
- if (ke instanceof ConnectionLossException
- || ke instanceof SessionExpiredException) {
- LOG.warn(
- "Lost the ZooKeeper connection for peer " + peer.getClusterKey(),
- ke);
- try {
- peer.reloadZkWatcher();
- } catch(IOException io) {
- LOG.warn(
- "Creation of ZookeeperWatcher failed for peer "
- + peer.getClusterKey(), io);
- }
- }
+ return this.replicationPeers.getPeerUUID(peerId);
}
public void registerRegionServerListener(ZooKeeperListener listener) {
@@ -758,8 +349,8 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
* Get a map of all peer clusters
* @return map of peer cluster keyed by id
*/
- public Map getPeerClusters() {
- return this.peerClusters;
+ public Set getPeerClusters() {
+ return this.replicationPeers.getConnectedPeers();
}
/**
@@ -802,36 +393,4 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
public void close() throws IOException {
if (replicationState != null) replicationState.close();
}
-
- /**
- * Utility method to ensure an ENABLED znode is in place; if not present, we
- * create it.
- * @param zookeeper
- * @param path Path to znode to check
- * @return True if we created the znode.
- * @throws NodeExistsException
- * @throws KeeperException
- */
- static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
- throws NodeExistsException, KeeperException {
- if (ZKUtil.checkExists(zookeeper, path) == -1) {
- // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
- // peer-state znode. This happens while adding a peer.
- // The peer state data is set as "ENABLED" by default.
- ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path, ENABLED_ZNODE_BYTES);
- return true;
- }
- return false;
- }
-
- /**
- * @param bytes
- * @return True if the passed in bytes
are those of a pb
- * serialized ENABLED state.
- * @throws DeserializationException
- */
- static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
- ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
- return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
- }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
index b766fbd117b..96dc20a4c1f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.zookeeper;
+import java.util.UUID;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ClusterId;
@@ -77,4 +79,14 @@ public class ZKClusterId {
throws KeeperException {
ZKUtil.createSetData(watcher, watcher.clusterIdZNode, id.toByteArray());
}
+
+ /**
+ * Get the UUID for the provided ZK watcher. Doesn't handle any ZK exceptions
+ * @param zkw watcher connected to an ensemble
+ * @return the UUID read from zookeeper
+ * @throws KeeperException
+ */
+ public static UUID getUUIDForCluster(ZooKeeperWatcher zkw) throws KeeperException {
+ return UUID.fromString(readClusterIdZNode(zkw));
+ }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
index 717ee4156ef..eba412d0706 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
@@ -1836,9 +1836,12 @@ public class ZKUtil {
* @throws DeserializationException
*/
public static long parseHLogPositionFrom(final byte[] bytes) throws DeserializationException {
+ if (bytes == null) {
+ throw new DeserializationException("Unable to parse null HLog position.");
+ }
if (ProtobufUtil.isPBMagicPrefix(bytes)) {
int pblen = ProtobufUtil.lengthOfPBMagic();
- ZooKeeperProtos.ReplicationHLogPosition.Builder builder =
+ ZooKeeperProtos.ReplicationHLogPosition.Builder builder =
ZooKeeperProtos.ReplicationHLogPosition.newBuilder();
ZooKeeperProtos.ReplicationHLogPosition position;
try {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
index ef8ef48a2ad..20dc80f58a0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
@@ -45,8 +45,10 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.replication.ReplicationPeersZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -176,17 +178,10 @@ public class Import {
cfRenameMap = createCfRenameMap(conf);
filter = instantiateFilter(conf);
// TODO: This is kind of ugly doing setup of ZKW just to read the clusterid.
- ReplicationZookeeper zkHelper = null;
ZooKeeperWatcher zkw = null;
try {
- HConnection connection = HConnectionManager.getConnection(conf);
zkw = new ZooKeeperWatcher(conf, context.getTaskAttemptID().toString(), null);
- zkHelper = new ReplicationZookeeper(connection, conf, zkw);
- try {
- this.clusterId = zkHelper.getUUIDForCluster(zkw);
- } finally {
- if (zkHelper != null) zkHelper.close();
- }
+ clusterId = ZKClusterId.getUUIDForCluster(zkw);
} catch (ZooKeeperConnectionException e) {
LOG.error("Problem connecting to ZooKeper during task setup", e);
} catch (KeeperException e) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index aaf5ec19574..0475be551d9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -37,6 +37,8 @@ import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationPeersZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -117,11 +119,13 @@ public class VerifyReplication {
@Override public void abort(String why, Throwable e) {}
@Override public boolean isAborted() {return false;}
});
- zk = new ReplicationZookeeper(conn, conf, localZKW);
- // Just verifying it we can connect
- peer = zk.getPeer(peerId);
- HTable replicatedTable = new HTable(peer.getConfiguration(),
- conf.get(NAME+".tableName"));
+ ReplicationPeers rp = new ReplicationPeersZKImpl(localZKW, conf, localZKW);
+ rp.init();
+ Configuration peerConf = rp.getPeerConf(peerId);
+ if (peerConf == null) {
+ throw new IOException("Couldn't get peer conf!");
+ }
+ HTable replicatedTable = new HTable(peerConf, conf.get(NAME + ".tableName"));
scan.setStartRow(value.getRow());
replicatedScanner = replicatedTable.getScanner(scan);
} catch (KeeperException e) {
@@ -175,42 +179,6 @@ public class VerifyReplication {
if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
throw new IOException("Replication needs to be enabled to verify it.");
}
- HConnectionManager.execute(new HConnectable(conf) {
- @Override
- public Void connect(HConnection conn) throws IOException {
- ZooKeeperWatcher localZKW = null;
- ReplicationZookeeper zk = null;
- ReplicationPeer peer = null;
- try {
- localZKW = new ZooKeeperWatcher(
- conf, "VerifyReplication", new Abortable() {
- @Override public void abort(String why, Throwable e) {}
- @Override public boolean isAborted() {return false;}
- });
- zk = new ReplicationZookeeper(conn, conf, localZKW);
- // Just verifying it we can connect
- peer = zk.getPeer(peerId);
- if (peer == null) {
- throw new IOException("Couldn't get access to the slave cluster," +
- "please see the log");
- }
- } catch (KeeperException ex) {
- throw new IOException("Couldn't get access to the slave cluster" +
- " because: ", ex);
- } finally {
- if (peer != null){
- peer.close();
- }
- if (zk != null){
- zk.close();
- }
- if (localZKW != null){
- localZKW.close();
- }
- }
- return null;
- }
- });
conf.set(NAME+".peerId", peerId);
conf.set(NAME+".tableName", tableName);
conf.setLong(NAME+".startTime", startTime);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
index 0a43a500ed1..01284b10d04 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
@@ -136,6 +136,7 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
this.zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null);
this.replicationQueues = new ReplicationQueuesClientZKImpl(zkw, conf, this);
this.replicationState = new ReplicationStateImpl(zkw, conf, this);
+ this.replicationState.init();
} catch (KeeperException e) {
LOG.error("Error while configuring " + this.getClass().getName(), e);
} catch (IOException e) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index a347da12c46..27fb9d62fdd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.KeeperException;
@@ -189,7 +190,7 @@ public class ReplicationSource extends Thread
this.metrics = new MetricsSource(peerClusterZnode);
this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);
try {
- this.clusterId = zkHelper.getUUIDForCluster(zkHelper.getZookeeperWatcher());
+ this.clusterId = ZKClusterId.getUUIDForCluster(zkHelper.getZookeeperWatcher());
} catch (KeeperException ke) {
throw new IOException("Could not read cluster id", ke);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 0f141b76908..8f9ad6583fd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -177,7 +177,7 @@ public class ReplicationSourceManager {
* old region server hlog queues
*/
public void init() throws IOException {
- for (String id : this.zkHelper.getPeerClusters().keySet()) {
+ for (String id : this.zkHelper.getPeerClusters()) {
addSource(id);
}
List currentReplicators = this.replicationQueues.getListOfReplicators();
@@ -601,7 +601,7 @@ public class ReplicationSourceManager {
try {
ReplicationSourceInterface src = getReplicationSource(conf,
fs, ReplicationSourceManager.this, stopper, replicating, peerId);
- if (!zkHelper.getPeerClusters().containsKey(src.getPeerClusterId())) {
+ if (!zkHelper.getPeerClusters().contains((src.getPeerClusterId()))) {
src.terminate("Recovered queue doesn't belong to any current peer");
break;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index 4f9c30014d5..97bcde41643 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -20,12 +20,17 @@ package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.*;
+import java.io.IOException;
import java.util.List;
import java.util.SortedMap;
import java.util.SortedSet;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.zookeeper.KeeperException;
+import org.junit.Before;
import org.junit.Test;
/**
@@ -41,6 +46,26 @@ public abstract class TestReplicationStateBasic {
protected String server1 = new ServerName("hostname1.example.org", 1234, -1L).toString();
protected String server2 = new ServerName("hostname2.example.org", 1234, -1L).toString();
protected String server3 = new ServerName("hostname3.example.org", 1234, -1L).toString();
+ protected ReplicationPeers rp;
+ protected static final String ID_ONE = "1";
+ protected static final String ID_TWO = "2";
+ protected static String KEY_ONE;
+ protected static String KEY_TWO;
+
+ // For testing when we try to replicate to ourself
+ protected String OUR_ID = "3";
+ protected String OUR_KEY;
+
+ protected static int zkTimeoutCount;
+ protected static final int ZK_MAX_COUNT = 300;
+ protected static final int ZK_SLEEP_INTERVAL = 100; // millis
+
+ private static final Log LOG = LogFactory.getLog(TestReplicationStateBasic.class);
+
+ @Before
+ public void setUp() {
+ zkTimeoutCount = 0;
+ }
@Test
public void testReplicationQueuesClient() throws KeeperException {
@@ -83,13 +108,15 @@ public abstract class TestReplicationStateBasic {
}
@Test
- public void testReplicationQueues() throws KeeperException {
+ public void testReplicationQueues() throws KeeperException, IOException {
rq1.init(server1);
rq2.init(server2);
rq3.init(server3);
+ //Initialize ReplicationPeer so we can add peers (we don't transfer lone queues)
+ rp.init();
- // Zero queues or replicators exist
- assertEquals(0, rq1.getListOfReplicators().size());
+ // 3 replicators should exist
+ assertEquals(3, rq1.getListOfReplicators().size());
rq1.removeQueue("bogus");
rq1.removeLog("bogus", "bogus");
rq1.removeAllQueues();
@@ -132,11 +159,102 @@ public abstract class TestReplicationStateBasic {
assertEquals(0, rq2.getListOfReplicators().size());
}
+ @Test
+ public void testReplicationPeers() throws Exception {
+ rp.init();
+
+ // Test methods with non-existent peer ids
+ try {
+ rp.removePeer("bogus");
+ fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
+ } catch (IllegalArgumentException e) {
+ }
+ try {
+ rp.enablePeer("bogus");
+ fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
+ } catch (IllegalArgumentException e) {
+ }
+ try {
+ rp.disablePeer("bogus");
+ fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
+ } catch (IllegalArgumentException e) {
+ }
+ try {
+ rp.getStatusOfConnectedPeer("bogus");
+ fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
+ } catch (IllegalArgumentException e) {
+ }
+ assertFalse(rp.connectToPeer("bogus"));
+ rp.disconnectFromPeer("bogus");
+ assertEquals(0, rp.getRegionServersOfConnectedPeer("bogus").size());
+ assertNull(rp.getPeerUUID("bogus"));
+ assertNull(rp.getPeerConf("bogus"));
+ assertNumberOfPeers(0, 0);
+
+ // Add some peers
+ rp.addPeer(ID_ONE, KEY_ONE);
+ assertNumberOfPeers(0, 1);
+ rp.addPeer(ID_TWO, KEY_TWO);
+ assertNumberOfPeers(0, 2);
+
+ // Test methods with a peer that is added but not connected
+ try {
+ rp.getStatusOfConnectedPeer(ID_ONE);
+ fail("There are no connected peers, should have thrown an IllegalArgumentException");
+ } catch (IllegalArgumentException e) {
+ }
+ assertNull(rp.getPeerUUID(ID_ONE));
+ assertEquals(KEY_ONE, ZKUtil.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE)));
+ rp.disconnectFromPeer(ID_ONE);
+ assertEquals(0, rp.getRegionServersOfConnectedPeer(ID_ONE).size());
+
+ // Connect to one peer
+ rp.connectToPeer(ID_ONE);
+ assertNumberOfPeers(1, 2);
+ assertTrue(rp.getStatusOfConnectedPeer(ID_ONE));
+ rp.disablePeer(ID_ONE);
+ assertConnectedPeerStatus(false, ID_ONE);
+ rp.enablePeer(ID_ONE);
+ assertConnectedPeerStatus(true, ID_ONE);
+ assertEquals(1, rp.getRegionServersOfConnectedPeer(ID_ONE).size());
+ assertNotNull(rp.getPeerUUID(ID_ONE).toString());
+
+ // Disconnect peer
+ rp.disconnectFromPeer(ID_ONE);
+ assertNumberOfPeers(0, 2);
+ try {
+ rp.getStatusOfConnectedPeer(ID_ONE);
+ fail("There are no connected peers, should have thrown an IllegalArgumentException");
+ } catch (IllegalArgumentException e) {
+ }
+ }
+
+ protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception {
+ while (true) {
+ if (status == rp.getStatusOfConnectedPeer(peerId)) {
+ return;
+ }
+ if (zkTimeoutCount < ZK_MAX_COUNT) {
+ LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status
+ + ", sleeping and trying again.");
+ Thread.sleep(ZK_SLEEP_INTERVAL);
+ } else {
+ fail("Timed out waiting for ConnectedPeerStatus to be " + status);
+ }
+ }
+ }
+
+ protected void assertNumberOfPeers(int connected, int total) {
+ assertEquals(total, rp.getAllPeerClusterKeys().size());
+ assertEquals(connected, rp.getConnectedPeers().size());
+ assertEquals(total, rp.getAllPeerIds().size());
+ }
+
/*
* three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 has 5 queues with 1, 2,
* 3, 4, 5 log files respectively
*/
- protected void populateQueues() throws KeeperException {
+ protected void populateQueues() throws KeeperException, IOException {
rq1.addLog("trash", "trash");
rq1.removeQueue("trash");
@@ -147,6 +265,8 @@ public abstract class TestReplicationStateBasic {
for (int j = 0; j < i; j++) {
rq3.addLog("qId" + i, "filename" + j);
}
+ //Add peers for the corresponding queues so they are not orphans
+ rp.addPeer("qId" + i, "bogus" + i);
}
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
index 8f30a6bd66b..137cf40e0cd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
@@ -23,16 +23,20 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.AfterClass;
+import static org.junit.Assert.*;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
@@ -55,20 +59,42 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName);
+ KEY_ONE = initPeerClusterState("/hbase1");
+ KEY_TWO = initPeerClusterState("/hbase2");
+ }
+
+ private static String initPeerClusterState(String baseZKNode)
+ throws IOException, KeeperException {
+ // Set up state nodes of peer clusters
+ Configuration testConf = new Configuration(conf);
+ testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode);
+ ZooKeeperWatcher zkw1 = new ZooKeeperWatcher(testConf, "test1", null);
+ ReplicationStateInterface rsi = new ReplicationStateImpl(zkw1, testConf, zkw1);
+ rsi.init();
+ rsi.setState(true);
+ rsi.close();
+ String fakeRs = ZKUtil.joinZNode(zkw1.rsZNode, "hostname1.example.org:1234");
+ ZKUtil.createWithParents(zkw1, fakeRs);
+ ZKClusterId.setClusterId(zkw1, new ClusterId());
+ return ZKUtil.getZooKeeperClusterKey(testConf);
}
@Before
- public void setUp() throws KeeperException {
+ @Override
+ public void setUp() {
+ super.setUp();
DummyServer ds1 = new DummyServer(server1);
DummyServer ds2 = new DummyServer(server2);
DummyServer ds3 = new DummyServer(server3);
- rq1 = new ReplicationQueuesZKImpl(zkw, conf, ds1);
- rq2 = new ReplicationQueuesZKImpl(zkw, conf, ds2);
- rq3 = new ReplicationQueuesZKImpl(zkw, conf, ds3);
- rqc = new ReplicationQueuesClientZKImpl(zkw, conf, ds1);
- String peersZnode = ZKUtil.joinZNode(replicationZNode, "peers");
- for (int i = 1; i < 6; i++) {
- ZKUtil.createWithParents(zkw, ZKUtil.joinZNode(peersZnode, "qId"+i));
+ try {
+ rq1 = new ReplicationQueuesZKImpl(zkw, conf, ds1);
+ rq2 = new ReplicationQueuesZKImpl(zkw, conf, ds2);
+ rq3 = new ReplicationQueuesZKImpl(zkw, conf, ds3);
+ rqc = new ReplicationQueuesClientZKImpl(zkw, conf, ds1);
+ rp = new ReplicationPeersZKImpl(zkw, conf, zkw);
+ OUR_KEY = ZKUtil.getZooKeeperClusterKey(conf);
+ } catch (KeeperException e) {
+ fail("Exception thrown: " + e);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationZookeeper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationZookeeper.java
index 3820e490486..c61cf570cba 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationZookeeper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationZookeeper.java
@@ -53,6 +53,8 @@ public class TestReplicationZookeeper {
private static String slaveClusterKey;
+ private static String peersZNode;
+
@BeforeClass
public static void setUpBeforeClass() throws Exception {
utility = new HBaseTestingUtility();
@@ -63,6 +65,10 @@ public class TestReplicationZookeeper {
repZk = new ReplicationZookeeper(server, new AtomicBoolean());
slaveClusterKey = conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" +
conf.get("hbase.zookeeper.property.clientPort") + ":/1";
+ String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
+ String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
+ String replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName);
+ peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
}
@AfterClass
@@ -80,19 +86,19 @@ public class TestReplicationZookeeper {
@Test
public void testIsPeerPath_PathToParentOfPeerNode() {
- String peerParentNode = repZk.getPeersZNode();
+ String peerParentNode = peersZNode;
assertFalse(repZk.isPeerPath(peerParentNode));
}
@Test
public void testIsPeerPath_PathToChildOfPeerNode() {
- String peerChild = ZKUtil.joinZNode(ZKUtil.joinZNode(repZk.getPeersZNode(), "1"), "child");
+ String peerChild = ZKUtil.joinZNode(ZKUtil.joinZNode(peersZNode, "1"), "child");
assertFalse(repZk.isPeerPath(peerChild));
}
@Test
public void testIsPeerPath_ActualPeerPath() {
- String peerPath = ZKUtil.joinZNode(repZk.getPeersZNode(), "1");
+ String peerPath = ZKUtil.joinZNode(peersZNode, "1");
assertTrue(repZk.isPeerPath(peerPath));
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index bb822d45e4f..2d334e69620 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -124,9 +125,9 @@ public class TestReplicationSourceManager {
+ conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
- ReplicationZookeeper.ENABLED_ZNODE_BYTES);
+ ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
ZKUtil.createWithParents(zkw, "/hbase/replication/state");
- ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationZookeeper.ENABLED_ZNODE_BYTES);
+ ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
manager = replication.getReplicationManager();
@@ -277,6 +278,7 @@ public class TestReplicationSourceManager {
for (String file : files) {
rz.addLogToList(file, "1");
}
+
// create 3 DummyServers
Server s1 = new DummyServer("ip-10-8-101-114.ec2.internal");
Server s2 = new DummyServer("ec2-107-20-52-47.compute-1.amazonaws.com");