diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index 388b8d4393b..d3bcff11e21 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -160,14 +160,14 @@ public class RecoveredReplicationSource extends ReplicationSource { // use synchronize to make sure one last thread will clean the queue synchronized (workerThreads) { Threads.sleep(100);// wait a short while for other worker thread to fully exit - boolean allOtherTaskDone = true; + boolean allTasksDone = true; for (ReplicationSourceShipperThread worker : workerThreads.values()) { - if (worker.isActive()) { - allOtherTaskDone = false; + if (!worker.isFinished()) { + allTasksDone = false; break; } } - if (allOtherTaskDone) { + if (allTasksDone) { manager.closeRecoveredQueue(this); LOG.info("Finished recovering queue " + peerClusterZnode + " with the following stats: " + getStats()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipperThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipperThread.java index 024b0c49fd8..65aeb2f88bd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipperThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipperThread.java @@ -51,6 +51,7 @@ public class RecoveredReplicationSourceShipperThread extends ReplicationSourceSh @Override public void run() { + setWorkerState(WorkerState.RUNNING); // Loop until we close down while (isActive()) { int sleepMultiplier = 1; @@ -77,7 +78,7 @@ public class RecoveredReplicationSourceShipperThread extends ReplicationSourceSh LOG.debug("Finished recovering queue for group " + walGroupId + " of peer " + source.getPeerClusterZnode()); source.getSourceMetrics().incrCompletedRecoveryQueue(); - setWorkerRunning(false); + setWorkerState(WorkerState.FINISHED); continue; } } catch (InterruptedException e) { @@ -85,8 +86,11 @@ public class RecoveredReplicationSourceShipperThread extends ReplicationSourceSh Thread.currentThread().interrupt(); } } - source.tryFinish(); + // If the worker exits run loop without finishing its task, mark it as stopped. + if (!isFinished()) { + setWorkerState(WorkerState.STOPPED); + } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index d098fd9f00d..1dbf07f117f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -57,13 +57,13 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; + /** * Class that handles the source of a replication stream. * Currently does not handle more than 1 slave @@ -448,7 +448,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf this.sourceRunning = false; Collection workers = workerThreads.values(); for (ReplicationSourceShipperThread worker : workers) { - worker.setWorkerRunning(false); + worker.stopWorker(); worker.entryReader.interrupt(); worker.interrupt(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java index b0f7feea215..d1a8ac20ce1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java @@ -54,6 +54,13 @@ import com.google.common.cache.LoadingCache; public class ReplicationSourceShipperThread extends Thread { private static final Log LOG = LogFactory.getLog(ReplicationSourceShipperThread.class); + // Hold the state of a replication worker thread + public enum WorkerState { + RUNNING, + STOPPED, + FINISHED, // The worker is done processing a recovered queue + } + protected final Configuration conf; protected final String walGroupId; protected final PriorityBlockingQueue queue; @@ -63,8 +70,8 @@ public class ReplicationSourceShipperThread extends Thread { protected long lastLoggedPosition = -1; // Path of the current log protected volatile Path currentPath; - // Indicates whether this particular worker is running - private boolean workerRunning = true; + // Current state of the worker thread + private WorkerState state; protected ReplicationSourceWALReaderThread entryReader; // How long should we sleep for each retry @@ -97,6 +104,7 @@ public class ReplicationSourceShipperThread extends Thread { @Override public void run() { + setWorkerState(WorkerState.RUNNING); // Loop until we close down while (isActive()) { int sleepMultiplier = 1; @@ -126,6 +134,10 @@ public class ReplicationSourceShipperThread extends Thread { Thread.currentThread().interrupt(); } } + // If the worker exits run loop without finishing its task, mark it as stopped. + if (state != WorkerState.FINISHED) { + setWorkerState(WorkerState.STOPPED); + } } /** @@ -307,12 +319,23 @@ public class ReplicationSourceShipperThread extends Thread { } protected boolean isActive() { - return source.isSourceActive() && workerRunning && !isInterrupted(); + return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted(); } - public void setWorkerRunning(boolean workerRunning) { - entryReader.setReaderRunning(workerRunning); - this.workerRunning = workerRunning; + public void setWorkerState(WorkerState state) { + this.state = state; + } + + public WorkerState getWorkerState() { + return state; + } + + public void stopWorker() { + setWorkerState(WorkerState.STOPPED); + } + + public boolean isFinished() { + return state == WorkerState.FINISHED; } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java index 7461edb6b91..c3b7eafdd2f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java @@ -18,9 +18,11 @@ */ package org.apache.hadoop.hbase.replication; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -31,6 +33,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter.Predicate; @@ -38,6 +42,9 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALProvider; @@ -49,12 +56,12 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; -import static org.mockito.Mockito.mock; @Category({ReplicationTests.class, MediumTests.class}) public class TestReplicationSource { @@ -63,10 +70,12 @@ public class TestReplicationSource { LogFactory.getLog(TestReplicationSource.class); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static HBaseTestingUtility TEST_UTIL_PEER = + new HBaseTestingUtility(); private static FileSystem FS; private static Path oldLogDir; private static Path logDir; - private static Configuration conf = HBaseConfiguration.create(); + private static Configuration conf = TEST_UTIL.getConfiguration(); /** * @throws java.lang.Exception @@ -82,6 +91,13 @@ public class TestReplicationSource { if (FS.exists(logDir)) FS.delete(logDir, true); } + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL_PEER.shutdownMiniHBaseCluster(); + TEST_UTIL.shutdownMiniHBaseCluster(); + TEST_UTIL.shutdownMiniDFSCluster(); + } + /** * Sanity check that we can move logs around while we are reading * from them. Should this test fail, ReplicationSource would have a hard @@ -172,5 +188,112 @@ public class TestReplicationSource { } + /** + * Tests that recovered queues are preserved on a regionserver shutdown. + * See HBASE-18192 + * @throws Exception + */ + @Test + public void testServerShutdownRecoveredQueue() throws Exception { + try { + // Ensure single-threaded WAL + conf.set("hbase.wal.provider", "defaultProvider"); + conf.setInt("replication.sleep.before.failover", 2000); + // Introduces a delay in regionserver shutdown to give the race condition a chance to kick in. + conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName()); + MiniHBaseCluster cluster = TEST_UTIL.startMiniCluster(2); + TEST_UTIL_PEER.startMiniCluster(1); + + HRegionServer serverA = cluster.getRegionServer(0); + final ReplicationSourceManager managerA = + ((Replication) serverA.getReplicationSourceService()).getReplicationManager(); + HRegionServer serverB = cluster.getRegionServer(1); + final ReplicationSourceManager managerB = + ((Replication) serverB.getReplicationSourceService()).getReplicationManager(); + final Admin admin = TEST_UTIL.getAdmin(); + + final String peerId = "TestPeer"; + admin.addReplicationPeer(peerId, + new ReplicationPeerConfig().setClusterKey(TEST_UTIL_PEER.getClusterKey())); + // Wait for replication sources to come up + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() throws Exception { + return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty()); + } + }); + // Disabling peer makes sure there is at least one log to claim when the server dies + // The recovered queue will also stay there until the peer is disabled even if the + // WALs it contains have no data. + admin.disableReplicationPeer(peerId); + + // Stopping serverA + // It's queues should be claimed by the only other alive server i.e. serverB + cluster.stopRegionServer(serverA.getServerName()); + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() throws Exception { + return managerB.getOldSources().size() == 1; + } + }); + + final HRegionServer serverC = cluster.startRegionServer().getRegionServer(); + serverC.waitForServerOnline(); + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() throws Exception { + return serverC.getReplicationSourceService() != null; + } + }); + final ReplicationSourceManager managerC = + ((Replication) serverC.getReplicationSourceService()).getReplicationManager(); + // Sanity check + assertEquals(0, managerC.getOldSources().size()); + + // Stopping serverB + // Now serverC should have two recovered queues: + // 1. The serverB's normal queue + // 2. serverA's recovered queue on serverB + cluster.stopRegionServer(serverB.getServerName()); + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() throws Exception { + return managerC.getOldSources().size() == 2; + } + }); + admin.enableReplicationPeer(peerId); + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() throws Exception { + return managerC.getOldSources().size() == 0; + } + }); + } finally { + conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName()); + } + } + + /** + * Regionserver implementation that adds a delay on the graceful shutdown. + */ + public static class ShutdownDelayRegionServer extends HRegionServer { + public ShutdownDelayRegionServer(Configuration conf) throws IOException, InterruptedException { + super(conf); + } + + public ShutdownDelayRegionServer(Configuration conf, CoordinatedStateManager csm) + throws IOException, InterruptedException { + super(conf, csm); + } + + @Override + protected void stopServiceThreads() { + // Add a delay before service threads are shutdown. + // This will keep the zookeeper connection alive for the duration of the delay. + LOG.info("Adding a delay to the regionserver shutdown"); + try { + Thread.sleep(2000); + } catch (InterruptedException ex) { + LOG.error("Interrupted while sleeping"); + } + super.stopServiceThreads(); + } + } + }