From c141547f3b1d215e6f8096e9abd53569406b1da3 Mon Sep 17 00:00:00 2001 From: Allan Yang Date: Tue, 23 Oct 2018 10:27:02 +0800 Subject: [PATCH] HBASE-21354 Procedure may be deleted improperly during master restarts resulting in 'Corrupt' --- .../store/ProcedureStoreTracker.java | 25 +- .../store/wal/WALProcedureStore.java | 52 ++-- .../procedure2/ProcedureTestingUtility.java | 36 ++- .../procedure2/TestProcedureCleanup.java | 242 ++++++++++++++++++ 4 files changed, 329 insertions(+), 26 deletions(-) create mode 100644 hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureCleanup.java diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java index a5b5825bb34..64479b220b2 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java @@ -203,7 +203,7 @@ public class ProcedureStoreTracker { * then we mark it as deleted. * @see #setDeletedIfModified(long...) */ - public void setDeletedIfModifiedInBoth(ProcedureStoreTracker tracker) { + public void setDeletedIfModifiedInBoth(ProcedureStoreTracker tracker, boolean globalTracker) { BitSetNode trackerNode = null; for (BitSetNode node : map.values()) { final long minProcId = node.getStart(); @@ -214,9 +214,26 @@ public class ProcedureStoreTracker { } trackerNode = tracker.lookupClosestNode(trackerNode, procId); - if (trackerNode == null || !trackerNode.contains(procId) || - trackerNode.isModified(procId)) { - // the procedure was removed or modified + if (trackerNode == null || !trackerNode.contains(procId)) { + // the procId is not exist in the track, we can only delete the proc + // if globalTracker set to true. + // Only if the procedure is not in the global tracker we can delete the + // the procedure. In other cases, the procedure may not update in a single + // log, we cannot delete it just because the log's track doesn't have + // any info for the procedure. + if (globalTracker) { + node.delete(procId); + } + continue; + } + // Only check delete in the global tracker, only global tracker has the + // whole picture + if (globalTracker && trackerNode.isDeleted(procId) == DeleteState.YES) { + node.delete(procId); + continue; + } + if (trackerNode.isModified(procId)) { + // the procedure was modified node.delete(procId); } } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 1aee86d80e6..4bc668e30e9 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -97,7 +97,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu * will first be initialized to the oldest file's tracker(which is stored in the trailer), using the * method {@link ProcedureStoreTracker#resetTo(ProcedureStoreTracker, boolean)}, and then merge it * with the tracker of every newer wal files, using the - * {@link ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker)}. If we find out + * {@link ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker, boolean)}. + * If we find out * that all the modified procedures for the oldest wal file are modified or deleted in newer wal * files, then we can delete it. This is because that, every time we call * {@link ProcedureStore#insert(Procedure[])} or {@link ProcedureStore#update(Procedure)}, we will @@ -343,7 +344,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } // Close the writer - closeCurrentLogStream(); + closeCurrentLogStream(abort); // Close the old logs // they should be already closed, this is just in case the load fails @@ -398,7 +399,7 @@ public class WALProcedureStore extends ProcedureStoreBase { public void recoverLease() throws IOException { lock.lock(); try { - LOG.trace("Starting WAL Procedure Store lease recovery"); + LOG.debug("Starting WAL Procedure Store lease recovery"); boolean afterFirstAttempt = false; while (isRunning()) { // Don't sleep before first attempt @@ -433,7 +434,7 @@ public class WALProcedureStore extends ProcedureStoreBase { continue; } - LOG.trace("Lease acquired for flushLogId={}", flushLogId); + LOG.debug("Lease acquired for flushLogId={}", flushLogId); break; } } finally { @@ -451,7 +452,7 @@ public class WALProcedureStore extends ProcedureStoreBase { // Nothing to do, If we have only the current log. if (logs.size() == 1) { - LOG.trace("No state logs to replay."); + LOG.debug("No state logs to replay."); loader.setMaxProcId(0); return; } @@ -983,7 +984,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } @VisibleForTesting - boolean rollWriterForTesting() throws IOException { + public boolean rollWriterForTesting() throws IOException { lock.lock(); try { return rollWriter(); @@ -1006,11 +1007,11 @@ public class WALProcedureStore extends ProcedureStoreBase { if (storeTracker.isEmpty()) { LOG.trace("no active procedures"); tryRollWriter(); - removeAllLogs(flushLogId - 1); + removeAllLogs(flushLogId - 1, "no active procedures"); } else { if (storeTracker.isAllModified()) { LOG.trace("all the active procedures are in the latest log"); - removeAllLogs(flushLogId - 1); + removeAllLogs(flushLogId - 1, "all the active procedures are in the latest log"); } // if the log size has exceeded the roll threshold @@ -1091,7 +1092,7 @@ public class WALProcedureStore extends ProcedureStoreBase { return false; } - closeCurrentLogStream(); + closeCurrentLogStream(false); storeTracker.resetModified(); stream = newStream; @@ -1124,7 +1125,7 @@ public class WALProcedureStore extends ProcedureStoreBase { return true; } - private void closeCurrentLogStream() { + private void closeCurrentLogStream(boolean abort) { if (stream == null || logs.isEmpty()) { return; } @@ -1133,8 +1134,10 @@ public class WALProcedureStore extends ProcedureStoreBase { ProcedureWALFile log = logs.getLast(); log.setProcIds(storeTracker.getModifiedMinProcId(), storeTracker.getModifiedMaxProcId()); log.updateLocalTracker(storeTracker); - long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker); - log.addToSize(trailerSize); + if (!abort) { + long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker); + log.addToSize(trailerSize); + } } catch (IOException e) { LOG.warn("Unable to write the trailer", e); } @@ -1153,6 +1156,7 @@ public class WALProcedureStore extends ProcedureStoreBase { // We keep track of which procedures are holding the oldest WAL in 'holdingCleanupTracker'. // once there is nothing olding the oldest WAL we can remove it. while (logs.size() > 1 && holdingCleanupTracker.isEmpty()) { + LOG.info("Remove the oldest log {}", logs.getFirst()); removeLogFile(logs.getFirst(), walArchiveDir); buildHoldingCleanupTracker(); } @@ -1170,24 +1174,38 @@ public class WALProcedureStore extends ProcedureStoreBase { // compute the holding tracker. // - the first WAL is used for the 'updates' - // - the other WALs are scanned to remove procs already in other wals. + // - the global tracker is passed in first to decide which procedures are not + // exist anymore, so we can mark them as deleted in holdingCleanupTracker. + // Only global tracker have the whole picture here. + // - the other WALs are scanned to remove procs already updated in a newer wal. + // If it is updated in a newer wal, we can mark it as delelted in holdingCleanupTracker + // But, we can not delete it if it was shown deleted in the newer wal, as said + // above. // TODO: exit early if holdingCleanupTracker.isEmpty() holdingCleanupTracker.resetTo(logs.getFirst().getTracker(), true); - holdingCleanupTracker.setDeletedIfModifiedInBoth(storeTracker); + //Passing in the global tracker, we can delete the procedures not in the global + //tracker, because they are deleted in the later logs + holdingCleanupTracker.setDeletedIfModifiedInBoth(storeTracker, true); for (int i = 1, size = logs.size() - 1; i < size; ++i) { - holdingCleanupTracker.setDeletedIfModifiedInBoth(logs.get(i).getTracker()); + // Set deleteIfNotExists to false since a single log's tracker is passed in. + // Since a specific procedure may not show up in the log at all(not executed or + // updated during the time), we can not delete the procedure just because this log + // don't have the info of the procedure. We can delete the procedure only if + // in this log's tracker, it was cleanly showed that the procedure is modified or deleted + // in the corresponding BitSetNode. + holdingCleanupTracker.setDeletedIfModifiedInBoth(logs.get(i).getTracker(), false); } } /** * Remove all logs with logId <= {@code lastLogId}. */ - private void removeAllLogs(long lastLogId) { + private void removeAllLogs(long lastLogId, String why) { if (logs.size() <= 1) { return; } - LOG.trace("Remove all state logs with ID less than {}", lastLogId); + LOG.info("Remove all state logs with ID less than {}, since {}", lastLogId, why); boolean removed = false; while (logs.size() > 1) { diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java index 52ffa57a7a8..865909428c9 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java @@ -66,20 +66,46 @@ public class ProcedureTestingUtility { }); } + public static void restart(final ProcedureExecutor procExecutor, + boolean abort, boolean startWorkers) throws Exception { + restart(procExecutor, false, true, null, null, abort, startWorkers); + } + + public static void restart(final ProcedureExecutor procExecutor, + boolean abort) throws Exception { + restart(procExecutor, false, true, null, null, abort, true); + } + public static void restart(final ProcedureExecutor procExecutor) throws Exception { - restart(procExecutor, false, true, null, null); + restart(procExecutor, false, true, null, null, false, true); } public static void initAndStartWorkers(ProcedureExecutor procExecutor, int numThreads, boolean abortOnCorruption) throws IOException { + initAndStartWorkers(procExecutor, numThreads, abortOnCorruption, true); + } + + public static void initAndStartWorkers(ProcedureExecutor procExecutor, int numThreads, + boolean abortOnCorruption, boolean startWorkers) throws IOException { procExecutor.init(numThreads, abortOnCorruption); - procExecutor.startWorkers(); + if (startWorkers) { + procExecutor.startWorkers(); + } } public static void restart(final ProcedureExecutor procExecutor, final boolean avoidTestKillDuringRestart, final boolean failOnCorrupted, final Callable stopAction, final Callable startAction) throws Exception { + restart(procExecutor, avoidTestKillDuringRestart, failOnCorrupted, + stopAction, startAction, false, true); + } + + public static void restart(final ProcedureExecutor procExecutor, + final boolean avoidTestKillDuringRestart, final boolean failOnCorrupted, + final Callable stopAction, final Callable startAction, + boolean abort, boolean startWorkers) + throws Exception { final ProcedureStore procStore = procExecutor.getStore(); final int storeThreads = procExecutor.getCorePoolSize(); final int execThreads = procExecutor.getCorePoolSize(); @@ -92,7 +118,7 @@ public class ProcedureTestingUtility { // stop LOG.info("RESTART - Stop"); procExecutor.stop(); - procStore.stop(false); + procStore.stop(abort); if (stopAction != null) { stopAction.call(); } @@ -104,7 +130,7 @@ public class ProcedureTestingUtility { // re-start LOG.info("RESTART - Start"); procStore.start(storeThreads); - initAndStartWorkers(procExecutor, execThreads, failOnCorrupted); + initAndStartWorkers(procExecutor, execThreads, failOnCorrupted, startWorkers); if (startAction != null) { startAction.call(); } @@ -196,7 +222,7 @@ public class ProcedureTestingUtility { NoopProcedureStore procStore = new NoopProcedureStore(); ProcedureExecutor procExecutor = new ProcedureExecutor<>(conf, env, procStore); procStore.start(1); - initAndStartWorkers(procExecutor, 1, false); + initAndStartWorkers(procExecutor, 1, false, true); try { return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE); } finally { diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureCleanup.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureCleanup.java new file mode 100644 index 00000000000..e06fdc53482 --- /dev/null +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureCleanup.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure2; + +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +@Category({MasterTests.class, SmallTests.class}) +public class TestProcedureCleanup { + @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule + .forClass(TestProcedureCleanup.class); + + + private static final Logger LOG = LoggerFactory.getLogger(TestProcedureCleanup.class); + private static final int PROCEDURE_EXECUTOR_SLOTS = 1; + + private static TestProcEnv procEnv; + private static WALProcedureStore procStore; + + private static ProcedureExecutor procExecutor; + + private static HBaseCommonTestingUtility htu; + + private static FileSystem fs; + private static Path testDir; + private static Path logDir; + + private static class TestProcEnv { + + } + + private void createProcExecutor(String dir) throws Exception { + logDir = new Path(testDir, dir); + procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); + procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, + procStore); + procStore.start(PROCEDURE_EXECUTOR_SLOTS); + ProcedureTestingUtility + .initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true, true); + } + + @BeforeClass + public static void setUp() throws Exception { + htu = new HBaseCommonTestingUtility(); + + // NOTE: The executor will be created by each test + procEnv = new TestProcEnv(); + testDir = htu.getDataTestDir(); + fs = testDir.getFileSystem(htu.getConfiguration()); + assertTrue(testDir.depth() > 1); + + + } + + @Test + public void testProcedureShouldNotCleanOnLoad() throws Exception { + createProcExecutor("testProcedureShouldNotCleanOnLoad"); + final RootProcedure proc = new RootProcedure(); + long rootProc = procExecutor.submitProcedure(proc); + LOG.info("Begin to execute " + rootProc); + // wait until the child procedure arrival + while(procExecutor.getProcedures().size() < 2) { + Thread.sleep(100); + } + SuspendProcedure suspendProcedure = (SuspendProcedure) procExecutor + .getProcedures().get(1); + // wait until the suspendProcedure executed + suspendProcedure.latch.countDown(); + Thread.sleep(100); + // roll the procedure log + LOG.info("Begin to roll log "); + procStore.rollWriterForTesting(); + LOG.info("finish to roll log "); + Thread.sleep(500); + LOG.info("begin to restart1 "); + ProcedureTestingUtility.restart(procExecutor, true); + LOG.info("finish to restart1 "); + Assert.assertTrue(procExecutor.getProcedure(rootProc) != null); + Thread.sleep(500); + LOG.info("begin to restart2 "); + ProcedureTestingUtility.restart(procExecutor, true); + LOG.info("finish to restart2 "); + Assert.assertTrue(procExecutor.getProcedure(rootProc) != null); + } + + @Test + public void testProcedureUpdatedShouldClean() throws Exception { + createProcExecutor("testProcedureUpdatedShouldClean"); + SuspendProcedure suspendProcedure = new SuspendProcedure(); + long suspendProc = procExecutor.submitProcedure(suspendProcedure); + LOG.info("Begin to execute " + suspendProc); + suspendProcedure.latch.countDown(); + Thread.sleep(500); + LOG.info("begin to restart1 "); + ProcedureTestingUtility.restart(procExecutor, true); + LOG.info("finish to restart1 "); + while(procExecutor.getProcedure(suspendProc) == null) { + Thread.sleep(100); + } + // Wait until the suspendProc executed after restart + suspendProcedure = (SuspendProcedure) procExecutor.getProcedure(suspendProc); + suspendProcedure.latch.countDown(); + Thread.sleep(500); + // Should be 1 log since the suspendProcedure is updated in the new log + Assert.assertTrue(procStore.getActiveLogs().size() == 1); + // restart procExecutor + LOG.info("begin to restart2"); + // Restart the executor but do not start the workers. + // Otherwise, the suspendProcedure will soon be executed and the oldest log + // will be cleaned, leaving only the newest log. + ProcedureTestingUtility.restart(procExecutor, true, false); + LOG.info("finish to restart2"); + // There should be two active logs + Assert.assertTrue(procStore.getActiveLogs().size() == 2); + procExecutor.startWorkers(); + + } + + @Test + public void testProcedureDeletedShouldClean() throws Exception { + createProcExecutor("testProcedureDeletedShouldClean"); + WaitProcedure waitProcedure = new WaitProcedure(); + long waitProce = procExecutor.submitProcedure(waitProcedure); + LOG.info("Begin to execute " + waitProce); + Thread.sleep(500); + LOG.info("begin to restart1 "); + ProcedureTestingUtility.restart(procExecutor, true); + LOG.info("finish to restart1 "); + while(procExecutor.getProcedure(waitProce) == null) { + Thread.sleep(100); + } + // Wait until the suspendProc executed after restart + waitProcedure = (WaitProcedure) procExecutor.getProcedure(waitProce); + waitProcedure.latch.countDown(); + Thread.sleep(500); + // Should be 1 log since the suspendProcedure is updated in the new log + Assert.assertTrue(procStore.getActiveLogs().size() == 1); + // restart procExecutor + LOG.info("begin to restart2"); + // Restart the executor but do not start the workers. + // Otherwise, the suspendProcedure will soon be executed and the oldest log + // will be cleaned, leaving only the newest log. + ProcedureTestingUtility.restart(procExecutor, true, false); + LOG.info("finish to restart2"); + // There should be two active logs + Assert.assertTrue(procStore.getActiveLogs().size() == 2); + procExecutor.startWorkers(); + } + + public static class WaitProcedure + extends ProcedureTestingUtility.NoopProcedure { + public WaitProcedure() { + super(); + } + + private CountDownLatch latch = new CountDownLatch(1); + + @Override + protected Procedure[] execute(final TestProcEnv env) + throws ProcedureSuspendedException { + // Always wait here + LOG.info("wait here"); + try { + latch.await(); + } catch (Throwable t) { + + } + LOG.info("finished"); + return null; + } + } + + public static class SuspendProcedure extends ProcedureTestingUtility.NoopProcedure { + public SuspendProcedure() { + super(); + } + + private CountDownLatch latch = new CountDownLatch(1); + + @Override + protected Procedure[] execute(final TestProcEnv env) + throws ProcedureSuspendedException { + // Always suspend the procedure + LOG.info("suspend here"); + latch.countDown(); + throw new ProcedureSuspendedException(); + } + } + + public static class RootProcedure extends ProcedureTestingUtility.NoopProcedure { + private boolean childSpwaned = false; + + public RootProcedure() { + super(); + } + + @Override + protected Procedure[] execute(final TestProcEnv env) + throws ProcedureSuspendedException { + if (!childSpwaned) { + childSpwaned = true; + return new Procedure[] {new SuspendProcedure()}; + } else { + return null; + } + } + } + + +}