From ed997ccdebbdd971ca983a095402b94b6830234f Mon Sep 17 00:00:00 2001 From: Jean-Daniel Cryans Date: Tue, 9 Jul 2013 19:59:39 +0000 Subject: [PATCH] HBASE-8861 Remove ReplicationState completely (Chris Trezzo via JD) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1501503 13f79535-47bb-0310-9956-ffa450edef68 --- .../client/replication/ReplicationAdmin.java | 40 ---- .../replication/ReplicationStateImpl.java | 174 ------------------ .../ReplicationStateInterface.java | 52 ------ .../replication/ReplicationStateZKBase.java | 9 +- .../replication/ReplicationZookeeper.java | 34 +--- .../replication/VerifyReplication.java | 4 - .../master/ReplicationLogCleaner.java | 23 --- .../replication/regionserver/Replication.java | 10 +- .../regionserver/ReplicationSink.java | 5 +- .../regionserver/ReplicationSource.java | 13 +- .../ReplicationSourceInterface.java | 3 - .../ReplicationSourceManager.java | 29 +-- .../hadoop/hbase/replication/package.html | 10 - .../src/main/ruby/hbase/replication_admin.rb | 12 -- hbase-server/src/main/ruby/shell.rb | 4 +- .../ruby/shell/commands/start_replication.rb | 42 ----- .../ruby/shell/commands/stop_replication.rb | 42 ----- .../hbase/master/cleaner/TestLogsCleaner.java | 3 +- .../replication/ReplicationSourceDummy.java | 6 +- .../replication/TestReplicationBase.java | 7 - .../TestReplicationSmallTests.java | 69 ------- .../TestReplicationStateZKImpl.java | 4 - .../replication/TestReplicationZookeeper.java | 2 +- .../TestReplicationSourceManager.java | 25 +-- 24 files changed, 31 insertions(+), 591 deletions(-) delete mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java delete mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java delete mode 100644 hbase-server/src/main/ruby/shell/commands/start_replication.rb delete mode 100644 hbase-server/src/main/ruby/shell/commands/stop_replication.rb diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index 8a81e8fb387..832c96cd2f4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -52,16 +52,6 @@ import java.util.Map; * used to keep track of the replication state. *

*

- * Enabling and disabling peers is currently not supported. - *

- *

- * As cluster replication is still experimental, a kill switch is provided - * in order to stop all replication-related operations, see - * {@link #setReplicating(boolean)}. When setting it back to true, the new - * state of all the replication streams will be unknown and may have holes. - * Use at your own risk. - *

- *

* To see which commands are available in the shell, type * replication. *

@@ -162,36 +152,6 @@ public class ReplicationAdmin implements Closeable { return this.replicationZk.listPeers(); } - /** - * Get the current status of the kill switch, if the cluster is replicating - * or not. - * @return true if the cluster is replicated, otherwise false - */ - public boolean getReplicating() throws IOException { - try { - return this.replicationZk.getReplication(); - } catch (KeeperException e) { - throw new IOException("Couldn't get the replication status"); - } - } - - /** - * Kill switch for all replication-related features - * @param newState true to start replication, false to stop it. - * completely - * @return the previous state - */ - public boolean setReplicating(boolean newState) throws IOException { - boolean prev = true; - try { - prev = getReplicating(); - this.replicationZk.setReplication(newState); - } catch (KeeperException e) { - throw new IOException("Unable to set the replication state", e); - } - return prev; - } - /** * Get the ZK-support tool created and used by this object for replication. * @return the ZK-support tool diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java deleted file mode 100644 index fa880536299..00000000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import com.google.protobuf.InvalidProtocolBufferException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.KeeperException; - -import java.io.IOException; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * ReplicationStateImpl is responsible for maintaining the replication state - * znode. - */ -public class ReplicationStateImpl extends ReplicationStateZKBase implements - ReplicationStateInterface { - - private final ReplicationStateTracker stateTracker; - private final AtomicBoolean replicating; - - private static final Log LOG = LogFactory.getLog(ReplicationStateImpl.class); - - public ReplicationStateImpl(final ZooKeeperWatcher zk, final Configuration conf, - final Abortable abortable, final AtomicBoolean replicating) { - super(zk, conf, abortable); - this.replicating = replicating; - - // Set a tracker on replicationStateNode - this.stateTracker = - new ReplicationStateTracker(this.zookeeper, this.stateZNode, this.abortable); - } - - public ReplicationStateImpl(final ZooKeeperWatcher zk, final Configuration conf, - final Abortable abortable) { - this(zk, conf, abortable, new AtomicBoolean()); - } - - @Override - public void init() throws KeeperException { - ZKUtil.createWithParents(this.zookeeper, this.stateZNode); - stateTracker.start(); - readReplicationStateZnode(); - } - - @Override - public boolean getState() throws KeeperException { - return getReplication(); - } - - @Override - public void setState(boolean newState) throws KeeperException { - setReplicating(newState); - } - - @Override - public void close() throws IOException { - if (stateTracker != null) stateTracker.stop(); - } - - /** - * @param bytes - * @return True if the passed in bytes are those of a pb - * serialized ENABLED state. - * @throws DeserializationException - */ - private boolean isStateEnabled(final byte[] bytes) throws DeserializationException { - ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes); - return ZooKeeperProtos.ReplicationState.State.ENABLED == state; - } - - /** - * @param bytes Content of a state znode. - * @return State parsed from the passed bytes. - * @throws DeserializationException - */ - private ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes) - throws DeserializationException { - ProtobufUtil.expectPBMagicPrefix(bytes); - int pblen = ProtobufUtil.lengthOfPBMagic(); - ZooKeeperProtos.ReplicationState.Builder builder = ZooKeeperProtos.ReplicationState - .newBuilder(); - ZooKeeperProtos.ReplicationState state; - try { - state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build(); - return state.getState(); - } catch (InvalidProtocolBufferException e) { - throw new DeserializationException(e); - } - } - - /** - * Set the new replication state for this cluster - * @param newState - */ - private void setReplicating(boolean newState) throws KeeperException { - ZKUtil.createWithParents(this.zookeeper, this.stateZNode); - byte[] stateBytes = (newState == true) ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES; - ZKUtil.setData(this.zookeeper, this.stateZNode, stateBytes); - } - - /** - * Get the replication status of this cluster. If the state znode doesn't - * exist it will also create it and set it true. - * @return returns true when it's enabled, else false - * @throws KeeperException - */ - private boolean getReplication() throws KeeperException { - byte[] data = this.stateTracker.getData(false); - if (data == null || data.length == 0) { - setReplicating(true); - return true; - } - try { - return isStateEnabled(data); - } catch (DeserializationException e) { - throw ZKUtil.convert(e); - } - } - - /** - * This reads the state znode for replication and sets the atomic boolean - */ - private void readReplicationStateZnode() { - try { - this.replicating.set(getReplication()); - LOG.info("Replication is now " + (this.replicating.get() ? "started" : "stopped")); - } catch (KeeperException e) { - this.abortable.abort("Failed getting data on from " + this.stateZNode, e); - } - } - - /** - * Tracker for status of the replication - */ - private class ReplicationStateTracker extends ZooKeeperNodeTracker { - public ReplicationStateTracker(ZooKeeperWatcher watcher, String stateZnode, Abortable abortable) { - super(watcher, stateZnode, abortable); - } - - @Override - public synchronized void nodeDataChanged(String path) { - if (path.equals(node)) { - super.nodeDataChanged(path); - readReplicationStateZnode(); - } - } - } -} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java deleted file mode 100644 index 69dd5402a58..00000000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.zookeeper.KeeperException; - -import java.io.Closeable; - -/** - * This provides an interface for getting and setting the replication state of a - * cluster. This state is used to indicate whether replication is enabled or - * disabled on a cluster. - */ -@InterfaceAudience.Private -public interface ReplicationStateInterface extends Closeable { - - /** - * Initialize the replication state interface. - */ - public void init() throws KeeperException; - - /** - * Get the current state of replication (i.e. ENABLED or DISABLED). - * @return true if replication is enabled, false otherwise - * @throws KeeperException - */ - public boolean getState() throws KeeperException; - - /** - * Set the state of replication. - * @param newState - * @throws KeeperException - */ - public void setState(boolean newState) throws KeeperException; -} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java index 78f5f385b95..f46dbb36bcc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java @@ -39,8 +39,6 @@ public abstract class ReplicationStateZKBase { * cluster. */ protected final String peerStateNodeName; - /** The name of the znode that contains the replication status of the local cluster. */ - protected final String stateZNode; /** The name of the base znode that contains all replication state. */ protected final String replicationZNode; /** The name of the znode that contains a list of all remote slave (i.e. peer) clusters. */ @@ -68,11 +66,9 @@ public abstract class ReplicationStateZKBase { String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers"); String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs"); - String stateZNodeName = conf.get("zookeeper.znode.replication.state", "state"); this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state"); this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf); this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName); - this.stateZNode = ZKUtil.joinZNode(replicationZNode, stateZNodeName); this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName); this.queuesZNode = ZKUtil.joinZNode(replicationZNode, queuesZNodeName); } @@ -90,9 +86,8 @@ public abstract class ReplicationStateZKBase { /** * @param state * @return Serialized protobuf of state with pb magic prefix prepended suitable for - * use as content of either the cluster state znode -- whether or not we should be - * replicating kept in /hbase/replication/state -- or as content of a peer-state znode - * under a peer cluster id as in /hbase/replication/peers/PEER_ID/peer-state. + * use as content of a peer-state znode under a peer cluster id as in + * /hbase/replication/peers/PEER_ID/peer-state. */ protected static byte[] toByteArray(final ZooKeeperProtos.ReplicationState.State state) { byte[] bytes = diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java index cad7a245161..9b0b72ca1ea 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java @@ -70,7 +70,7 @@ import java.util.concurrent.atomic.AtomicBoolean; * */ @InterfaceAudience.Private -public class ReplicationZookeeper extends ReplicationStateZKBase implements Closeable { +public class ReplicationZookeeper extends ReplicationStateZKBase { private static final Log LOG = LogFactory.getLog(ReplicationZookeeper.class); // Our handle on zookeeper @@ -79,7 +79,6 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos private final Configuration conf; // Abortable private Abortable abortable; - private final ReplicationStateInterface replicationState; private final ReplicationPeers replicationPeers; private final ReplicationQueues replicationQueues; @@ -95,8 +94,6 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos this.conf = conf; this.zookeeper = zk; setZNodes(abortable); - this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, abortable); - this.replicationState.init(); // TODO This interface is no longer used by anyone using this constructor. When this class goes // away, we will no longer have this null initialization business this.replicationQueues = null; @@ -108,19 +105,16 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos * Constructor used by region servers, connects to the peer cluster right away. * * @param server - * @param replicating atomic boolean to start/stop replication * @throws IOException * @throws KeeperException */ - public ReplicationZookeeper(final Server server, final AtomicBoolean replicating) + public ReplicationZookeeper(final Server server) throws IOException, KeeperException { super(server.getZooKeeper(), server.getConfiguration(), server); this.abortable = server; this.zookeeper = server.getZooKeeper(); this.conf = server.getConfiguration(); setZNodes(server); - this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, server, replicating); - this.replicationState.init(); this.replicationQueues = new ReplicationQueuesZKImpl(this.zookeeper, this.conf, server); this.replicationQueues.init(server.getServerName().toString()); this.replicationPeers = new ReplicationPeersZKImpl(this.zookeeper, this.conf, server); @@ -227,25 +221,6 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos return this.replicationPeers.getStatusOfConnectedPeer(id); } - /** - * Get the replication status of this cluster. If the state znode doesn't exist it will also - * create it and set it true. - * @return returns true when it's enabled, else false - * @throws KeeperException - */ - public boolean getReplication() throws KeeperException { - return this.replicationState.getState(); - } - - /** - * Set the new replication state for this cluster - * @param newState - * @throws KeeperException - */ - public void setReplication(boolean newState) throws KeeperException { - this.replicationState.setState(newState); - } - /** * Add a new log to the list of hlogs in zookeeper * @param filename name of the hlog's znode @@ -388,9 +363,4 @@ public class ReplicationZookeeper extends ReplicationStateZKBase implements Clos public String getPeersZNode() { return peersZNode; } - - @Override - public void close() throws IOException { - if (replicationState != null) replicationState.close(); - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index 0475be551d9..43945832991 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -111,7 +111,6 @@ public class VerifyReplication { @Override public Void connect(HConnection conn) throws IOException { ZooKeeperWatcher localZKW = null; - ReplicationZookeeper zk = null; ReplicationPeer peer = null; try { localZKW = new ZooKeeperWatcher( @@ -134,9 +133,6 @@ public class VerifyReplication { if (peer != null) { peer.close(); } - if (zk != null) { - zk.close(); - } if (localZKW != null) { localZKW.close(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java index 6739693e9fc..9d404a00077 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java @@ -23,15 +23,12 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl; -import org.apache.hadoop.hbase.replication.ReplicationStateImpl; -import org.apache.hadoop.hbase.replication.ReplicationStateInterface; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -49,7 +46,6 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class); private ZooKeeperWatcher zkw; private ReplicationQueuesClient replicationQueues; - private ReplicationStateInterface replicationState; private final Set hlogs = new HashSet(); private boolean stopped = false; private boolean aborted; @@ -58,15 +54,6 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo @Override public boolean isLogDeletable(FileStatus fStat) { - try { - if (!replicationState.getState()) { - return false; - } - } catch (KeeperException e) { - abort("Cannot get the state of replication", e); - return false; - } - // all members of this class are null if replication is disabled, and we // return true since false would render the LogsCleaner useless if (this.getConf() == null) { @@ -136,8 +123,6 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo try { this.zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null); this.replicationQueues = new ReplicationQueuesClientZKImpl(zkw, conf, this); - this.replicationState = new ReplicationStateImpl(zkw, conf, this); - this.replicationState.init(); } catch (KeeperException e) { LOG.error("Error while configuring " + this.getClass().getName(), e); } catch (IOException e) { @@ -155,14 +140,6 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo LOG.info("Stopping " + this.zkw); this.zkw.close(); } - if (this.replicationState != null) { - LOG.info("Stopping " + this.replicationState); - try { - this.replicationState.close(); - } catch (IOException e) { - LOG.error("Error while stopping " + this.replicationState, e); - } - } // Not sure why we're deleting a connection that we never acquired or used HConnectionManager.deleteConnection(this.getConf()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 387f44dd0df..d119cbe3aad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -66,7 +66,6 @@ public class Replication implements WALActionsListener, LogFactory.getLog(Replication.class); private boolean replication; private ReplicationSourceManager replicationManager; - private final AtomicBoolean replicating = new AtomicBoolean(true); private ReplicationZookeeper zkHelper; private ReplicationQueues replicationQueues; private Configuration conf; @@ -108,17 +107,16 @@ public class Replication implements WALActionsListener, .build()); if (replication) { try { - this.zkHelper = new ReplicationZookeeper(server, this.replicating); + this.zkHelper = new ReplicationZookeeper(server); this.replicationQueues = new ReplicationQueuesZKImpl(server.getZooKeeper(), this.conf, this.server); this.replicationQueues.init(this.server.getServerName().toString()); } catch (KeeperException ke) { - throw new IOException("Failed replication handler create " + - "(replicating=" + this.replicating, ke); + throw new IOException("Failed replication handler create", ke); } this.replicationManager = - new ReplicationSourceManager(zkHelper, replicationQueues, conf, this.server, fs, - this.replicating, logDir, oldLogDir); + new ReplicationSourceManager(zkHelper, replicationQueues, conf, this.server, fs, logDir, + oldLogDir); this.statsThreadPeriod = this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 6f149cf2fe0..6f74ec572be 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -112,9 +112,8 @@ public class ReplicationSink { } /** - * Replicate this array of entries directly into the local cluster using the native client. - * Only operates against raw protobuf type saving on a convertion from pb to pojo. - * + * Replicate this array of entries directly into the local cluster using the native client. Only + * operates against raw protobuf type saving on a conversion from pb to pojo. * @param entries * @param cells * @throws IOException diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index bf9266b5f65..c86549027b9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -93,8 +93,6 @@ public class ReplicationSource extends Thread // ratio of region servers to chose from a slave cluster private float ratio; private Random random; - // should we replicate or not? - private AtomicBoolean replicating; private ReplicationQueueInfo replicationQueueInfo; // id of the peer cluster this source replicates to private String peerId; @@ -149,7 +147,6 @@ public class ReplicationSource extends Thread * @param fs file system to use * @param manager replication manager to ping to * @param stopper the atomic boolean to use to stop the regionserver - * @param replicating the atomic boolean that starts/stops replication * @param peerClusterZnode the name of our znode * @throws IOException */ @@ -157,7 +154,6 @@ public class ReplicationSource extends Thread final FileSystem fs, final ReplicationSourceManager manager, final Stoppable stopper, - final AtomicBoolean replicating, final String peerClusterZnode) throws IOException { this.stopper = stopper; @@ -185,7 +181,6 @@ public class ReplicationSource extends Thread this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f); this.currentPeers = new ArrayList(); this.random = new Random(); - this.replicating = replicating; this.manager = manager; this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); @@ -417,9 +412,8 @@ public class ReplicationSource extends Thread removeNonReplicableEdits(entry); // Don't replicate catalog entries, if the WALEdit wasn't // containing anything to replicate and if we're currently not set to replicate - if (!(Bytes.equals(logKey.getTablename(), HConstants.ROOT_TABLE_NAME) || - Bytes.equals(logKey.getTablename(), HConstants.META_TABLE_NAME)) && - edit.size() != 0 && replicating.get()) { + if (!(Bytes.equals(logKey.getTablename(), HConstants.ROOT_TABLE_NAME) || Bytes.equals( + logKey.getTablename(), HConstants.META_TABLE_NAME)) && edit.size() != 0) { // Only set the clusterId if is a local key. // This ensures that the originator sets the cluster id // and all replicas retain the initial cluster id. @@ -714,8 +708,7 @@ public class ReplicationSource extends Thread * @return true if the peer is enabled, otherwise false */ protected boolean isPeerEnabled() { - return this.replicating.get() && - this.zkHelper.getPeerEnabled(this.peerId); + return this.zkHelper.getPeerEnabled(this.peerId); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index ff011b6acfc..05a586e5594 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -39,7 +38,6 @@ public interface ReplicationSourceInterface { * @param fs the file system to use * @param manager the manager to use * @param stopper the stopper object for this region server - * @param replicating the status of the replication on this cluster * @param peerClusterId the id of the peer cluster * @throws IOException */ @@ -47,7 +45,6 @@ public interface ReplicationSourceInterface { final FileSystem fs, final ReplicationSourceManager manager, final Stoppable stopper, - final AtomicBoolean replicating, final String peerClusterId) throws IOException; /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 8f9ad6583fd..ba705520b40 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -69,8 +69,6 @@ public class ReplicationSourceManager { private final List sources; // List of all the sources we got from died RSs private final List oldsources; - // Indicates if we are currently replicating - private final AtomicBoolean replicating; // Helper for zookeeper private final ReplicationZookeeper zkHelper; private final ReplicationQueues replicationQueues; @@ -103,16 +101,13 @@ public class ReplicationSourceManager { * @param conf the configuration to use * @param stopper the stopper object for this region server * @param fs the file system to use - * @param replicating the status of the replication on this cluster * @param logDir the directory that contains all hlog directories of live RSs * @param oldLogDir the directory where old logs are archived */ public ReplicationSourceManager(final ReplicationZookeeper zkHelper, final ReplicationQueues replicationQueues, final Configuration conf, final Stoppable stopper, - final FileSystem fs, final AtomicBoolean replicating, final Path logDir, - final Path oldLogDir) { + final FileSystem fs, final Path logDir, final Path oldLogDir) { this.sources = new ArrayList(); - this.replicating = replicating; this.zkHelper = zkHelper; this.replicationQueues = replicationQueues; this.stopper = stopper; @@ -206,8 +201,7 @@ public class ReplicationSourceManager { * @throws IOException */ public ReplicationSourceInterface addSource(String id) throws IOException { - ReplicationSourceInterface src = - getReplicationSource(this.conf, this.fs, this, stopper, replicating, id); + ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this, stopper, id); synchronized (this.hlogsById) { this.sources.add(src); this.hlogsById.put(id, new TreeSet()); @@ -260,10 +254,6 @@ public class ReplicationSourceManager { } void preLogRoll(Path newLog) throws IOException { - if (!this.replicating.get()) { - LOG.warn("Replication stopped, won't add new log"); - return; - } synchronized (this.hlogsById) { String name = newLog.getName(); @@ -288,14 +278,9 @@ public class ReplicationSourceManager { } void postLogRoll(Path newLog) throws IOException { - if (!this.replicating.get()) { - LOG.warn("Replication stopped, won't add new log"); - return; - } - // This only updates the sources we own, not the recovered ones for (ReplicationSourceInterface source : this.sources) { - source.enqueueLog(newLog); + source.enqueueLog(newLog); } } @@ -313,7 +298,6 @@ public class ReplicationSourceManager { * @param fs the file system to use * @param manager the manager to use * @param stopper the stopper object for this region server - * @param replicating the status of the replication on this cluster * @param peerId the id of the peer cluster * @return the created source * @throws IOException @@ -323,7 +307,6 @@ public class ReplicationSourceManager { final FileSystem fs, final ReplicationSourceManager manager, final Stoppable stopper, - final AtomicBoolean replicating, final String peerId) throws IOException { ReplicationSourceInterface src; try { @@ -337,7 +320,7 @@ public class ReplicationSourceManager { src = new ReplicationSource(); } - src.init(conf, fs, manager, stopper, replicating, peerId); + src.init(conf, fs, manager, stopper, peerId); return src; } @@ -599,8 +582,8 @@ public class ReplicationSourceManager { for (Map.Entry> entry : newQueues.entrySet()) { String peerId = entry.getKey(); try { - ReplicationSourceInterface src = getReplicationSource(conf, - fs, ReplicationSourceManager.this, stopper, replicating, peerId); + ReplicationSourceInterface src = + getReplicationSource(conf, fs, ReplicationSourceManager.this, stopper, peerId); if (!zkHelper.getPeerClusters().contains((src.getPeerClusterId()))) { src.terminate("Recovered queue doesn't belong to any current peer"); break; diff --git a/hbase-server/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html b/hbase-server/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html index b97a1a6629e..0029c75716b 100644 --- a/hbase-server/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html +++ b/hbase-server/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html @@ -132,16 +132,6 @@ Choosing peer 10.10.1.49:62020 In this case it indicates that 1 region server from the slave cluster was chosen for replication.

- -Should you want to stop the replication while the clusters are running, open -the shell on the master cluster and issue this command: -
-hbase(main):001:0> stop_replication
- -Replication of already queued edits will still happen after you -issued that command but new entries won't be. To start it back, simply replace -"false" with "true" in the command. -

diff --git a/hbase-server/src/main/ruby/hbase/replication_admin.rb b/hbase-server/src/main/ruby/hbase/replication_admin.rb index c0fd5b9909c..27d141acc47 100644 --- a/hbase-server/src/main/ruby/hbase/replication_admin.rb +++ b/hbase-server/src/main/ruby/hbase/replication_admin.rb @@ -65,17 +65,5 @@ module Hbase def disable_peer(id) @replication_admin.disablePeer(id) end - - #---------------------------------------------------------------------------------------------- - # Restart the replication, in an unknown state - def start_replication - @replication_admin.setReplicating(true) - end - - #---------------------------------------------------------------------------------------------- - # Kill switch for replication, stops all its features - def stop_replication - @replication_admin.setReplicating(false) - end end end diff --git a/hbase-server/src/main/ruby/shell.rb b/hbase-server/src/main/ruby/shell.rb index 1ba9e027b24..b1d5cc0d7dd 100644 --- a/hbase-server/src/main/ruby/shell.rb +++ b/hbase-server/src/main/ruby/shell.rb @@ -297,15 +297,13 @@ Shell.load_command_group( Shell.load_command_group( 'replication', :full_name => 'CLUSTER REPLICATION TOOLS', - :comment => "In order to use these tools, hbase.replication must be true. enabling/disabling is currently unsupported", + :comment => "In order to use these tools, hbase.replication must be true.", :commands => %w[ add_peer remove_peer list_peers enable_peer disable_peer - start_replication - stop_replication ] ) diff --git a/hbase-server/src/main/ruby/shell/commands/start_replication.rb b/hbase-server/src/main/ruby/shell/commands/start_replication.rb deleted file mode 100644 index 3ea97a90c99..00000000000 --- a/hbase-server/src/main/ruby/shell/commands/start_replication.rb +++ /dev/null @@ -1,42 +0,0 @@ -# -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -module Shell - module Commands - class StartReplication < Command - def help - return <<-EOF -Restarts all the replication features. The state in which each -stream starts in is undetermined. -WARNING: -start/stop replication is only meant to be used in critical load situations. -Examples: - - hbase> start_replication -EOF - end - - def command - format_simple_command do - replication_admin.start_replication - end - end - end - end -end diff --git a/hbase-server/src/main/ruby/shell/commands/stop_replication.rb b/hbase-server/src/main/ruby/shell/commands/stop_replication.rb deleted file mode 100644 index 2e24fa5cfe6..00000000000 --- a/hbase-server/src/main/ruby/shell/commands/stop_replication.rb +++ /dev/null @@ -1,42 +0,0 @@ -# -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -module Shell - module Commands - class StopReplication < Command - def help - return <<-EOF -Stops all the replication features. The state in which each -stream stops in is undetermined. -WARNING: -start/stop replication is only meant to be used in critical load situations. -Examples: - - hbase> stop_replication -EOF - end - - def command - format_simple_command do - replication_admin.stop_replication - end - end - end - end -end diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index 5205d129233..06843df555e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -68,8 +68,7 @@ public class TestLogsCleaner { conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); Replication.decorateMasterConfiguration(conf); Server server = new DummyServer(); - ReplicationZookeeper zkHelper = - new ReplicationZookeeper(server, new AtomicBoolean(true)); + ReplicationZookeeper zkHelper = new ReplicationZookeeper(server); Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index f06e947b110..743a1762a34 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -38,10 +38,8 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { Path currentPath; @Override - public void init(Configuration conf, FileSystem fs, - ReplicationSourceManager manager, Stoppable stopper, - AtomicBoolean replicating, String peerClusterId) - throws IOException { + public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, + Stoppable stopper, String peerClusterId) throws IOException { this.manager = manager; this.peerClusterId = peerClusterId; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index f77d6121fe5..b4b90cf0f43 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -111,7 +111,6 @@ public class TestReplicationBase { zkw2 = new ZooKeeperWatcher(conf2, "cluster2", null, true); admin.addPeer("2", utility2.getClusterKey()); - setIsReplication(true); LOG.info("Setup second Zk"); CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1); @@ -134,12 +133,6 @@ public class TestReplicationBase { htable2 = new HTable(conf2, tableName); } - protected static void setIsReplication(boolean rep) throws Exception { - LOG.info("Set rep " + rep); - admin.setReplicating(rep); - Thread.sleep(SLEEP_TIME); - } - /** * @throws java.lang.Exception */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index ddbd56d79f6..538c2df689e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -255,75 +255,6 @@ public class TestReplicationSmallTests extends TestReplicationBase { } } - /** - * Test stopping replication, trying to insert, make sure nothing's - * replicated, enable it, try replicating and it should work - * @throws Exception - */ - @Test(timeout=300000) - public void testStartStop() throws Exception { - - // Test stopping replication - setIsReplication(false); - - Put put = new Put(Bytes.toBytes("stop start")); - put.add(famName, row, row); - htable1.put(put); - - Get get = new Get(Bytes.toBytes("stop start")); - for (int i = 0; i < NB_RETRIES; i++) { - if (i==NB_RETRIES-1) { - break; - } - Result res = htable2.get(get); - if(res.size() >= 1) { - fail("Replication wasn't stopped"); - - } else { - LOG.info("Row not replicated, let's wait a bit more..."); - Thread.sleep(SLEEP_TIME); - } - } - - // Test restart replication - setIsReplication(true); - - htable1.put(put); - - for (int i = 0; i < NB_RETRIES; i++) { - if (i==NB_RETRIES-1) { - fail("Waited too much time for put replication"); - } - Result res = htable2.get(get); - if(res.size() == 0) { - LOG.info("Row not available"); - Thread.sleep(SLEEP_TIME); - } else { - assertArrayEquals(res.value(), row); - break; - } - } - - put = new Put(Bytes.toBytes("do not rep")); - put.add(noRepfamName, row, row); - htable1.put(put); - - get = new Get(Bytes.toBytes("do not rep")); - for (int i = 0; i < NB_RETRIES; i++) { - if (i == NB_RETRIES-1) { - break; - } - Result res = htable2.get(get); - if (res.size() >= 1) { - fail("Not supposed to be replicated"); - } else { - LOG.info("Row not replicated, let's wait a bit more..."); - Thread.sleep(SLEEP_TIME); - } - } - - } - /** * Test disable/enable replication, trying to insert, make sure nothing's * replicated, enable it, the insert should be replicated diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java index 137cf40e0cd..077a504de9c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java @@ -69,10 +69,6 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic { Configuration testConf = new Configuration(conf); testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode); ZooKeeperWatcher zkw1 = new ZooKeeperWatcher(testConf, "test1", null); - ReplicationStateInterface rsi = new ReplicationStateImpl(zkw1, testConf, zkw1); - rsi.init(); - rsi.setState(true); - rsi.close(); String fakeRs = ZKUtil.joinZNode(zkw1.rsZNode, "hostname1.example.org:1234"); ZKUtil.createWithParents(zkw1, fakeRs); ZKClusterId.setClusterId(zkw1, new ClusterId()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationZookeeper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationZookeeper.java index c61cf570cba..de2515c969e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationZookeeper.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationZookeeper.java @@ -62,7 +62,7 @@ public class TestReplicationZookeeper { conf = utility.getConfiguration(); zkw = HBaseTestingUtility.getZooKeeperWatcher(utility); DummyServer server = new DummyServer(); - repZk = new ReplicationZookeeper(server, new AtomicBoolean()); + repZk = new ReplicationZookeeper(server); slaveClusterKey = conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get("hbase.zookeeper.property.clientPort") + ":/1"; String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 2d334e69620..1189162b96b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -28,7 +28,6 @@ import java.util.List; import java.util.SortedMap; import java.util.SortedSet; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -227,8 +226,7 @@ public class TestReplicationSourceManager { LOG.debug("testNodeFailoverWorkerCopyQueuesFromRSUsingMulti"); conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); final Server server = new DummyServer("hostname0.example.org"); - AtomicBoolean replicating = new AtomicBoolean(true); - ReplicationZookeeper rz = new ReplicationZookeeper(server, replicating); + ReplicationZookeeper rz = new ReplicationZookeeper(server); // populate some znodes in the peer znode files.add("log1"); files.add("log2"); @@ -260,8 +258,6 @@ public class TestReplicationSourceManager { populatedMap += w1.isLogZnodesMapPopulated() + w2.isLogZnodesMapPopulated() + w3.isLogZnodesMapPopulated(); assertEquals(1, populatedMap); - // close out the resources. - rz.close(); server.abort("", null); } @@ -270,8 +266,7 @@ public class TestReplicationSourceManager { LOG.debug("testNodeFailoverDeadServerParsing"); conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com"); - AtomicBoolean replicating = new AtomicBoolean(true); - ReplicationZookeeper rz = new ReplicationZookeeper(server, replicating); + ReplicationZookeeper rz = new ReplicationZookeeper(server); // populate some znodes in the peer znode files.add("log1"); files.add("log2"); @@ -285,16 +280,13 @@ public class TestReplicationSourceManager { Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com"); // simulate three servers fail sequentially - ReplicationZookeeper rz1 = new ReplicationZookeeper(s1, new AtomicBoolean(true)); + ReplicationZookeeper rz1 = new ReplicationZookeeper(s1); SortedMap> testMap = rz1.claimQueues(server.getServerName().getServerName()); - rz1.close(); - ReplicationZookeeper rz2 = new ReplicationZookeeper(s2, new AtomicBoolean(true)); + ReplicationZookeeper rz2 = new ReplicationZookeeper(s2); testMap = rz2.claimQueues(s1.getServerName().getServerName()); - rz2.close(); - ReplicationZookeeper rz3 = new ReplicationZookeeper(s3, new AtomicBoolean(true)); + ReplicationZookeeper rz3 = new ReplicationZookeeper(s3); testMap = rz3.claimQueues(s2.getServerName().getServerName()); - rz3.close(); ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.firstKey()); List result = replicationQueueInfo.getDeadRegionServers(); @@ -303,9 +295,7 @@ public class TestReplicationSourceManager { assertTrue(result.contains(server.getServerName().getServerName())); assertTrue(result.contains(s1.getServerName().getServerName())); assertTrue(result.contains(s2.getServerName().getServerName())); - - // close out the resources. - rz.close(); + server.abort("", null); } @@ -319,14 +309,13 @@ public class TestReplicationSourceManager { public DummyNodeFailoverWorker(String znode, Server s) throws Exception { this.deadRsZnode = znode; this.server = s; - rz = new ReplicationZookeeper(server, new AtomicBoolean(true)); + rz = new ReplicationZookeeper(server); } @Override public void run() { try { logZnodesMap = rz.claimQueues(deadRsZnode); - rz.close(); server.abort("Done with testing", null); } catch (Exception e) { LOG.error("Got exception while running NodeFailoverWorker", e);