diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index f24ecfa5539..dc0276dc707 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -35,9 +35,12 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Predicate; + import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -120,6 +123,14 @@ public class ReplicationSource implements ReplicationSourceInterface { // ReplicationEndpoint which will handle the actual replication private volatile ReplicationEndpoint replicationEndpoint; + private boolean abortOnError; + //This is needed for the startup loop to identify when there's already + //an initialization happening (but not finished yet), + //so that it doesn't try submit another initialize thread. + //NOTE: this should only be set to false at the end of initialize method, prior to return. + private AtomicBoolean startupOngoing = new AtomicBoolean(false); + + /** * A filter (or a chain of filters) for WAL entries; filters out edits. */ @@ -217,6 +228,10 @@ public class ReplicationSource implements ReplicationSourceInterface { this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); this.totalBufferUsed = manager.getTotalBufferUsed(); this.walFileLengthProvider = walFileLengthProvider; + + this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort", + true); + LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId, replicationPeer.getId(), this.currentBandwidth); } @@ -372,10 +387,10 @@ public class ReplicationSource implements ReplicationSourceInterface { createNewWALReader(walGroupId, queue, worker.getStartPosition()); Threads.setDaemonThreadRunning( walReader, Thread.currentThread().getName() - + ".replicationSource.wal-reader." + walGroupId + "," + queueId, - this::uncaughtException); + + ".replicationSource.wal-reader." + walGroupId + "," + queueId, + (t,e) -> this.uncaughtException(t, e, this.manager, this.getPeerId())); worker.setWALReader(walReader); - worker.startup(this::uncaughtException); + worker.startup((t,e) -> this.uncaughtException(t, e, this.manager, this.getPeerId())); return worker; } }); @@ -450,11 +465,28 @@ public class ReplicationSource implements ReplicationSourceInterface { return walEntryFilter; } - protected final void uncaughtException(Thread t, Throwable e) { + protected final void uncaughtException(Thread t, Throwable e, + ReplicationSourceManager manager, String peerId) { RSRpcServices.exitIfOOME(e); LOG.error("Unexpected exception in {} currentPath={}", t.getName(), getCurrentPath(), e); - server.abort("Unexpected exception in " + t.getName(), e); + if(abortOnError){ + server.abort("Unexpected exception in " + t.getName(), e); + } + if(manager != null){ + while (true) { + try { + LOG.info("Refreshing replication sources now due to previous error on thread: {}", + t.getName()); + manager.refreshSources(peerId); + break; + } catch (IOException e1) { + LOG.error("Replication sources refresh failed.", e1); + sleepForRetries("Sleeping before try refreshing sources again", + maxRetriesMultiplier); + } + } + } } @Override @@ -544,12 +576,16 @@ public class ReplicationSource implements ReplicationSourceInterface { replicationEndpoint.stop(); if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) { sleepMultiplier++; + } else { + this.startupOngoing.set(false); + throw new RuntimeException("Exhausted retries to start replication endpoint."); } } } if (!this.isSourceActive()) { - return; + this.startupOngoing.set(false); + throw new IllegalStateException("Source should be active."); } sleepMultiplier = 1; @@ -571,7 +607,8 @@ public class ReplicationSource implements ReplicationSourceInterface { } if(!this.isSourceActive()) { - return; + this.startupOngoing.set(false); + throw new IllegalStateException("Source should be active."); } LOG.info("{} Source: {}, is now replicating from cluster: {}; to peer cluster: {};", logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId); @@ -583,16 +620,30 @@ public class ReplicationSource implements ReplicationSourceInterface { PriorityBlockingQueue queue = entry.getValue(); tryStartNewShipper(walGroupId, queue); } + this.startupOngoing.set(false); } @Override public void startup() { - // mark we are running now + //Flag that signalizes uncaught error happening while starting up the source + // and a retry should be attempted + MutableBoolean retryStartup = new MutableBoolean(true); this.sourceRunning = true; - initThread = new Thread(this::initialize); - Threads.setDaemonThreadRunning(initThread, - Thread.currentThread().getName() + ".replicationSource," + this.queueId, - this::uncaughtException); + do { + if(retryStartup.booleanValue()) { + retryStartup.setValue(false); + startupOngoing.set(true); + // mark we are running now + initThread = new Thread(this::initialize); + Threads.setDaemonThreadRunning(initThread, + Thread.currentThread().getName() + ".replicationSource," + this.queueId, + (t,e) -> { + sourceRunning = false; + uncaughtException(t, e, null, null); + retryStartup.setValue(!this.abortOnError); + }); + } + } while (this.startupOngoing.get() && !this.abortOnError); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 72cc5e82b69..45eb91c2e72 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -290,7 +290,8 @@ public class ReplicationSourceShipper extends Thread { public void startup(UncaughtExceptionHandler handler) { String name = Thread.currentThread().getName(); Threads.setDaemonThreadRunning(this, - name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), handler); + name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), + handler::uncaughtException); } Path getCurrentPath() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 15f202f0646..3628daf749b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -396,6 +397,25 @@ public class TestReplicationSource { } } + /** + * Deadend Endpoint. Does nothing. + */ + public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint { + + static int count = 0; + + @Override + public synchronized UUID getPeerUUID() { + if(count==0) { + count++; + throw new RuntimeException(); + } else { + return super.getPeerUUID(); + } + } + + } + /** * Test HBASE-20497 * Moved here from TestReplicationSource because doesn't need cluster. @@ -423,5 +443,77 @@ public class TestReplicationSource { new RecoveredReplicationSourceShipper(conf, walGroupId, queue, source, storage); assertEquals(1001L, shipper.getStartPosition()); } + + /** + * Test ReplicationSource retries startup once an uncaught exception happens + * during initialization and eplication.source.regionserver.abort is set to false. + */ + @Test + public void testAbortFalseOnError() throws IOException { + ReplicationSource rs = new ReplicationSource(); + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.setInt("replication.source.maxretriesmultiplier", 1); + conf.setBoolean("replication.source.regionserver.abort", false); + ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); + Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); + Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); + ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); + Mockito.when(peerConfig.getReplicationEndpointImpl()). + thenReturn(FaultyReplicationEndpoint.class.getName()); + Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); + ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); + Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); + String queueId = "qid"; + RegionServerServices rss = + TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); + rs.init(conf, null, manager, null, mockPeer, rss, queueId, null, + p -> OptionalLong.empty(), new MetricsSource(queueId)); + try { + rs.startup(); + assertTrue(rs.isSourceActive()); + assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); + rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID)); + assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); + rs.enqueueLog(new Path("a.1")); + assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue()); + } finally { + rs.terminate("Done"); + rss.stop("Done"); + } + } + + /** + * Test ReplicationSource retries startup once an uncaught exception happens + * during initialization and replication.source.regionserver.abort is set to false. + */ + @Test + public void testAbortTrueOnError() throws IOException { + ReplicationSource rs = new ReplicationSource(); + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.setInt("replication.source.maxretriesmultiplier", 1); + ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); + Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); + Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); + ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); + Mockito.when(peerConfig.getReplicationEndpointImpl()). + thenReturn(FaultyReplicationEndpoint.class.getName()); + Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); + ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); + Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); + String queueId = "qid"; + RegionServerServices rss = + TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); + rs.init(conf, null, manager, null, mockPeer, rss, queueId, null, + p -> OptionalLong.empty(), new MetricsSource(queueId)); + try { + rs.startup(); + Waiter.waitFor(conf, 1000, () -> FaultyReplicationEndpoint.count > 0); + assertFalse(rs.isSourceActive()); + assertTrue(rss.isAborted()); + } finally { + rs.terminate("Done"); + rss.stop("Done"); + } + } }