diff --git a/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index cfe558f62d7..b478813e5cc 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -23,7 +23,6 @@ import java.io.Closeable; import java.io.IOException; import java.util.Map; -import org.apache.commons.lang.NotImplementedException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -136,16 +135,16 @@ public class ReplicationAdmin implements Closeable { * Restart the replication stream to the specified peer. * @param id a short that identifies the cluster */ - public void enablePeer(String id) { - throw new NotImplementedException("Not implemented"); + public void enablePeer(String id) throws IOException { + this.replicationZk.enablePeer(id); } /** * Stop the replication stream to the specified peer. * @param id a short that identifies the cluster */ - public void disablePeer(String id) { - throw new NotImplementedException("Not implemented"); + public void disablePeer(String id) throws IOException { + this.replicationZk.disablePeer(id); } /** @@ -164,6 +163,20 @@ public class ReplicationAdmin implements Closeable { return this.replicationZk.listPeers(); } + /** + * Get state of the peer + * + * @param id peer's identifier + * @return current state of the peer + */ + public String getPeerState(String id) throws IOException { + try { + return this.replicationZk.getPeerState(id).name(); + } catch (KeeperException e) { + throw new IOException("Couldn't get the state of the peer " + id, e); + } + } + /** * Get the current status of the kill switch, if the cluster is replicating * or not. diff --git a/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java index 15c84b2b672..1047a1014c7 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java @@ -31,7 +31,12 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.replication.ReplicationZookeeper.PeerState; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; /** * This class acts as a wrapper for all the objects used to identify and @@ -50,6 +55,8 @@ public class ReplicationPeer implements Abortable, Closeable { private ZooKeeperWatcher zkw; private final Configuration conf; + private PeerStateTracker peerStateTracker; + /** * Constructor that takes all the objects required to communicate with the * specified peer, except for the region server addresses. @@ -65,6 +72,31 @@ public class ReplicationPeer implements Abortable, Closeable { this.reloadZkWatcher(); } + /** + * start a state tracker to check whether this peer is enabled or not + * + * @param zookeeper zk watcher for the local cluster + * @param peerStateNode path to zk node which stores peer state + * @throws KeeperException + */ + public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode) + throws KeeperException { + if (ZKUtil.checkExists(zookeeper, peerStateNode) == -1) { + ZKUtil.createAndWatch(zookeeper, peerStateNode, + Bytes.toBytes(PeerState.ENABLED.name())); // enabled by default + } + this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, + this); + this.peerStateTracker.start(); + this.readPeerStateZnode(); + } + + private void readPeerStateZnode() { + String currentState = Bytes.toString(peerStateTracker.getData(false)); + this.peerEnabled.set(PeerState.ENABLED.equals(PeerState + .valueOf(currentState))); + } + /** * Get the cluster key of that peer * @return string consisting of zk ensemble addresses, client port @@ -152,4 +184,23 @@ public class ReplicationPeer implements Abortable, Closeable { zkw.close(); } } + + /** + * Tracker for state of this peer + */ + public class PeerStateTracker extends ZooKeeperNodeTracker { + + public PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher, + Abortable abortable) { + super(watcher, peerStateZNode, abortable); + } + + @Override + public synchronized void nodeDataChanged(String path) { + if (path.equals(node)) { + super.nodeDataChanged(path); + readPeerStateZnode(); + } + } + } } diff --git a/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java b/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java index e1a7398119e..6eaa51fded0 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java @@ -50,18 +50,20 @@ import org.apache.zookeeper.KeeperException.ConnectionLossException; import org.apache.zookeeper.KeeperException.SessionExpiredException; /** - * This class serves as a helper for all things related to zookeeper - * in replication. + * This class serves as a helper for all things related to zookeeper in + * replication. *

- * The layout looks something like this under zookeeper.znode.parent - * for the master cluster: + * The layout looks something like this under zookeeper.znode.parent for the + * master cluster: *

+ * *

  * replication/
  *  state      {contains true or false}
  *  clusterId  {contains a byte}
  *  peers/
  *    1/   {contains a full cluster address}
+ *      peer-state  {contains ENABLED or DISABLED}
  *    2/
  *    ...
  *  rs/ {lists all RS that replicate}
@@ -82,6 +84,12 @@ public class ReplicationZookeeper implements Closeable{
     LogFactory.getLog(ReplicationZookeeper.class);
   // Name of znode we use to lock when failover
   private final static String RS_LOCK_ZNODE = "lock";
+
+  // Values of znode which stores state of a peer
+  public static enum PeerState {
+    ENABLED, DISABLED
+  };
+
   // Our handle on zookeeper
   private final ZooKeeperWatcher zookeeper;
   // Map of peer clusters keyed by their id
@@ -96,6 +104,8 @@ public class ReplicationZookeeper implements Closeable{
   private String rsServerNameZnode;
   // Name node if the replicationState znode
   private String replicationStateNodeName;
+  // Name of zk node which stores peer state
+  private String peerStateNodeName;
   private final Configuration conf;
   // Is this cluster replicating at the moment?
   private AtomicBoolean replicating;
@@ -150,6 +160,8 @@ public class ReplicationZookeeper implements Closeable{
         conf.get("zookeeper.znode.replication", "replication");
     String peersZNodeName =
         conf.get("zookeeper.znode.replication.peers", "peers");
+    this.peerStateNodeName = conf.get(
+        "zookeeper.znode.replication.peers.state", "peer-state");
     this.replicationStateNodeName =
         conf.get("zookeeper.znode.replication.state", "state");
     String rsZNodeName =
@@ -339,8 +351,10 @@ public class ReplicationZookeeper implements Closeable{
       return null;
     }
 
-    return new ReplicationPeer(otherConf, peerId,
+    ReplicationPeer peer = new ReplicationPeer(otherConf, peerId,
         otherClusterKey);
+    peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
+    return peer;
   }
 
   /**
@@ -366,7 +380,8 @@ public class ReplicationZookeeper implements Closeable{
       if (!peerExists(id)) {
         throw new IllegalArgumentException("Cannot remove inexisting peer");
       }
-      ZKUtil.deleteNode(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
+      ZKUtil.deleteNodeRecursively(this.zookeeper,
+          ZKUtil.joinZNode(this.peersZNode, id));
     } catch (KeeperException e) {
       throw new IOException("Unable to remove a peer", e);
     }
@@ -388,6 +403,8 @@ public class ReplicationZookeeper implements Closeable{
       ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
       ZKUtil.createAndWatch(this.zookeeper,
           ZKUtil.joinZNode(this.peersZNode, id), Bytes.toBytes(clusterKey));
+      ZKUtil.createAndWatch(this.zookeeper, getPeerStateNode(id),
+          Bytes.toBytes(PeerState.ENABLED.name())); // enabled by default
     } catch (KeeperException e) {
       throw new IOException("Unable to add peer", e);
     }
@@ -398,6 +415,82 @@ public class ReplicationZookeeper implements Closeable{
           ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
   }
 
+  /**
+   * Enable replication to the peer
+   *
+   * @param id peer's identifier
+   * @throws IllegalArgumentException
+   *           Thrown when the peer doesn't exist
+   */
+  public void enablePeer(String id) throws IOException {
+    changePeerState(id, PeerState.ENABLED);
+    LOG.info("peer " + id + " is enabled");
+  }
+
+  /**
+   * Disable replication to the peer
+   *
+   * @param id peer's identifier
+   * @throws IllegalArgumentException
+   *           Thrown when the peer doesn't exist
+   */
+  public void disablePeer(String id) throws IOException {
+    changePeerState(id, PeerState.DISABLED);
+    LOG.info("peer " + id + " is disabled");
+  }
+
+  private void changePeerState(String id, PeerState state) throws IOException {
+    try {
+      if (!peerExists(id)) {
+        throw new IllegalArgumentException("peer " + id + " is not registered");
+      }
+      String peerStateZNode = getPeerStateNode(id);
+      if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
+        ZKUtil.setData(this.zookeeper, peerStateZNode,
+          Bytes.toBytes(state.name()));
+      } else {
+        ZKUtil.createAndWatch(zookeeper, peerStateZNode,
+            Bytes.toBytes(state.name()));
+      }
+      LOG.info("state of the peer " + id + " changed to " + state.name());
+    } catch (KeeperException e) {
+      throw new IOException("Unable to change state of the peer " + id, e);
+    }
+  }
+
+  /**
+   * Get state of the peer. This method checks the state by connecting to ZK.
+   *
+   * @param id peer's identifier
+   * @return current state of the peer
+   */
+  public PeerState getPeerState(String id) throws KeeperException {
+    byte[] peerStateBytes = ZKUtil
+        .getData(this.zookeeper, getPeerStateNode(id));
+    return PeerState.valueOf(Bytes.toString(peerStateBytes));
+  }
+
+  /**
+   * Check whether the peer is enabled or not. This method checks the atomic
+   * boolean of ReplicationPeer locally.
+   *
+   * @param id peer identifier
+   * @return true if the peer is enabled, otherwise false
+   * @throws IllegalArgumentException
+   *           Thrown when the peer doesn't exist
+   */
+  public boolean getPeerEnabled(String id) {
+    if (!this.peerClusters.containsKey(id)) {
+      throw new IllegalArgumentException("peer " + id + " is not registered");
+    }
+    return this.peerClusters.get(id).getPeerEnabled().get();
+  }
+
+  private String getPeerStateNode(String id) {
+    return ZKUtil.joinZNode(this.peersZNode,
+        ZKUtil.joinZNode(id, this.peerStateNodeName));
+  }
+
   /**
    * This reads the state znode for replication and sets the atomic boolean
    */
diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 8950c9f42f1..545bd02e45b 100644
--- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -138,9 +138,6 @@ public class ReplicationSource extends Thread
   private volatile boolean running = true;
   // Metrics for this source
   private ReplicationSourceMetrics metrics;
-  // If source is enabled, replication happens. If disabled, nothing will be
-  // replicated but HLogs will still be queued
-  private AtomicBoolean sourceEnabled = new AtomicBoolean();
 
   /**
    * Instantiation method used by region servers
@@ -274,7 +271,7 @@ public class ReplicationSource extends Thread
     // Loop until we close down
     while (isActive()) {
       // Sleep until replication is enabled again
-      if (!this.replicating.get() || !this.sourceEnabled.get()) {
+      if (!isPeerEnabled()) {
         if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
           sleepMultiplier++;
         }
@@ -601,6 +598,12 @@ public class ReplicationSource extends Thread
       return;
     }
     while (this.isActive()) {
+      if (!isPeerEnabled()) {
+        if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
+          sleepMultiplier++;
+        }
+        continue;
+      }
       try {
         HRegionInterface rrs = getRS();
         LOG.debug("Replicating " + currentNbEntries);
@@ -659,6 +662,15 @@ public class ReplicationSource extends Thread
     }
   }
 
+  /**
+   * check whether the peer is enabled or not
+   *
+   * @return true if the peer is enabled, otherwise false
+   */
+  protected boolean isPeerEnabled() {
+    return this.replicating.get() && this.zkHelper.getPeerEnabled(peerId);
+  }
+
   /**
    * If the queue isn't empty, switch to the next one
    * Else if this is a recovered queue, it means we're done!
@@ -765,10 +777,6 @@ public class ReplicationSource extends Thread
     return this.currentPath;
   }
 
-  public void setSourceEnabled(boolean status) {
-    this.sourceEnabled.set(status);
-  }
-
   private boolean isActive() {
     return !this.stopper.isStopped() && this.running;
   }
diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 2fe395e20fe..ccafe1fad38 100644
--- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -95,9 +95,4 @@ public interface ReplicationSourceInterface {
    */
   public String getPeerClusterId();
 
-  /**
-   * Set if this source is enabled or disabled
-   * @param status the new status
-   */
-  public void setSourceEnabled(boolean status);
 }
diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 732d0869341..97af2e8b5a7 100644
--- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -35,7 +35,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -48,6 +47,8 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * This class is responsible to manage all the replication
  * sources. There are two classes of sources:
@@ -203,8 +204,6 @@ public class ReplicationSourceManager {
   public ReplicationSourceInterface addSource(String id) throws IOException {
     ReplicationSourceInterface src =
         getReplicationSource(this.conf, this.fs, this, stopper, replicating, id);
-    // TODO set it to what's in ZK
-    src.setSourceEnabled(true);
     synchronized (this.hlogsById) {
       this.sources.add(src);
       this.hlogsById.put(id, new TreeSet());
@@ -585,8 +584,6 @@ public class ReplicationSourceManager {
           for (String hlog : entry.getValue()) {
             src.enqueueLog(new Path(oldLogDir, hlog));
           }
-          // TODO set it to what's in ZK
-          src.setSourceEnabled(true);
           src.startup();
         } catch (IOException e) {
           // TODO manage it
diff --git a/src/main/ruby/hbase/replication_admin.rb b/src/main/ruby/hbase/replication_admin.rb
index c4be93c299f..f694f5f678d 100644
--- a/src/main/ruby/hbase/replication_admin.rb
+++ b/src/main/ruby/hbase/replication_admin.rb
@@ -49,6 +49,12 @@ module Hbase
       @replication_admin.listPeers
     end
 
+    #----------------------------------------------------------------------------------------------
+    # Get peer cluster state
+    def get_peer_state(id)
+      @replication_admin.getPeerState(id)
+    end
+
     #----------------------------------------------------------------------------------------------
     # Restart the replication stream to the specified peer
     def enable_peer(id)
diff --git a/src/main/ruby/shell/commands/disable_peer.rb b/src/main/ruby/shell/commands/disable_peer.rb
index ad1ebbd4c7d..da9941a9572 100644
--- a/src/main/ruby/shell/commands/disable_peer.rb
+++ b/src/main/ruby/shell/commands/disable_peer.rb
@@ -26,8 +26,6 @@ module Shell
 Stops the replication stream to the specified cluster, but still
 keeps track of new edits to replicate.
 
-CURRENTLY UNSUPPORTED
-
 Examples:
 
   hbase> disable_peer '1'
diff --git a/src/main/ruby/shell/commands/enable_peer.rb b/src/main/ruby/shell/commands/enable_peer.rb
index 099f3fd4549..de459092b58 100644
--- a/src/main/ruby/shell/commands/enable_peer.rb
+++ b/src/main/ruby/shell/commands/enable_peer.rb
@@ -26,8 +26,6 @@ module Shell
 Restarts the replication to the specified peer cluster,
 continuing from where it was disabled.
 
-CURRENTLY UNSUPPORTED
-
 Examples:
 
   hbase> enable_peer '1'
diff --git a/src/main/ruby/shell/commands/list_peers.rb b/src/main/ruby/shell/commands/list_peers.rb
index 93a430c8710..2f57592e587 100644
--- a/src/main/ruby/shell/commands/list_peers.rb
+++ b/src/main/ruby/shell/commands/list_peers.rb
@@ -33,10 +33,11 @@ EOF
         now = Time.now
         peers = replication_admin.list_peers
 
-        formatter.header(["PEER ID", "CLUSTER KEY"])
+        formatter.header(["PEER_ID", "CLUSTER_KEY", "STATE"])
 
         peers.entrySet().each do |e|
-          formatter.row([ e.key, e.value ])
+          state = replication_admin.get_peer_state(e.key)
+          formatter.row([ e.key, e.value, state ])
         end
 
         formatter.footer(now)
diff --git a/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index 9d3e8620ed7..2daf643ba36 100644
--- a/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -19,6 +19,9 @@
  */
 package org.apache.hadoop.hbase.replication;
 
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -26,9 +29,6 @@ import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
 
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 /**
  * Source that does nothing at all, helpful to test ReplicationSourceManager
  */
@@ -81,10 +81,4 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
   public String getPeerClusterId() {
     return peerClusterId;
   }
-
-  @Override
-  public void setSourceEnabled(boolean status) {
-
-  }
-
 }
diff --git a/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java b/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
index f6775bad836..96eb211cccf 100644
--- a/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
+++ b/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
@@ -26,7 +26,14 @@ import static org.junit.Assert.fail;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -444,9 +451,108 @@ public class TestReplication {
 
   }
 
+  /**
+   * Test disable/enable replication, trying to insert, make sure nothing's
+   * replicated, enable it, the insert should be replicated
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 300000)
+  public void testDisableEnable() throws Exception {
+
+    // Test disabling replication
+    admin.disablePeer("2");
+
+    byte[] rowkey = Bytes.toBytes("disable enable");
+    Put put = new Put(rowkey);
+    put.add(famName, row, row);
+    htable1.put(put);
+
+    Get get = new Get(rowkey);
+    for (int i = 0; i < NB_RETRIES; i++) {
+      Result res = htable2.get(get);
+      if (res.size() >= 1) {
+        fail("Replication wasn't disabled");
+      } else {
+        LOG.info("Row not replicated, let's wait a bit more...");
+        Thread.sleep(SLEEP_TIME);
+      }
+    }
+
+    // Test enable replication
+    admin.enablePeer("2");
+
+    for (int i = 0; i < NB_RETRIES; i++) {
+      Result res = htable2.get(get);
+      if (res.size() == 0) {
+        LOG.info("Row not available");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        assertArrayEquals(res.value(), row);
+        return;
+      }
+    }
+    fail("Waited too much time for put replication");
+  }
+
+  /**
+   * Test disabling an inactive peer. Add a peer which is inactive, trying to
+   * insert, disable the peer, then activate the peer and make sure nothing is
+   * replicated. In Addition, enable the peer and check the updates are
+   * replicated.
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 600000)
+  public void testDisableInactivePeer() throws Exception {
+
+    // enabling and shutdown the peer
+    admin.enablePeer("2");
+    utility2.shutdownMiniHBaseCluster();
+
+    byte[] rowkey = Bytes.toBytes("disable inactive peer");
+    Put put = new Put(rowkey);
+    put.add(famName, row, row);
+    htable1.put(put);
+
+    // wait for the sleep interval of the master cluster to become long
+    Thread.sleep(SLEEP_TIME * NB_RETRIES);
+
+    // disable and start the peer
+    admin.disablePeer("2");
+    utility2.startMiniHBaseCluster(1, 1);
+    Get get = new Get(rowkey);
+    for (int i = 0; i < NB_RETRIES; i++) {
+      Result res = htable2.get(get);
+      if (res.size() >= 1) {
+        fail("Replication wasn't disabled");
+      } else {
+        LOG.info("Row not replicated, let's wait a bit more...");
+        Thread.sleep(SLEEP_TIME);
+      }
+    }
+
+    // Test enable replication
+    admin.enablePeer("2");
+    // wait since the sleep interval would be long
+    Thread.sleep(SLEEP_TIME * NB_RETRIES);
+    for (int i = 0; i < NB_RETRIES; i++) {
+      Result res = htable2.get(get);
+      if (res.size() == 0) {
+        LOG.info("Row not available");
+        Thread.sleep(SLEEP_TIME * NB_RETRIES);
+      } else {
+        assertArrayEquals(res.value(), row);
+        return;
+      }
+    }
+    fail("Waited too much time for put replication");
+  }
+
   /**
    * Integration test for TestReplicationAdmin, removes and re-add a peer
    * cluster
+   *
    * @throws Exception
    */
   @Test(timeout=300000)
diff --git a/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 07b3f3c30f0..5828154d08b 100644
--- a/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -30,13 +30,23 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.catalog.CatalogTracker;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -100,6 +110,9 @@ public class TestReplicationSourceManager {
     ZKUtil.setData(zkw, "/hbase/replication/peers/1",
         Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
             + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
+    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
+    ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
+        Bytes.toBytes(ReplicationZookeeper.PeerState.ENABLED.name()));
     ZKUtil.createWithParents(zkw, "/hbase/replication/state");
     ZKUtil.setData(zkw, "/hbase/replication/state", Bytes.toBytes("true"));