diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index 8a81e8fb387..832c96cd2f4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -52,16 +52,6 @@ import java.util.Map;
* used to keep track of the replication state.
*
*
- * Enabling and disabling peers is currently not supported.
- *
- *
- * As cluster replication is still experimental, a kill switch is provided
- * in order to stop all replication-related operations, see
- * {@link #setReplicating(boolean)}. When setting it back to true, the new
- * state of all the replication streams will be unknown and may have holes.
- * Use at your own risk.
- *
- *
* To see which commands are available in the shell, type
* replication
.
*
@@ -162,36 +152,6 @@ public class ReplicationAdmin implements Closeable {
return this.replicationZk.listPeers();
}
- /**
- * Get the current status of the kill switch, if the cluster is replicating
- * or not.
- * @return true if the cluster is replicated, otherwise false
- */
- public boolean getReplicating() throws IOException {
- try {
- return this.replicationZk.getReplication();
- } catch (KeeperException e) {
- throw new IOException("Couldn't get the replication status");
- }
- }
-
- /**
- * Kill switch for all replication-related features
- * @param newState true to start replication, false to stop it.
- * completely
- * @return the previous state
- */
- public boolean setReplicating(boolean newState) throws IOException {
- boolean prev = true;
- try {
- prev = getReplicating();
- this.replicationZk.setReplication(newState);
- } catch (KeeperException e) {
- throw new IOException("Unable to set the replication state", e);
- }
- return prev;
- }
-
/**
* Get the ZK-support tool created and used by this object for replication.
* @return the ZK-support tool
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java
deleted file mode 100644
index fa880536299..00000000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.KeeperException;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * ReplicationStateImpl is responsible for maintaining the replication state
- * znode.
- */
-public class ReplicationStateImpl extends ReplicationStateZKBase implements
- ReplicationStateInterface {
-
- private final ReplicationStateTracker stateTracker;
- private final AtomicBoolean replicating;
-
- private static final Log LOG = LogFactory.getLog(ReplicationStateImpl.class);
-
- public ReplicationStateImpl(final ZooKeeperWatcher zk, final Configuration conf,
- final Abortable abortable, final AtomicBoolean replicating) {
- super(zk, conf, abortable);
- this.replicating = replicating;
-
- // Set a tracker on replicationStateNode
- this.stateTracker =
- new ReplicationStateTracker(this.zookeeper, this.stateZNode, this.abortable);
- }
-
- public ReplicationStateImpl(final ZooKeeperWatcher zk, final Configuration conf,
- final Abortable abortable) {
- this(zk, conf, abortable, new AtomicBoolean());
- }
-
- @Override
- public void init() throws KeeperException {
- ZKUtil.createWithParents(this.zookeeper, this.stateZNode);
- stateTracker.start();
- readReplicationStateZnode();
- }
-
- @Override
- public boolean getState() throws KeeperException {
- return getReplication();
- }
-
- @Override
- public void setState(boolean newState) throws KeeperException {
- setReplicating(newState);
- }
-
- @Override
- public void close() throws IOException {
- if (stateTracker != null) stateTracker.stop();
- }
-
- /**
- * @param bytes
- * @return True if the passed in bytes
are those of a pb
- * serialized ENABLED state.
- * @throws DeserializationException
- */
- private boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
- ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
- return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
- }
-
- /**
- * @param bytes Content of a state znode.
- * @return State parsed from the passed bytes.
- * @throws DeserializationException
- */
- private ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
- throws DeserializationException {
- ProtobufUtil.expectPBMagicPrefix(bytes);
- int pblen = ProtobufUtil.lengthOfPBMagic();
- ZooKeeperProtos.ReplicationState.Builder builder = ZooKeeperProtos.ReplicationState
- .newBuilder();
- ZooKeeperProtos.ReplicationState state;
- try {
- state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
- return state.getState();
- } catch (InvalidProtocolBufferException e) {
- throw new DeserializationException(e);
- }
- }
-
- /**
- * Set the new replication state for this cluster
- * @param newState
- */
- private void setReplicating(boolean newState) throws KeeperException {
- ZKUtil.createWithParents(this.zookeeper, this.stateZNode);
- byte[] stateBytes = (newState == true) ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES;
- ZKUtil.setData(this.zookeeper, this.stateZNode, stateBytes);
- }
-
- /**
- * Get the replication status of this cluster. If the state znode doesn't
- * exist it will also create it and set it true.
- * @return returns true when it's enabled, else false
- * @throws KeeperException
- */
- private boolean getReplication() throws KeeperException {
- byte[] data = this.stateTracker.getData(false);
- if (data == null || data.length == 0) {
- setReplicating(true);
- return true;
- }
- try {
- return isStateEnabled(data);
- } catch (DeserializationException e) {
- throw ZKUtil.convert(e);
- }
- }
-
- /**
- * This reads the state znode for replication and sets the atomic boolean
- */
- private void readReplicationStateZnode() {
- try {
- this.replicating.set(getReplication());
- LOG.info("Replication is now " + (this.replicating.get() ? "started" : "stopped"));
- } catch (KeeperException e) {
- this.abortable.abort("Failed getting data on from " + this.stateZNode, e);
- }
- }
-
- /**
- * Tracker for status of the replication
- */
- private class ReplicationStateTracker extends ZooKeeperNodeTracker {
- public ReplicationStateTracker(ZooKeeperWatcher watcher, String stateZnode, Abortable abortable) {
- super(watcher, stateZnode, abortable);
- }
-
- @Override
- public synchronized void nodeDataChanged(String path) {
- if (path.equals(node)) {
- super.nodeDataChanged(path);
- readReplicationStateZnode();
- }
- }
- }
-}
\ No newline at end of file
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java
deleted file mode 100644
index 69dd5402a58..00000000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.zookeeper.KeeperException;
-
-import java.io.Closeable;
-
-/**
- * This provides an interface for getting and setting the replication state of a
- * cluster. This state is used to indicate whether replication is enabled or
- * disabled on a cluster.
- */
-@InterfaceAudience.Private
-public interface ReplicationStateInterface extends Closeable {
-
- /**
- * Initialize the replication state interface.
- */
- public void init() throws KeeperException;
-
- /**
- * Get the current state of replication (i.e. ENABLED or DISABLED).
- * @return true if replication is enabled, false otherwise
- * @throws KeeperException
- */
- public boolean getState() throws KeeperException;
-
- /**
- * Set the state of replication.
- * @param newState
- * @throws KeeperException
- */
- public void setState(boolean newState) throws KeeperException;
-}
\ No newline at end of file
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
index 78f5f385b95..f46dbb36bcc 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
@@ -39,8 +39,6 @@ public abstract class ReplicationStateZKBase {
* cluster.
*/
protected final String peerStateNodeName;
- /** The name of the znode that contains the replication status of the local cluster. */
- protected final String stateZNode;
/** The name of the base znode that contains all replication state. */
protected final String replicationZNode;
/** The name of the znode that contains a list of all remote slave (i.e. peer) clusters. */
@@ -68,11 +66,9 @@ public abstract class ReplicationStateZKBase {
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
- String stateZNodeName = conf.get("zookeeper.znode.replication.state", "state");
this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf);
this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
- this.stateZNode = ZKUtil.joinZNode(replicationZNode, stateZNodeName);
this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
this.queuesZNode = ZKUtil.joinZNode(replicationZNode, queuesZNodeName);
}
@@ -90,9 +86,8 @@ public abstract class ReplicationStateZKBase {
/**
* @param state
* @return Serialized protobuf of state
with pb magic prefix prepended suitable for
- * use as content of either the cluster state znode -- whether or not we should be
- * replicating kept in /hbase/replication/state -- or as content of a peer-state znode
- * under a peer cluster id as in /hbase/replication/peers/PEER_ID/peer-state.
+ * use as content of a peer-state znode under a peer cluster id as in
+ * /hbase/replication/peers/PEER_ID/peer-state.
*/
protected static byte[] toByteArray(final ZooKeeperProtos.ReplicationState.State state) {
byte[] bytes =
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
index cad7a245161..9b0b72ca1ea 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
@@ -70,7 +70,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*
*/
@InterfaceAudience.Private
-public class ReplicationZookeeper extends ReplicationStateZKBase implements Closeable {
+public class ReplicationZookeeper extends ReplicationStateZKBase {
private static final Log LOG = LogFactory.getLog(ReplicationZookeeper.class);
// Our handle on zookeeper
@@ -79,7 +79,6 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
private final Configuration conf;
// Abortable
private Abortable abortable;
- private final ReplicationStateInterface replicationState;
private final ReplicationPeers replicationPeers;
private final ReplicationQueues replicationQueues;
@@ -95,8 +94,6 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
this.conf = conf;
this.zookeeper = zk;
setZNodes(abortable);
- this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, abortable);
- this.replicationState.init();
// TODO This interface is no longer used by anyone using this constructor. When this class goes
// away, we will no longer have this null initialization business
this.replicationQueues = null;
@@ -108,19 +105,16 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
* Constructor used by region servers, connects to the peer cluster right away.
*
* @param server
- * @param replicating atomic boolean to start/stop replication
* @throws IOException
* @throws KeeperException
*/
- public ReplicationZookeeper(final Server server, final AtomicBoolean replicating)
+ public ReplicationZookeeper(final Server server)
throws IOException, KeeperException {
super(server.getZooKeeper(), server.getConfiguration(), server);
this.abortable = server;
this.zookeeper = server.getZooKeeper();
this.conf = server.getConfiguration();
setZNodes(server);
- this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, server, replicating);
- this.replicationState.init();
this.replicationQueues = new ReplicationQueuesZKImpl(this.zookeeper, this.conf, server);
this.replicationQueues.init(server.getServerName().toString());
this.replicationPeers = new ReplicationPeersZKImpl(this.zookeeper, this.conf, server);
@@ -227,25 +221,6 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
return this.replicationPeers.getStatusOfConnectedPeer(id);
}
- /**
- * Get the replication status of this cluster. If the state znode doesn't exist it will also
- * create it and set it true.
- * @return returns true when it's enabled, else false
- * @throws KeeperException
- */
- public boolean getReplication() throws KeeperException {
- return this.replicationState.getState();
- }
-
- /**
- * Set the new replication state for this cluster
- * @param newState
- * @throws KeeperException
- */
- public void setReplication(boolean newState) throws KeeperException {
- this.replicationState.setState(newState);
- }
-
/**
* Add a new log to the list of hlogs in zookeeper
* @param filename name of the hlog's znode
@@ -388,9 +363,4 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos
public String getPeersZNode() {
return peersZNode;
}
-
- @Override
- public void close() throws IOException {
- if (replicationState != null) replicationState.close();
- }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index 0475be551d9..43945832991 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -111,7 +111,6 @@ public class VerifyReplication {
@Override
public Void connect(HConnection conn) throws IOException {
ZooKeeperWatcher localZKW = null;
- ReplicationZookeeper zk = null;
ReplicationPeer peer = null;
try {
localZKW = new ZooKeeperWatcher(
@@ -134,9 +133,6 @@ public class VerifyReplication {
if (peer != null) {
peer.close();
}
- if (zk != null) {
- zk.close();
- }
if (localZKW != null) {
localZKW.close();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
index 6739693e9fc..9d404a00077 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
@@ -23,15 +23,12 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
-import org.apache.hadoop.hbase.replication.ReplicationStateImpl;
-import org.apache.hadoop.hbase.replication.ReplicationStateInterface;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
@@ -49,7 +46,6 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class);
private ZooKeeperWatcher zkw;
private ReplicationQueuesClient replicationQueues;
- private ReplicationStateInterface replicationState;
private final Set hlogs = new HashSet();
private boolean stopped = false;
private boolean aborted;
@@ -58,15 +54,6 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
@Override
public boolean isLogDeletable(FileStatus fStat) {
- try {
- if (!replicationState.getState()) {
- return false;
- }
- } catch (KeeperException e) {
- abort("Cannot get the state of replication", e);
- return false;
- }
-
// all members of this class are null if replication is disabled, and we
// return true since false would render the LogsCleaner useless
if (this.getConf() == null) {
@@ -136,8 +123,6 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
try {
this.zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null);
this.replicationQueues = new ReplicationQueuesClientZKImpl(zkw, conf, this);
- this.replicationState = new ReplicationStateImpl(zkw, conf, this);
- this.replicationState.init();
} catch (KeeperException e) {
LOG.error("Error while configuring " + this.getClass().getName(), e);
} catch (IOException e) {
@@ -155,14 +140,6 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
LOG.info("Stopping " + this.zkw);
this.zkw.close();
}
- if (this.replicationState != null) {
- LOG.info("Stopping " + this.replicationState);
- try {
- this.replicationState.close();
- } catch (IOException e) {
- LOG.error("Error while stopping " + this.replicationState, e);
- }
- }
// Not sure why we're deleting a connection that we never acquired or used
HConnectionManager.deleteConnection(this.getConf());
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 387f44dd0df..d119cbe3aad 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -66,7 +66,6 @@ public class Replication implements WALActionsListener,
LogFactory.getLog(Replication.class);
private boolean replication;
private ReplicationSourceManager replicationManager;
- private final AtomicBoolean replicating = new AtomicBoolean(true);
private ReplicationZookeeper zkHelper;
private ReplicationQueues replicationQueues;
private Configuration conf;
@@ -108,17 +107,16 @@ public class Replication implements WALActionsListener,
.build());
if (replication) {
try {
- this.zkHelper = new ReplicationZookeeper(server, this.replicating);
+ this.zkHelper = new ReplicationZookeeper(server);
this.replicationQueues =
new ReplicationQueuesZKImpl(server.getZooKeeper(), this.conf, this.server);
this.replicationQueues.init(this.server.getServerName().toString());
} catch (KeeperException ke) {
- throw new IOException("Failed replication handler create " +
- "(replicating=" + this.replicating, ke);
+ throw new IOException("Failed replication handler create", ke);
}
this.replicationManager =
- new ReplicationSourceManager(zkHelper, replicationQueues, conf, this.server, fs,
- this.replicating, logDir, oldLogDir);
+ new ReplicationSourceManager(zkHelper, replicationQueues, conf, this.server, fs, logDir,
+ oldLogDir);
this.statsThreadPeriod =
this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index 6f149cf2fe0..6f74ec572be 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -112,9 +112,8 @@ public class ReplicationSink {
}
/**
- * Replicate this array of entries directly into the local cluster using the native client.
- * Only operates against raw protobuf type saving on a convertion from pb to pojo.
- *
+ * Replicate this array of entries directly into the local cluster using the native client. Only
+ * operates against raw protobuf type saving on a conversion from pb to pojo.
* @param entries
* @param cells
* @throws IOException
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index bf9266b5f65..c86549027b9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -93,8 +93,6 @@ public class ReplicationSource extends Thread
// ratio of region servers to chose from a slave cluster
private float ratio;
private Random random;
- // should we replicate or not?
- private AtomicBoolean replicating;
private ReplicationQueueInfo replicationQueueInfo;
// id of the peer cluster this source replicates to
private String peerId;
@@ -149,7 +147,6 @@ public class ReplicationSource extends Thread
* @param fs file system to use
* @param manager replication manager to ping to
* @param stopper the atomic boolean to use to stop the regionserver
- * @param replicating the atomic boolean that starts/stops replication
* @param peerClusterZnode the name of our znode
* @throws IOException
*/
@@ -157,7 +154,6 @@ public class ReplicationSource extends Thread
final FileSystem fs,
final ReplicationSourceManager manager,
final Stoppable stopper,
- final AtomicBoolean replicating,
final String peerClusterZnode)
throws IOException {
this.stopper = stopper;
@@ -185,7 +181,6 @@ public class ReplicationSource extends Thread
this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f);
this.currentPeers = new ArrayList();
this.random = new Random();
- this.replicating = replicating;
this.manager = manager;
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000);
@@ -417,9 +412,8 @@ public class ReplicationSource extends Thread
removeNonReplicableEdits(entry);
// Don't replicate catalog entries, if the WALEdit wasn't
// containing anything to replicate and if we're currently not set to replicate
- if (!(Bytes.equals(logKey.getTablename(), HConstants.ROOT_TABLE_NAME) ||
- Bytes.equals(logKey.getTablename(), HConstants.META_TABLE_NAME)) &&
- edit.size() != 0 && replicating.get()) {
+ if (!(Bytes.equals(logKey.getTablename(), HConstants.ROOT_TABLE_NAME) || Bytes.equals(
+ logKey.getTablename(), HConstants.META_TABLE_NAME)) && edit.size() != 0) {
// Only set the clusterId if is a local key.
// This ensures that the originator sets the cluster id
// and all replicas retain the initial cluster id.
@@ -714,8 +708,7 @@ public class ReplicationSource extends Thread
* @return true if the peer is enabled, otherwise false
*/
protected boolean isPeerEnabled() {
- return this.replicating.get() &&
- this.zkHelper.getPeerEnabled(this.peerId);
+ return this.zkHelper.getPeerEnabled(this.peerId);
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index ff011b6acfc..05a586e5594 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@@ -39,7 +38,6 @@ public interface ReplicationSourceInterface {
* @param fs the file system to use
* @param manager the manager to use
* @param stopper the stopper object for this region server
- * @param replicating the status of the replication on this cluster
* @param peerClusterId the id of the peer cluster
* @throws IOException
*/
@@ -47,7 +45,6 @@ public interface ReplicationSourceInterface {
final FileSystem fs,
final ReplicationSourceManager manager,
final Stoppable stopper,
- final AtomicBoolean replicating,
final String peerClusterId) throws IOException;
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 8f9ad6583fd..ba705520b40 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -69,8 +69,6 @@ public class ReplicationSourceManager {
private final List sources;
// List of all the sources we got from died RSs
private final List oldsources;
- // Indicates if we are currently replicating
- private final AtomicBoolean replicating;
// Helper for zookeeper
private final ReplicationZookeeper zkHelper;
private final ReplicationQueues replicationQueues;
@@ -103,16 +101,13 @@ public class ReplicationSourceManager {
* @param conf the configuration to use
* @param stopper the stopper object for this region server
* @param fs the file system to use
- * @param replicating the status of the replication on this cluster
* @param logDir the directory that contains all hlog directories of live RSs
* @param oldLogDir the directory where old logs are archived
*/
public ReplicationSourceManager(final ReplicationZookeeper zkHelper,
final ReplicationQueues replicationQueues, final Configuration conf, final Stoppable stopper,
- final FileSystem fs, final AtomicBoolean replicating, final Path logDir,
- final Path oldLogDir) {
+ final FileSystem fs, final Path logDir, final Path oldLogDir) {
this.sources = new ArrayList();
- this.replicating = replicating;
this.zkHelper = zkHelper;
this.replicationQueues = replicationQueues;
this.stopper = stopper;
@@ -206,8 +201,7 @@ public class ReplicationSourceManager {
* @throws IOException
*/
public ReplicationSourceInterface addSource(String id) throws IOException {
- ReplicationSourceInterface src =
- getReplicationSource(this.conf, this.fs, this, stopper, replicating, id);
+ ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this, stopper, id);
synchronized (this.hlogsById) {
this.sources.add(src);
this.hlogsById.put(id, new TreeSet());
@@ -260,10 +254,6 @@ public class ReplicationSourceManager {
}
void preLogRoll(Path newLog) throws IOException {
- if (!this.replicating.get()) {
- LOG.warn("Replication stopped, won't add new log");
- return;
- }
synchronized (this.hlogsById) {
String name = newLog.getName();
@@ -288,14 +278,9 @@ public class ReplicationSourceManager {
}
void postLogRoll(Path newLog) throws IOException {
- if (!this.replicating.get()) {
- LOG.warn("Replication stopped, won't add new log");
- return;
- }
-
// This only updates the sources we own, not the recovered ones
for (ReplicationSourceInterface source : this.sources) {
- source.enqueueLog(newLog);
+ source.enqueueLog(newLog);
}
}
@@ -313,7 +298,6 @@ public class ReplicationSourceManager {
* @param fs the file system to use
* @param manager the manager to use
* @param stopper the stopper object for this region server
- * @param replicating the status of the replication on this cluster
* @param peerId the id of the peer cluster
* @return the created source
* @throws IOException
@@ -323,7 +307,6 @@ public class ReplicationSourceManager {
final FileSystem fs,
final ReplicationSourceManager manager,
final Stoppable stopper,
- final AtomicBoolean replicating,
final String peerId) throws IOException {
ReplicationSourceInterface src;
try {
@@ -337,7 +320,7 @@ public class ReplicationSourceManager {
src = new ReplicationSource();
}
- src.init(conf, fs, manager, stopper, replicating, peerId);
+ src.init(conf, fs, manager, stopper, peerId);
return src;
}
@@ -599,8 +582,8 @@ public class ReplicationSourceManager {
for (Map.Entry> entry : newQueues.entrySet()) {
String peerId = entry.getKey();
try {
- ReplicationSourceInterface src = getReplicationSource(conf,
- fs, ReplicationSourceManager.this, stopper, replicating, peerId);
+ ReplicationSourceInterface src =
+ getReplicationSource(conf, fs, ReplicationSourceManager.this, stopper, peerId);
if (!zkHelper.getPeerClusters().contains((src.getPeerClusterId()))) {
src.terminate("Recovered queue doesn't belong to any current peer");
break;
diff --git a/hbase-server/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html b/hbase-server/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html
index b97a1a6629e..0029c75716b 100644
--- a/hbase-server/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html
+++ b/hbase-server/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html
@@ -132,16 +132,6 @@ Choosing peer 10.10.1.49:62020
In this case it indicates that 1 region server from the slave cluster
was chosen for replication.
-
-Should you want to stop the replication while the clusters are running, open
-the shell on the master cluster and issue this command:
-
-hbase(main):001:0> stop_replication
-
-Replication of already queued edits will still happen after you
-issued that command but new entries won't be. To start it back, simply replace
-"false" with "true" in the command.
-
diff --git a/hbase-server/src/main/ruby/hbase/replication_admin.rb b/hbase-server/src/main/ruby/hbase/replication_admin.rb
index c0fd5b9909c..27d141acc47 100644
--- a/hbase-server/src/main/ruby/hbase/replication_admin.rb
+++ b/hbase-server/src/main/ruby/hbase/replication_admin.rb
@@ -65,17 +65,5 @@ module Hbase
def disable_peer(id)
@replication_admin.disablePeer(id)
end
-
- #----------------------------------------------------------------------------------------------
- # Restart the replication, in an unknown state
- def start_replication
- @replication_admin.setReplicating(true)
- end
-
- #----------------------------------------------------------------------------------------------
- # Kill switch for replication, stops all its features
- def stop_replication
- @replication_admin.setReplicating(false)
- end
end
end
diff --git a/hbase-server/src/main/ruby/shell.rb b/hbase-server/src/main/ruby/shell.rb
index 1ba9e027b24..b1d5cc0d7dd 100644
--- a/hbase-server/src/main/ruby/shell.rb
+++ b/hbase-server/src/main/ruby/shell.rb
@@ -297,15 +297,13 @@ Shell.load_command_group(
Shell.load_command_group(
'replication',
:full_name => 'CLUSTER REPLICATION TOOLS',
- :comment => "In order to use these tools, hbase.replication must be true. enabling/disabling is currently unsupported",
+ :comment => "In order to use these tools, hbase.replication must be true.",
:commands => %w[
add_peer
remove_peer
list_peers
enable_peer
disable_peer
- start_replication
- stop_replication
]
)
diff --git a/hbase-server/src/main/ruby/shell/commands/start_replication.rb b/hbase-server/src/main/ruby/shell/commands/start_replication.rb
deleted file mode 100644
index 3ea97a90c99..00000000000
--- a/hbase-server/src/main/ruby/shell/commands/start_replication.rb
+++ /dev/null
@@ -1,42 +0,0 @@
-#
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-module Shell
- module Commands
- class StartReplication < Command
- def help
- return <<-EOF
-Restarts all the replication features. The state in which each
-stream starts in is undetermined.
-WARNING:
-start/stop replication is only meant to be used in critical load situations.
-Examples:
-
- hbase> start_replication
-EOF
- end
-
- def command
- format_simple_command do
- replication_admin.start_replication
- end
- end
- end
- end
-end
diff --git a/hbase-server/src/main/ruby/shell/commands/stop_replication.rb b/hbase-server/src/main/ruby/shell/commands/stop_replication.rb
deleted file mode 100644
index 2e24fa5cfe6..00000000000
--- a/hbase-server/src/main/ruby/shell/commands/stop_replication.rb
+++ /dev/null
@@ -1,42 +0,0 @@
-#
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-module Shell
- module Commands
- class StopReplication < Command
- def help
- return <<-EOF
-Stops all the replication features. The state in which each
-stream stops in is undetermined.
-WARNING:
-start/stop replication is only meant to be used in critical load situations.
-Examples:
-
- hbase> stop_replication
-EOF
- end
-
- def command
- format_simple_command do
- replication_admin.stop_replication
- end
- end
- end
- end
-end
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
index 5205d129233..06843df555e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
@@ -68,8 +68,7 @@ public class TestLogsCleaner {
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
Replication.decorateMasterConfiguration(conf);
Server server = new DummyServer();
- ReplicationZookeeper zkHelper =
- new ReplicationZookeeper(server, new AtomicBoolean(true));
+ ReplicationZookeeper zkHelper = new ReplicationZookeeper(server);
Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(),
HConstants.HREGION_OLDLOGDIR_NAME);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index f06e947b110..743a1762a34 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -38,10 +38,8 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
Path currentPath;
@Override
- public void init(Configuration conf, FileSystem fs,
- ReplicationSourceManager manager, Stoppable stopper,
- AtomicBoolean replicating, String peerClusterId)
- throws IOException {
+ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
+ Stoppable stopper, String peerClusterId) throws IOException {
this.manager = manager;
this.peerClusterId = peerClusterId;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index f77d6121fe5..b4b90cf0f43 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -111,7 +111,6 @@ public class TestReplicationBase {
zkw2 = new ZooKeeperWatcher(conf2, "cluster2", null, true);
admin.addPeer("2", utility2.getClusterKey());
- setIsReplication(true);
LOG.info("Setup second Zk");
CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1);
@@ -134,12 +133,6 @@ public class TestReplicationBase {
htable2 = new HTable(conf2, tableName);
}
- protected static void setIsReplication(boolean rep) throws Exception {
- LOG.info("Set rep " + rep);
- admin.setReplicating(rep);
- Thread.sleep(SLEEP_TIME);
- }
-
/**
* @throws java.lang.Exception
*/
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
index ddbd56d79f6..538c2df689e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -255,75 +255,6 @@ public class TestReplicationSmallTests extends TestReplicationBase {
}
}
- /**
- * Test stopping replication, trying to insert, make sure nothing's
- * replicated, enable it, try replicating and it should work
- * @throws Exception
- */
- @Test(timeout=300000)
- public void testStartStop() throws Exception {
-
- // Test stopping replication
- setIsReplication(false);
-
- Put put = new Put(Bytes.toBytes("stop start"));
- put.add(famName, row, row);
- htable1.put(put);
-
- Get get = new Get(Bytes.toBytes("stop start"));
- for (int i = 0; i < NB_RETRIES; i++) {
- if (i==NB_RETRIES-1) {
- break;
- }
- Result res = htable2.get(get);
- if(res.size() >= 1) {
- fail("Replication wasn't stopped");
-
- } else {
- LOG.info("Row not replicated, let's wait a bit more...");
- Thread.sleep(SLEEP_TIME);
- }
- }
-
- // Test restart replication
- setIsReplication(true);
-
- htable1.put(put);
-
- for (int i = 0; i < NB_RETRIES; i++) {
- if (i==NB_RETRIES-1) {
- fail("Waited too much time for put replication");
- }
- Result res = htable2.get(get);
- if(res.size() == 0) {
- LOG.info("Row not available");
- Thread.sleep(SLEEP_TIME);
- } else {
- assertArrayEquals(res.value(), row);
- break;
- }
- }
-
- put = new Put(Bytes.toBytes("do not rep"));
- put.add(noRepfamName, row, row);
- htable1.put(put);
-
- get = new Get(Bytes.toBytes("do not rep"));
- for (int i = 0; i < NB_RETRIES; i++) {
- if (i == NB_RETRIES-1) {
- break;
- }
- Result res = htable2.get(get);
- if (res.size() >= 1) {
- fail("Not supposed to be replicated");
- } else {
- LOG.info("Row not replicated, let's wait a bit more...");
- Thread.sleep(SLEEP_TIME);
- }
- }
-
- }
-
/**
* Test disable/enable replication, trying to insert, make sure nothing's
* replicated, enable it, the insert should be replicated
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
index 137cf40e0cd..077a504de9c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
@@ -69,10 +69,6 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
Configuration testConf = new Configuration(conf);
testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode);
ZooKeeperWatcher zkw1 = new ZooKeeperWatcher(testConf, "test1", null);
- ReplicationStateInterface rsi = new ReplicationStateImpl(zkw1, testConf, zkw1);
- rsi.init();
- rsi.setState(true);
- rsi.close();
String fakeRs = ZKUtil.joinZNode(zkw1.rsZNode, "hostname1.example.org:1234");
ZKUtil.createWithParents(zkw1, fakeRs);
ZKClusterId.setClusterId(zkw1, new ClusterId());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationZookeeper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationZookeeper.java
index c61cf570cba..de2515c969e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationZookeeper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationZookeeper.java
@@ -62,7 +62,7 @@ public class TestReplicationZookeeper {
conf = utility.getConfiguration();
zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
DummyServer server = new DummyServer();
- repZk = new ReplicationZookeeper(server, new AtomicBoolean());
+ repZk = new ReplicationZookeeper(server);
slaveClusterKey = conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" +
conf.get("hbase.zookeeper.property.clientPort") + ":/1";
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 2d334e69620..1189162b96b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -28,7 +28,6 @@ import java.util.List;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -227,8 +226,7 @@ public class TestReplicationSourceManager {
LOG.debug("testNodeFailoverWorkerCopyQueuesFromRSUsingMulti");
conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
final Server server = new DummyServer("hostname0.example.org");
- AtomicBoolean replicating = new AtomicBoolean(true);
- ReplicationZookeeper rz = new ReplicationZookeeper(server, replicating);
+ ReplicationZookeeper rz = new ReplicationZookeeper(server);
// populate some znodes in the peer znode
files.add("log1");
files.add("log2");
@@ -260,8 +258,6 @@ public class TestReplicationSourceManager {
populatedMap += w1.isLogZnodesMapPopulated() + w2.isLogZnodesMapPopulated()
+ w3.isLogZnodesMapPopulated();
assertEquals(1, populatedMap);
- // close out the resources.
- rz.close();
server.abort("", null);
}
@@ -270,8 +266,7 @@ public class TestReplicationSourceManager {
LOG.debug("testNodeFailoverDeadServerParsing");
conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
- AtomicBoolean replicating = new AtomicBoolean(true);
- ReplicationZookeeper rz = new ReplicationZookeeper(server, replicating);
+ ReplicationZookeeper rz = new ReplicationZookeeper(server);
// populate some znodes in the peer znode
files.add("log1");
files.add("log2");
@@ -285,16 +280,13 @@ public class TestReplicationSourceManager {
Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com");
// simulate three servers fail sequentially
- ReplicationZookeeper rz1 = new ReplicationZookeeper(s1, new AtomicBoolean(true));
+ ReplicationZookeeper rz1 = new ReplicationZookeeper(s1);
SortedMap> testMap =
rz1.claimQueues(server.getServerName().getServerName());
- rz1.close();
- ReplicationZookeeper rz2 = new ReplicationZookeeper(s2, new AtomicBoolean(true));
+ ReplicationZookeeper rz2 = new ReplicationZookeeper(s2);
testMap = rz2.claimQueues(s1.getServerName().getServerName());
- rz2.close();
- ReplicationZookeeper rz3 = new ReplicationZookeeper(s3, new AtomicBoolean(true));
+ ReplicationZookeeper rz3 = new ReplicationZookeeper(s3);
testMap = rz3.claimQueues(s2.getServerName().getServerName());
- rz3.close();
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.firstKey());
List result = replicationQueueInfo.getDeadRegionServers();
@@ -303,9 +295,7 @@ public class TestReplicationSourceManager {
assertTrue(result.contains(server.getServerName().getServerName()));
assertTrue(result.contains(s1.getServerName().getServerName()));
assertTrue(result.contains(s2.getServerName().getServerName()));
-
- // close out the resources.
- rz.close();
+
server.abort("", null);
}
@@ -319,14 +309,13 @@ public class TestReplicationSourceManager {
public DummyNodeFailoverWorker(String znode, Server s) throws Exception {
this.deadRsZnode = znode;
this.server = s;
- rz = new ReplicationZookeeper(server, new AtomicBoolean(true));
+ rz = new ReplicationZookeeper(server);
}
@Override
public void run() {
try {
logZnodesMap = rz.claimQueues(deadRsZnode);
- rz.close();
server.abort("Done with testing", null);
} catch (Exception e) {
LOG.error("Got exception while running NodeFailoverWorker", e);