HBASE-24877 Add option to avoid aborting RS process upon uncaught exceptions happen on replication source (#2399)
Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
b862c3e9c9
commit
5d65bd45fa
|
@ -34,8 +34,10 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.PriorityBlockingQueue;
|
import java.util.concurrent.PriorityBlockingQueue;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -121,6 +123,16 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
||||||
// ReplicationEndpoint which will handle the actual replication
|
// ReplicationEndpoint which will handle the actual replication
|
||||||
private volatile ReplicationEndpoint replicationEndpoint;
|
private volatile ReplicationEndpoint replicationEndpoint;
|
||||||
|
|
||||||
|
private boolean abortOnError;
|
||||||
|
//This is needed for the startup loop to identify when there's already
|
||||||
|
//an initialization happening (but not finished yet),
|
||||||
|
//so that it doesn't try submit another initialize thread.
|
||||||
|
//NOTE: this should only be set to false at the end of initialize method, prior to return.
|
||||||
|
private AtomicBoolean startupOngoing = new AtomicBoolean(false);
|
||||||
|
//Flag that signalizes uncaught error happening while starting up the source
|
||||||
|
//and a retry should be attempted
|
||||||
|
private AtomicBoolean retryStartup = new AtomicBoolean(false);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A filter (or a chain of filters) for WAL entries; filters out edits.
|
* A filter (or a chain of filters) for WAL entries; filters out edits.
|
||||||
*/
|
*/
|
||||||
|
@ -131,6 +143,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
||||||
private long defaultBandwidth;
|
private long defaultBandwidth;
|
||||||
private long currentBandwidth;
|
private long currentBandwidth;
|
||||||
private WALFileLengthProvider walFileLengthProvider;
|
private WALFileLengthProvider walFileLengthProvider;
|
||||||
|
@VisibleForTesting
|
||||||
protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
|
protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
|
||||||
new ConcurrentHashMap<>();
|
new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
@ -219,6 +232,10 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
||||||
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
|
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
|
||||||
this.totalBufferUsed = manager.getTotalBufferUsed();
|
this.totalBufferUsed = manager.getTotalBufferUsed();
|
||||||
this.walFileLengthProvider = walFileLengthProvider;
|
this.walFileLengthProvider = walFileLengthProvider;
|
||||||
|
|
||||||
|
this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort",
|
||||||
|
true);
|
||||||
|
|
||||||
LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId,
|
LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId,
|
||||||
replicationPeer.getId(), this.currentBandwidth);
|
replicationPeer.getId(), this.currentBandwidth);
|
||||||
}
|
}
|
||||||
|
@ -244,6 +261,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
||||||
new AbstractFSWALProvider.WALStartTimeComparator());
|
new AbstractFSWALProvider.WALStartTimeComparator());
|
||||||
// make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise
|
// make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise
|
||||||
// the shipper may quit immediately
|
// the shipper may quit immediately
|
||||||
|
queue.put(wal);
|
||||||
queues.put(walPrefix, queue);
|
queues.put(walPrefix, queue);
|
||||||
if (this.isSourceActive() && this.walEntryFilter != null) {
|
if (this.isSourceActive() && this.walEntryFilter != null) {
|
||||||
// new wal group observed after source startup, start a new worker thread to track it
|
// new wal group observed after source startup, start a new worker thread to track it
|
||||||
|
@ -251,8 +269,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
||||||
// still not launched, so it's necessary to check workerThreads before start the worker
|
// still not launched, so it's necessary to check workerThreads before start the worker
|
||||||
tryStartNewShipper(walPrefix, queue);
|
tryStartNewShipper(walPrefix, queue);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
queue.put(wal);
|
||||||
}
|
}
|
||||||
queue.put(wal);
|
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), walPrefix,
|
LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), walPrefix,
|
||||||
this.replicationQueueInfo.getQueueId());
|
this.replicationQueueInfo.getQueueId());
|
||||||
|
@ -357,19 +376,30 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
|
private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
|
||||||
ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
|
workerThreads.compute(walGroupId, (key, value) -> {
|
||||||
ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
|
if (value != null) {
|
||||||
if (extant != null) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("{} preempted start of worker walGroupId={}", logPeerId(), walGroupId);
|
LOG.debug(
|
||||||
} else {
|
"{} Someone has beat us to start a worker thread for wal group {}",
|
||||||
LOG.debug("{} starting worker for walGroupId={}", logPeerId(), walGroupId);
|
logPeerId(), key);
|
||||||
ReplicationSourceWALReader walReader =
|
}
|
||||||
createNewWALReader(walGroupId, queue, worker.getStartPosition());
|
return value;
|
||||||
Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName() +
|
} else {
|
||||||
".replicationSource.wal-reader." + walGroupId + "," + queueId, this::uncaughtException);
|
if (LOG.isDebugEnabled()) {
|
||||||
worker.setWALReader(walReader);
|
LOG.debug("{} Starting up worker for wal group {}", logPeerId(), key);
|
||||||
worker.startup(this::uncaughtException);
|
}
|
||||||
}
|
ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
|
||||||
|
ReplicationSourceWALReader walReader =
|
||||||
|
createNewWALReader(walGroupId, queue, worker.getStartPosition());
|
||||||
|
Threads.setDaemonThreadRunning(
|
||||||
|
walReader, Thread.currentThread().getName()
|
||||||
|
+ ".replicationSource.wal-reader." + walGroupId + "," + queueId,
|
||||||
|
(t,e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
|
||||||
|
worker.setWALReader(walReader);
|
||||||
|
worker.startup((t,e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
|
||||||
|
return worker;
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -440,11 +470,28 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
||||||
return walEntryFilter;
|
return walEntryFilter;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected final void uncaughtException(Thread t, Throwable e) {
|
protected final void uncaughtException(Thread t, Throwable e,
|
||||||
|
ReplicationSourceManager manager, String peerId) {
|
||||||
RSRpcServices.exitIfOOME(e);
|
RSRpcServices.exitIfOOME(e);
|
||||||
LOG.error("Unexpected exception in {} currentPath={}",
|
LOG.error("Unexpected exception in {} currentPath={}",
|
||||||
t.getName(), getCurrentPath(), e);
|
t.getName(), getCurrentPath(), e);
|
||||||
server.abort("Unexpected exception in " + t.getName(), e);
|
if(abortOnError){
|
||||||
|
server.abort("Unexpected exception in " + t.getName(), e);
|
||||||
|
}
|
||||||
|
if(manager != null){
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
LOG.info("Refreshing replication sources now due to previous error on thread: {}",
|
||||||
|
t.getName());
|
||||||
|
manager.refreshSources(peerId);
|
||||||
|
break;
|
||||||
|
} catch (IOException e1) {
|
||||||
|
LOG.error("Replication sources refresh failed.", e1);
|
||||||
|
sleepForRetries("Sleeping before try refreshing sources again",
|
||||||
|
maxRetriesMultiplier);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -543,12 +590,18 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
||||||
replicationEndpoint.stop();
|
replicationEndpoint.stop();
|
||||||
if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) {
|
if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) {
|
||||||
sleepMultiplier++;
|
sleepMultiplier++;
|
||||||
|
} else {
|
||||||
|
retryStartup.set(!this.abortOnError);
|
||||||
|
this.startupOngoing.set(false);
|
||||||
|
throw new RuntimeException("Exhausted retries to start replication endpoint.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!this.isSourceActive()) {
|
if (!this.isSourceActive()) {
|
||||||
return;
|
retryStartup.set(!this.abortOnError);
|
||||||
|
this.startupOngoing.set(false);
|
||||||
|
throw new IllegalStateException("Source should be active.");
|
||||||
}
|
}
|
||||||
|
|
||||||
sleepMultiplier = 1;
|
sleepMultiplier = 1;
|
||||||
|
@ -569,8 +622,10 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!this.isSourceActive()) {
|
if(!this.isSourceActive()) {
|
||||||
return;
|
retryStartup.set(!this.abortOnError);
|
||||||
|
this.startupOngoing.set(false);
|
||||||
|
throw new IllegalStateException("Source should be active.");
|
||||||
}
|
}
|
||||||
LOG.info("{} queueId={} is replicating from cluster={} to cluster={}",
|
LOG.info("{} queueId={} is replicating from cluster={} to cluster={}",
|
||||||
logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId);
|
logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId);
|
||||||
|
@ -582,6 +637,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
||||||
PriorityBlockingQueue<Path> queue = entry.getValue();
|
PriorityBlockingQueue<Path> queue = entry.getValue();
|
||||||
tryStartNewShipper(walGroupId, queue);
|
tryStartNewShipper(walGroupId, queue);
|
||||||
}
|
}
|
||||||
|
this.startupOngoing.set(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -591,10 +647,32 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
||||||
}
|
}
|
||||||
// Mark we are running now
|
// Mark we are running now
|
||||||
this.sourceRunning = true;
|
this.sourceRunning = true;
|
||||||
|
startupOngoing.set(true);
|
||||||
initThread = new Thread(this::initialize);
|
initThread = new Thread(this::initialize);
|
||||||
Threads.setDaemonThreadRunning(initThread,
|
Threads.setDaemonThreadRunning(initThread,
|
||||||
Thread.currentThread().getName() + ".replicationSource," + this.queueId,
|
Thread.currentThread().getName() + ".replicationSource," + this.queueId,
|
||||||
this::uncaughtException);
|
(t,e) -> {
|
||||||
|
//if first initialization attempt failed, and abortOnError is false, we will
|
||||||
|
//keep looping in this thread until initialize eventually succeeds,
|
||||||
|
//while the server main startup one can go on with its work.
|
||||||
|
sourceRunning = false;
|
||||||
|
uncaughtException(t, e, null, null);
|
||||||
|
retryStartup.set(!this.abortOnError);
|
||||||
|
do {
|
||||||
|
if(retryStartup.get()) {
|
||||||
|
this.sourceRunning = true;
|
||||||
|
startupOngoing.set(true);
|
||||||
|
retryStartup.set(false);
|
||||||
|
try {
|
||||||
|
initialize();
|
||||||
|
} catch(Throwable error){
|
||||||
|
sourceRunning = false;
|
||||||
|
uncaughtException(t, error, null, null);
|
||||||
|
retryStartup.set(!this.abortOnError);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -280,7 +280,8 @@ public class ReplicationSourceShipper extends Thread {
|
||||||
public void startup(UncaughtExceptionHandler handler) {
|
public void startup(UncaughtExceptionHandler handler) {
|
||||||
String name = Thread.currentThread().getName();
|
String name = Thread.currentThread().getName();
|
||||||
Threads.setDaemonThreadRunning(this,
|
Threads.setDaemonThreadRunning(this,
|
||||||
name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), handler);
|
name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(),
|
||||||
|
handler::uncaughtException);
|
||||||
}
|
}
|
||||||
|
|
||||||
Path getCurrentPath() {
|
Path getCurrentPath() {
|
||||||
|
|
|
@ -18,9 +18,13 @@
|
||||||
package org.apache.hadoop.hbase.replication.regionserver;
|
package org.apache.hadoop.hbase.replication.regionserver;
|
||||||
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID;
|
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.OptionalLong;
|
import java.util.OptionalLong;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
@ -118,15 +122,15 @@ public class TestReplicationSource {
|
||||||
ReplicationSource rs = new ReplicationSource();
|
ReplicationSource rs = new ReplicationSource();
|
||||||
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
||||||
conf.setInt("replication.source.maxretriesmultiplier", 1);
|
conf.setInt("replication.source.maxretriesmultiplier", 1);
|
||||||
ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
|
ReplicationPeer mockPeer = mock(ReplicationPeer.class);
|
||||||
Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
|
when(mockPeer.getConfiguration()).thenReturn(conf);
|
||||||
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
|
when(mockPeer.getPeerBandwidth()).thenReturn(0L);
|
||||||
ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
|
ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
|
||||||
Mockito.when(peerConfig.getReplicationEndpointImpl()).
|
when(peerConfig.getReplicationEndpointImpl()).
|
||||||
thenReturn(DoNothingReplicationEndpoint.class.getName());
|
thenReturn(DoNothingReplicationEndpoint.class.getName());
|
||||||
Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
|
when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
|
||||||
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
|
ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
|
||||||
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
|
when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
|
||||||
String queueId = "qid";
|
String queueId = "qid";
|
||||||
RegionServerServices rss =
|
RegionServerServices rss =
|
||||||
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
|
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
|
||||||
|
@ -156,15 +160,15 @@ public class TestReplicationSource {
|
||||||
ReplicationSource rs = new ReplicationSource();
|
ReplicationSource rs = new ReplicationSource();
|
||||||
UUID uuid = UUID.randomUUID();
|
UUID uuid = UUID.randomUUID();
|
||||||
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
||||||
ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
|
ReplicationPeer mockPeer = mock(ReplicationPeer.class);
|
||||||
Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
|
when(mockPeer.getConfiguration()).thenReturn(conf);
|
||||||
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
|
when(mockPeer.getPeerBandwidth()).thenReturn(0L);
|
||||||
ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
|
ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
|
||||||
Mockito.when(peerConfig.getReplicationEndpointImpl()).
|
when(peerConfig.getReplicationEndpointImpl()).
|
||||||
thenReturn(DoNothingReplicationEndpoint.class.getName());
|
thenReturn(DoNothingReplicationEndpoint.class.getName());
|
||||||
Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
|
when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
|
||||||
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
|
ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
|
||||||
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
|
when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
|
||||||
String queueId = "qid";
|
String queueId = "qid";
|
||||||
RegionServerServices rss =
|
RegionServerServices rss =
|
||||||
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
|
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
|
||||||
|
@ -250,12 +254,12 @@ public class TestReplicationSource {
|
||||||
replicationEndpoint = new DoNothingReplicationEndpoint();
|
replicationEndpoint = new DoNothingReplicationEndpoint();
|
||||||
try {
|
try {
|
||||||
replicationEndpoint.start();
|
replicationEndpoint.start();
|
||||||
ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
|
ReplicationPeer mockPeer = mock(ReplicationPeer.class);
|
||||||
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
|
when(mockPeer.getPeerBandwidth()).thenReturn(0L);
|
||||||
Configuration testConf = HBaseConfiguration.create();
|
Configuration testConf = HBaseConfiguration.create();
|
||||||
testConf.setInt("replication.source.maxretriesmultiplier", 1);
|
testConf.setInt("replication.source.maxretriesmultiplier", 1);
|
||||||
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
|
ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
|
||||||
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
|
when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
|
||||||
source.init(testConf, null, manager, null, mockPeer, null, "testPeer",
|
source.init(testConf, null, manager, null, mockPeer, null, "testPeer",
|
||||||
null, p -> OptionalLong.empty(), null);
|
null, p -> OptionalLong.empty(), null);
|
||||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||||
|
@ -396,6 +400,36 @@ public class TestReplicationSource {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deadend Endpoint. Does nothing.
|
||||||
|
*/
|
||||||
|
public static class FlakyReplicationEndpoint extends DoNothingReplicationEndpoint {
|
||||||
|
|
||||||
|
static int count = 0;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized UUID getPeerUUID() {
|
||||||
|
if(count==0) {
|
||||||
|
count++;
|
||||||
|
throw new RuntimeException();
|
||||||
|
} else {
|
||||||
|
return super.getPeerUUID();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint {
|
||||||
|
|
||||||
|
static int count = 0;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized UUID getPeerUUID() {
|
||||||
|
throw new RuntimeException();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test HBASE-20497
|
* Test HBASE-20497
|
||||||
* Moved here from TestReplicationSource because doesn't need cluster.
|
* Moved here from TestReplicationSource because doesn't need cluster.
|
||||||
|
@ -407,15 +441,15 @@ public class TestReplicationSource {
|
||||||
ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L);
|
ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L);
|
||||||
PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
|
PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
|
||||||
queue.put(new Path("/www/html/test"));
|
queue.put(new Path("/www/html/test"));
|
||||||
RecoveredReplicationSource source = Mockito.mock(RecoveredReplicationSource.class);
|
RecoveredReplicationSource source = mock(RecoveredReplicationSource.class);
|
||||||
Server server = Mockito.mock(Server.class);
|
Server server = mock(Server.class);
|
||||||
Mockito.when(server.getServerName()).thenReturn(serverName);
|
when(server.getServerName()).thenReturn(serverName);
|
||||||
Mockito.when(source.getServer()).thenReturn(server);
|
when(source.getServer()).thenReturn(server);
|
||||||
Mockito.when(source.getServerWALsBelongTo()).thenReturn(deadServer);
|
when(source.getServerWALsBelongTo()).thenReturn(deadServer);
|
||||||
ReplicationQueueStorage storage = Mockito.mock(ReplicationQueueStorage.class);
|
ReplicationQueueStorage storage = mock(ReplicationQueueStorage.class);
|
||||||
Mockito.when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any()))
|
when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any()))
|
||||||
.thenReturn(1001L);
|
.thenReturn(1001L);
|
||||||
Mockito.when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any()))
|
when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any()))
|
||||||
.thenReturn(-1L);
|
.thenReturn(-1L);
|
||||||
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
||||||
conf.setInt("replication.source.maxretriesmultiplier", -1);
|
conf.setInt("replication.source.maxretriesmultiplier", -1);
|
||||||
|
@ -423,5 +457,90 @@ public class TestReplicationSource {
|
||||||
new RecoveredReplicationSourceShipper(conf, walGroupId, queue, source, storage);
|
new RecoveredReplicationSourceShipper(conf, walGroupId, queue, source, storage);
|
||||||
assertEquals(1001L, shipper.getStartPosition());
|
assertEquals(1001L, shipper.getStartPosition());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private RegionServerServices setupForAbortTests(ReplicationSource rs, Configuration conf,
|
||||||
|
String endpointName) throws IOException {
|
||||||
|
conf.setInt("replication.source.maxretriesmultiplier", 1);
|
||||||
|
ReplicationPeer mockPeer = mock(ReplicationPeer.class);
|
||||||
|
when(mockPeer.getConfiguration()).thenReturn(conf);
|
||||||
|
when(mockPeer.getPeerBandwidth()).thenReturn(0L);
|
||||||
|
ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
|
||||||
|
FaultyReplicationEndpoint.count = 0;
|
||||||
|
when(peerConfig.getReplicationEndpointImpl()).
|
||||||
|
thenReturn(endpointName);
|
||||||
|
when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
|
||||||
|
ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
|
||||||
|
when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
|
||||||
|
String queueId = "qid";
|
||||||
|
RegionServerServices rss =
|
||||||
|
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
|
||||||
|
rs.init(conf, null, manager, null, mockPeer, rss, queueId, null,
|
||||||
|
p -> OptionalLong.empty(), new MetricsSource(queueId));
|
||||||
|
return rss;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test ReplicationSource retries startup once an uncaught exception happens
|
||||||
|
* during initialization and <b>eplication.source.regionserver.abort</b> is set to false.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testAbortFalseOnError() throws IOException {
|
||||||
|
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
||||||
|
conf.setBoolean("replication.source.regionserver.abort", false);
|
||||||
|
ReplicationSource rs = new ReplicationSource();
|
||||||
|
RegionServerServices rss = setupForAbortTests(rs, conf,
|
||||||
|
FlakyReplicationEndpoint.class.getName());
|
||||||
|
try {
|
||||||
|
rs.startup();
|
||||||
|
assertTrue(rs.isSourceActive());
|
||||||
|
assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
|
||||||
|
rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID));
|
||||||
|
assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
|
||||||
|
rs.enqueueLog(new Path("a.1"));
|
||||||
|
assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue());
|
||||||
|
} finally {
|
||||||
|
rs.terminate("Done");
|
||||||
|
rss.stop("Done");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test ReplicationSource keeps retrying startup indefinitely without blocking the main thread,
|
||||||
|
* when <b>eplication.source.regionserver.abort</b> is set to false.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testAbortFalseOnErrorDoesntBlockMainThread() throws IOException {
|
||||||
|
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
||||||
|
ReplicationSource rs = new ReplicationSource();
|
||||||
|
RegionServerServices rss = setupForAbortTests(rs, conf,
|
||||||
|
FaultyReplicationEndpoint.class.getName());
|
||||||
|
try {
|
||||||
|
rs.startup();
|
||||||
|
assertTrue(true);
|
||||||
|
} finally {
|
||||||
|
rs.terminate("Done");
|
||||||
|
rss.stop("Done");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test ReplicationSource retries startup once an uncaught exception happens
|
||||||
|
* during initialization and <b>replication.source.regionserver.abort</b> is set to true.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testAbortTrueOnError() throws IOException {
|
||||||
|
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
||||||
|
ReplicationSource rs = new ReplicationSource();
|
||||||
|
RegionServerServices rss = setupForAbortTests(rs, conf,
|
||||||
|
FlakyReplicationEndpoint.class.getName());
|
||||||
|
try {
|
||||||
|
rs.startup();
|
||||||
|
Waiter.waitFor(conf, 1000, () -> rss.isAborted());
|
||||||
|
assertFalse(rs.isSourceActive());
|
||||||
|
} finally {
|
||||||
|
rs.terminate("Done");
|
||||||
|
rss.stop("Done");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue