diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java index 9f704efd77d..6ba041f8109 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java @@ -50,7 +50,7 @@ public class TestReplicationStatus extends TestReplicationBase { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestReplicationStatus.class); - private void insertRowsOnSource() throws IOException { + static void insertRowsOnSource() throws IOException { final byte[] qualName = Bytes.toBytes("q"); for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { Put p = new Put(Bytes.toBytes("row" + i)); @@ -124,35 +124,6 @@ public class TestReplicationStatus extends TestReplicationBase { assertEquals(PEER_ID2, rLoadSourceList.get(0).getPeerID()); } - @Test - public void testReplicationStatusSink() throws Exception { - try (Admin hbaseAdmin = UTIL2.getConnection().getAdmin()) { - ServerName server = UTIL2.getHBaseCluster().getRegionServer(0).getServerName(); - ReplicationLoadSink loadSink = getLatestSinkMetric(hbaseAdmin, server); - //First checks if status of timestamp of last applied op is same as RS start, since no edits - //were replicated yet - assertEquals(loadSink.getTimestampStarted(), loadSink.getTimestampsOfLastAppliedOp()); - //now insert some rows on source, so that it gets delivered to target - insertRowsOnSource(); - long wait = Waiter.waitFor(UTIL2.getConfiguration(), - 10000, new Waiter.Predicate() { - @Override - public boolean evaluate() throws Exception { - ReplicationLoadSink loadSink = getLatestSinkMetric(hbaseAdmin, server); - return loadSink.getTimestampsOfLastAppliedOp()>loadSink.getTimestampStarted(); - } - }); - //If wait is -1, we know predicate condition was never true - assertTrue(wait>=0); - } - } - - private ReplicationLoadSink getLatestSinkMetric(Admin admin, ServerName server) - throws IOException { - ClusterMetrics metrics = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)); - ServerMetrics sm = metrics.getLiveServerMetrics().get(server); - return sm.getReplicationLoadSink(); - } /** * Wait until Master shows metrics counts for ReplicationLoadSourceList that are * greater than greaterThan for serverName before diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSink.java new file mode 100644 index 00000000000..edc1817cce3 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSink.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.hbase.ClusterMetrics; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.ServerMetrics; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import java.io.IOException; +import java.util.EnumSet; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestReplicationStatusSink extends TestReplicationBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReplicationStatusSink.class); + + @Test + public void testReplicationStatusSink() throws Exception { + try (Admin admin = UTIL2.getConnection().getAdmin()) { + ServerName server = UTIL2.getHBaseCluster().getRegionServer(0).getServerName(); + ReplicationLoadSink loadSink = getLatestSinkMetric(admin, server); + //First checks if status of timestamp of last applied op is same as RS start, since no edits + //were replicated yet + Assert.assertEquals(loadSink.getTimestampStarted(), loadSink.getTimestampsOfLastAppliedOp()); + //now insert some rows on source, so that it gets delivered to target + TestReplicationStatus.insertRowsOnSource(); + long wait = + Waiter.waitFor(UTIL2.getConfiguration(), 10000, (Waiter.Predicate) () -> { + ReplicationLoadSink loadSink1 = getLatestSinkMetric(admin, server); + return loadSink1.getTimestampsOfLastAppliedOp() > loadSink1.getTimestampStarted(); + }); + Assert.assertNotEquals(-1, wait); + } + } + + private ReplicationLoadSink getLatestSinkMetric(Admin admin, ServerName server) + throws IOException { + ClusterMetrics metrics = + admin.getClusterMetrics(EnumSet.of(ClusterMetrics.Option.LIVE_SERVERS)); + ServerMetrics sm = metrics.getLiveServerMetrics().get(server); + return sm.getReplicationLoadSink(); + } + +}