HBASE-24877 addendum: additional checks to avoid one extra possible race control in the initialize loop (#2400)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Josh Elser <elserj@apache.org>
This commit is contained in:
Wellington Ramos Chevreuil 2020-09-29 10:00:57 +01:00 committed by GitHub
parent b268b1f621
commit 9fc29c4cbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 88 additions and 54 deletions

View File

@ -39,7 +39,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate; import java.util.function.Predicate;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -128,7 +127,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
//so that it doesn't try submit another initialize thread. //so that it doesn't try submit another initialize thread.
//NOTE: this should only be set to false at the end of initialize method, prior to return. //NOTE: this should only be set to false at the end of initialize method, prior to return.
private AtomicBoolean startupOngoing = new AtomicBoolean(false); private AtomicBoolean startupOngoing = new AtomicBoolean(false);
//Flag that signalizes uncaught error happening while starting up the source
// and a retry should be attempted
private AtomicBoolean retryStartup = new AtomicBoolean(false);
/** /**
* A filter (or a chain of filters) for WAL entries; filters out edits. * A filter (or a chain of filters) for WAL entries; filters out edits.
@ -375,7 +376,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
LOG.debug("{} preempted start of worker walGroupId={}", logPeerId(), walGroupId); LOG.debug("{} preempted start of worker walGroupId={}", logPeerId(), walGroupId);
return value; return value;
} else { } else {
LOG.debug("{} starting worker for walGroupId={}", logPeerId(), walGroupId); LOG.debug("{} starting worker for walGroupId={}", logPeerId(), walGroupId);
ReplicationSourceShipper worker = createNewShipper(walGroupId, queue); ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
ReplicationSourceWALReader walReader = ReplicationSourceWALReader walReader =
createNewWALReader(walGroupId, queue, worker.getStartPosition()); createNewWALReader(walGroupId, queue, worker.getStartPosition());
@ -570,6 +571,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) { if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) {
sleepMultiplier++; sleepMultiplier++;
} else { } else {
retryStartup.set(!this.abortOnError);
this.startupOngoing.set(false); this.startupOngoing.set(false);
throw new RuntimeException("Exhausted retries to start replication endpoint."); throw new RuntimeException("Exhausted retries to start replication endpoint.");
} }
@ -577,6 +579,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
} }
if (!this.isSourceActive()) { if (!this.isSourceActive()) {
retryStartup.set(!this.abortOnError);
this.startupOngoing.set(false); this.startupOngoing.set(false);
throw new IllegalStateException("Source should be active."); throw new IllegalStateException("Source should be active.");
} }
@ -600,6 +603,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
} }
if(!this.isSourceActive()) { if(!this.isSourceActive()) {
retryStartup.set(!this.abortOnError);
this.startupOngoing.set(false); this.startupOngoing.set(false);
throw new IllegalStateException("Source should be active."); throw new IllegalStateException("Source should be active.");
} }
@ -618,28 +622,34 @@ public class ReplicationSource implements ReplicationSourceInterface {
@Override @Override
public void startup() { public void startup() {
if (this.sourceRunning) { // mark we are running now
return;
}
this.sourceRunning = true; this.sourceRunning = true;
//Flag that signalizes uncaught error happening while starting up the source startupOngoing.set(true);
// and a retry should be attempted initThread = new Thread(this::initialize);
MutableBoolean retryStartup = new MutableBoolean(true); Threads.setDaemonThreadRunning(initThread,
do { Thread.currentThread().getName() + ".replicationSource," + this.queueId,
if(retryStartup.booleanValue()) { (t,e) -> {
retryStartup.setValue(false); //if first initialization attempt failed, and abortOnError is false, we will
startupOngoing.set(true); //keep looping in this thread until initialize eventually succeeds,
// mark we are running now //while the server main startup one can go on with its work.
initThread = new Thread(this::initialize); sourceRunning = false;
Threads.setDaemonThreadRunning(initThread, uncaughtException(t, e, null, null);
Thread.currentThread().getName() + ".replicationSource," + this.queueId, retryStartup.set(!this.abortOnError);
(t,e) -> { do {
sourceRunning = false; if(retryStartup.get()) {
uncaughtException(t, e, null, null); this.sourceRunning = true;
retryStartup.setValue(!this.abortOnError); startupOngoing.set(true);
}); retryStartup.set(false);
} try {
} while (this.startupOngoing.get() && !this.abortOnError); initialize();
} catch(Throwable error){
sourceRunning = false;
uncaughtException(t, error, null, null);
retryStartup.set(!this.abortOnError);
}
}
} while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError);
});
} }
@Override @Override

View File

@ -444,7 +444,7 @@ public class TestReplicationSource {
/** /**
* Deadend Endpoint. Does nothing. * Deadend Endpoint. Does nothing.
*/ */
public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint { public static class FlakyReplicationEndpoint extends DoNothingReplicationEndpoint {
static int count = 0; static int count = 0;
@ -460,6 +460,17 @@ public class TestReplicationSource {
} }
public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint {
static int count = 0;
@Override
public synchronized UUID getPeerUUID() {
throw new RuntimeException();
}
}
/** /**
* Test HBASE-20497 * Test HBASE-20497
* Moved here from TestReplicationSource because doesn't need cluster. * Moved here from TestReplicationSource because doesn't need cluster.
@ -488,22 +499,16 @@ public class TestReplicationSource {
assertEquals(1001L, shipper.getStartPosition()); assertEquals(1001L, shipper.getStartPosition());
} }
/** private RegionServerServices setupForAbortTests(ReplicationSource rs, Configuration conf,
* Test ReplicationSource retries startup once an uncaught exception happens String endpointName) throws IOException {
* during initialization and <b>eplication.source.regionserver.abort</b> is set to false.
*/
@Test
public void testAbortFalseOnError() throws IOException {
ReplicationSource rs = new ReplicationSource();
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.setInt("replication.source.maxretriesmultiplier", 1); conf.setInt("replication.source.maxretriesmultiplier", 1);
conf.setBoolean("replication.source.regionserver.abort", false);
ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
FaultyReplicationEndpoint.count = 0;
Mockito.when(peerConfig.getReplicationEndpointImpl()). Mockito.when(peerConfig.getReplicationEndpointImpl()).
thenReturn(FaultyReplicationEndpoint.class.getName()); thenReturn(endpointName);
Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
@ -512,6 +517,20 @@ public class TestReplicationSource {
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
rs.init(conf, null, manager, null, mockPeer, rss, queueId, null, rs.init(conf, null, manager, null, mockPeer, rss, queueId, null,
p -> OptionalLong.empty(), new MetricsSource(queueId)); p -> OptionalLong.empty(), new MetricsSource(queueId));
return rss;
}
/**
* Test ReplicationSource retries startup once an uncaught exception happens
* during initialization and <b>eplication.source.regionserver.abort</b> is set to false.
*/
@Test
public void testAbortFalseOnError() throws IOException {
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.setBoolean("replication.source.regionserver.abort", false);
ReplicationSource rs = new ReplicationSource();
RegionServerServices rss = setupForAbortTests(rs, conf,
FlakyReplicationEndpoint.class.getName());
try { try {
rs.startup(); rs.startup();
assertTrue(rs.isSourceActive()); assertTrue(rs.isSourceActive());
@ -526,34 +545,39 @@ public class TestReplicationSource {
} }
} }
/**
* Test ReplicationSource keeps retrying startup indefinitely without blocking the main thread,
* when <b>eplication.source.regionserver.abort</b> is set to false.
*/
@Test
public void testAbortFalseOnErrorDoesntBlockMainThread() throws IOException {
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
ReplicationSource rs = new ReplicationSource();
RegionServerServices rss = setupForAbortTests(rs, conf,
FaultyReplicationEndpoint.class.getName());
try {
rs.startup();
assertTrue(true);
} finally {
rs.terminate("Done");
rss.stop("Done");
}
}
/** /**
* Test ReplicationSource retries startup once an uncaught exception happens * Test ReplicationSource retries startup once an uncaught exception happens
* during initialization and <b>replication.source.regionserver.abort</b> is set to false. * during initialization and <b>replication.source.regionserver.abort</b> is set to true.
*/ */
@Test @Test
public void testAbortTrueOnError() throws IOException { public void testAbortTrueOnError() throws IOException {
ReplicationSource rs = new ReplicationSource();
Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.setInt("replication.source.maxretriesmultiplier", 1); ReplicationSource rs = new ReplicationSource();
ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); RegionServerServices rss = setupForAbortTests(rs, conf,
Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); FlakyReplicationEndpoint.class.getName());
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
Mockito.when(peerConfig.getReplicationEndpointImpl()).
thenReturn(FaultyReplicationEndpoint.class.getName());
Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
String queueId = "qid";
RegionServerServices rss =
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
rs.init(conf, null, manager, null, mockPeer, rss, queueId, null,
p -> OptionalLong.empty(), new MetricsSource(queueId));
try { try {
rs.startup(); rs.startup();
Waiter.waitFor(conf, 1000, () -> FaultyReplicationEndpoint.count > 0); Waiter.waitFor(conf, 1000, () -> rss.isAborted());
assertFalse(rs.isSourceActive()); assertFalse(rs.isSourceActive());
assertTrue(rss.isAborted());
} finally { } finally {
rs.terminate("Done"); rs.terminate("Done");
rss.stop("Done"); rss.stop("Done");