HBASE-17341 Add a timeout during replication endpoint termination (Vincent Poon)

This commit is contained in:
tedyu 2016-12-21 08:27:45 -08:00
parent e9444ed077
commit f94180a3e9
2 changed files with 58 additions and 2 deletions

View File

@ -430,9 +430,11 @@ public class ReplicationSource extends Thread
}
if (future != null) {
try {
future.get();
future.get(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS);
} catch (Exception e) {
LOG.warn("Got exception:" + e);
LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :"
+ this.peerClusterZnode,
e);
}
}
}

View File

@ -21,12 +21,18 @@ package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.Waiter.Predicate;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
@ -37,12 +43,16 @@ import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.mockito.Mockito.mock;
@Category(MediumTests.class)
public class TestReplicationSource {
@ -111,5 +121,49 @@ public class TestReplicationSource {
reader.close();
}
/**
* Tests that {@link ReplicationSource#terminate(String)} will timeout properly
*/
@Test
public void testTerminateTimeout() throws Exception {
final ReplicationSource source = new ReplicationSource();
ReplicationEndpoint replicationEndpoint = new HBaseInterClusterReplicationEndpoint() {
@Override
protected void doStart() {
notifyStarted();
}
@Override
protected void doStop() {
// not calling notifyStopped() here causes the caller of stop() to get a Future that never
// completes
}
};
replicationEndpoint.start();
ReplicationPeers mockPeers = mock(ReplicationPeers.class);
Configuration testConf = HBaseConfiguration.create();
testConf.setInt("replication.source.maxretriesmultiplier", 1);
source.init(testConf, null, null, null, mockPeers, null, "testPeer", null, replicationEndpoint,
null);
ExecutorService executor = Executors.newSingleThreadExecutor();
final Future<?> future = executor.submit(new Runnable() {
@Override
public void run() {
source.terminate("testing source termination");
}
});
long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000);
Waiter.waitFor(testConf, sleepForRetries * 2, new Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return future.isDone();
}
});
}
}