From 9fc29c4cbf6671589f34b15e4ab41970bd4f3b45 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Tue, 29 Sep 2020 10:00:57 +0100 Subject: [PATCH] HBASE-24877 addendum: additional checks to avoid one extra possible race control in the initialize loop (#2400) Signed-off-by: Duo Zhang Signed-off-by: Josh Elser --- .../regionserver/ReplicationSource.java | 58 +++++++------ .../regionserver/TestReplicationSource.java | 84 ++++++++++++------- 2 files changed, 88 insertions(+), 54 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index bf8127f93ab..82120736bd4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -39,7 +39,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Predicate; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -128,7 +127,9 @@ public class ReplicationSource implements ReplicationSourceInterface { //so that it doesn't try submit another initialize thread. //NOTE: this should only be set to false at the end of initialize method, prior to return. private AtomicBoolean startupOngoing = new AtomicBoolean(false); - + //Flag that signalizes uncaught error happening while starting up the source + // and a retry should be attempted + private AtomicBoolean retryStartup = new AtomicBoolean(false); /** * A filter (or a chain of filters) for WAL entries; filters out edits. @@ -375,7 +376,7 @@ public class ReplicationSource implements ReplicationSourceInterface { LOG.debug("{} preempted start of worker walGroupId={}", logPeerId(), walGroupId); return value; } else { - LOG.debug("{} starting worker for walGroupId={}", logPeerId(), walGroupId); + LOG.debug("{} starting worker for walGroupId={}", logPeerId(), walGroupId); ReplicationSourceShipper worker = createNewShipper(walGroupId, queue); ReplicationSourceWALReader walReader = createNewWALReader(walGroupId, queue, worker.getStartPosition()); @@ -570,6 +571,7 @@ public class ReplicationSource implements ReplicationSourceInterface { if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) { sleepMultiplier++; } else { + retryStartup.set(!this.abortOnError); this.startupOngoing.set(false); throw new RuntimeException("Exhausted retries to start replication endpoint."); } @@ -577,6 +579,7 @@ public class ReplicationSource implements ReplicationSourceInterface { } if (!this.isSourceActive()) { + retryStartup.set(!this.abortOnError); this.startupOngoing.set(false); throw new IllegalStateException("Source should be active."); } @@ -600,6 +603,7 @@ public class ReplicationSource implements ReplicationSourceInterface { } if(!this.isSourceActive()) { + retryStartup.set(!this.abortOnError); this.startupOngoing.set(false); throw new IllegalStateException("Source should be active."); } @@ -618,28 +622,34 @@ public class ReplicationSource implements ReplicationSourceInterface { @Override public void startup() { - if (this.sourceRunning) { - return; - } + // mark we are running now this.sourceRunning = true; - //Flag that signalizes uncaught error happening while starting up the source - // and a retry should be attempted - MutableBoolean retryStartup = new MutableBoolean(true); - do { - if(retryStartup.booleanValue()) { - retryStartup.setValue(false); - startupOngoing.set(true); - // mark we are running now - initThread = new Thread(this::initialize); - Threads.setDaemonThreadRunning(initThread, - Thread.currentThread().getName() + ".replicationSource," + this.queueId, - (t,e) -> { - sourceRunning = false; - uncaughtException(t, e, null, null); - retryStartup.setValue(!this.abortOnError); - }); - } - } while (this.startupOngoing.get() && !this.abortOnError); + startupOngoing.set(true); + initThread = new Thread(this::initialize); + Threads.setDaemonThreadRunning(initThread, + Thread.currentThread().getName() + ".replicationSource," + this.queueId, + (t,e) -> { + //if first initialization attempt failed, and abortOnError is false, we will + //keep looping in this thread until initialize eventually succeeds, + //while the server main startup one can go on with its work. + sourceRunning = false; + uncaughtException(t, e, null, null); + retryStartup.set(!this.abortOnError); + do { + if(retryStartup.get()) { + this.sourceRunning = true; + startupOngoing.set(true); + retryStartup.set(false); + try { + initialize(); + } catch(Throwable error){ + sourceRunning = false; + uncaughtException(t, error, null, null); + retryStartup.set(!this.abortOnError); + } + } + } while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError); + }); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 8b8dcd8afa2..ded9e8f28e2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -444,7 +444,7 @@ public class TestReplicationSource { /** * Deadend Endpoint. Does nothing. */ - public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint { + public static class FlakyReplicationEndpoint extends DoNothingReplicationEndpoint { static int count = 0; @@ -460,6 +460,17 @@ public class TestReplicationSource { } + public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint { + + static int count = 0; + + @Override + public synchronized UUID getPeerUUID() { + throw new RuntimeException(); + } + + } + /** * Test HBASE-20497 * Moved here from TestReplicationSource because doesn't need cluster. @@ -488,22 +499,16 @@ public class TestReplicationSource { assertEquals(1001L, shipper.getStartPosition()); } - /** - * Test ReplicationSource retries startup once an uncaught exception happens - * during initialization and eplication.source.regionserver.abort is set to false. - */ - @Test - public void testAbortFalseOnError() throws IOException { - ReplicationSource rs = new ReplicationSource(); - Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + private RegionServerServices setupForAbortTests(ReplicationSource rs, Configuration conf, + String endpointName) throws IOException { conf.setInt("replication.source.maxretriesmultiplier", 1); - conf.setBoolean("replication.source.regionserver.abort", false); ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); + FaultyReplicationEndpoint.count = 0; Mockito.when(peerConfig.getReplicationEndpointImpl()). - thenReturn(FaultyReplicationEndpoint.class.getName()); + thenReturn(endpointName); Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); @@ -512,6 +517,20 @@ public class TestReplicationSource { TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); rs.init(conf, null, manager, null, mockPeer, rss, queueId, null, p -> OptionalLong.empty(), new MetricsSource(queueId)); + return rss; + } + + /** + * Test ReplicationSource retries startup once an uncaught exception happens + * during initialization and eplication.source.regionserver.abort is set to false. + */ + @Test + public void testAbortFalseOnError() throws IOException { + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.setBoolean("replication.source.regionserver.abort", false); + ReplicationSource rs = new ReplicationSource(); + RegionServerServices rss = setupForAbortTests(rs, conf, + FlakyReplicationEndpoint.class.getName()); try { rs.startup(); assertTrue(rs.isSourceActive()); @@ -526,34 +545,39 @@ public class TestReplicationSource { } } + /** + * Test ReplicationSource keeps retrying startup indefinitely without blocking the main thread, + * when eplication.source.regionserver.abort is set to false. + */ + @Test + public void testAbortFalseOnErrorDoesntBlockMainThread() throws IOException { + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + ReplicationSource rs = new ReplicationSource(); + RegionServerServices rss = setupForAbortTests(rs, conf, + FaultyReplicationEndpoint.class.getName()); + try { + rs.startup(); + assertTrue(true); + } finally { + rs.terminate("Done"); + rss.stop("Done"); + } + } + /** * Test ReplicationSource retries startup once an uncaught exception happens - * during initialization and replication.source.regionserver.abort is set to false. + * during initialization and replication.source.regionserver.abort is set to true. */ @Test public void testAbortTrueOnError() throws IOException { - ReplicationSource rs = new ReplicationSource(); Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); - conf.setInt("replication.source.maxretriesmultiplier", 1); - ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); - Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); - Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); - ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); - Mockito.when(peerConfig.getReplicationEndpointImpl()). - thenReturn(FaultyReplicationEndpoint.class.getName()); - Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); - ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); - Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); - String queueId = "qid"; - RegionServerServices rss = - TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); - rs.init(conf, null, manager, null, mockPeer, rss, queueId, null, - p -> OptionalLong.empty(), new MetricsSource(queueId)); + ReplicationSource rs = new ReplicationSource(); + RegionServerServices rss = setupForAbortTests(rs, conf, + FlakyReplicationEndpoint.class.getName()); try { rs.startup(); - Waiter.waitFor(conf, 1000, () -> FaultyReplicationEndpoint.count > 0); + Waiter.waitFor(conf, 1000, () -> rss.isAborted()); assertFalse(rs.isSourceActive()); - assertTrue(rss.isAborted()); } finally { rs.terminate("Done"); rss.stop("Done");