HBASE-8861 Remove ReplicationState completely (Chris Trezzo via JD)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1501503 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2013-07-09 19:59:39 +00:00
parent 1cf2511b30
commit ed997ccdeb
24 changed files with 31 additions and 591 deletions

View File

@ -52,16 +52,6 @@ import java.util.Map;
* used to keep track of the replication state. * used to keep track of the replication state.
* </p> * </p>
* <p> * <p>
* Enabling and disabling peers is currently not supported.
* </p>
* <p>
* As cluster replication is still experimental, a kill switch is provided
* in order to stop all replication-related operations, see
* {@link #setReplicating(boolean)}. When setting it back to true, the new
* state of all the replication streams will be unknown and may have holes.
* Use at your own risk.
* </p>
* <p>
* To see which commands are available in the shell, type * To see which commands are available in the shell, type
* <code>replication</code>. * <code>replication</code>.
* </p> * </p>
@ -162,36 +152,6 @@ public class ReplicationAdmin implements Closeable {
return this.replicationZk.listPeers(); return this.replicationZk.listPeers();
} }
/**
* Get the current status of the kill switch, if the cluster is replicating
* or not.
* @return true if the cluster is replicated, otherwise false
*/
public boolean getReplicating() throws IOException {
try {
return this.replicationZk.getReplication();
} catch (KeeperException e) {
throw new IOException("Couldn't get the replication status");
}
}
/**
* Kill switch for all replication-related features
* @param newState true to start replication, false to stop it.
* completely
* @return the previous state
*/
public boolean setReplicating(boolean newState) throws IOException {
boolean prev = true;
try {
prev = getReplicating();
this.replicationZk.setReplication(newState);
} catch (KeeperException e) {
throw new IOException("Unable to set the replication state", e);
}
return prev;
}
/** /**
* Get the ZK-support tool created and used by this object for replication. * Get the ZK-support tool created and used by this object for replication.
* @return the ZK-support tool * @return the ZK-support tool

View File

@ -1,174 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* ReplicationStateImpl is responsible for maintaining the replication state
* znode.
*/
public class ReplicationStateImpl extends ReplicationStateZKBase implements
ReplicationStateInterface {
private final ReplicationStateTracker stateTracker;
private final AtomicBoolean replicating;
private static final Log LOG = LogFactory.getLog(ReplicationStateImpl.class);
public ReplicationStateImpl(final ZooKeeperWatcher zk, final Configuration conf,
final Abortable abortable, final AtomicBoolean replicating) {
super(zk, conf, abortable);
this.replicating = replicating;
// Set a tracker on replicationStateNode
this.stateTracker =
new ReplicationStateTracker(this.zookeeper, this.stateZNode, this.abortable);
}
public ReplicationStateImpl(final ZooKeeperWatcher zk, final Configuration conf,
final Abortable abortable) {
this(zk, conf, abortable, new AtomicBoolean());
}
@Override
public void init() throws KeeperException {
ZKUtil.createWithParents(this.zookeeper, this.stateZNode);
stateTracker.start();
readReplicationStateZnode();
}
@Override
public boolean getState() throws KeeperException {
return getReplication();
}
@Override
public void setState(boolean newState) throws KeeperException {
setReplicating(newState);
}
@Override
public void close() throws IOException {
if (stateTracker != null) stateTracker.stop();
}
/**
* @param bytes
* @return True if the passed in <code>bytes</code> are those of a pb
* serialized ENABLED state.
* @throws DeserializationException
*/
private boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
}
/**
* @param bytes Content of a state znode.
* @return State parsed from the passed bytes.
* @throws DeserializationException
*/
private ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
throws DeserializationException {
ProtobufUtil.expectPBMagicPrefix(bytes);
int pblen = ProtobufUtil.lengthOfPBMagic();
ZooKeeperProtos.ReplicationState.Builder builder = ZooKeeperProtos.ReplicationState
.newBuilder();
ZooKeeperProtos.ReplicationState state;
try {
state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
return state.getState();
} catch (InvalidProtocolBufferException e) {
throw new DeserializationException(e);
}
}
/**
* Set the new replication state for this cluster
* @param newState
*/
private void setReplicating(boolean newState) throws KeeperException {
ZKUtil.createWithParents(this.zookeeper, this.stateZNode);
byte[] stateBytes = (newState == true) ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES;
ZKUtil.setData(this.zookeeper, this.stateZNode, stateBytes);
}
/**
* Get the replication status of this cluster. If the state znode doesn't
* exist it will also create it and set it true.
* @return returns true when it's enabled, else false
* @throws KeeperException
*/
private boolean getReplication() throws KeeperException {
byte[] data = this.stateTracker.getData(false);
if (data == null || data.length == 0) {
setReplicating(true);
return true;
}
try {
return isStateEnabled(data);
} catch (DeserializationException e) {
throw ZKUtil.convert(e);
}
}
/**
* This reads the state znode for replication and sets the atomic boolean
*/
private void readReplicationStateZnode() {
try {
this.replicating.set(getReplication());
LOG.info("Replication is now " + (this.replicating.get() ? "started" : "stopped"));
} catch (KeeperException e) {
this.abortable.abort("Failed getting data on from " + this.stateZNode, e);
}
}
/**
* Tracker for status of the replication
*/
private class ReplicationStateTracker extends ZooKeeperNodeTracker {
public ReplicationStateTracker(ZooKeeperWatcher watcher, String stateZnode, Abortable abortable) {
super(watcher, stateZnode, abortable);
}
@Override
public synchronized void nodeDataChanged(String path) {
if (path.equals(node)) {
super.nodeDataChanged(path);
readReplicationStateZnode();
}
}
}
}

View File

@ -1,52 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import java.io.Closeable;
/**
* This provides an interface for getting and setting the replication state of a
* cluster. This state is used to indicate whether replication is enabled or
* disabled on a cluster.
*/
@InterfaceAudience.Private
public interface ReplicationStateInterface extends Closeable {
/**
* Initialize the replication state interface.
*/
public void init() throws KeeperException;
/**
* Get the current state of replication (i.e. ENABLED or DISABLED).
* @return true if replication is enabled, false otherwise
* @throws KeeperException
*/
public boolean getState() throws KeeperException;
/**
* Set the state of replication.
* @param newState
* @throws KeeperException
*/
public void setState(boolean newState) throws KeeperException;
}

View File

@ -39,8 +39,6 @@ public abstract class ReplicationStateZKBase {
* cluster. * cluster.
*/ */
protected final String peerStateNodeName; protected final String peerStateNodeName;
/** The name of the znode that contains the replication status of the local cluster. */
protected final String stateZNode;
/** The name of the base znode that contains all replication state. */ /** The name of the base znode that contains all replication state. */
protected final String replicationZNode; protected final String replicationZNode;
/** The name of the znode that contains a list of all remote slave (i.e. peer) clusters. */ /** The name of the znode that contains a list of all remote slave (i.e. peer) clusters. */
@ -68,11 +66,9 @@ public abstract class ReplicationStateZKBase {
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers"); String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs"); String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
String stateZNodeName = conf.get("zookeeper.znode.replication.state", "state");
this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state"); this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf); this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf);
this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName); this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
this.stateZNode = ZKUtil.joinZNode(replicationZNode, stateZNodeName);
this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName); this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
this.queuesZNode = ZKUtil.joinZNode(replicationZNode, queuesZNodeName); this.queuesZNode = ZKUtil.joinZNode(replicationZNode, queuesZNodeName);
} }
@ -90,9 +86,8 @@ public abstract class ReplicationStateZKBase {
/** /**
* @param state * @param state
* @return Serialized protobuf of <code>state</code> with pb magic prefix prepended suitable for * @return Serialized protobuf of <code>state</code> with pb magic prefix prepended suitable for
* use as content of either the cluster state znode -- whether or not we should be * use as content of a peer-state znode under a peer cluster id as in
* replicating kept in /hbase/replication/state -- or as content of a peer-state znode * /hbase/replication/peers/PEER_ID/peer-state.
* under a peer cluster id as in /hbase/replication/peers/PEER_ID/peer-state.
*/ */
protected static byte[] toByteArray(final ZooKeeperProtos.ReplicationState.State state) { protected static byte[] toByteArray(final ZooKeeperProtos.ReplicationState.State state) {
byte[] bytes = byte[] bytes =

View File

@ -70,7 +70,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
* </pre> * </pre>
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class ReplicationZookeeper extends ReplicationStateZKBase implements Closeable { public class ReplicationZookeeper extends ReplicationStateZKBase {
private static final Log LOG = LogFactory.getLog(ReplicationZookeeper.class); private static final Log LOG = LogFactory.getLog(ReplicationZookeeper.class);
// Our handle on zookeeper // Our handle on zookeeper
@ -79,7 +79,6 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
private final Configuration conf; private final Configuration conf;
// Abortable // Abortable
private Abortable abortable; private Abortable abortable;
private final ReplicationStateInterface replicationState;
private final ReplicationPeers replicationPeers; private final ReplicationPeers replicationPeers;
private final ReplicationQueues replicationQueues; private final ReplicationQueues replicationQueues;
@ -95,8 +94,6 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
this.conf = conf; this.conf = conf;
this.zookeeper = zk; this.zookeeper = zk;
setZNodes(abortable); setZNodes(abortable);
this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, abortable);
this.replicationState.init();
// TODO This interface is no longer used by anyone using this constructor. When this class goes // TODO This interface is no longer used by anyone using this constructor. When this class goes
// away, we will no longer have this null initialization business // away, we will no longer have this null initialization business
this.replicationQueues = null; this.replicationQueues = null;
@ -108,19 +105,16 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
* Constructor used by region servers, connects to the peer cluster right away. * Constructor used by region servers, connects to the peer cluster right away.
* *
* @param server * @param server
* @param replicating atomic boolean to start/stop replication
* @throws IOException * @throws IOException
* @throws KeeperException * @throws KeeperException
*/ */
public ReplicationZookeeper(final Server server, final AtomicBoolean replicating) public ReplicationZookeeper(final Server server)
throws IOException, KeeperException { throws IOException, KeeperException {
super(server.getZooKeeper(), server.getConfiguration(), server); super(server.getZooKeeper(), server.getConfiguration(), server);
this.abortable = server; this.abortable = server;
this.zookeeper = server.getZooKeeper(); this.zookeeper = server.getZooKeeper();
this.conf = server.getConfiguration(); this.conf = server.getConfiguration();
setZNodes(server); setZNodes(server);
this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, server, replicating);
this.replicationState.init();
this.replicationQueues = new ReplicationQueuesZKImpl(this.zookeeper, this.conf, server); this.replicationQueues = new ReplicationQueuesZKImpl(this.zookeeper, this.conf, server);
this.replicationQueues.init(server.getServerName().toString()); this.replicationQueues.init(server.getServerName().toString());
this.replicationPeers = new ReplicationPeersZKImpl(this.zookeeper, this.conf, server); this.replicationPeers = new ReplicationPeersZKImpl(this.zookeeper, this.conf, server);
@ -227,25 +221,6 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
return this.replicationPeers.getStatusOfConnectedPeer(id); return this.replicationPeers.getStatusOfConnectedPeer(id);
} }
/**
* Get the replication status of this cluster. If the state znode doesn't exist it will also
* create it and set it true.
* @return returns true when it's enabled, else false
* @throws KeeperException
*/
public boolean getReplication() throws KeeperException {
return this.replicationState.getState();
}
/**
* Set the new replication state for this cluster
* @param newState
* @throws KeeperException
*/
public void setReplication(boolean newState) throws KeeperException {
this.replicationState.setState(newState);
}
/** /**
* Add a new log to the list of hlogs in zookeeper * Add a new log to the list of hlogs in zookeeper
* @param filename name of the hlog's znode * @param filename name of the hlog's znode
@ -388,9 +363,4 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
public String getPeersZNode() { public String getPeersZNode() {
return peersZNode; return peersZNode;
} }
@Override
public void close() throws IOException {
if (replicationState != null) replicationState.close();
}
} }

View File

@ -111,7 +111,6 @@ public class VerifyReplication {
@Override @Override
public Void connect(HConnection conn) throws IOException { public Void connect(HConnection conn) throws IOException {
ZooKeeperWatcher localZKW = null; ZooKeeperWatcher localZKW = null;
ReplicationZookeeper zk = null;
ReplicationPeer peer = null; ReplicationPeer peer = null;
try { try {
localZKW = new ZooKeeperWatcher( localZKW = new ZooKeeperWatcher(
@ -134,9 +133,6 @@ public class VerifyReplication {
if (peer != null) { if (peer != null) {
peer.close(); peer.close();
} }
if (zk != null) {
zk.close();
}
if (localZKW != null) { if (localZKW != null) {
localZKW.close(); localZKW.close();
} }

View File

@ -23,15 +23,12 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl; import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationStateImpl;
import org.apache.hadoop.hbase.replication.ReplicationStateInterface;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
@ -49,7 +46,6 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class); private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class);
private ZooKeeperWatcher zkw; private ZooKeeperWatcher zkw;
private ReplicationQueuesClient replicationQueues; private ReplicationQueuesClient replicationQueues;
private ReplicationStateInterface replicationState;
private final Set<String> hlogs = new HashSet<String>(); private final Set<String> hlogs = new HashSet<String>();
private boolean stopped = false; private boolean stopped = false;
private boolean aborted; private boolean aborted;
@ -58,15 +54,6 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
@Override @Override
public boolean isLogDeletable(FileStatus fStat) { public boolean isLogDeletable(FileStatus fStat) {
try {
if (!replicationState.getState()) {
return false;
}
} catch (KeeperException e) {
abort("Cannot get the state of replication", e);
return false;
}
// all members of this class are null if replication is disabled, and we // all members of this class are null if replication is disabled, and we
// return true since false would render the LogsCleaner useless // return true since false would render the LogsCleaner useless
if (this.getConf() == null) { if (this.getConf() == null) {
@ -136,8 +123,6 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
try { try {
this.zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null); this.zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null);
this.replicationQueues = new ReplicationQueuesClientZKImpl(zkw, conf, this); this.replicationQueues = new ReplicationQueuesClientZKImpl(zkw, conf, this);
this.replicationState = new ReplicationStateImpl(zkw, conf, this);
this.replicationState.init();
} catch (KeeperException e) { } catch (KeeperException e) {
LOG.error("Error while configuring " + this.getClass().getName(), e); LOG.error("Error while configuring " + this.getClass().getName(), e);
} catch (IOException e) { } catch (IOException e) {
@ -155,14 +140,6 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
LOG.info("Stopping " + this.zkw); LOG.info("Stopping " + this.zkw);
this.zkw.close(); this.zkw.close();
} }
if (this.replicationState != null) {
LOG.info("Stopping " + this.replicationState);
try {
this.replicationState.close();
} catch (IOException e) {
LOG.error("Error while stopping " + this.replicationState, e);
}
}
// Not sure why we're deleting a connection that we never acquired or used // Not sure why we're deleting a connection that we never acquired or used
HConnectionManager.deleteConnection(this.getConf()); HConnectionManager.deleteConnection(this.getConf());
} }

View File

@ -66,7 +66,6 @@ public class Replication implements WALActionsListener,
LogFactory.getLog(Replication.class); LogFactory.getLog(Replication.class);
private boolean replication; private boolean replication;
private ReplicationSourceManager replicationManager; private ReplicationSourceManager replicationManager;
private final AtomicBoolean replicating = new AtomicBoolean(true);
private ReplicationZookeeper zkHelper; private ReplicationZookeeper zkHelper;
private ReplicationQueues replicationQueues; private ReplicationQueues replicationQueues;
private Configuration conf; private Configuration conf;
@ -108,17 +107,16 @@ public class Replication implements WALActionsListener,
.build()); .build());
if (replication) { if (replication) {
try { try {
this.zkHelper = new ReplicationZookeeper(server, this.replicating); this.zkHelper = new ReplicationZookeeper(server);
this.replicationQueues = this.replicationQueues =
new ReplicationQueuesZKImpl(server.getZooKeeper(), this.conf, this.server); new ReplicationQueuesZKImpl(server.getZooKeeper(), this.conf, this.server);
this.replicationQueues.init(this.server.getServerName().toString()); this.replicationQueues.init(this.server.getServerName().toString());
} catch (KeeperException ke) { } catch (KeeperException ke) {
throw new IOException("Failed replication handler create " + throw new IOException("Failed replication handler create", ke);
"(replicating=" + this.replicating, ke);
} }
this.replicationManager = this.replicationManager =
new ReplicationSourceManager(zkHelper, replicationQueues, conf, this.server, fs, new ReplicationSourceManager(zkHelper, replicationQueues, conf, this.server, fs, logDir,
this.replicating, logDir, oldLogDir); oldLogDir);
this.statsThreadPeriod = this.statsThreadPeriod =
this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod); LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);

View File

@ -112,9 +112,8 @@ public class ReplicationSink {
} }
/** /**
* Replicate this array of entries directly into the local cluster using the native client. * Replicate this array of entries directly into the local cluster using the native client. Only
* Only operates against raw protobuf type saving on a convertion from pb to pojo. * operates against raw protobuf type saving on a conversion from pb to pojo.
*
* @param entries * @param entries
* @param cells * @param cells
* @throws IOException * @throws IOException

View File

@ -93,8 +93,6 @@ public class ReplicationSource extends Thread
// ratio of region servers to chose from a slave cluster // ratio of region servers to chose from a slave cluster
private float ratio; private float ratio;
private Random random; private Random random;
// should we replicate or not?
private AtomicBoolean replicating;
private ReplicationQueueInfo replicationQueueInfo; private ReplicationQueueInfo replicationQueueInfo;
// id of the peer cluster this source replicates to // id of the peer cluster this source replicates to
private String peerId; private String peerId;
@ -149,7 +147,6 @@ public class ReplicationSource extends Thread
* @param fs file system to use * @param fs file system to use
* @param manager replication manager to ping to * @param manager replication manager to ping to
* @param stopper the atomic boolean to use to stop the regionserver * @param stopper the atomic boolean to use to stop the regionserver
* @param replicating the atomic boolean that starts/stops replication
* @param peerClusterZnode the name of our znode * @param peerClusterZnode the name of our znode
* @throws IOException * @throws IOException
*/ */
@ -157,7 +154,6 @@ public class ReplicationSource extends Thread
final FileSystem fs, final FileSystem fs,
final ReplicationSourceManager manager, final ReplicationSourceManager manager,
final Stoppable stopper, final Stoppable stopper,
final AtomicBoolean replicating,
final String peerClusterZnode) final String peerClusterZnode)
throws IOException { throws IOException {
this.stopper = stopper; this.stopper = stopper;
@ -185,7 +181,6 @@ public class ReplicationSource extends Thread
this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f); this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f);
this.currentPeers = new ArrayList<ServerName>(); this.currentPeers = new ArrayList<ServerName>();
this.random = new Random(); this.random = new Random();
this.replicating = replicating;
this.manager = manager; this.manager = manager;
this.sleepForRetries = this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000); this.conf.getLong("replication.source.sleepforretries", 1000);
@ -417,9 +412,8 @@ public class ReplicationSource extends Thread
removeNonReplicableEdits(entry); removeNonReplicableEdits(entry);
// Don't replicate catalog entries, if the WALEdit wasn't // Don't replicate catalog entries, if the WALEdit wasn't
// containing anything to replicate and if we're currently not set to replicate // containing anything to replicate and if we're currently not set to replicate
if (!(Bytes.equals(logKey.getTablename(), HConstants.ROOT_TABLE_NAME) || if (!(Bytes.equals(logKey.getTablename(), HConstants.ROOT_TABLE_NAME) || Bytes.equals(
Bytes.equals(logKey.getTablename(), HConstants.META_TABLE_NAME)) && logKey.getTablename(), HConstants.META_TABLE_NAME)) && edit.size() != 0) {
edit.size() != 0 && replicating.get()) {
// Only set the clusterId if is a local key. // Only set the clusterId if is a local key.
// This ensures that the originator sets the cluster id // This ensures that the originator sets the cluster id
// and all replicas retain the initial cluster id. // and all replicas retain the initial cluster id.
@ -714,8 +708,7 @@ public class ReplicationSource extends Thread
* @return true if the peer is enabled, otherwise false * @return true if the peer is enabled, otherwise false
*/ */
protected boolean isPeerEnabled() { protected boolean isPeerEnabled() {
return this.replicating.get() && return this.zkHelper.getPeerEnabled(this.peerId);
this.zkHelper.getPeerEnabled(this.peerId);
} }
/** /**

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.replication.regionserver; package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -39,7 +38,6 @@ public interface ReplicationSourceInterface {
* @param fs the file system to use * @param fs the file system to use
* @param manager the manager to use * @param manager the manager to use
* @param stopper the stopper object for this region server * @param stopper the stopper object for this region server
* @param replicating the status of the replication on this cluster
* @param peerClusterId the id of the peer cluster * @param peerClusterId the id of the peer cluster
* @throws IOException * @throws IOException
*/ */
@ -47,7 +45,6 @@ public interface ReplicationSourceInterface {
final FileSystem fs, final FileSystem fs,
final ReplicationSourceManager manager, final ReplicationSourceManager manager,
final Stoppable stopper, final Stoppable stopper,
final AtomicBoolean replicating,
final String peerClusterId) throws IOException; final String peerClusterId) throws IOException;
/** /**

View File

@ -69,8 +69,6 @@ public class ReplicationSourceManager {
private final List<ReplicationSourceInterface> sources; private final List<ReplicationSourceInterface> sources;
// List of all the sources we got from died RSs // List of all the sources we got from died RSs
private final List<ReplicationSourceInterface> oldsources; private final List<ReplicationSourceInterface> oldsources;
// Indicates if we are currently replicating
private final AtomicBoolean replicating;
// Helper for zookeeper // Helper for zookeeper
private final ReplicationZookeeper zkHelper; private final ReplicationZookeeper zkHelper;
private final ReplicationQueues replicationQueues; private final ReplicationQueues replicationQueues;
@ -103,16 +101,13 @@ public class ReplicationSourceManager {
* @param conf the configuration to use * @param conf the configuration to use
* @param stopper the stopper object for this region server * @param stopper the stopper object for this region server
* @param fs the file system to use * @param fs the file system to use
* @param replicating the status of the replication on this cluster
* @param logDir the directory that contains all hlog directories of live RSs * @param logDir the directory that contains all hlog directories of live RSs
* @param oldLogDir the directory where old logs are archived * @param oldLogDir the directory where old logs are archived
*/ */
public ReplicationSourceManager(final ReplicationZookeeper zkHelper, public ReplicationSourceManager(final ReplicationZookeeper zkHelper,
final ReplicationQueues replicationQueues, final Configuration conf, final Stoppable stopper, final ReplicationQueues replicationQueues, final Configuration conf, final Stoppable stopper,
final FileSystem fs, final AtomicBoolean replicating, final Path logDir, final FileSystem fs, final Path logDir, final Path oldLogDir) {
final Path oldLogDir) {
this.sources = new ArrayList<ReplicationSourceInterface>(); this.sources = new ArrayList<ReplicationSourceInterface>();
this.replicating = replicating;
this.zkHelper = zkHelper; this.zkHelper = zkHelper;
this.replicationQueues = replicationQueues; this.replicationQueues = replicationQueues;
this.stopper = stopper; this.stopper = stopper;
@ -206,8 +201,7 @@ public class ReplicationSourceManager {
* @throws IOException * @throws IOException
*/ */
public ReplicationSourceInterface addSource(String id) throws IOException { public ReplicationSourceInterface addSource(String id) throws IOException {
ReplicationSourceInterface src = ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this, stopper, id);
getReplicationSource(this.conf, this.fs, this, stopper, replicating, id);
synchronized (this.hlogsById) { synchronized (this.hlogsById) {
this.sources.add(src); this.sources.add(src);
this.hlogsById.put(id, new TreeSet<String>()); this.hlogsById.put(id, new TreeSet<String>());
@ -260,10 +254,6 @@ public class ReplicationSourceManager {
} }
void preLogRoll(Path newLog) throws IOException { void preLogRoll(Path newLog) throws IOException {
if (!this.replicating.get()) {
LOG.warn("Replication stopped, won't add new log");
return;
}
synchronized (this.hlogsById) { synchronized (this.hlogsById) {
String name = newLog.getName(); String name = newLog.getName();
@ -288,14 +278,9 @@ public class ReplicationSourceManager {
} }
void postLogRoll(Path newLog) throws IOException { void postLogRoll(Path newLog) throws IOException {
if (!this.replicating.get()) {
LOG.warn("Replication stopped, won't add new log");
return;
}
// This only updates the sources we own, not the recovered ones // This only updates the sources we own, not the recovered ones
for (ReplicationSourceInterface source : this.sources) { for (ReplicationSourceInterface source : this.sources) {
source.enqueueLog(newLog); source.enqueueLog(newLog);
} }
} }
@ -313,7 +298,6 @@ public class ReplicationSourceManager {
* @param fs the file system to use * @param fs the file system to use
* @param manager the manager to use * @param manager the manager to use
* @param stopper the stopper object for this region server * @param stopper the stopper object for this region server
* @param replicating the status of the replication on this cluster
* @param peerId the id of the peer cluster * @param peerId the id of the peer cluster
* @return the created source * @return the created source
* @throws IOException * @throws IOException
@ -323,7 +307,6 @@ public class ReplicationSourceManager {
final FileSystem fs, final FileSystem fs,
final ReplicationSourceManager manager, final ReplicationSourceManager manager,
final Stoppable stopper, final Stoppable stopper,
final AtomicBoolean replicating,
final String peerId) throws IOException { final String peerId) throws IOException {
ReplicationSourceInterface src; ReplicationSourceInterface src;
try { try {
@ -337,7 +320,7 @@ public class ReplicationSourceManager {
src = new ReplicationSource(); src = new ReplicationSource();
} }
src.init(conf, fs, manager, stopper, replicating, peerId); src.init(conf, fs, manager, stopper, peerId);
return src; return src;
} }
@ -599,8 +582,8 @@ public class ReplicationSourceManager {
for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) { for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
String peerId = entry.getKey(); String peerId = entry.getKey();
try { try {
ReplicationSourceInterface src = getReplicationSource(conf, ReplicationSourceInterface src =
fs, ReplicationSourceManager.this, stopper, replicating, peerId); getReplicationSource(conf, fs, ReplicationSourceManager.this, stopper, peerId);
if (!zkHelper.getPeerClusters().contains((src.getPeerClusterId()))) { if (!zkHelper.getPeerClusters().contains((src.getPeerClusterId()))) {
src.terminate("Recovered queue doesn't belong to any current peer"); src.terminate("Recovered queue doesn't belong to any current peer");
break; break;

View File

@ -132,16 +132,6 @@ Choosing peer 10.10.1.49:62020</pre>
In this case it indicates that 1 region server from the slave cluster In this case it indicates that 1 region server from the slave cluster
was chosen for replication.<br><br> was chosen for replication.<br><br>
Should you want to stop the replication while the clusters are running, open
the shell on the master cluster and issue this command:
<pre>
hbase(main):001:0> stop_replication</pre>
Replication of already queued edits will still happen after you
issued that command but new entries won't be. To start it back, simply replace
"false" with "true" in the command.
<p> <p>
<a name="verify"> <a name="verify">

View File

@ -65,17 +65,5 @@ module Hbase
def disable_peer(id) def disable_peer(id)
@replication_admin.disablePeer(id) @replication_admin.disablePeer(id)
end end
#----------------------------------------------------------------------------------------------
# Restart the replication, in an unknown state
def start_replication
@replication_admin.setReplicating(true)
end
#----------------------------------------------------------------------------------------------
# Kill switch for replication, stops all its features
def stop_replication
@replication_admin.setReplicating(false)
end
end end
end end

View File

@ -297,15 +297,13 @@ Shell.load_command_group(
Shell.load_command_group( Shell.load_command_group(
'replication', 'replication',
:full_name => 'CLUSTER REPLICATION TOOLS', :full_name => 'CLUSTER REPLICATION TOOLS',
:comment => "In order to use these tools, hbase.replication must be true. enabling/disabling is currently unsupported", :comment => "In order to use these tools, hbase.replication must be true.",
:commands => %w[ :commands => %w[
add_peer add_peer
remove_peer remove_peer
list_peers list_peers
enable_peer enable_peer
disable_peer disable_peer
start_replication
stop_replication
] ]
) )

View File

@ -1,42 +0,0 @@
#
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Shell
module Commands
class StartReplication < Command
def help
return <<-EOF
Restarts all the replication features. The state in which each
stream starts in is undetermined.
WARNING:
start/stop replication is only meant to be used in critical load situations.
Examples:
hbase> start_replication
EOF
end
def command
format_simple_command do
replication_admin.start_replication
end
end
end
end
end

View File

@ -1,42 +0,0 @@
#
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Shell
module Commands
class StopReplication < Command
def help
return <<-EOF
Stops all the replication features. The state in which each
stream stops in is undetermined.
WARNING:
start/stop replication is only meant to be used in critical load situations.
Examples:
hbase> stop_replication
EOF
end
def command
format_simple_command do
replication_admin.stop_replication
end
end
end
end
end

View File

@ -68,8 +68,7 @@ public class TestLogsCleaner {
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
Replication.decorateMasterConfiguration(conf); Replication.decorateMasterConfiguration(conf);
Server server = new DummyServer(); Server server = new DummyServer();
ReplicationZookeeper zkHelper = ReplicationZookeeper zkHelper = new ReplicationZookeeper(server);
new ReplicationZookeeper(server, new AtomicBoolean(true));
Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(), Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(),
HConstants.HREGION_OLDLOGDIR_NAME); HConstants.HREGION_OLDLOGDIR_NAME);

View File

@ -38,10 +38,8 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
Path currentPath; Path currentPath;
@Override @Override
public void init(Configuration conf, FileSystem fs, public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationSourceManager manager, Stoppable stopper, Stoppable stopper, String peerClusterId) throws IOException {
AtomicBoolean replicating, String peerClusterId)
throws IOException {
this.manager = manager; this.manager = manager;
this.peerClusterId = peerClusterId; this.peerClusterId = peerClusterId;
} }

View File

@ -111,7 +111,6 @@ public class TestReplicationBase {
zkw2 = new ZooKeeperWatcher(conf2, "cluster2", null, true); zkw2 = new ZooKeeperWatcher(conf2, "cluster2", null, true);
admin.addPeer("2", utility2.getClusterKey()); admin.addPeer("2", utility2.getClusterKey());
setIsReplication(true);
LOG.info("Setup second Zk"); LOG.info("Setup second Zk");
CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1); CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1);
@ -134,12 +133,6 @@ public class TestReplicationBase {
htable2 = new HTable(conf2, tableName); htable2 = new HTable(conf2, tableName);
} }
protected static void setIsReplication(boolean rep) throws Exception {
LOG.info("Set rep " + rep);
admin.setReplicating(rep);
Thread.sleep(SLEEP_TIME);
}
/** /**
* @throws java.lang.Exception * @throws java.lang.Exception
*/ */

View File

@ -255,75 +255,6 @@ public class TestReplicationSmallTests extends TestReplicationBase {
} }
} }
/**
* Test stopping replication, trying to insert, make sure nothing's
* replicated, enable it, try replicating and it should work
* @throws Exception
*/
@Test(timeout=300000)
public void testStartStop() throws Exception {
// Test stopping replication
setIsReplication(false);
Put put = new Put(Bytes.toBytes("stop start"));
put.add(famName, row, row);
htable1.put(put);
Get get = new Get(Bytes.toBytes("stop start"));
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
break;
}
Result res = htable2.get(get);
if(res.size() >= 1) {
fail("Replication wasn't stopped");
} else {
LOG.info("Row not replicated, let's wait a bit more...");
Thread.sleep(SLEEP_TIME);
}
}
// Test restart replication
setIsReplication(true);
htable1.put(put);
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for put replication");
}
Result res = htable2.get(get);
if(res.size() == 0) {
LOG.info("Row not available");
Thread.sleep(SLEEP_TIME);
} else {
assertArrayEquals(res.value(), row);
break;
}
}
put = new Put(Bytes.toBytes("do not rep"));
put.add(noRepfamName, row, row);
htable1.put(put);
get = new Get(Bytes.toBytes("do not rep"));
for (int i = 0; i < NB_RETRIES; i++) {
if (i == NB_RETRIES-1) {
break;
}
Result res = htable2.get(get);
if (res.size() >= 1) {
fail("Not supposed to be replicated");
} else {
LOG.info("Row not replicated, let's wait a bit more...");
Thread.sleep(SLEEP_TIME);
}
}
}
/** /**
* Test disable/enable replication, trying to insert, make sure nothing's * Test disable/enable replication, trying to insert, make sure nothing's
* replicated, enable it, the insert should be replicated * replicated, enable it, the insert should be replicated

View File

@ -69,10 +69,6 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
Configuration testConf = new Configuration(conf); Configuration testConf = new Configuration(conf);
testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode); testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode);
ZooKeeperWatcher zkw1 = new ZooKeeperWatcher(testConf, "test1", null); ZooKeeperWatcher zkw1 = new ZooKeeperWatcher(testConf, "test1", null);
ReplicationStateInterface rsi = new ReplicationStateImpl(zkw1, testConf, zkw1);
rsi.init();
rsi.setState(true);
rsi.close();
String fakeRs = ZKUtil.joinZNode(zkw1.rsZNode, "hostname1.example.org:1234"); String fakeRs = ZKUtil.joinZNode(zkw1.rsZNode, "hostname1.example.org:1234");
ZKUtil.createWithParents(zkw1, fakeRs); ZKUtil.createWithParents(zkw1, fakeRs);
ZKClusterId.setClusterId(zkw1, new ClusterId()); ZKClusterId.setClusterId(zkw1, new ClusterId());

View File

@ -62,7 +62,7 @@ public class TestReplicationZookeeper {
conf = utility.getConfiguration(); conf = utility.getConfiguration();
zkw = HBaseTestingUtility.getZooKeeperWatcher(utility); zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
DummyServer server = new DummyServer(); DummyServer server = new DummyServer();
repZk = new ReplicationZookeeper(server, new AtomicBoolean()); repZk = new ReplicationZookeeper(server);
slaveClusterKey = conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + slaveClusterKey = conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" +
conf.get("hbase.zookeeper.property.clientPort") + ":/1"; conf.get("hbase.zookeeper.property.clientPort") + ":/1";
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");

View File

@ -28,7 +28,6 @@ import java.util.List;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -227,8 +226,7 @@ public class TestReplicationSourceManager {
LOG.debug("testNodeFailoverWorkerCopyQueuesFromRSUsingMulti"); LOG.debug("testNodeFailoverWorkerCopyQueuesFromRSUsingMulti");
conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
final Server server = new DummyServer("hostname0.example.org"); final Server server = new DummyServer("hostname0.example.org");
AtomicBoolean replicating = new AtomicBoolean(true); ReplicationZookeeper rz = new ReplicationZookeeper(server);
ReplicationZookeeper rz = new ReplicationZookeeper(server, replicating);
// populate some znodes in the peer znode // populate some znodes in the peer znode
files.add("log1"); files.add("log1");
files.add("log2"); files.add("log2");
@ -260,8 +258,6 @@ public class TestReplicationSourceManager {
populatedMap += w1.isLogZnodesMapPopulated() + w2.isLogZnodesMapPopulated() populatedMap += w1.isLogZnodesMapPopulated() + w2.isLogZnodesMapPopulated()
+ w3.isLogZnodesMapPopulated(); + w3.isLogZnodesMapPopulated();
assertEquals(1, populatedMap); assertEquals(1, populatedMap);
// close out the resources.
rz.close();
server.abort("", null); server.abort("", null);
} }
@ -270,8 +266,7 @@ public class TestReplicationSourceManager {
LOG.debug("testNodeFailoverDeadServerParsing"); LOG.debug("testNodeFailoverDeadServerParsing");
conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com"); final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
AtomicBoolean replicating = new AtomicBoolean(true); ReplicationZookeeper rz = new ReplicationZookeeper(server);
ReplicationZookeeper rz = new ReplicationZookeeper(server, replicating);
// populate some znodes in the peer znode // populate some znodes in the peer znode
files.add("log1"); files.add("log1");
files.add("log2"); files.add("log2");
@ -285,16 +280,13 @@ public class TestReplicationSourceManager {
Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com"); Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com");
// simulate three servers fail sequentially // simulate three servers fail sequentially
ReplicationZookeeper rz1 = new ReplicationZookeeper(s1, new AtomicBoolean(true)); ReplicationZookeeper rz1 = new ReplicationZookeeper(s1);
SortedMap<String, SortedSet<String>> testMap = SortedMap<String, SortedSet<String>> testMap =
rz1.claimQueues(server.getServerName().getServerName()); rz1.claimQueues(server.getServerName().getServerName());
rz1.close(); ReplicationZookeeper rz2 = new ReplicationZookeeper(s2);
ReplicationZookeeper rz2 = new ReplicationZookeeper(s2, new AtomicBoolean(true));
testMap = rz2.claimQueues(s1.getServerName().getServerName()); testMap = rz2.claimQueues(s1.getServerName().getServerName());
rz2.close(); ReplicationZookeeper rz3 = new ReplicationZookeeper(s3);
ReplicationZookeeper rz3 = new ReplicationZookeeper(s3, new AtomicBoolean(true));
testMap = rz3.claimQueues(s2.getServerName().getServerName()); testMap = rz3.claimQueues(s2.getServerName().getServerName());
rz3.close();
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.firstKey()); ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.firstKey());
List<String> result = replicationQueueInfo.getDeadRegionServers(); List<String> result = replicationQueueInfo.getDeadRegionServers();
@ -303,9 +295,7 @@ public class TestReplicationSourceManager {
assertTrue(result.contains(server.getServerName().getServerName())); assertTrue(result.contains(server.getServerName().getServerName()));
assertTrue(result.contains(s1.getServerName().getServerName())); assertTrue(result.contains(s1.getServerName().getServerName()));
assertTrue(result.contains(s2.getServerName().getServerName())); assertTrue(result.contains(s2.getServerName().getServerName()));
// close out the resources.
rz.close();
server.abort("", null); server.abort("", null);
} }
@ -319,14 +309,13 @@ public class TestReplicationSourceManager {
public DummyNodeFailoverWorker(String znode, Server s) throws Exception { public DummyNodeFailoverWorker(String znode, Server s) throws Exception {
this.deadRsZnode = znode; this.deadRsZnode = znode;
this.server = s; this.server = s;
rz = new ReplicationZookeeper(server, new AtomicBoolean(true)); rz = new ReplicationZookeeper(server);
} }
@Override @Override
public void run() { public void run() {
try { try {
logZnodesMap = rz.claimQueues(deadRsZnode); logZnodesMap = rz.claimQueues(deadRsZnode);
rz.close();
server.abort("Done with testing", null); server.abort("Done with testing", null);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Got exception while running NodeFailoverWorker", e); LOG.error("Got exception while running NodeFailoverWorker", e);