diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index e993a78d195..3091ef3bce4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
@@ -87,6 +88,8 @@ public class TestReplicationBase {
protected static HBaseTestingUtility utility1;
protected static HBaseTestingUtility utility2;
+ protected static final int NUM_SLAVES1 = 2;
+ protected static final int NUM_SLAVES2 = 4;
protected static final int NB_ROWS_IN_BATCH = 100;
protected static final int NB_ROWS_IN_BIG_BATCH =
NB_ROWS_IN_BATCH * 10;
@@ -209,6 +212,13 @@ public class TestReplicationBase {
utility2 = new HBaseTestingUtility(conf2);
}
+ protected static void restartHBaseCluster(HBaseTestingUtility util, int numSlaves)
+ throws Exception {
+ util.shutdownMiniHBaseCluster();
+ util
+ .startMiniHBaseCluster(StartMiniClusterOption.builder().numRegionServers(numSlaves).build());
+ }
+
protected static void startClusters() throws Exception{
utility1.startMiniZKCluster();
MiniZooKeeperCluster miniZK = utility1.getZkCluster();
@@ -224,10 +234,10 @@ public class TestReplicationBase {
LOG.info("Setup second Zk");
CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1);
- utility1.startMiniCluster(2);
+ utility1.startMiniCluster(NUM_SLAVES1);
// Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks
// as a component in deciding maximum number of parallel batches to send to the peer cluster.
- utility2.startMiniCluster(4);
+ utility2.startMiniCluster(NUM_SLAVES2);
hbaseAdmin = ConnectionFactory.createConnection(conf1).getAdmin();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java
index c778f528c93..a305b665f22 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java
@@ -18,15 +18,14 @@
package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.EnumSet;
import java.util.List;
+import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.ClusterMetrics.Option;
-import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Put;
@@ -34,279 +33,69 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-@Category({ReplicationTests.class, MediumTests.class})
+@Category({ ReplicationTests.class, MediumTests.class })
public class TestReplicationStatus extends TestReplicationBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestReplicationStatus.class);
-
- private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStatus.class);
- private static final String PEER_ID = "2";
+ HBaseClassTestRule.forClass(TestReplicationStatus.class);
/**
- * Test for HBASE-9531
- * put a few rows into htable1, which should be replicated to htable2
- * create a ClusterStatus instance 'status' from HBaseAdmin
- * test : status.getLoad(server).getReplicationLoadSourceList()
+ * Test for HBASE-9531.
+ *
+ * put a few rows into htable1, which should be replicated to htable2
+ * create a ClusterStatus instance 'status' from HBaseAdmin
+ * test : status.getLoad(server).getReplicationLoadSourceList()
* test : status.getLoad(server).getReplicationLoadSink()
- * * @throws Exception
*/
@Test
public void testReplicationStatus() throws Exception {
- LOG.info("testReplicationStatus");
- utility2.shutdownMiniHBaseCluster();
- utility2.startMiniHBaseCluster(1,4);
- try (Admin hbaseAdmin = utility1.getConnection().getAdmin()) {
- // disable peer
- admin.disablePeer(PEER_ID);
+ Admin hbaseAdmin = utility1.getAdmin();
+ // disable peer
+ hbaseAdmin.disableReplicationPeer(PEER_ID2);
- final byte[] qualName = Bytes.toBytes("q");
- Put p;
+ final byte[] qualName = Bytes.toBytes("q");
+ Put p;
- for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
- p = new Put(Bytes.toBytes("row" + i));
- p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
- htable1.put(p);
- }
-
- ClusterStatus status = new ClusterStatus(hbaseAdmin.getClusterMetrics(
- EnumSet.of(Option.LIVE_SERVERS)));
-
- for (JVMClusterUtil.RegionServerThread thread : utility1.getHBaseCluster()
- .getRegionServerThreads()) {
- ServerName server = thread.getRegionServer().getServerName();
- ServerLoad sl = status.getLoad(server);
- List rLoadSourceList = sl.getReplicationLoadSourceList();
- ReplicationLoadSink rLoadSink = sl.getReplicationLoadSink();
-
- // check SourceList only has one entry, beacuse only has one peer
- assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() == 1));
- assertEquals(PEER_ID, rLoadSourceList.get(0).getPeerID());
-
- // check Sink exist only as it is difficult to verify the value on the fly
- assertTrue("failed to get ReplicationLoadSink.AgeOfLastShippedOp ",
- (rLoadSink.getAgeOfLastAppliedOp() >= 0));
- assertTrue("failed to get ReplicationLoadSink.TimeStampsOfLastAppliedOp ",
- (rLoadSink.getTimestampsOfLastAppliedOp() >= 0));
- }
-
- // Stop rs1, then the queue of rs1 will be transfered to rs0
- utility1.getHBaseCluster().getRegionServer(1).stop("Stop RegionServer");
- Thread.sleep(10000);
- status = new ClusterStatus(hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)));
- ServerName server = utility1.getHBaseCluster().getRegionServer(0).getServerName();
- ServerLoad sl = status.getLoad(server);
- List rLoadSourceList = sl.getReplicationLoadSourceList();
- // check SourceList still only has one entry
- assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() == 2));
- assertEquals(PEER_ID, rLoadSourceList.get(0).getPeerID());
- } finally {
- admin.enablePeer(PEER_ID);
- utility1.getHBaseCluster().getRegionServer(1).start();
- }
- }
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- //we need to perform initialisations from TestReplicationBase.setUpBeforeClass() on each
- //test here, so we override BeforeClass to do nothing and call
- // TestReplicationBase.setUpBeforeClass() from setup method
- TestReplicationBase.configureClusters();
- }
-
- @Before
- @Override
- public void setUpBase() throws Exception {
- TestReplicationBase.startClusters();
- super.setUpBase();
- }
-
- @After
- @Override
- public void tearDownBase() throws Exception {
- utility2.shutdownMiniCluster();
- utility1.shutdownMiniCluster();
- }
-
- @AfterClass
- public static void tearDownAfterClass(){
- //We need to override it here to avoid issues when trying to execute super class teardown
- }
-
- @Test
- public void testReplicationStatusSourceStartedTargetStoppedNoOps() throws Exception {
- utility2.shutdownMiniHBaseCluster();
- utility1.shutdownMiniHBaseCluster();
- utility1.startMiniHBaseCluster();
- Admin hbaseAdmin = utility1.getConnection().getAdmin();
- ServerName serverName = utility1.getHBaseCluster().
- getRegionServer(0).getServerName();
- Thread.sleep(10000);
- ClusterStatus status = new ClusterStatus(hbaseAdmin.
- getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)));
- List loadSources = status.getLiveServerMetrics().
- get(serverName).getReplicationLoadSourceList();
- assertEquals(1, loadSources.size());
- ReplicationLoadSource loadSource = loadSources.get(0);
- assertFalse(loadSource.hasEditsSinceRestart());
- assertEquals(0, loadSource.getTimestampOfLastShippedOp());
- assertEquals(0, loadSource.getReplicationLag());
- assertFalse(loadSource.isRecovered());
- }
-
- @Test
- public void testReplicationStatusSourceStartedTargetStoppedNewOp() throws Exception {
- utility2.shutdownMiniHBaseCluster();
- utility1.shutdownMiniHBaseCluster();
- utility1.startMiniHBaseCluster();
- Admin hbaseAdmin = utility1.getConnection().getAdmin();
- //add some values to source cluster
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
- Put p = new Put(Bytes.toBytes("row" + i));
- p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
+ p = new Put(Bytes.toBytes("row" + i));
+ p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
htable1.put(p);
}
- Thread.sleep(10000);
- ServerName serverName = utility1.getHBaseCluster().
- getRegionServer(0).getServerName();
- ClusterStatus status = new ClusterStatus(hbaseAdmin.
- getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)));
- List loadSources = status.getLiveServerMetrics().
- get(serverName).getReplicationLoadSourceList();
- assertEquals(1, loadSources.size());
- ReplicationLoadSource loadSource = loadSources.get(0);
- assertTrue(loadSource.hasEditsSinceRestart());
- assertEquals(0, loadSource.getTimestampOfLastShippedOp());
- assertTrue(loadSource.getReplicationLag()>0);
- assertFalse(loadSource.isRecovered());
- }
- @Test
- public void testReplicationStatusSourceStartedTargetStoppedWithRecovery() throws Exception {
- utility2.shutdownMiniHBaseCluster();
- utility1.shutdownMiniHBaseCluster();
- utility1.startMiniHBaseCluster();
- //add some values to cluster 1
- for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
- Put p = new Put(Bytes.toBytes("row" + i));
- p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
- htable1.put(p);
- }
- Thread.sleep(10000);
- utility1.shutdownMiniHBaseCluster();
- utility1.startMiniHBaseCluster();
- Admin hbaseAdmin = utility1.getConnection().getAdmin();
- ServerName serverName = utility1.getHBaseCluster().
- getRegionServer(0).getServerName();
- Thread.sleep(10000);
- ClusterStatus status = new ClusterStatus(hbaseAdmin.
- getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)));
- List loadSources = status.getLiveServerMetrics().
- get(serverName).getReplicationLoadSourceList();
- assertEquals(2, loadSources.size());
- boolean foundRecovery = false;
- boolean foundNormal = false;
- for(ReplicationLoadSource loadSource : loadSources){
- if (loadSource.isRecovered()){
- foundRecovery = true;
- assertTrue(loadSource.hasEditsSinceRestart());
- assertEquals(0, loadSource.getTimestampOfLastShippedOp());
- assertTrue(loadSource.getReplicationLag()>0);
- } else {
- foundNormal = true;
- assertFalse(loadSource.hasEditsSinceRestart());
- assertEquals(0, loadSource.getTimestampOfLastShippedOp());
- assertEquals(0, loadSource.getReplicationLag());
- }
- }
- assertTrue("No normal queue found.", foundNormal);
- assertTrue("No recovery queue found.", foundRecovery);
- }
+ ClusterMetrics metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
- @Test
- public void testReplicationStatusBothNormalAndRecoveryLagging() throws Exception {
- utility2.shutdownMiniHBaseCluster();
- utility1.shutdownMiniHBaseCluster();
- utility1.startMiniHBaseCluster();
- //add some values to cluster 1
- for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
- Put p = new Put(Bytes.toBytes("row" + i));
- p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
- htable1.put(p);
- }
- Thread.sleep(10000);
- utility1.shutdownMiniHBaseCluster();
- utility1.startMiniHBaseCluster();
- Admin hbaseAdmin = utility1.getConnection().getAdmin();
- ServerName serverName = utility1.getHBaseCluster().
- getRegionServer(0).getServerName();
- Thread.sleep(10000);
- //add more values to cluster 1, these should cause normal queue to lag
- for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
- Put p = new Put(Bytes.toBytes("row" + i));
- p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
- htable1.put(p);
- }
- Thread.sleep(10000);
- ClusterStatus status = new ClusterStatus(hbaseAdmin.
- getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)));
- List loadSources = status.getLiveServerMetrics().
- get(serverName).getReplicationLoadSourceList();
- assertEquals(2, loadSources.size());
- boolean foundRecovery = false;
- boolean foundNormal = false;
- for(ReplicationLoadSource loadSource : loadSources){
- if (loadSource.isRecovered()){
- foundRecovery = true;
- } else {
- foundNormal = true;
- }
- assertTrue(loadSource.hasEditsSinceRestart());
- assertEquals(0, loadSource.getTimestampOfLastShippedOp());
- assertTrue(loadSource.getReplicationLag()>0);
- }
- assertTrue("No normal queue found.", foundNormal);
- assertTrue("No recovery queue found.", foundRecovery);
- }
+ for (JVMClusterUtil.RegionServerThread thread : utility1.getHBaseCluster()
+ .getRegionServerThreads()) {
+ ServerName server = thread.getRegionServer().getServerName();
+ ServerMetrics sm = metrics.getLiveServerMetrics().get(server);
+ List rLoadSourceList = sm.getReplicationLoadSourceList();
+ ReplicationLoadSink rLoadSink = sm.getReplicationLoadSink();
- @Test
- public void testReplicationStatusAfterLagging() throws Exception {
- utility2.shutdownMiniHBaseCluster();
- utility1.shutdownMiniHBaseCluster();
- utility1.startMiniHBaseCluster();
- //add some values to cluster 1
- for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
- Put p = new Put(Bytes.toBytes("row" + i));
- p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
- htable1.put(p);
+ // check SourceList only has one entry, because only has one peer
+ assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() == 1));
+ assertEquals(PEER_ID2, rLoadSourceList.get(0).getPeerID());
+
+ // check Sink exist only as it is difficult to verify the value on the fly
+ assertTrue("failed to get ReplicationLoadSink.AgeOfLastShippedOp ",
+ (rLoadSink.getAgeOfLastAppliedOp() >= 0));
+ assertTrue("failed to get ReplicationLoadSink.TimeStampsOfLastAppliedOp ",
+ (rLoadSink.getTimestampsOfLastAppliedOp() >= 0));
}
- utility2.startMiniHBaseCluster();
+
+ // Stop rs1, then the queue of rs1 will be transfered to rs0
+ utility1.getHBaseCluster().getRegionServer(1).stop("Stop RegionServer");
Thread.sleep(10000);
- try(Admin hbaseAdmin = utility1.getConnection().getAdmin()) {
- ServerName serverName = utility1.getHBaseCluster().getRegionServer(0).
- getServerName();
- ClusterStatus status =
- new ClusterStatus(hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)));
- List loadSources = status.getLiveServerMetrics().get(serverName).
- getReplicationLoadSourceList();
- assertEquals(1, loadSources.size());
- ReplicationLoadSource loadSource = loadSources.get(0);
- assertTrue(loadSource.hasEditsSinceRestart());
- assertTrue(loadSource.getTimestampOfLastShippedOp() > 0);
- assertEquals(0, loadSource.getReplicationLag());
- }finally{
- utility2.shutdownMiniHBaseCluster();
- }
+ metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
+ ServerName server = utility1.getHBaseCluster().getRegionServer(0).getServerName();
+ ServerMetrics sm = metrics.getLiveServerMetrics().get(server);
+ List rLoadSourceList = sm.getReplicationLoadSourceList();
+ // check SourceList still only has one entry
+ assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() == 2));
+ assertEquals(PEER_ID2, rLoadSourceList.get(0).getPeerID());
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusAfterLagging.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusAfterLagging.java
new file mode 100644
index 00000000000..1993d349d24
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusAfterLagging.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.EnumSet;
+import java.util.List;
+import org.apache.hadoop.hbase.ClusterMetrics;
+import org.apache.hadoop.hbase.ClusterMetrics.Option;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestReplicationStatusAfterLagging extends TestReplicationBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestReplicationStatusAfterLagging.class);
+
+ @Test
+ public void testReplicationStatusAfterLagging() throws Exception {
+ utility2.shutdownMiniHBaseCluster();
+ restartHBaseCluster(utility1, NUM_SLAVES1);
+ // add some values to cluster 1
+ for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+ Put p = new Put(Bytes.toBytes("row" + i));
+ p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
+ htable1.put(p);
+ }
+ utility2.startMiniHBaseCluster();
+ Thread.sleep(10000);
+ Admin hbaseAdmin = utility1.getAdmin();
+ ServerName serverName = utility1.getHBaseCluster().getRegionServer(0).getServerName();
+ ClusterMetrics metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
+ List loadSources =
+ metrics.getLiveServerMetrics().get(serverName).getReplicationLoadSourceList();
+ assertEquals(1, loadSources.size());
+ ReplicationLoadSource loadSource = loadSources.get(0);
+ assertTrue(loadSource.hasEditsSinceRestart());
+ assertTrue(loadSource.getTimestampOfLastShippedOp() > 0);
+ assertEquals(0, loadSource.getReplicationLag());
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusBothNormalAndRecoveryLagging.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusBothNormalAndRecoveryLagging.java
new file mode 100644
index 00000000000..4bb41f8265c
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusBothNormalAndRecoveryLagging.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.EnumSet;
+import java.util.List;
+import org.apache.hadoop.hbase.ClusterMetrics;
+import org.apache.hadoop.hbase.ClusterMetrics.Option;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestReplicationStatusBothNormalAndRecoveryLagging extends TestReplicationBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestReplicationStatusBothNormalAndRecoveryLagging.class);
+
+ @Test
+ public void testReplicationStatusBothNormalAndRecoveryLagging() throws Exception {
+ utility2.shutdownMiniHBaseCluster();
+ // add some values to cluster 1
+ for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+ Put p = new Put(Bytes.toBytes("row" + i));
+ p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
+ htable1.put(p);
+ }
+ Thread.sleep(10000);
+ restartHBaseCluster(utility1, NUM_SLAVES1);
+ Admin hbaseAdmin = utility1.getAdmin();
+ ServerName serverName = utility1.getHBaseCluster().getRegionServer(0).getServerName();
+ Thread.sleep(10000);
+ // add more values to cluster 1, these should cause normal queue to lag
+ for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+ Put p = new Put(Bytes.toBytes("row" + i));
+ p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
+ htable1.put(p);
+ }
+ Thread.sleep(10000);
+ ClusterMetrics metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
+ List loadSources =
+ metrics.getLiveServerMetrics().get(serverName).getReplicationLoadSourceList();
+ assertEquals(2, loadSources.size());
+ boolean foundRecovery = false;
+ boolean foundNormal = false;
+ for (ReplicationLoadSource loadSource : loadSources) {
+ if (loadSource.isRecovered()) {
+ foundRecovery = true;
+ } else {
+ foundNormal = true;
+ }
+ assertTrue(loadSource.hasEditsSinceRestart());
+ assertEquals(0, loadSource.getTimestampOfLastShippedOp());
+ assertTrue(loadSource.getReplicationLag() > 0);
+ }
+ assertTrue("No normal queue found.", foundNormal);
+ assertTrue("No recovery queue found.", foundRecovery);
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNewOp.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNewOp.java
new file mode 100644
index 00000000000..fb3f16bb6b7
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNewOp.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.EnumSet;
+import java.util.List;
+import org.apache.hadoop.hbase.ClusterMetrics;
+import org.apache.hadoop.hbase.ClusterMetrics.Option;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestReplicationStatusSourceStartedTargetStoppedNewOp extends TestReplicationBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestReplicationStatusSourceStartedTargetStoppedNewOp.class);
+
+ @Test
+ public void testReplicationStatusSourceStartedTargetStoppedNewOp() throws Exception {
+ utility2.shutdownMiniHBaseCluster();
+ restartHBaseCluster(utility1, NUM_SLAVES1);
+ Admin hbaseAdmin = utility1.getAdmin();
+ // add some values to source cluster
+ for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+ Put p = new Put(Bytes.toBytes("row" + i));
+ p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
+ htable1.put(p);
+ }
+ Thread.sleep(10000);
+ ServerName serverName = utility1.getHBaseCluster().getRegionServer(0).getServerName();
+ ClusterMetrics metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
+ List loadSources =
+ metrics.getLiveServerMetrics().get(serverName).getReplicationLoadSourceList();
+ assertEquals(1, loadSources.size());
+ ReplicationLoadSource loadSource = loadSources.get(0);
+ assertTrue(loadSource.hasEditsSinceRestart());
+ assertEquals(0, loadSource.getTimestampOfLastShippedOp());
+ assertTrue(loadSource.getReplicationLag() > 0);
+ assertFalse(loadSource.isRecovered());
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNoOps.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNoOps.java
new file mode 100644
index 00000000000..76d12b29b59
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNoOps.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.util.EnumSet;
+import java.util.List;
+import org.apache.hadoop.hbase.ClusterMetrics;
+import org.apache.hadoop.hbase.ClusterMetrics.Option;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestReplicationStatusSourceStartedTargetStoppedNoOps extends TestReplicationBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestReplicationStatusSourceStartedTargetStoppedNoOps.class);
+
+ @Test
+ public void c() throws Exception {
+ utility2.shutdownMiniHBaseCluster();
+ restartHBaseCluster(utility1, NUM_SLAVES1);
+ Admin hbaseAdmin = utility1.getAdmin();
+ ServerName serverName = utility1.getHBaseCluster().getRegionServer(0).getServerName();
+ Thread.sleep(10000);
+ ClusterMetrics metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
+ List loadSources =
+ metrics.getLiveServerMetrics().get(serverName).getReplicationLoadSourceList();
+ assertEquals(1, loadSources.size());
+ ReplicationLoadSource loadSource = loadSources.get(0);
+ assertFalse(loadSource.hasEditsSinceRestart());
+ assertEquals(0, loadSource.getTimestampOfLastShippedOp());
+ assertEquals(0, loadSource.getReplicationLag());
+ assertFalse(loadSource.isRecovered());
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedWithRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedWithRecovery.java
new file mode 100644
index 00000000000..800fa822d96
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedWithRecovery.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.EnumSet;
+import java.util.List;
+import org.apache.hadoop.hbase.ClusterMetrics;
+import org.apache.hadoop.hbase.ClusterMetrics.Option;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestReplicationStatusSourceStartedTargetStoppedWithRecovery
+ extends TestReplicationBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestReplicationStatusSourceStartedTargetStoppedWithRecovery.class);
+
+ @Test
+ public void testReplicationStatusSourceStartedTargetStoppedWithRecovery() throws Exception {
+ utility2.shutdownMiniHBaseCluster();
+ // add some values to cluster 1
+ for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+ Put p = new Put(Bytes.toBytes("row" + i));
+ p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
+ htable1.put(p);
+ }
+ Thread.sleep(10000);
+ restartHBaseCluster(utility1, NUM_SLAVES1);
+ Admin hbaseAdmin = utility1.getAdmin();
+ ServerName serverName = utility1.getHBaseCluster().getRegionServer(0).getServerName();
+ Thread.sleep(10000);
+ ClusterMetrics metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
+ List loadSources =
+ metrics.getLiveServerMetrics().get(serverName).getReplicationLoadSourceList();
+ assertEquals(2, loadSources.size());
+ boolean foundRecovery = false;
+ boolean foundNormal = false;
+ for (ReplicationLoadSource loadSource : loadSources) {
+ if (loadSource.isRecovered()) {
+ foundRecovery = true;
+ assertTrue(loadSource.hasEditsSinceRestart());
+ assertEquals(0, loadSource.getTimestampOfLastShippedOp());
+ assertTrue(loadSource.getReplicationLag() > 0);
+ } else {
+ foundNormal = true;
+ assertFalse(loadSource.hasEditsSinceRestart());
+ assertEquals(0, loadSource.getTimestampOfLastShippedOp());
+ assertEquals(0, loadSource.getReplicationLag());
+ }
+ }
+ assertTrue("No normal queue found.", foundNormal);
+ assertTrue("No recovery queue found.", foundRecovery);
+ }
+}