HBASE-7567 [replication] Create an interface for replication peers (Chris Trezzo via JD)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1484031 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2013-05-18 00:00:20 +00:00
parent d5f6d90329
commit aec1857b37
20 changed files with 906 additions and 599 deletions

View File

@ -25,10 +25,15 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
@ -79,7 +84,7 @@ public class ReplicationPeer implements Abortable, Closeable {
*/ */
public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode) public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode)
throws KeeperException { throws KeeperException {
ReplicationZookeeper.ensurePeerEnabled(zookeeper, peerStateNode); ensurePeerEnabled(zookeeper, peerStateNode);
this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this); this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
this.peerStateTracker.start(); this.peerStateTracker.start();
try { try {
@ -90,7 +95,7 @@ public class ReplicationPeer implements Abortable, Closeable {
} }
private void readPeerStateZnode() throws DeserializationException { private void readPeerStateZnode() throws DeserializationException {
this.peerEnabled.set(ReplicationZookeeper.isStateEnabled(this.peerStateTracker.getData(false))); this.peerEnabled.set(isStateEnabled(this.peerStateTracker.getData(false)));
} }
/** /**
@ -181,6 +186,57 @@ public class ReplicationPeer implements Abortable, Closeable {
} }
} }
/**
* @param bytes
* @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state.
* @throws DeserializationException
*/
private static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
}
/**
* @param bytes Content of a state znode.
* @return State parsed from the passed bytes.
* @throws DeserializationException
*/
private static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
throws DeserializationException {
ProtobufUtil.expectPBMagicPrefix(bytes);
int pblen = ProtobufUtil.lengthOfPBMagic();
ZooKeeperProtos.ReplicationState.Builder builder =
ZooKeeperProtos.ReplicationState.newBuilder();
ZooKeeperProtos.ReplicationState state;
try {
state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
return state.getState();
} catch (InvalidProtocolBufferException e) {
throw new DeserializationException(e);
}
}
/**
* Utility method to ensure an ENABLED znode is in place; if not present, we create it.
* @param zookeeper
* @param path Path to znode to check
* @return True if we created the znode.
* @throws NodeExistsException
* @throws KeeperException
*/
private static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
throws NodeExistsException, KeeperException {
if (ZKUtil.checkExists(zookeeper, path) == -1) {
// There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
// peer-state znode. This happens while adding a peer.
// The peer state data is set as "ENABLED" by default.
ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path,
ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
return true;
}
return false;
}
/** /**
* Tracker for state of this peer * Tracker for state of this peer
*/ */

View File

@ -0,0 +1,138 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.zookeeper.KeeperException;
/**
* This provides an interface for maintaining a set of peer clusters. These peers are remote slave
* clusters that data is replicated to. A peer cluster can be in three different states:
*
* 1. Not-Registered - There is no notion of the peer cluster.
* 2. Registered - The peer has an id and is being tracked but there is no connection.
* 3. Connected - There is an active connection to the remote peer.
*
* In the registered or connected state, a peer cluster can either be enabled or disabled.
*/
@InterfaceAudience.Private
public interface ReplicationPeers {
/**
* Initialize the ReplicationPeers interface.
* @throws KeeperException
*/
public void init() throws IOException, KeeperException;
/**
* Add a new remote slave cluster for replication.
* @param peerId a short that identifies the cluster
* @param clusterKey the concatenation of the slave cluster's:
* hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
*/
public void addPeer(String peerId, String clusterKey) throws IOException;
/**
* Removes a remote slave cluster and stops the replication to it.
* @param peerId a short that identifies the cluster
*/
public void removePeer(String peerId) throws IOException;
/**
* Restart the replication to the specified remote slave cluster.
* @param peerId a short that identifies the cluster
*/
public void enablePeer(String peerId) throws IOException;
/**
* Stop the replication to the specified remote slave cluster.
* @param peerId a short that identifies the cluster
*/
public void disablePeer(String peerId) throws IOException;
/**
* Get the replication status for the specified connected remote slave cluster.
* @param peerId a short that identifies the cluster
* @return true if replication is enabled, false otherwise.
*/
public boolean getStatusOfConnectedPeer(String peerId);
/**
* Get a set of all connected remote slave clusters.
* @return set of peer ids
*/
public Set<String> getConnectedPeers();
/**
* List the cluster keys of all remote slave clusters (whether they are enabled/disabled or
* connected/disconnected).
* @return A map of peer ids to peer cluster keys
*/
public Map<String, String> getAllPeerClusterKeys();
/**
* List the peer ids of all remote slave clusters (whether they are enabled/disabled or
* connected/disconnected).
* @return A list of peer ids
*/
public List<String> getAllPeerIds();
/**
* Attempt to connect to a new remote slave cluster.
* @param peerId a short that identifies the cluster
* @return true if a new connection was made, false if no new connection was made.
*/
public boolean connectToPeer(String peerId) throws IOException, KeeperException;
/**
* Disconnect from a remote slave cluster.
* @param peerId a short that identifies the cluster
*/
public void disconnectFromPeer(String peerId);
/**
* Returns all region servers from given connected remote slave cluster.
* @param peerId a short that identifies the cluster
* @return addresses of all region servers in the peer cluster. Returns an empty list if the peer
* cluster is unavailable or there are no region servers in the cluster.
*/
public List<ServerName> getRegionServersOfConnectedPeer(String peerId);
/**
* Returns the UUID of the provided peer id.
* @param peerId the peer's ID that will be converted into a UUID
* @return a UUID or null if the peer cluster does not exist or is not connected.
*/
public UUID getPeerUUID(String peerId);
/**
* Returns the configuration needed to talk to the remote slave cluster.
* @param peerId a short that identifies the cluster
* @return the configuration for the peer cluster, null if it was unable to get the configuration
*/
public Configuration getPeerConf(String peerId) throws KeeperException;
}

View File

@ -0,0 +1,409 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
import com.google.protobuf.InvalidProtocolBufferException;
/**
* This class provides an implementation of the ReplicationPeers interface using Zookeeper. The
* peers znode contains a list of all peer replication clusters and the current replication state of
* those clusters. It has one child peer znode for each peer cluster. The peer znode is named with
* the cluster id provided by the user in the HBase shell. The value of the peer znode contains the
* peers cluster key provided by the user in the HBase Shell. The cluster key contains a list of
* zookeeper quorum peers, the client port for the zookeeper quorum, and the base znode for HBase.
* For example:
*
* /hbase/replication/peers/1 [Value: zk1.host.com,zk2.host.com,zk3.host.com:2181:/hbase]
* /hbase/replication/peers/2 [Value: zk5.host.com,zk6.host.com,zk7.host.com:2181:/hbase]
*
* Each of these peer znodes has a child znode that indicates whether or not replication is enabled
* on that peer cluster. These peer-state znodes do not have child znodes and simply contain a
* boolean value (i.e. ENABLED or DISABLED). This value is read/maintained by the
* ReplicationPeer.PeerStateTracker class. For example:
*
* /hbase/replication/peers/1/peer-state [Value: ENABLED]
*/
public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers {
// Map of peer clusters keyed by their id
private Map<String, ReplicationPeer> peerClusters;
private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class);
public ReplicationPeersZKImpl(final ZooKeeperWatcher zk, final Configuration conf,
Abortable abortable) {
super(zk, conf, abortable);
this.peerClusters = new HashMap<String, ReplicationPeer>();
}
@Override
public void init() throws IOException, KeeperException {
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
connectExistingPeers();
}
@Override
public void addPeer(String id, String clusterKey) throws IOException {
try {
if (peerExists(id)) {
throw new IllegalArgumentException("Cannot add existing peer");
}
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
ZKUtil.createAndWatch(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id),
toByteArray(clusterKey));
// There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
// peer-state znode. This happens while adding a peer.
// The peer state data is set as "ENABLED" by default.
ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, getPeerStateNode(id),
ENABLED_ZNODE_BYTES);
// A peer is enabled by default
} catch (KeeperException e) {
throw new IOException("Unable to add peer", e);
}
}
@Override
public void removePeer(String id) throws IOException {
try {
if (!peerExists(id)) {
throw new IllegalArgumentException("Cannot remove inexisting peer");
}
ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
} catch (KeeperException e) {
throw new IOException("Unable to remove a peer", e);
}
}
@Override
public void enablePeer(String id) throws IOException {
changePeerState(id, ZooKeeperProtos.ReplicationState.State.ENABLED);
LOG.info("peer " + id + " is enabled");
}
@Override
public void disablePeer(String id) throws IOException {
changePeerState(id, ZooKeeperProtos.ReplicationState.State.DISABLED);
LOG.info("peer " + id + " is disabled");
}
@Override
public boolean getStatusOfConnectedPeer(String id) {
if (!this.peerClusters.containsKey(id)) {
throw new IllegalArgumentException("peer " + id + " is not connected");
}
return this.peerClusters.get(id).getPeerEnabled().get();
}
@Override
public boolean connectToPeer(String peerId) throws IOException, KeeperException {
if (peerClusters == null) {
return false;
}
if (this.peerClusters.containsKey(peerId)) {
return false;
}
ReplicationPeer peer = getPeer(peerId);
if (peer == null) {
return false;
}
this.peerClusters.put(peerId, peer);
LOG.info("Added new peer cluster " + peer.getClusterKey());
return true;
}
@Override
public void disconnectFromPeer(String peerId) {
ReplicationPeer rp = this.peerClusters.get(peerId);
if (rp != null) {
rp.getZkw().close();
this.peerClusters.remove(peerId);
}
}
@Override
public Map<String, String> getAllPeerClusterKeys() {
Map<String, String> peers = new TreeMap<String, String>();
List<String> ids = null;
try {
ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
for (String id : ids) {
byte[] bytes = ZKUtil.getData(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
String clusterKey = null;
try {
clusterKey = parsePeerFrom(bytes);
} catch (DeserializationException de) {
LOG.warn("Failed parse of clusterid=" + id + " znode content, continuing.");
continue;
}
peers.put(id, clusterKey);
}
} catch (KeeperException e) {
this.abortable.abort("Cannot get the list of peers ", e);
}
return peers;
}
@Override
public List<ServerName> getRegionServersOfConnectedPeer(String peerId) {
if (this.peerClusters.size() == 0) {
return Collections.emptyList();
}
ReplicationPeer peer = this.peerClusters.get(peerId);
if (peer == null) {
return Collections.emptyList();
}
List<ServerName> addresses;
try {
addresses = fetchSlavesAddresses(peer.getZkw());
} catch (KeeperException ke) {
reconnectPeer(ke, peer);
addresses = Collections.emptyList();
}
peer.setRegionServers(addresses);
return peer.getRegionServers();
}
@Override
public UUID getPeerUUID(String peerId) {
ReplicationPeer peer = this.peerClusters.get(peerId);
if (peer == null) {
return null;
}
UUID peerUUID = null;
try {
peerUUID = ZKClusterId.getUUIDForCluster(peer.getZkw());
} catch (KeeperException ke) {
reconnectPeer(ke, peer);
}
return peerUUID;
}
@Override
public Set<String> getConnectedPeers() {
return this.peerClusters.keySet();
}
@Override
public Configuration getPeerConf(String peerId) throws KeeperException {
String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
byte[] data = ZKUtil.getData(this.zookeeper, znode);
if (data == null) {
LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId);
return null;
}
String otherClusterKey = "";
try {
otherClusterKey = parsePeerFrom(data);
} catch (DeserializationException e) {
LOG.warn("Failed to parse cluster key from peerId=" + peerId
+ ", specifically the content from the following znode: " + znode);
return null;
}
Configuration otherConf = new Configuration(this.conf);
try {
ZKUtil.applyClusterKeyToConf(otherConf, otherClusterKey);
} catch (IOException e) {
LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e);
return null;
}
return otherConf;
}
/**
* List all registered peer clusters and set a watch on their znodes.
*/
@Override
public List<String> getAllPeerIds() {
List<String> ids = null;
try {
ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
} catch (KeeperException e) {
this.abortable.abort("Cannot get the list of peers ", e);
}
return ids;
}
/**
* A private method used during initialization. This method attempts to connect to all registered
* peer clusters. This method does not set a watch on the peer cluster znodes.
* @throws IOException
* @throws KeeperException
*/
private void connectExistingPeers() throws IOException, KeeperException {
List<String> znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
if (znodes != null) {
for (String z : znodes) {
connectToPeer(z);
}
}
}
/**
* A private method used to re-establish a zookeeper session with a peer cluster.
* @param ke
* @param peer
*/
private void reconnectPeer(KeeperException ke, ReplicationPeer peer) {
if (ke instanceof ConnectionLossException || ke instanceof SessionExpiredException) {
LOG.warn("Lost the ZooKeeper connection for peer " + peer.getClusterKey(), ke);
try {
peer.reloadZkWatcher();
} catch (IOException io) {
LOG.warn("Creation of ZookeeperWatcher failed for peer " + peer.getClusterKey(), io);
}
}
}
/**
* Get the list of all the region servers from the specified peer
* @param zkw zk connection to use
* @return list of region server addresses or an empty list if the slave is unavailable
*/
private static List<ServerName> fetchSlavesAddresses(ZooKeeperWatcher zkw)
throws KeeperException {
List<String> children = ZKUtil.listChildrenNoWatch(zkw, zkw.rsZNode);
if (children == null) {
return Collections.emptyList();
}
List<ServerName> addresses = new ArrayList<ServerName>(children.size());
for (String child : children) {
addresses.add(ServerName.parseServerName(child));
}
return addresses;
}
private String getPeerStateNode(String id) {
return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
}
/**
* Update the state znode of a peer cluster.
* @param id
* @param state
* @throws IOException
*/
private void changePeerState(String id, ZooKeeperProtos.ReplicationState.State state)
throws IOException {
try {
if (!peerExists(id)) {
throw new IllegalArgumentException("peer " + id + " is not registered");
}
String peerStateZNode = getPeerStateNode(id);
byte[] stateBytes =
(state == ZooKeeperProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
: DISABLED_ZNODE_BYTES;
if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
} else {
ZKUtil.createAndWatch(this.zookeeper, peerStateZNode, stateBytes);
}
LOG.info("state of the peer " + id + " changed to " + state.name());
} catch (KeeperException e) {
throw new IOException("Unable to change state of the peer " + id, e);
}
}
/**
* Helper method to connect to a peer
* @param peerId peer's identifier
* @return object representing the peer
* @throws IOException
* @throws KeeperException
*/
private ReplicationPeer getPeer(String peerId) throws IOException, KeeperException {
Configuration peerConf = getPeerConf(peerId);
if (peerConf == null) {
return null;
}
if (this.ourClusterKey.equals(ZKUtil.getZooKeeperClusterKey(peerConf))) {
LOG.debug("Not connecting to " + peerId + " because it's us");
return null;
}
ReplicationPeer peer =
new ReplicationPeer(peerConf, peerId, ZKUtil.getZooKeeperClusterKey(peerConf));
peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
return peer;
}
/**
* @param bytes Content of a peer znode.
* @return ClusterKey parsed from the passed bytes.
* @throws DeserializationException
*/
private static String parsePeerFrom(final byte[] bytes) throws DeserializationException {
if (ProtobufUtil.isPBMagicPrefix(bytes)) {
int pblen = ProtobufUtil.lengthOfPBMagic();
ZooKeeperProtos.ReplicationPeer.Builder builder =
ZooKeeperProtos.ReplicationPeer.newBuilder();
ZooKeeperProtos.ReplicationPeer peer;
try {
peer = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
} catch (InvalidProtocolBufferException e) {
throw new DeserializationException(e);
}
return peer.getClusterkey();
} else {
if (bytes.length > 0) {
return Bytes.toString(bytes);
}
return "";
}
}
/**
* @param clusterKey
* @return Serialized protobuf of <code>clusterKey</code> with pb magic prefix prepended suitable
* for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under
* /hbase/replication/peers/PEER_ID
*/
private static byte[] toByteArray(final String clusterKey) {
byte[] bytes =
ZooKeeperProtos.ReplicationPeer.newBuilder().setClusterkey(clusterKey).build()
.toByteArray();
return ProtobufUtil.prependPBMagic(bytes);
}
}

View File

@ -22,12 +22,14 @@ import java.util.List;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.SortedSet; import java.util.SortedSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
/** /**
* This provides an interface for maintaining a region server's replication queues. These queues * This provides an interface for maintaining a region server's replication queues. These queues
* keep track of the HLogs that still need to be replicated to remote clusters. * keep track of the HLogs that still need to be replicated to remote clusters.
*/ */
@InterfaceAudience.Private
public interface ReplicationQueues { public interface ReplicationQueues {
/** /**
@ -35,7 +37,7 @@ public interface ReplicationQueues {
* @param serverName The server name of the region server that owns the replication queues this * @param serverName The server name of the region server that owns the replication queues this
* interface manages. * interface manages.
*/ */
public void init(String serverName); public void init(String serverName) throws KeeperException;
/** /**
* Remove a replication queue. * Remove a replication queue.

View File

@ -39,8 +39,27 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import com.google.protobuf.InvalidProtocolBufferException; /**
* This class provides an implementation of the ReplicationQueues interface using Zookeeper. The
* base znode that this class works at is the myQueuesZnode. The myQueuesZnode contains a list of
* all outstanding HLog files on this region server that need to be replicated. The myQueuesZnode is
* the regionserver name (a concatenation of the region servers hostname, client port and start
* code). For example:
*
* /hbase/replication/rs/hostname.example.org,6020,1234
*
* Within this znode, the region server maintains a set of HLog replication queues. These queues are
* represented by child znodes named using there give queue id. For example:
*
* /hbase/replication/rs/hostname.example.org,6020,1234/1
* /hbase/replication/rs/hostname.example.org,6020,1234/2
*
* Each queue has one child znode for every HLog that still needs to be replicated. The value of
* these HLog child znodes is the latest position that has been replicated. This position is updated
* every time a HLog entry is replicated. For example:
*
* /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 [VALUE: 254]
*/
public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements ReplicationQueues { public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements ReplicationQueues {
/** Znode containing all replication queues for this region server. */ /** Znode containing all replication queues for this region server. */
@ -50,14 +69,15 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class); private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class);
public ReplicationQueuesZKImpl(final ZooKeeperWatcher zk, Configuration conf, Abortable abortable) public ReplicationQueuesZKImpl(final ZooKeeperWatcher zk, Configuration conf,
throws KeeperException { Abortable abortable) {
super(zk, conf, abortable); super(zk, conf, abortable);
} }
@Override @Override
public void init(String serverName) { public void init(String serverName) throws KeeperException {
this.myQueuesZnode = ZKUtil.joinZNode(this.queuesZNode, serverName); this.myQueuesZnode = ZKUtil.joinZNode(this.queuesZNode, serverName);
ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode);
} }
@Override @Override
@ -94,7 +114,7 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId); String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
znode = ZKUtil.joinZNode(znode, filename); znode = ZKUtil.joinZNode(znode, filename);
// Why serialize String of Long and not Long as bytes? // Why serialize String of Long and not Long as bytes?
ZKUtil.setData(this.zookeeper, znode, toByteArray(position)); ZKUtil.setData(this.zookeeper, znode, ZKUtil.positionToByteArray(position));
} catch (KeeperException e) { } catch (KeeperException e) {
this.abortable.abort("Failed to write replication hlog position (filename=" + filename this.abortable.abort("Failed to write replication hlog position (filename=" + filename
+ ", position=" + position + ")", e); + ", position=" + position + ")", e);
@ -107,7 +127,7 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
String znode = ZKUtil.joinZNode(clusterZnode, filename); String znode = ZKUtil.joinZNode(clusterZnode, filename);
byte[] bytes = ZKUtil.getData(this.zookeeper, znode); byte[] bytes = ZKUtil.getData(this.zookeeper, znode);
try { try {
return parseHLogPositionFrom(bytes); return ZKUtil.parseHLogPositionFrom(bytes);
} catch (DeserializationException de) { } catch (DeserializationException de) {
LOG.warn("Failed to parse HLogPosition for queueId=" + queueId + " and hlog=" + filename LOG.warn("Failed to parse HLogPosition for queueId=" + queueId + " and hlog=" + filename
+ "znode content, continuing."); + "znode content, continuing.");
@ -351,7 +371,7 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
byte[] positionBytes = ZKUtil.getData(this.zookeeper, z); byte[] positionBytes = ZKUtil.getData(this.zookeeper, z);
long position = 0; long position = 0;
try { try {
position = parseHLogPositionFrom(positionBytes); position = ZKUtil.parseHLogPositionFrom(positionBytes);
} catch (DeserializationException e) { } catch (DeserializationException e) {
LOG.warn("Failed parse of hlog position from the following znode: " + z LOG.warn("Failed parse of hlog position from the following znode: " + z
+ ", Exception: " + e); + ", Exception: " + e);
@ -380,44 +400,4 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build().toByteArray(); ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build().toByteArray();
return ProtobufUtil.prependPBMagic(bytes); return ProtobufUtil.prependPBMagic(bytes);
} }
/**
* @param position
* @return Serialized protobuf of <code>position</code> with pb magic prefix prepended suitable
* for use as content of an hlog position in a replication queue.
*/
static byte[] toByteArray(final long position) {
byte[] bytes =
ZooKeeperProtos.ReplicationHLogPosition.newBuilder().setPosition(position).build()
.toByteArray();
return ProtobufUtil.prependPBMagic(bytes);
}
/**
* @param bytes - Content of a HLog position znode.
* @return long - The current HLog position.
* @throws DeserializationException
*/
private long parseHLogPositionFrom(final byte[] bytes) throws DeserializationException {
if(bytes == null) {
throw new DeserializationException("Unable to parse null HLog position.");
}
if (ProtobufUtil.isPBMagicPrefix(bytes)) {
int pblen = ProtobufUtil.lengthOfPBMagic();
ZooKeeperProtos.ReplicationHLogPosition.Builder builder =
ZooKeeperProtos.ReplicationHLogPosition.newBuilder();
ZooKeeperProtos.ReplicationHLogPosition position;
try {
position = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
} catch (InvalidProtocolBufferException e) {
throw new DeserializationException(e);
}
return position.getPosition();
} else {
if (bytes.length > 0) {
return Bytes.toLong(bytes);
}
return 0;
}
}
} }

View File

@ -54,8 +54,6 @@ public class ReplicationStateImpl extends ReplicationStateZKBase implements
// Set a tracker on replicationStateNode // Set a tracker on replicationStateNode
this.stateTracker = this.stateTracker =
new ReplicationStateTracker(this.zookeeper, this.stateZNode, this.abortable); new ReplicationStateTracker(this.zookeeper, this.stateZNode, this.abortable);
stateTracker.start();
readReplicationStateZnode();
} }
public ReplicationStateImpl(final ZooKeeperWatcher zk, final Configuration conf, public ReplicationStateImpl(final ZooKeeperWatcher zk, final Configuration conf,
@ -63,6 +61,13 @@ public class ReplicationStateImpl extends ReplicationStateZKBase implements
this(zk, conf, abortable, new AtomicBoolean()); this(zk, conf, abortable, new AtomicBoolean());
} }
@Override
public void init() throws KeeperException {
ZKUtil.createWithParents(this.zookeeper, this.stateZNode);
stateTracker.start();
readReplicationStateZnode();
}
@Override @Override
public boolean getState() throws KeeperException { public boolean getState() throws KeeperException {
return getReplication(); return getReplication();
@ -115,8 +120,7 @@ public class ReplicationStateImpl extends ReplicationStateZKBase implements
*/ */
private void setReplicating(boolean newState) throws KeeperException { private void setReplicating(boolean newState) throws KeeperException {
ZKUtil.createWithParents(this.zookeeper, this.stateZNode); ZKUtil.createWithParents(this.zookeeper, this.stateZNode);
byte[] stateBytes = (newState == true) ? ReplicationZookeeper.ENABLED_ZNODE_BYTES byte[] stateBytes = (newState == true) ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES;
: ReplicationZookeeper.DISABLED_ZNODE_BYTES;
ZKUtil.setData(this.zookeeper, this.stateZNode, stateBytes); ZKUtil.setData(this.zookeeper, this.stateZNode, stateBytes);
} }

View File

@ -18,6 +18,7 @@
*/ */
package org.apache.hadoop.hbase.replication; package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import java.io.Closeable; import java.io.Closeable;
@ -27,11 +28,16 @@ import java.io.Closeable;
* cluster. This state is used to indicate whether replication is enabled or * cluster. This state is used to indicate whether replication is enabled or
* disabled on a cluster. * disabled on a cluster.
*/ */
@InterfaceAudience.Private
public interface ReplicationStateInterface extends Closeable { public interface ReplicationStateInterface extends Closeable {
/**
* Initialize the replication state interface.
*/
public void init() throws KeeperException;
/** /**
* Get the current state of replication (i.e. ENABLED or DISABLED). * Get the current state of replication (i.e. ENABLED or DISABLED).
*
* @return true if replication is enabled, false otherwise * @return true if replication is enabled, false otherwise
* @throws KeeperException * @throws KeeperException
*/ */
@ -39,7 +45,6 @@ public interface ReplicationStateInterface extends Closeable {
/** /**
* Set the state of replication. * Set the state of replication.
*
* @param newState * @param newState
* @throws KeeperException * @throws KeeperException
*/ */

View File

@ -22,6 +22,8 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
@ -51,6 +53,12 @@ public abstract class ReplicationStateZKBase {
protected final Configuration conf; protected final Configuration conf;
protected final Abortable abortable; protected final Abortable abortable;
// Public for testing
public static final byte[] ENABLED_ZNODE_BYTES =
toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED);
protected static final byte[] DISABLED_ZNODE_BYTES =
toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED);
public ReplicationStateZKBase(ZooKeeperWatcher zookeeper, Configuration conf, public ReplicationStateZKBase(ZooKeeperWatcher zookeeper, Configuration conf,
Abortable abortable) { Abortable abortable) {
this.zookeeper = zookeeper; this.zookeeper = zookeeper;
@ -79,8 +87,20 @@ public abstract class ReplicationStateZKBase {
return result; return result;
} }
/**
* @param state
* @return Serialized protobuf of <code>state</code> with pb magic prefix prepended suitable for
* use as content of either the cluster state znode -- whether or not we should be
* replicating kept in /hbase/replication/state -- or as content of a peer-state znode
* under a peer cluster id as in /hbase/replication/peers/PEER_ID/peer-state.
*/
protected static byte[] toByteArray(final ZooKeeperProtos.ReplicationState.State state) {
byte[] bytes =
ZooKeeperProtos.ReplicationState.newBuilder().setState(state).build().toByteArray();
return ProtobufUtil.prependPBMagic(bytes);
}
public boolean peerExists(String id) throws KeeperException { public boolean peerExists(String id) throws KeeperException {
return ZKUtil.checkExists(this.zookeeper, return ZKUtil.checkExists(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
} }
} }

View File

@ -18,7 +18,6 @@
*/ */
package org.apache.hadoop.hbase.replication; package org.apache.hadoop.hbase.replication;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -26,29 +25,18 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.TreeMap;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -83,47 +71,18 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class ReplicationZookeeper extends ReplicationStateZKBase implements Closeable { public class ReplicationZookeeper extends ReplicationStateZKBase implements Closeable {
private static final Log LOG = private static final Log LOG = LogFactory.getLog(ReplicationZookeeper.class);
LogFactory.getLog(ReplicationZookeeper.class);
// Our handle on zookeeper // Our handle on zookeeper
private final ZooKeeperWatcher zookeeper; private final ZooKeeperWatcher zookeeper;
// Map of peer clusters keyed by their id
private Map<String, ReplicationPeer> peerClusters;
// Path to the root replication znode
private String replicationZNode;
// Path to the peer clusters znode
private String peersZNode; private String peersZNode;
// Path to the znode that contains all RS that replicates
private String rsZNode;
// Path to this region server's name under rsZNode
private String rsServerNameZnode;
// Name node if the replicationState znode
private String replicationStateNodeName;
// Name of zk node which stores peer state. The peer-state znode is under a
// peers' id node; e.g. /hbase/replication/peers/PEER_ID/peer-state
private String peerStateNodeName;
private final Configuration conf; private final Configuration conf;
// The key to our own cluster
private String ourClusterKey;
// Abortable // Abortable
private Abortable abortable; private Abortable abortable;
private final ReplicationStateInterface replicationState; private final ReplicationStateInterface replicationState;
private final ReplicationPeers replicationPeers;
private final ReplicationQueues replicationQueues; private final ReplicationQueues replicationQueues;
/**
* ZNode content if enabled state.
*/
// Public so it can be seen by test code.
public static final byte[] ENABLED_ZNODE_BYTES =
toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED);
/**
* ZNode content if disabled state.
*/
static final byte[] DISABLED_ZNODE_BYTES =
toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED);
/** /**
* Constructor used by clients of replication (like master and HBase clients) * Constructor used by clients of replication (like master and HBase clients)
* @param conf conf to use * @param conf conf to use
@ -131,15 +90,18 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
* @throws IOException * @throws IOException
*/ */
public ReplicationZookeeper(final Abortable abortable, final Configuration conf, public ReplicationZookeeper(final Abortable abortable, final Configuration conf,
final ZooKeeperWatcher zk) throws KeeperException { final ZooKeeperWatcher zk) throws KeeperException, IOException {
super(zk, conf, abortable); super(zk, conf, abortable);
this.conf = conf; this.conf = conf;
this.zookeeper = zk; this.zookeeper = zk;
setZNodes(abortable); setZNodes(abortable);
this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, abortable); this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, abortable);
this.replicationState.init();
// TODO This interface is no longer used by anyone using this constructor. When this class goes // TODO This interface is no longer used by anyone using this constructor. When this class goes
// away, we will no longer have this null initialization business // away, we will no longer have this null initialization business
this.replicationQueues = null; this.replicationQueues = null;
this.replicationPeers = new ReplicationPeersZKImpl(this.zookeeper, this.conf, abortable);
this.replicationPeers.init();
} }
/** /**
@ -157,39 +119,19 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
this.zookeeper = server.getZooKeeper(); this.zookeeper = server.getZooKeeper();
this.conf = server.getConfiguration(); this.conf = server.getConfiguration();
setZNodes(server); setZNodes(server);
this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, server, replicating); this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, server, replicating);
this.peerClusters = new HashMap<String, ReplicationPeer>(); this.replicationState.init();
ZKUtil.createWithParents(this.zookeeper,
ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName().toString());
ZKUtil.createWithParents(this.zookeeper, this.rsServerNameZnode);
this.replicationQueues = new ReplicationQueuesZKImpl(this.zookeeper, this.conf, server); this.replicationQueues = new ReplicationQueuesZKImpl(this.zookeeper, this.conf, server);
this.replicationQueues.init(server.getServerName().toString()); this.replicationQueues.init(server.getServerName().toString());
connectExistingPeers(); this.replicationPeers = new ReplicationPeersZKImpl(this.zookeeper, this.conf, server);
this.replicationPeers.init();
} }
private void setZNodes(Abortable abortable) throws KeeperException { private void setZNodes(Abortable abortable) throws KeeperException {
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers"); String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state"); String replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
this.replicationStateNodeName = conf.get("zookeeper.znode.replication.state", "state");
String rsZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf);
this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName); this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName);
ZKUtil.createWithParents(this.zookeeper, this.rsZNode);
}
private void connectExistingPeers() throws IOException, KeeperException {
List<String> znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
if (znodes != null) {
for (String z : znodes) {
connectToPeer(z);
}
}
} }
/** /**
@ -197,39 +139,15 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
* @return list of all peers' identifiers * @return list of all peers' identifiers
*/ */
public List<String> listPeersIdsAndWatch() { public List<String> listPeersIdsAndWatch() {
List<String> ids = null; return this.replicationPeers.getAllPeerIds();
try {
ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
} catch (KeeperException e) {
this.abortable.abort("Cannot get the list of peers ", e);
}
return ids;
} }
/** /**
* Map of this cluster's peers for display. * Map of this cluster's peers for display.
* @return A map of peer ids to peer cluster keys * @return A map of peer ids to peer cluster keys
*/ */
public Map<String,String> listPeers() { public Map<String, String> listPeers() {
Map<String,String> peers = new TreeMap<String,String>(); return this.replicationPeers.getAllPeerClusterKeys();
List<String> ids = null;
try {
ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
for (String id : ids) {
byte[] bytes = ZKUtil.getData(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
String clusterKey = null;
try {
clusterKey = parsePeerFrom(bytes);
} catch (DeserializationException de) {
LOG.warn("Failed parse of clusterid=" + id + " znode content, continuing.");
continue;
}
peers.put(id, clusterKey);
}
} catch (KeeperException e) {
this.abortable.abort("Cannot get the list of peers ", e);
}
return peers;
} }
/** /**
@ -239,65 +157,7 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
* @return addresses of all region servers * @return addresses of all region servers
*/ */
public List<ServerName> getSlavesAddresses(String peerClusterId) { public List<ServerName> getSlavesAddresses(String peerClusterId) {
if (this.peerClusters.size() == 0) { return this.replicationPeers.getRegionServersOfConnectedPeer(peerClusterId);
return Collections.emptyList();
}
ReplicationPeer peer = this.peerClusters.get(peerClusterId);
if (peer == null) {
return Collections.emptyList();
}
List<ServerName> addresses;
try {
addresses = fetchSlavesAddresses(peer.getZkw());
} catch (KeeperException ke) {
reconnectPeer(ke, peer);
addresses = Collections.emptyList();
}
peer.setRegionServers(addresses);
return peer.getRegionServers();
}
/**
* Get the list of all the region servers from the specified peer
* @param zkw zk connection to use
* @return list of region server addresses or an empty list if the slave
* is unavailable
*/
private List<ServerName> fetchSlavesAddresses(ZooKeeperWatcher zkw)
throws KeeperException {
return listChildrenAndGetAsServerNames(zkw, zkw.rsZNode);
}
/**
* Lists the children of the specified znode, retrieving the data of each
* child as a server address.
*
* Used to list the currently online regionservers and their addresses.
*
* Sets no watches at all, this method is best effort.
*
* Returns an empty list if the node has no children. Returns null if the
* parent node itself does not exist.
*
* @param zkw zookeeper reference
* @param znode node to get children of as addresses
* @return list of data of children of specified znode, empty if no children,
* null if parent does not exist
* @throws KeeperException if unexpected zookeeper exception
*/
public static List<ServerName> listChildrenAndGetAsServerNames(
ZooKeeperWatcher zkw, String znode)
throws KeeperException {
List<String> children = ZKUtil.listChildrenNoWatch(zkw, znode);
if(children == null) {
return Collections.emptyList();
}
List<ServerName> addresses = new ArrayList<ServerName>(children.size());
for (String child : children) {
addresses.add(ServerName.parseServerName(child));
}
return addresses;
} }
/** /**
@ -306,59 +166,8 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
* @param peerId id of the peer cluster * @param peerId id of the peer cluster
* @throws KeeperException * @throws KeeperException
*/ */
public boolean connectToPeer(String peerId) public boolean connectToPeer(String peerId) throws IOException, KeeperException {
throws IOException, KeeperException { return this.replicationPeers.connectToPeer(peerId);
if (peerClusters == null) {
return false;
}
if (this.peerClusters.containsKey(peerId)) {
return false;
}
ReplicationPeer peer = getPeer(peerId);
if (peer == null) {
return false;
}
this.peerClusters.put(peerId, peer);
ZKUtil.createWithParents(this.zookeeper, ZKUtil.joinZNode(
this.rsServerNameZnode, peerId));
LOG.info("Added new peer cluster " + peer.getClusterKey());
return true;
}
/**
* Helper method to connect to a peer
* @param peerId peer's identifier
* @return object representing the peer
* @throws IOException
* @throws KeeperException
*/
public ReplicationPeer getPeer(String peerId) throws IOException, KeeperException{
String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
byte [] data = ZKUtil.getData(this.zookeeper, znode);
String otherClusterKey = "";
try {
otherClusterKey = parsePeerFrom(data);
} catch (DeserializationException e) {
LOG.warn("Failed parse of cluster key from peerId=" + peerId
+ ", specifically the content from the following znode: " + znode);
}
if (this.ourClusterKey.equals(otherClusterKey)) {
LOG.debug("Not connecting to " + peerId + " because it's us");
return null;
}
// Construct the connection to the new peer
Configuration otherConf = new Configuration(this.conf);
try {
ZKUtil.applyClusterKeyToConf(otherConf, otherClusterKey);
} catch (IOException e) {
LOG.error("Can't get peer because:", e);
return null;
}
ReplicationPeer peer = new ReplicationPeer(otherConf, peerId,
otherClusterKey);
peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
return peer;
} }
/** /**
@ -368,15 +177,7 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
* @throws IllegalArgumentException Thrown when the peer doesn't exist * @throws IllegalArgumentException Thrown when the peer doesn't exist
*/ */
public void removePeer(String id) throws IOException { public void removePeer(String id) throws IOException {
try { this.replicationPeers.removePeer(id);
if (!peerExists(id)) {
throw new IllegalArgumentException("Cannot remove inexisting peer");
}
ZKUtil.deleteNodeRecursively(this.zookeeper,
ZKUtil.joinZNode(this.peersZNode, id));
} catch (KeeperException e) {
throw new IOException("Unable to remove a peer", e);
}
} }
/** /**
@ -388,154 +189,7 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
* multi-slave isn't supported yet. * multi-slave isn't supported yet.
*/ */
public void addPeer(String id, String clusterKey) throws IOException { public void addPeer(String id, String clusterKey) throws IOException {
try { this.replicationPeers.addPeer(id, clusterKey);
if (peerExists(id)) {
throw new IllegalArgumentException("Cannot add existing peer");
}
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
ZKUtil.createAndWatch(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id),
toByteArray(clusterKey));
// There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
// peer-state znode. This happens while adding a peer.
// The peer state data is set as "ENABLED" by default.
ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, getPeerStateNode(id),
ENABLED_ZNODE_BYTES);
// A peer is enabled by default
} catch (KeeperException e) {
throw new IOException("Unable to add peer", e);
}
}
/**
* @param clusterKey
* @return Serialized protobuf of <code>clusterKey</code> with pb magic prefix
* prepended suitable for use as content of a this.peersZNode; i.e.
* the content of PEER_ID znode under /hbase/replication/peers/PEER_ID
*/
static byte[] toByteArray(final String clusterKey) {
byte[] bytes = ZooKeeperProtos.ReplicationPeer.newBuilder().setClusterkey(clusterKey).build()
.toByteArray();
return ProtobufUtil.prependPBMagic(bytes);
}
/**
* @param state
* @return Serialized protobuf of <code>state</code> with pb magic prefix
* prepended suitable for use as content of either the cluster state
* znode -- whether or not we should be replicating kept in
* /hbase/replication/state -- or as content of a peer-state znode
* under a peer cluster id as in
* /hbase/replication/peers/PEER_ID/peer-state.
*/
static byte[] toByteArray(final ZooKeeperProtos.ReplicationState.State state) {
byte[] bytes = ZooKeeperProtos.ReplicationState.newBuilder().setState(state).build()
.toByteArray();
return ProtobufUtil.prependPBMagic(bytes);
}
/**
* @param position
* @return Serialized protobuf of <code>position</code> with pb magic prefix
* prepended suitable for use as content of an hlog position in a
* replication queue.
*/
public static byte[] positionToByteArray(
final long position) {
return ZKUtil.positionToByteArray(position);
}
/**
* @param lockOwner
* @return Serialized protobuf of <code>lockOwner</code> with pb magic prefix
* prepended suitable for use as content of an replication lock during
* region server fail over.
*/
static byte[] lockToByteArray(
final String lockOwner) {
byte[] bytes = ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build()
.toByteArray();
return ProtobufUtil.prependPBMagic(bytes);
}
/**
* @param bytes Content of a peer znode.
* @return ClusterKey parsed from the passed bytes.
* @throws DeserializationException
*/
static String parsePeerFrom(final byte[] bytes) throws DeserializationException {
if (ProtobufUtil.isPBMagicPrefix(bytes)) {
int pblen = ProtobufUtil.lengthOfPBMagic();
ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer
.newBuilder();
ZooKeeperProtos.ReplicationPeer peer;
try {
peer = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
} catch (InvalidProtocolBufferException e) {
throw new DeserializationException(e);
}
return peer.getClusterkey();
} else {
if (bytes.length > 0) {
return Bytes.toString(bytes);
}
return "";
}
}
/**
* @param bytes Content of a state znode.
* @return State parsed from the passed bytes.
* @throws DeserializationException
*/
static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
throws DeserializationException {
ProtobufUtil.expectPBMagicPrefix(bytes);
int pblen = ProtobufUtil.lengthOfPBMagic();
ZooKeeperProtos.ReplicationState.Builder builder = ZooKeeperProtos.ReplicationState
.newBuilder();
ZooKeeperProtos.ReplicationState state;
try {
state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
return state.getState();
} catch (InvalidProtocolBufferException e) {
throw new DeserializationException(e);
}
}
/**
* @param bytes - Content of a HLog position znode.
* @return long - The current HLog position.
* @throws DeserializationException
*/
public static long parseHLogPositionFrom(
final byte[] bytes) throws DeserializationException {
return ZKUtil.parseHLogPositionFrom(bytes);
}
/**
* @param bytes - Content of a lock znode.
* @return String - The owner of the lock.
* @throws DeserializationException
*/
static String parseLockOwnerFrom(
final byte[] bytes) throws DeserializationException {
if (ProtobufUtil.isPBMagicPrefix(bytes)) {
int pblen = ProtobufUtil.lengthOfPBMagic();
ZooKeeperProtos.ReplicationLock.Builder builder = ZooKeeperProtos.ReplicationLock
.newBuilder();
ZooKeeperProtos.ReplicationLock lock;
try {
lock = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
} catch (InvalidProtocolBufferException e) {
throw new DeserializationException(e);
}
return lock.getLockOwner();
} else {
if (bytes.length > 0) {
return Bytes.toString(bytes);
}
return "";
}
} }
/** /**
@ -546,8 +200,7 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
* Thrown when the peer doesn't exist * Thrown when the peer doesn't exist
*/ */
public void enablePeer(String id) throws IOException { public void enablePeer(String id) throws IOException {
changePeerState(id, ZooKeeperProtos.ReplicationState.State.ENABLED); this.replicationPeers.enablePeer(id);
LOG.info("peer " + id + " is enabled");
} }
/** /**
@ -558,28 +211,7 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
* Thrown when the peer doesn't exist * Thrown when the peer doesn't exist
*/ */
public void disablePeer(String id) throws IOException { public void disablePeer(String id) throws IOException {
changePeerState(id, ZooKeeperProtos.ReplicationState.State.DISABLED); this.replicationPeers.disablePeer(id);
LOG.info("peer " + id + " is disabled");
}
private void changePeerState(String id, ZooKeeperProtos.ReplicationState.State state)
throws IOException {
try {
if (!peerExists(id)) {
throw new IllegalArgumentException("peer " + id + " is not registered");
}
String peerStateZNode = getPeerStateNode(id);
byte[] stateBytes = (state == ZooKeeperProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
: DISABLED_ZNODE_BYTES;
if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
} else {
ZKUtil.createAndWatch(zookeeper, peerStateZNode, stateBytes);
}
LOG.info("state of the peer " + id + " changed to " + state.name());
} catch (KeeperException e) {
throw new IOException("Unable to change state of the peer " + id, e);
}
} }
/** /**
@ -592,14 +224,7 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
* Thrown when the peer doesn't exist * Thrown when the peer doesn't exist
*/ */
public boolean getPeerEnabled(String id) { public boolean getPeerEnabled(String id) {
if (!this.peerClusters.containsKey(id)) { return this.replicationPeers.getStatusOfConnectedPeer(id);
throw new IllegalArgumentException("peer " + id + " is not registered");
}
return this.peerClusters.get(id).getPeerEnabled().get();
}
private String getPeerStateNode(String id) {
return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
} }
/** /**
@ -683,8 +308,7 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
public void deleteSource(String peerZnode, boolean closeConnection) { public void deleteSource(String peerZnode, boolean closeConnection) {
this.replicationQueues.removeQueue(peerZnode); this.replicationQueues.removeQueue(peerZnode);
if (closeConnection) { if (closeConnection) {
this.peerClusters.get(peerZnode).getZkw().close(); this.replicationPeers.disconnectFromPeer(peerZnode);
this.peerClusters.remove(peerZnode);
} }
} }
@ -714,40 +338,7 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
* @return a UUID or null if there's a ZK connection issue * @return a UUID or null if there's a ZK connection issue
*/ */
public UUID getPeerUUID(String peerId) { public UUID getPeerUUID(String peerId) {
ReplicationPeer peer = getPeerClusters().get(peerId); return this.replicationPeers.getPeerUUID(peerId);
UUID peerUUID = null;
try {
peerUUID = getUUIDForCluster(peer.getZkw());
} catch (KeeperException ke) {
reconnectPeer(ke, peer);
}
return peerUUID;
}
/**
* Get the UUID for the provided ZK watcher. Doesn't handle any ZK exceptions
* @param zkw watcher connected to an ensemble
* @return the UUID read from zookeeper
* @throws KeeperException
*/
public UUID getUUIDForCluster(ZooKeeperWatcher zkw) throws KeeperException {
return UUID.fromString(ZKClusterId.readClusterIdZNode(zkw));
}
private void reconnectPeer(KeeperException ke, ReplicationPeer peer) {
if (ke instanceof ConnectionLossException
|| ke instanceof SessionExpiredException) {
LOG.warn(
"Lost the ZooKeeper connection for peer " + peer.getClusterKey(),
ke);
try {
peer.reloadZkWatcher();
} catch(IOException io) {
LOG.warn(
"Creation of ZookeeperWatcher failed for peer "
+ peer.getClusterKey(), io);
}
}
} }
public void registerRegionServerListener(ZooKeeperListener listener) { public void registerRegionServerListener(ZooKeeperListener listener) {
@ -758,8 +349,8 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
* Get a map of all peer clusters * Get a map of all peer clusters
* @return map of peer cluster keyed by id * @return map of peer cluster keyed by id
*/ */
public Map<String, ReplicationPeer> getPeerClusters() { public Set<String> getPeerClusters() {
return this.peerClusters; return this.replicationPeers.getConnectedPeers();
} }
/** /**
@ -802,36 +393,4 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
public void close() throws IOException { public void close() throws IOException {
if (replicationState != null) replicationState.close(); if (replicationState != null) replicationState.close();
} }
/**
* Utility method to ensure an ENABLED znode is in place; if not present, we
* create it.
* @param zookeeper
* @param path Path to znode to check
* @return True if we created the znode.
* @throws NodeExistsException
* @throws KeeperException
*/
static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
throws NodeExistsException, KeeperException {
if (ZKUtil.checkExists(zookeeper, path) == -1) {
// There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
// peer-state znode. This happens while adding a peer.
// The peer state data is set as "ENABLED" by default.
ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path, ENABLED_ZNODE_BYTES);
return true;
}
return false;
}
/**
* @param bytes
* @return True if the passed in <code>bytes</code> are those of a pb
* serialized ENABLED state.
* @throws DeserializationException
*/
static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
}
} }

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.zookeeper; package org.apache.hadoop.hbase.zookeeper;
import java.util.UUID;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ClusterId; import org.apache.hadoop.hbase.ClusterId;
@ -77,4 +79,14 @@ public class ZKClusterId {
throws KeeperException { throws KeeperException {
ZKUtil.createSetData(watcher, watcher.clusterIdZNode, id.toByteArray()); ZKUtil.createSetData(watcher, watcher.clusterIdZNode, id.toByteArray());
} }
/**
* Get the UUID for the provided ZK watcher. Doesn't handle any ZK exceptions
* @param zkw watcher connected to an ensemble
* @return the UUID read from zookeeper
* @throws KeeperException
*/
public static UUID getUUIDForCluster(ZooKeeperWatcher zkw) throws KeeperException {
return UUID.fromString(readClusterIdZNode(zkw));
}
} }

View File

@ -1836,6 +1836,9 @@ public class ZKUtil {
* @throws DeserializationException * @throws DeserializationException
*/ */
public static long parseHLogPositionFrom(final byte[] bytes) throws DeserializationException { public static long parseHLogPositionFrom(final byte[] bytes) throws DeserializationException {
if (bytes == null) {
throw new DeserializationException("Unable to parse null HLog position.");
}
if (ProtobufUtil.isPBMagicPrefix(bytes)) { if (ProtobufUtil.isPBMagicPrefix(bytes)) {
int pblen = ProtobufUtil.lengthOfPBMagic(); int pblen = ProtobufUtil.lengthOfPBMagic();
ZooKeeperProtos.ReplicationHLogPosition.Builder builder = ZooKeeperProtos.ReplicationHLogPosition.Builder builder =

View File

@ -45,8 +45,10 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException; import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.replication.ReplicationPeersZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@ -176,17 +178,10 @@ public class Import {
cfRenameMap = createCfRenameMap(conf); cfRenameMap = createCfRenameMap(conf);
filter = instantiateFilter(conf); filter = instantiateFilter(conf);
// TODO: This is kind of ugly doing setup of ZKW just to read the clusterid. // TODO: This is kind of ugly doing setup of ZKW just to read the clusterid.
ReplicationZookeeper zkHelper = null;
ZooKeeperWatcher zkw = null; ZooKeeperWatcher zkw = null;
try { try {
HConnection connection = HConnectionManager.getConnection(conf);
zkw = new ZooKeeperWatcher(conf, context.getTaskAttemptID().toString(), null); zkw = new ZooKeeperWatcher(conf, context.getTaskAttemptID().toString(), null);
zkHelper = new ReplicationZookeeper(connection, conf, zkw); clusterId = ZKClusterId.getUUIDForCluster(zkw);
try {
this.clusterId = zkHelper.getUUIDForCluster(zkw);
} finally {
if (zkHelper != null) zkHelper.close();
}
} catch (ZooKeeperConnectionException e) { } catch (ZooKeeperConnectionException e) {
LOG.error("Problem connecting to ZooKeper during task setup", e); LOG.error("Problem connecting to ZooKeper during task setup", e);
} catch (KeeperException e) { } catch (KeeperException e) {

View File

@ -37,6 +37,8 @@ import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationPeersZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@ -117,11 +119,13 @@ public class VerifyReplication {
@Override public void abort(String why, Throwable e) {} @Override public void abort(String why, Throwable e) {}
@Override public boolean isAborted() {return false;} @Override public boolean isAborted() {return false;}
}); });
zk = new ReplicationZookeeper(conn, conf, localZKW); ReplicationPeers rp = new ReplicationPeersZKImpl(localZKW, conf, localZKW);
// Just verifying it we can connect rp.init();
peer = zk.getPeer(peerId); Configuration peerConf = rp.getPeerConf(peerId);
HTable replicatedTable = new HTable(peer.getConfiguration(), if (peerConf == null) {
conf.get(NAME+".tableName")); throw new IOException("Couldn't get peer conf!");
}
HTable replicatedTable = new HTable(peerConf, conf.get(NAME + ".tableName"));
scan.setStartRow(value.getRow()); scan.setStartRow(value.getRow());
replicatedScanner = replicatedTable.getScanner(scan); replicatedScanner = replicatedTable.getScanner(scan);
} catch (KeeperException e) { } catch (KeeperException e) {
@ -175,42 +179,6 @@ public class VerifyReplication {
if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) { if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
throw new IOException("Replication needs to be enabled to verify it."); throw new IOException("Replication needs to be enabled to verify it.");
} }
HConnectionManager.execute(new HConnectable<Void>(conf) {
@Override
public Void connect(HConnection conn) throws IOException {
ZooKeeperWatcher localZKW = null;
ReplicationZookeeper zk = null;
ReplicationPeer peer = null;
try {
localZKW = new ZooKeeperWatcher(
conf, "VerifyReplication", new Abortable() {
@Override public void abort(String why, Throwable e) {}
@Override public boolean isAborted() {return false;}
});
zk = new ReplicationZookeeper(conn, conf, localZKW);
// Just verifying it we can connect
peer = zk.getPeer(peerId);
if (peer == null) {
throw new IOException("Couldn't get access to the slave cluster," +
"please see the log");
}
} catch (KeeperException ex) {
throw new IOException("Couldn't get access to the slave cluster" +
" because: ", ex);
} finally {
if (peer != null){
peer.close();
}
if (zk != null){
zk.close();
}
if (localZKW != null){
localZKW.close();
}
}
return null;
}
});
conf.set(NAME+".peerId", peerId); conf.set(NAME+".peerId", peerId);
conf.set(NAME+".tableName", tableName); conf.set(NAME+".tableName", tableName);
conf.setLong(NAME+".startTime", startTime); conf.setLong(NAME+".startTime", startTime);

View File

@ -136,6 +136,7 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
this.zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null); this.zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null);
this.replicationQueues = new ReplicationQueuesClientZKImpl(zkw, conf, this); this.replicationQueues = new ReplicationQueuesClientZKImpl(zkw, conf, this);
this.replicationState = new ReplicationStateImpl(zkw, conf, this); this.replicationState = new ReplicationStateImpl(zkw, conf, this);
this.replicationState.init();
} catch (KeeperException e) { } catch (KeeperException e) {
LOG.error("Error while configuring " + this.getClass().getName(), e); LOG.error("Error while configuring " + this.getClass().getName(), e);
} catch (IOException e) { } catch (IOException e) {

View File

@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
@ -189,7 +190,7 @@ public class ReplicationSource extends Thread
this.metrics = new MetricsSource(peerClusterZnode); this.metrics = new MetricsSource(peerClusterZnode);
this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf); this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);
try { try {
this.clusterId = zkHelper.getUUIDForCluster(zkHelper.getZookeeperWatcher()); this.clusterId = ZKClusterId.getUUIDForCluster(zkHelper.getZookeeperWatcher());
} catch (KeeperException ke) { } catch (KeeperException ke) {
throw new IOException("Could not read cluster id", ke); throw new IOException("Could not read cluster id", ke);
} }

View File

@ -177,7 +177,7 @@ public class ReplicationSourceManager {
* old region server hlog queues * old region server hlog queues
*/ */
public void init() throws IOException { public void init() throws IOException {
for (String id : this.zkHelper.getPeerClusters().keySet()) { for (String id : this.zkHelper.getPeerClusters()) {
addSource(id); addSource(id);
} }
List<String> currentReplicators = this.replicationQueues.getListOfReplicators(); List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
@ -601,7 +601,7 @@ public class ReplicationSourceManager {
try { try {
ReplicationSourceInterface src = getReplicationSource(conf, ReplicationSourceInterface src = getReplicationSource(conf,
fs, ReplicationSourceManager.this, stopper, replicating, peerId); fs, ReplicationSourceManager.this, stopper, replicating, peerId);
if (!zkHelper.getPeerClusters().containsKey(src.getPeerClusterId())) { if (!zkHelper.getPeerClusters().contains((src.getPeerClusterId()))) {
src.terminate("Recovered queue doesn't belong to any current peer"); src.terminate("Recovered queue doesn't belong to any current peer");
break; break;
} }

View File

@ -20,12 +20,17 @@ package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.SortedSet; import java.util.SortedSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
/** /**
@ -41,6 +46,26 @@ public abstract class TestReplicationStateBasic {
protected String server1 = new ServerName("hostname1.example.org", 1234, -1L).toString(); protected String server1 = new ServerName("hostname1.example.org", 1234, -1L).toString();
protected String server2 = new ServerName("hostname2.example.org", 1234, -1L).toString(); protected String server2 = new ServerName("hostname2.example.org", 1234, -1L).toString();
protected String server3 = new ServerName("hostname3.example.org", 1234, -1L).toString(); protected String server3 = new ServerName("hostname3.example.org", 1234, -1L).toString();
protected ReplicationPeers rp;
protected static final String ID_ONE = "1";
protected static final String ID_TWO = "2";
protected static String KEY_ONE;
protected static String KEY_TWO;
// For testing when we try to replicate to ourself
protected String OUR_ID = "3";
protected String OUR_KEY;
protected static int zkTimeoutCount;
protected static final int ZK_MAX_COUNT = 300;
protected static final int ZK_SLEEP_INTERVAL = 100; // millis
private static final Log LOG = LogFactory.getLog(TestReplicationStateBasic.class);
@Before
public void setUp() {
zkTimeoutCount = 0;
}
@Test @Test
public void testReplicationQueuesClient() throws KeeperException { public void testReplicationQueuesClient() throws KeeperException {
@ -83,13 +108,15 @@ public abstract class TestReplicationStateBasic {
} }
@Test @Test
public void testReplicationQueues() throws KeeperException { public void testReplicationQueues() throws KeeperException, IOException {
rq1.init(server1); rq1.init(server1);
rq2.init(server2); rq2.init(server2);
rq3.init(server3); rq3.init(server3);
//Initialize ReplicationPeer so we can add peers (we don't transfer lone queues)
rp.init();
// Zero queues or replicators exist // 3 replicators should exist
assertEquals(0, rq1.getListOfReplicators().size()); assertEquals(3, rq1.getListOfReplicators().size());
rq1.removeQueue("bogus"); rq1.removeQueue("bogus");
rq1.removeLog("bogus", "bogus"); rq1.removeLog("bogus", "bogus");
rq1.removeAllQueues(); rq1.removeAllQueues();
@ -132,11 +159,102 @@ public abstract class TestReplicationStateBasic {
assertEquals(0, rq2.getListOfReplicators().size()); assertEquals(0, rq2.getListOfReplicators().size());
} }
@Test
public void testReplicationPeers() throws Exception {
rp.init();
// Test methods with non-existent peer ids
try {
rp.removePeer("bogus");
fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
} catch (IllegalArgumentException e) {
}
try {
rp.enablePeer("bogus");
fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
} catch (IllegalArgumentException e) {
}
try {
rp.disablePeer("bogus");
fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
} catch (IllegalArgumentException e) {
}
try {
rp.getStatusOfConnectedPeer("bogus");
fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
} catch (IllegalArgumentException e) {
}
assertFalse(rp.connectToPeer("bogus"));
rp.disconnectFromPeer("bogus");
assertEquals(0, rp.getRegionServersOfConnectedPeer("bogus").size());
assertNull(rp.getPeerUUID("bogus"));
assertNull(rp.getPeerConf("bogus"));
assertNumberOfPeers(0, 0);
// Add some peers
rp.addPeer(ID_ONE, KEY_ONE);
assertNumberOfPeers(0, 1);
rp.addPeer(ID_TWO, KEY_TWO);
assertNumberOfPeers(0, 2);
// Test methods with a peer that is added but not connected
try {
rp.getStatusOfConnectedPeer(ID_ONE);
fail("There are no connected peers, should have thrown an IllegalArgumentException");
} catch (IllegalArgumentException e) {
}
assertNull(rp.getPeerUUID(ID_ONE));
assertEquals(KEY_ONE, ZKUtil.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE)));
rp.disconnectFromPeer(ID_ONE);
assertEquals(0, rp.getRegionServersOfConnectedPeer(ID_ONE).size());
// Connect to one peer
rp.connectToPeer(ID_ONE);
assertNumberOfPeers(1, 2);
assertTrue(rp.getStatusOfConnectedPeer(ID_ONE));
rp.disablePeer(ID_ONE);
assertConnectedPeerStatus(false, ID_ONE);
rp.enablePeer(ID_ONE);
assertConnectedPeerStatus(true, ID_ONE);
assertEquals(1, rp.getRegionServersOfConnectedPeer(ID_ONE).size());
assertNotNull(rp.getPeerUUID(ID_ONE).toString());
// Disconnect peer
rp.disconnectFromPeer(ID_ONE);
assertNumberOfPeers(0, 2);
try {
rp.getStatusOfConnectedPeer(ID_ONE);
fail("There are no connected peers, should have thrown an IllegalArgumentException");
} catch (IllegalArgumentException e) {
}
}
protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception {
while (true) {
if (status == rp.getStatusOfConnectedPeer(peerId)) {
return;
}
if (zkTimeoutCount < ZK_MAX_COUNT) {
LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status
+ ", sleeping and trying again.");
Thread.sleep(ZK_SLEEP_INTERVAL);
} else {
fail("Timed out waiting for ConnectedPeerStatus to be " + status);
}
}
}
protected void assertNumberOfPeers(int connected, int total) {
assertEquals(total, rp.getAllPeerClusterKeys().size());
assertEquals(connected, rp.getConnectedPeers().size());
assertEquals(total, rp.getAllPeerIds().size());
}
/* /*
* three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 has 5 queues with 1, 2, * three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 has 5 queues with 1, 2,
* 3, 4, 5 log files respectively * 3, 4, 5 log files respectively
*/ */
protected void populateQueues() throws KeeperException { protected void populateQueues() throws KeeperException, IOException {
rq1.addLog("trash", "trash"); rq1.addLog("trash", "trash");
rq1.removeQueue("trash"); rq1.removeQueue("trash");
@ -147,6 +265,8 @@ public abstract class TestReplicationStateBasic {
for (int j = 0; j < i; j++) { for (int j = 0; j < i; j++) {
rq3.addLog("qId" + i, "filename" + j); rq3.addLog("qId" + i, "filename" + j);
} }
//Add peers for the corresponding queues so they are not orphans
rp.addPeer("qId" + i, "bogus" + i);
} }
} }
} }

View File

@ -23,16 +23,20 @@ import java.io.IOException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import static org.junit.Assert.*;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -55,20 +59,42 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
zkw = HBaseTestingUtility.getZooKeeperWatcher(utility); zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName); replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName);
KEY_ONE = initPeerClusterState("/hbase1");
KEY_TWO = initPeerClusterState("/hbase2");
}
private static String initPeerClusterState(String baseZKNode)
throws IOException, KeeperException {
// Set up state nodes of peer clusters
Configuration testConf = new Configuration(conf);
testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode);
ZooKeeperWatcher zkw1 = new ZooKeeperWatcher(testConf, "test1", null);
ReplicationStateInterface rsi = new ReplicationStateImpl(zkw1, testConf, zkw1);
rsi.init();
rsi.setState(true);
rsi.close();
String fakeRs = ZKUtil.joinZNode(zkw1.rsZNode, "hostname1.example.org:1234");
ZKUtil.createWithParents(zkw1, fakeRs);
ZKClusterId.setClusterId(zkw1, new ClusterId());
return ZKUtil.getZooKeeperClusterKey(testConf);
} }
@Before @Before
public void setUp() throws KeeperException { @Override
public void setUp() {
super.setUp();
DummyServer ds1 = new DummyServer(server1); DummyServer ds1 = new DummyServer(server1);
DummyServer ds2 = new DummyServer(server2); DummyServer ds2 = new DummyServer(server2);
DummyServer ds3 = new DummyServer(server3); DummyServer ds3 = new DummyServer(server3);
rq1 = new ReplicationQueuesZKImpl(zkw, conf, ds1); try {
rq2 = new ReplicationQueuesZKImpl(zkw, conf, ds2); rq1 = new ReplicationQueuesZKImpl(zkw, conf, ds1);
rq3 = new ReplicationQueuesZKImpl(zkw, conf, ds3); rq2 = new ReplicationQueuesZKImpl(zkw, conf, ds2);
rqc = new ReplicationQueuesClientZKImpl(zkw, conf, ds1); rq3 = new ReplicationQueuesZKImpl(zkw, conf, ds3);
String peersZnode = ZKUtil.joinZNode(replicationZNode, "peers"); rqc = new ReplicationQueuesClientZKImpl(zkw, conf, ds1);
for (int i = 1; i < 6; i++) { rp = new ReplicationPeersZKImpl(zkw, conf, zkw);
ZKUtil.createWithParents(zkw, ZKUtil.joinZNode(peersZnode, "qId"+i)); OUR_KEY = ZKUtil.getZooKeeperClusterKey(conf);
} catch (KeeperException e) {
fail("Exception thrown: " + e);
} }
} }

View File

@ -53,6 +53,8 @@ public class TestReplicationZookeeper {
private static String slaveClusterKey; private static String slaveClusterKey;
private static String peersZNode;
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
utility = new HBaseTestingUtility(); utility = new HBaseTestingUtility();
@ -63,6 +65,10 @@ public class TestReplicationZookeeper {
repZk = new ReplicationZookeeper(server, new AtomicBoolean()); repZk = new ReplicationZookeeper(server, new AtomicBoolean());
slaveClusterKey = conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + slaveClusterKey = conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" +
conf.get("hbase.zookeeper.property.clientPort") + ":/1"; conf.get("hbase.zookeeper.property.clientPort") + ":/1";
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
String replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName);
peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
} }
@AfterClass @AfterClass
@ -80,19 +86,19 @@ public class TestReplicationZookeeper {
@Test @Test
public void testIsPeerPath_PathToParentOfPeerNode() { public void testIsPeerPath_PathToParentOfPeerNode() {
String peerParentNode = repZk.getPeersZNode(); String peerParentNode = peersZNode;
assertFalse(repZk.isPeerPath(peerParentNode)); assertFalse(repZk.isPeerPath(peerParentNode));
} }
@Test @Test
public void testIsPeerPath_PathToChildOfPeerNode() { public void testIsPeerPath_PathToChildOfPeerNode() {
String peerChild = ZKUtil.joinZNode(ZKUtil.joinZNode(repZk.getPeersZNode(), "1"), "child"); String peerChild = ZKUtil.joinZNode(ZKUtil.joinZNode(peersZNode, "1"), "child");
assertFalse(repZk.isPeerPath(peerChild)); assertFalse(repZk.isPeerPath(peerChild));
} }
@Test @Test
public void testIsPeerPath_ActualPeerPath() { public void testIsPeerPath_ActualPeerPath() {
String peerPath = ZKUtil.joinZNode(repZk.getPeersZNode(), "1"); String peerPath = ZKUtil.joinZNode(peersZNode, "1");
assertTrue(repZk.isPeerPath(peerPath)); assertTrue(repZk.isPeerPath(peerPath));
} }

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@ -124,9 +125,9 @@ public class TestReplicationSourceManager {
+ conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1")); + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state"); ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state", ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
ReplicationZookeeper.ENABLED_ZNODE_BYTES); ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
ZKUtil.createWithParents(zkw, "/hbase/replication/state"); ZKUtil.createWithParents(zkw, "/hbase/replication/state");
ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationZookeeper.ENABLED_ZNODE_BYTES); ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
replication = new Replication(new DummyServer(), fs, logDir, oldLogDir); replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
manager = replication.getReplicationManager(); manager = replication.getReplicationManager();
@ -277,6 +278,7 @@ public class TestReplicationSourceManager {
for (String file : files) { for (String file : files) {
rz.addLogToList(file, "1"); rz.addLogToList(file, "1");
} }
// create 3 DummyServers // create 3 DummyServers
Server s1 = new DummyServer("ip-10-8-101-114.ec2.internal"); Server s1 = new DummyServer("ip-10-8-101-114.ec2.internal");
Server s2 = new DummyServer("ec2-107-20-52-47.compute-1.amazonaws.com"); Server s2 = new DummyServer("ec2-107-20-52-47.compute-1.amazonaws.com");