From f94180a3e9820761d59be98a62db9d218a096e5b Mon Sep 17 00:00:00 2001 From: tedyu Date: Wed, 21 Dec 2016 08:27:45 -0800 Subject: [PATCH] HBASE-17341 Add a timeout during replication endpoint termination (Vincent Poon) --- .../regionserver/ReplicationSource.java | 6 ++- .../replication/TestReplicationSource.java | 54 +++++++++++++++++++ 2 files changed, 58 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index d60d45ed3f5..005a20864d8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -430,9 +430,11 @@ public class ReplicationSource extends Thread } if (future != null) { try { - future.get(); + future.get(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS); } catch (Exception e) { - LOG.warn("Got exception:" + e); + LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :" + + this.peerClusterZnode, + e); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java index 458819d12f4..9bf0e9338c0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java @@ -21,12 +21,18 @@ package org.apache.hadoop.hbase.replication; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.Waiter.Predicate; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; @@ -37,12 +43,16 @@ import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import static org.mockito.Mockito.mock; + @Category(MediumTests.class) public class TestReplicationSource { @@ -111,5 +121,49 @@ public class TestReplicationSource { reader.close(); } + /** + * Tests that {@link ReplicationSource#terminate(String)} will timeout properly + */ + @Test + public void testTerminateTimeout() throws Exception { + final ReplicationSource source = new ReplicationSource(); + ReplicationEndpoint replicationEndpoint = new HBaseInterClusterReplicationEndpoint() { + @Override + protected void doStart() { + notifyStarted(); + } + + @Override + protected void doStop() { + // not calling notifyStopped() here causes the caller of stop() to get a Future that never + // completes + } + }; + replicationEndpoint.start(); + ReplicationPeers mockPeers = mock(ReplicationPeers.class); + Configuration testConf = HBaseConfiguration.create(); + testConf.setInt("replication.source.maxretriesmultiplier", 1); + source.init(testConf, null, null, null, mockPeers, null, "testPeer", null, replicationEndpoint, + null); + ExecutorService executor = Executors.newSingleThreadExecutor(); + final Future future = executor.submit(new Runnable() { + + @Override + public void run() { + source.terminate("testing source termination"); + } + }); + long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000); + Waiter.waitFor(testConf, sleepForRetries * 2, new Predicate() { + + @Override + public boolean evaluate() throws Exception { + return future.isDone(); + } + + }); + + } + }