diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index d23bfda0ce8..393c90192b2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -175,6 +175,16 @@ public class ReplicationAdmin implements Closeable { return this.replicationPeers.getAllPeerClusterKeys(); } + /** + * Get the state of the specified peer cluster + * @param id String format of the Short that identifies the peer, an IllegalArgumentException + * is thrown if it doesn't exist + * @return true if replication is enabled to that peer, false if it isn't + */ + public boolean getPeerState(String id) throws IOException { + return this.replicationPeers.getStatusOfPeerFromBackingStore(id); + } + @Override public void close() throws IOException { if (this.connection != null) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java index 6d6355c79e7..f7e853492bd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java @@ -200,11 +200,12 @@ public class ReplicationPeer implements Abortable, Closeable { } /** - * @param bytes + * Parse the raw data from ZK to get a peer's state + * @param bytes raw ZK data * @return True if the passed in bytes are those of a pb serialized ENABLED state. * @throws DeserializationException */ - private static boolean isStateEnabled(final byte[] bytes) throws DeserializationException { + public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException { ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes); return ZooKeeperProtos.ReplicationState.State.ENABLED == state; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java index 21f9ee49d0e..2765815deec 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java @@ -76,11 +76,23 @@ public interface ReplicationPeers { /** * Get the replication status for the specified connected remote slave cluster. + * The value might be read from cache, so it is recommended to + * use {@link #getStatusOfPeerFromBackingStore(String)} + * if reading the state after enabling or disabling it. * @param peerId a short that identifies the cluster * @return true if replication is enabled, false otherwise. */ boolean getStatusOfConnectedPeer(String peerId); + /** + * Get the replication status for the specified remote slave cluster, which doesn't + * have to be connected. The state is read directly from the backing store. + * @param peerId a short that identifies the cluster + * @return true if replication is enabled, false otherwise. + * @throws IOException Throws if there's an error contacting the store + */ + boolean getStatusOfPeerFromBackingStore(String peerId) throws IOException; + /** * Get a set of all connected remote slave clusters. * @return set of peer ids diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index f0d6f143cc3..08dd8a4ffe2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -137,6 +137,21 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re return this.peerClusters.get(id).getPeerEnabled().get(); } + @Override + public boolean getStatusOfPeerFromBackingStore(String id) throws IOException { + if (!this.getAllPeerIds().contains(id)) { + throw new IllegalArgumentException("peer " + id + " doesn't exist"); + } + String peerStateZNode = getPeerStateNode(id); + try { + return ReplicationPeer.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode)); + } catch (KeeperException e) { + throw new IOException(e); + } catch (DeserializationException e) { + throw new IOException(e); + } + } + @Override public boolean connectToPeer(String peerId) throws IOException, KeeperException { if (peerClusters == null) { diff --git a/hbase-server/src/main/ruby/hbase/replication_admin.rb b/hbase-server/src/main/ruby/hbase/replication_admin.rb index c89488d105d..baa6aeaeda8 100644 --- a/hbase-server/src/main/ruby/hbase/replication_admin.rb +++ b/hbase-server/src/main/ruby/hbase/replication_admin.rb @@ -58,7 +58,7 @@ module Hbase #---------------------------------------------------------------------------------------------- # Get peer cluster state def get_peer_state(id) - @replication_admin.getPeerState(id) + @replication_admin.getPeerState(id) ? "ENABLED" : "DISABLED" end #---------------------------------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index df0af63f39c..0cc102aa40a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -27,6 +27,8 @@ import org.junit.experimental.categories.Category; import static org.junit.Assert.fail; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; /** * Unit testing of ReplicationAdmin @@ -86,12 +88,33 @@ public class TestReplicationAdmin { admin.addPeer(ID_SECOND, KEY_SECOND); } catch (IllegalStateException iae) { fail(); - // OK! } assertEquals(2, admin.getPeersCount()); // Remove the first peer we added admin.removePeer(ID_ONE); assertEquals(1, admin.getPeersCount()); + admin.removePeer(ID_SECOND); + assertEquals(0, admin.getPeersCount()); + } + + /** + * basic checks that when we add a peer that it is enabled, and that we can disable + * @throws Exception + */ + @Test + public void testEnableDisable() throws Exception { + admin.addPeer(ID_ONE, KEY_ONE); + assertEquals(1, admin.getPeersCount()); + assertTrue(admin.getPeerState(ID_ONE)); + admin.disablePeer(ID_ONE); + + assertFalse(admin.getPeerState(ID_ONE)); + try { + admin.getPeerState(ID_SECOND); + } catch (IllegalArgumentException iae) { + // OK! + } + admin.removePeer(ID_ONE); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index d79cc6051a1..c5b0a1ec5c1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -231,6 +231,10 @@ public abstract class TestReplicationStateBasic { } protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception { + // we can first check if the value was changed in the store, if it wasn't then fail right away + if (status != rp.getStatusOfPeerFromBackingStore(peerId)) { + fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK"); + } while (true) { if (status == rp.getStatusOfConnectedPeer(peerId)) { return;