HBASE-7565 [replication] Create an interface for the replication state node
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1435250 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
832b85e8e3
commit
0c9a81436e
|
@ -185,7 +185,7 @@ public class ReplicationAdmin implements Closeable {
|
|||
boolean prev = true;
|
||||
try {
|
||||
prev = getReplicating();
|
||||
this.replicationZk.setReplicating(newState);
|
||||
this.replicationZk.setReplication(newState);
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException("Unable to set the replication state", e);
|
||||
}
|
||||
|
|
|
@ -90,7 +90,7 @@ public class ReplicationPeer implements Abortable, Closeable {
|
|||
}
|
||||
|
||||
private void readPeerStateZnode() throws DeserializationException {
|
||||
this.peerEnabled.set(ReplicationZookeeper.isPeerEnabled(this.peerStateTracker.getData(false)));
|
||||
this.peerEnabled.set(ReplicationZookeeper.isStateEnabled(this.peerStateTracker.getData(false)));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,167 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.DeserializationException;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
|
||||
/**
|
||||
* ReplicationStateImpl is responsible for maintaining the replication state
|
||||
* znode.
|
||||
*/
|
||||
public class ReplicationStateImpl implements ReplicationStateInterface {
|
||||
|
||||
private ReplicationStateTracker stateTracker;
|
||||
private final String stateZnode;
|
||||
private final ZooKeeperWatcher zookeeper;
|
||||
private final Abortable abortable;
|
||||
private final AtomicBoolean replicating;
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(ReplicationStateImpl.class);
|
||||
|
||||
public ReplicationStateImpl(final ZooKeeperWatcher zk, final String stateZnode,
|
||||
final Abortable abortable, final AtomicBoolean replicating) {
|
||||
this.zookeeper = zk;
|
||||
this.stateZnode = stateZnode;
|
||||
this.abortable = abortable;
|
||||
this.replicating = replicating;
|
||||
|
||||
// Set a tracker on replicationStateNode
|
||||
this.stateTracker = new ReplicationStateTracker(this.zookeeper, this.stateZnode,
|
||||
this.abortable);
|
||||
stateTracker.start();
|
||||
readReplicationStateZnode();
|
||||
}
|
||||
|
||||
public boolean getState() throws KeeperException {
|
||||
return getReplication();
|
||||
}
|
||||
|
||||
public void setState(boolean newState) throws KeeperException {
|
||||
setReplicating(newState);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (stateTracker != null) stateTracker.stop();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param bytes
|
||||
* @return True if the passed in <code>bytes</code> are those of a pb
|
||||
* serialized ENABLED state.
|
||||
* @throws DeserializationException
|
||||
*/
|
||||
private boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
|
||||
ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
|
||||
return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param bytes Content of a state znode.
|
||||
* @return State parsed from the passed bytes.
|
||||
* @throws DeserializationException
|
||||
*/
|
||||
private ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
|
||||
throws DeserializationException {
|
||||
ProtobufUtil.expectPBMagicPrefix(bytes);
|
||||
int pblen = ProtobufUtil.lengthOfPBMagic();
|
||||
ZooKeeperProtos.ReplicationState.Builder builder = ZooKeeperProtos.ReplicationState
|
||||
.newBuilder();
|
||||
ZooKeeperProtos.ReplicationState state;
|
||||
try {
|
||||
state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
|
||||
return state.getState();
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
throw new DeserializationException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the new replication state for this cluster
|
||||
* @param newState
|
||||
*/
|
||||
private void setReplicating(boolean newState) throws KeeperException {
|
||||
ZKUtil.createWithParents(this.zookeeper, this.stateZnode);
|
||||
byte[] stateBytes = (newState == true) ? ReplicationZookeeper.ENABLED_ZNODE_BYTES
|
||||
: ReplicationZookeeper.DISABLED_ZNODE_BYTES;
|
||||
ZKUtil.setData(this.zookeeper, this.stateZnode, stateBytes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the replication status of this cluster. If the state znode doesn't
|
||||
* exist it will also create it and set it true.
|
||||
* @return returns true when it's enabled, else false
|
||||
* @throws KeeperException
|
||||
*/
|
||||
private boolean getReplication() throws KeeperException {
|
||||
byte[] data = this.stateTracker.getData(false);
|
||||
if (data == null || data.length == 0) {
|
||||
setReplicating(true);
|
||||
return true;
|
||||
}
|
||||
try {
|
||||
return isStateEnabled(data);
|
||||
} catch (DeserializationException e) {
|
||||
throw ZKUtil.convert(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This reads the state znode for replication and sets the atomic boolean
|
||||
*/
|
||||
private void readReplicationStateZnode() {
|
||||
try {
|
||||
this.replicating.set(getReplication());
|
||||
LOG.info("Replication is now " + (this.replicating.get() ? "started" : "stopped"));
|
||||
} catch (KeeperException e) {
|
||||
this.abortable.abort("Failed getting data on from " + this.stateZnode, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tracker for status of the replication
|
||||
*/
|
||||
private class ReplicationStateTracker extends ZooKeeperNodeTracker {
|
||||
public ReplicationStateTracker(ZooKeeperWatcher watcher, String stateZnode, Abortable abortable) {
|
||||
super(watcher, stateZnode, abortable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void nodeDataChanged(String path) {
|
||||
if (path.equals(node)) {
|
||||
super.nodeDataChanged(path);
|
||||
readReplicationStateZnode();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import java.io.Closeable;
|
||||
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
/**
|
||||
* This provides an interface for getting and setting the replication state of a
|
||||
* cluster. This state is used to indicate whether replication is enabled or
|
||||
* disabled on a cluster.
|
||||
*/
|
||||
public interface ReplicationStateInterface extends Closeable {
|
||||
|
||||
/**
|
||||
* Get the current state of replication (i.e. ENABLED or DISABLED).
|
||||
*
|
||||
* @return true if replication is enabled, false otherwise
|
||||
* @throws KeeperException
|
||||
*/
|
||||
public boolean getState() throws KeeperException;
|
||||
|
||||
/**
|
||||
* Set the state of replication.
|
||||
*
|
||||
* @param newState
|
||||
* @throws KeeperException
|
||||
*/
|
||||
public void setState(boolean newState) throws KeeperException;
|
||||
}
|
|
@ -43,12 +43,10 @@ import org.apache.hadoop.hbase.Server;
|
|||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.Replication;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.KeeperException.ConnectionLossException;
|
||||
|
@ -87,7 +85,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
|
|||
* </pre>
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ReplicationZookeeper implements Closeable{
|
||||
public class ReplicationZookeeper implements Closeable {
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(ReplicationZookeeper.class);
|
||||
// Name of znode we use to lock when failover
|
||||
|
@ -111,24 +109,24 @@ public class ReplicationZookeeper implements Closeable{
|
|||
// peers' id node; e.g. /hbase/replication/peers/PEER_ID/peer-state
|
||||
private String peerStateNodeName;
|
||||
private final Configuration conf;
|
||||
// Is this cluster replicating at the moment?
|
||||
private AtomicBoolean replicating;
|
||||
// The key to our own cluster
|
||||
private String ourClusterKey;
|
||||
// Abortable
|
||||
private Abortable abortable;
|
||||
private ReplicationStatusTracker statusTracker;
|
||||
private final ReplicationStateInterface replicationState;
|
||||
|
||||
/**
|
||||
* ZNode content if enabled state.
|
||||
*/
|
||||
// Public so it can be seen by test code.
|
||||
public static final byte[] ENABLED_ZNODE_BYTES = toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED);
|
||||
public static final byte[] ENABLED_ZNODE_BYTES =
|
||||
toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED);
|
||||
|
||||
/**
|
||||
* ZNode content if disabled state.
|
||||
*/
|
||||
static final byte[] DISABLED_ZNODE_BYTES = toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED);
|
||||
static final byte[] DISABLED_ZNODE_BYTES =
|
||||
toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED);
|
||||
|
||||
/**
|
||||
* Constructor used by clients of replication (like master and HBase clients)
|
||||
|
@ -140,8 +138,9 @@ public class ReplicationZookeeper implements Closeable{
|
|||
final ZooKeeperWatcher zk) throws KeeperException {
|
||||
this.conf = conf;
|
||||
this.zookeeper = zk;
|
||||
this.replicating = new AtomicBoolean();
|
||||
setZNodes(abortable);
|
||||
this.replicationState =
|
||||
new ReplicationStateImpl(this.zookeeper, getRepStateNode(), abortable, new AtomicBoolean());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -157,9 +156,10 @@ public class ReplicationZookeeper implements Closeable{
|
|||
this.abortable = server;
|
||||
this.zookeeper = server.getZooKeeper();
|
||||
this.conf = server.getConfiguration();
|
||||
this.replicating = replicating;
|
||||
setZNodes(server);
|
||||
|
||||
this.replicationState =
|
||||
new ReplicationStateImpl(this.zookeeper, getRepStateNode(), server, replicating);
|
||||
this.peerClusters = new HashMap<String, ReplicationPeer>();
|
||||
ZKUtil.createWithParents(this.zookeeper,
|
||||
ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
|
||||
|
@ -180,11 +180,6 @@ public class ReplicationZookeeper implements Closeable{
|
|||
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
|
||||
this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName);
|
||||
ZKUtil.createWithParents(this.zookeeper, this.rsZNode);
|
||||
|
||||
// Set a tracker on replicationStateNodeNode
|
||||
this.statusTracker = new ReplicationStatusTracker(this.zookeeper, abortable);
|
||||
statusTracker.start();
|
||||
readReplicationStateZnode();
|
||||
}
|
||||
|
||||
private void connectExistingPeers() throws IOException, KeeperException {
|
||||
|
@ -365,18 +360,6 @@ public class ReplicationZookeeper implements Closeable{
|
|||
return peer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the new replication state for this cluster
|
||||
* @param newState
|
||||
*/
|
||||
public void setReplicating(boolean newState) throws KeeperException {
|
||||
ZKUtil.createWithParents(this.zookeeper,
|
||||
ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
|
||||
byte[] stateBytes = (newState == true) ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES;
|
||||
ZKUtil.setData(this.zookeeper,
|
||||
ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName), stateBytes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove the peer from zookeeper. which will trigger the watchers on every
|
||||
* region server and close their sources
|
||||
|
@ -641,40 +624,27 @@ public class ReplicationZookeeper implements Closeable{
|
|||
return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
|
||||
}
|
||||
|
||||
/**
|
||||
* This reads the state znode for replication and sets the atomic boolean
|
||||
*/
|
||||
private void readReplicationStateZnode() {
|
||||
try {
|
||||
this.replicating.set(getReplication());
|
||||
LOG.info("Replication is now " + (this.replicating.get()?
|
||||
"started" : "stopped"));
|
||||
} catch (KeeperException e) {
|
||||
this.abortable.abort("Failed getting data on from " + getRepStateNode(), e);
|
||||
}
|
||||
private String getRepStateNode() {
|
||||
return ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the replication status of this cluster. If the state znode doesn't
|
||||
* exist it will also create it and set it true.
|
||||
* Get the replication status of this cluster. If the state znode doesn't exist it will also
|
||||
* create it and set it true.
|
||||
* @return returns true when it's enabled, else false
|
||||
* @throws KeeperException
|
||||
*/
|
||||
public boolean getReplication() throws KeeperException {
|
||||
byte [] data = this.statusTracker.getData(false);
|
||||
if (data == null || data.length == 0) {
|
||||
setReplicating(true);
|
||||
return true;
|
||||
}
|
||||
try {
|
||||
return isPeerEnabled(data);
|
||||
} catch (DeserializationException e) {
|
||||
throw ZKUtil.convert(e);
|
||||
}
|
||||
return this.replicationState.getState();
|
||||
}
|
||||
|
||||
private String getRepStateNode() {
|
||||
return ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName);
|
||||
/**
|
||||
* Set the new replication state for this cluster
|
||||
* @param newState
|
||||
* @throws KeeperException
|
||||
*/
|
||||
public void setReplication(boolean newState) throws KeeperException {
|
||||
this.replicationState.setState(newState);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1055,8 +1025,7 @@ public class ReplicationZookeeper implements Closeable{
|
|||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (statusTracker != null)
|
||||
statusTracker.stop();
|
||||
if (replicationState != null) replicationState.close();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1086,26 +1055,8 @@ public class ReplicationZookeeper implements Closeable{
|
|||
* serialized ENABLED state.
|
||||
* @throws DeserializationException
|
||||
*/
|
||||
static boolean isPeerEnabled(final byte[] bytes) throws DeserializationException {
|
||||
static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
|
||||
ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
|
||||
return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tracker for status of the replication
|
||||
*/
|
||||
public class ReplicationStatusTracker extends ZooKeeperNodeTracker {
|
||||
public ReplicationStatusTracker(ZooKeeperWatcher watcher,
|
||||
Abortable abortable) {
|
||||
super(watcher, getRepStateNode(), abortable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void nodeDataChanged(String path) {
|
||||
if (path.equals(node)) {
|
||||
super.nodeDataChanged(path);
|
||||
readReplicationStateZnode();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue