diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java index d74211ea374..91109cf76fa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java @@ -60,7 +60,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper } @Override - long getStartPosition() { + public long getStartPosition() { long startPosition = getRecoveredQueueStartPos(); int numRetries = 0; while (numRetries <= maxRetriesMultiplier) { @@ -79,32 +79,30 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper // normally has a position (unless the RS failed between 2 logs) private long getRecoveredQueueStartPos() { long startPosition = 0; - String peerClusterZnode = source.getQueueId(); + String peerClusterZNode = source.getQueueId(); try { - startPosition = this.replicationQueues.getWALPosition(source.getServerWALsBelongTo(), - peerClusterZnode, this.queue.peek().getName()); - if (LOG.isTraceEnabled()) { - LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position " + - startPosition); - } + startPosition = this.replicationQueues.getWALPosition(source.getServer().getServerName(), + peerClusterZNode, this.queue.peek().getName()); + LOG.trace("Recovered queue started with log {} at position {}", this.queue.peek(), + startPosition); } catch (ReplicationException e) { - terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e); + terminate("Couldn't get the position of this recovered queue " + peerClusterZNode, e); } return startPosition; } private void terminate(String reason, Exception cause) { if (cause == null) { - LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason); - + LOG.info("Closing worker for wal group {} because: {}", this.walGroupId, reason); } else { - LOG.error("Closing worker for wal group " + this.walGroupId - + " because an error occurred: " + reason, cause); + LOG.error( + "Closing worker for wal group " + this.walGroupId + " because an error occurred: " + reason, + cause); } entryReader.interrupt(); Threads.shutdown(entryReader, sleepForRetries); this.interrupt(); Threads.shutdown(this, sleepForRetries); - LOG.info("ReplicationSourceWorker " + this.getName() + " terminated"); + LOG.info("ReplicationSourceWorker {} terminated", this.getName()); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java similarity index 83% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 1bb361bf582..274ccabfbea 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase.replication; +package org.apache.hadoop.hbase.replication.regionserver; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -26,6 +26,7 @@ import java.util.OptionalLong; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -36,12 +37,20 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter.Predicate; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; +import org.apache.hadoop.hbase.replication.ReplicationPeer; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; +import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource; +import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSourceShipper; import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; @@ -54,6 +63,7 @@ import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALProvider; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -80,9 +90,6 @@ public class TestReplicationSource { private static Path logDir; private static Configuration conf = TEST_UTIL.getConfiguration(); - /** - * @throws java.lang.Exception - */ @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.startMiniDFSCluster(1); @@ -105,7 +112,6 @@ public class TestReplicationSource { * Sanity check that we can move logs around while we are reading * from them. Should this test fail, ReplicationSource would have a hard * time reading logs that are being archived. - * @throws Exception */ @Test public void testLogMoving() throws Exception{ @@ -185,15 +191,12 @@ public class TestReplicationSource { public boolean evaluate() throws Exception { return future.isDone(); } - }); - } /** * Tests that recovered queues are preserved on a regionserver shutdown. * See HBASE-18192 - * @throws Exception */ @Test public void testServerShutdownRecoveredQueue() throws Exception { @@ -216,7 +219,7 @@ public class TestReplicationSource { final String peerId = "TestPeer"; admin.addReplicationPeer(peerId, - new ReplicationPeerConfig().setClusterKey(TEST_UTIL_PEER.getClusterKey())); + ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL_PEER.getClusterKey()).build()); // Wait for replication sources to come up Waiter.waitFor(conf, 20000, new Waiter.Predicate() { @Override public boolean evaluate() throws Exception { @@ -292,5 +295,29 @@ public class TestReplicationSource { } } + // Test HBASE-20497 + @Test + public void testRecoveredReplicationSourceShipperGetPosition() throws Exception { + String walGroupId = "fake-wal-group-id"; + ServerName serverName = ServerName.valueOf("www.example.com", 12006, 1524679704418L); + ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L); + PriorityBlockingQueue queue = new PriorityBlockingQueue<>(); + queue.put(new Path("/www/html/test")); + RecoveredReplicationSource source = Mockito.mock(RecoveredReplicationSource.class); + Server server = Mockito.mock(Server.class); + Mockito.when(server.getServerName()).thenReturn(serverName); + Mockito.when(source.getServer()).thenReturn(server); + Mockito.when(source.getServerWALsBelongTo()).thenReturn(deadServer); + ReplicationQueueStorage storage = Mockito.mock(ReplicationQueueStorage.class); + Mockito.when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any())) + .thenReturn(1001L); + Mockito.when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any())) + .thenReturn(-1L); + conf.setInt("replication.source.maxretriesmultiplier", -1); + RecoveredReplicationSourceShipper shipper = + new RecoveredReplicationSourceShipper(conf, walGroupId, queue, source, storage); + Assert.assertEquals(1001L, shipper.getStartPosition()); + conf.unset("replication.source.maxretriesmultiplier"); + } }