From 23b58fcca0e768aedc04b9b64cc9191cd606d8b1 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Wed, 24 Oct 2018 14:13:25 +0800 Subject: [PATCH] HBASE-21363 Rewrite the buildingHoldCleanupTracker method in WALProcedureStore --- .../hbase/procedure2/store/BitSetNode.java | 6 +- .../store/ProcedureStoreTracker.java | 60 +++---- .../store/wal/ProcedureWALFormat.java | 11 ++ .../store/wal/ProcedureWALFormatReader.java | 5 +- .../store/wal/WALProcedureStore.java | 41 +++-- .../procedure2/TestProcedureCleanup.java | 148 ++++++++++++------ 6 files changed, 163 insertions(+), 108 deletions(-) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java index 2030c8b69df..3102bde7d19 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java @@ -131,9 +131,11 @@ class BitSetNode { public BitSetNode(BitSetNode other, boolean resetDelete) { this.start = other.start; - this.partial = other.partial; - this.modified = other.modified.clone(); // The resetDelete will be set to true when building cleanup tracker. + // as we will reset deleted flags for all the unmodified bits to 1, the partial flag is useless + // so set it to false for not confusing the developers when debugging. + this.partial = resetDelete ? false : other.partial; + this.modified = other.modified.clone(); // The intention here is that, if a procedure is not modified in this tracker, then we do not // need to take care of it, so we will set deleted to true for these bits, i.e, if modified is // 0, then we set deleted to 1, otherwise keep it as is. So here, the equation is diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java index 9f99e266654..a0978e15fdd 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.Map; import java.util.TreeMap; +import java.util.function.BiFunction; import java.util.stream.LongStream; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.yetus.audience.InterfaceAudience; @@ -87,7 +88,10 @@ public class ProcedureStoreTracker { */ public void resetTo(ProcedureStoreTracker tracker, boolean resetDelete) { reset(); - this.partial = tracker.partial; + // resetDelete will true if we are building the cleanup tracker, as we will reset deleted flags + // for all the unmodified bits to 1, the partial flag is useless so set it to false for not + // confusing the developers when debugging. + this.partial = resetDelete ? false : tracker.partial; this.minModifiedProcId = tracker.minModifiedProcId; this.maxModifiedProcId = tracker.maxModifiedProcId; this.keepDeletes = tracker.keepDeletes; @@ -197,49 +201,45 @@ public class ProcedureStoreTracker { } } - /** - * Similar with {@link #setDeletedIfModified(long...)}, but here the {@code procId} are given by - * the {@code tracker}. If a procedure is modified by us, and also by the given {@code tracker}, - * then we mark it as deleted. - * @see #setDeletedIfModified(long...) - */ - public void setDeletedIfModifiedInBoth(ProcedureStoreTracker tracker, boolean globalTracker) { + private void setDeleteIf(ProcedureStoreTracker tracker, + BiFunction func) { BitSetNode trackerNode = null; for (BitSetNode node : map.values()) { - final long minProcId = node.getStart(); - final long maxProcId = node.getEnd(); + long minProcId = node.getStart(); + long maxProcId = node.getEnd(); for (long procId = minProcId; procId <= maxProcId; ++procId) { if (!node.isModified(procId)) { continue; } trackerNode = tracker.lookupClosestNode(trackerNode, procId); - if (trackerNode == null || !trackerNode.contains(procId)) { - // the procId is not exist in the track, we can only delete the proc - // if globalTracker set to true. - // Only if the procedure is not in the global tracker we can delete the - // the procedure. In other cases, the procedure may not update in a single - // log, we cannot delete it just because the log's track doesn't have - // any info for the procedure. - if (globalTracker) { - node.delete(procId); - } - continue; - } - // Only check delete in the global tracker, only global tracker has the - // whole picture - if (globalTracker && trackerNode.isDeleted(procId) == DeleteState.YES) { - node.delete(procId); - continue; - } - if (trackerNode.isModified(procId)) { - // the procedure was modified + if (func.apply(trackerNode, procId)) { node.delete(procId); } } } } + /** + * For the global tracker, we will use this method to build the holdingCleanupTracker, as the + * modified flags will be cleared after rolling so we only need to test the deleted flags. + * @see #setDeletedIfModifiedInBoth(ProcedureStoreTracker) + */ + public void setDeletedIfDeletedByThem(ProcedureStoreTracker tracker) { + setDeleteIf(tracker, (node, procId) -> node == null || !node.contains(procId) || + node.isDeleted(procId) == DeleteState.YES); + } + + /** + * Similar with {@link #setDeletedIfModified(long...)}, but here the {@code procId} are given by + * the {@code tracker}. If a procedure is modified by us, and also by the given {@code tracker}, + * then we mark it as deleted. + * @see #setDeletedIfModified(long...) + */ + public void setDeletedIfModifiedInBoth(ProcedureStoreTracker tracker) { + setDeleteIf(tracker, (node, procId) -> node != null && node.isModified(procId)); + } + /** * lookup the node containing the specified procId. * @param node cached node to check before doing a lookup diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java index c9986edc904..179c7404d95 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java @@ -73,6 +73,17 @@ public final class ProcedureWALFormat { private ProcedureWALFormat() {} + /** + * Load all the procedures in these ProcedureWALFiles, and rebuild the given {@code tracker} if + * needed, i.e, the {@code tracker} is a partial one. + *

+ * The method in the give {@code loader} will be called at the end after we load all the + * procedures and construct the hierarchy. + *

+ * And we will call the {@link ProcedureStoreTracker#resetModified()} method for the given + * {@code tracker} before returning, as it will be used to track the next proc wal file's modified + * procedures. + */ public static void load(Iterator logs, ProcedureStoreTracker tracker, Loader loader) throws IOException { ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker, loader); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java index 2e1e06ce054..1b19abbeb92 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java @@ -134,9 +134,8 @@ public class ProcedureWALFormatReader { } procedureMap.merge(localProcedureMap); } - if (localTracker.isPartial()) { - localTracker.setPartialFlag(false); - } + // Do not reset the partial flag for local tracker, as here the local tracker only know the + // procedures which are modified in this file. } public void finish() throws IOException { diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 4bc668e30e9..39ad939b2da 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -97,7 +97,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu * will first be initialized to the oldest file's tracker(which is stored in the trailer), using the * method {@link ProcedureStoreTracker#resetTo(ProcedureStoreTracker, boolean)}, and then merge it * with the tracker of every newer wal files, using the - * {@link ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker, boolean)}. + * {@link ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker)}. * If we find out * that all the modified procedures for the oldest wal file are modified or deleted in newer wal * files, then we can delete it. This is because that, every time we call @@ -1173,27 +1173,26 @@ public class WALProcedureStore extends ProcedureStoreBase { } // compute the holding tracker. - // - the first WAL is used for the 'updates' - // - the global tracker is passed in first to decide which procedures are not - // exist anymore, so we can mark them as deleted in holdingCleanupTracker. - // Only global tracker have the whole picture here. - // - the other WALs are scanned to remove procs already updated in a newer wal. - // If it is updated in a newer wal, we can mark it as delelted in holdingCleanupTracker - // But, we can not delete it if it was shown deleted in the newer wal, as said - // above. - // TODO: exit early if holdingCleanupTracker.isEmpty() + // - the first WAL is used for the 'updates' + // - the global tracker will be used to determine whether a procedure has been deleted + // - other trackers will be used to determine whether a procedure has been updated, as a deleted + // procedure can always be detected by checking the global tracker, we can save the deleted + // checks when applying other trackers holdingCleanupTracker.resetTo(logs.getFirst().getTracker(), true); - //Passing in the global tracker, we can delete the procedures not in the global - //tracker, because they are deleted in the later logs - holdingCleanupTracker.setDeletedIfModifiedInBoth(storeTracker, true); - for (int i = 1, size = logs.size() - 1; i < size; ++i) { - // Set deleteIfNotExists to false since a single log's tracker is passed in. - // Since a specific procedure may not show up in the log at all(not executed or - // updated during the time), we can not delete the procedure just because this log - // don't have the info of the procedure. We can delete the procedure only if - // in this log's tracker, it was cleanly showed that the procedure is modified or deleted - // in the corresponding BitSetNode. - holdingCleanupTracker.setDeletedIfModifiedInBoth(logs.get(i).getTracker(), false); + holdingCleanupTracker.setDeletedIfDeletedByThem(storeTracker); + // the logs is a linked list, so avoid calling get(index) on it. + Iterator iter = logs.iterator(); + // skip the tracker for the first file when creating the iterator. + iter.next(); + ProcedureStoreTracker tracker = iter.next().getTracker(); + // testing iter.hasNext after calling iter.next to skip applying the tracker for last file, + // which is just the storeTracker above. + while (iter.hasNext()) { + holdingCleanupTracker.setDeletedIfModifiedInBoth(tracker); + if (holdingCleanupTracker.isEmpty()) { + break; + } + iter.next(); } } diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureCleanup.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureCleanup.java index e06fdc53482..82917ea5315 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureCleanup.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureCleanup.java @@ -19,8 +19,13 @@ package org.apache.hadoop.hbase.procedure2; import static org.junit.Assert.assertTrue; +import java.io.IOException; +import java.util.Arrays; import java.util.concurrent.CountDownLatch; - +import java.util.concurrent.Exchanger; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -28,28 +33,32 @@ import org.apache.hadoop.hbase.HBaseCommonTestingUtility; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams; -@Category({MasterTests.class, SmallTests.class}) +@Category({ MasterTests.class, SmallTests.class }) public class TestProcedureCleanup { - @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule - .forClass(TestProcedureCleanup.class); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestProcedureCleanup.class); private static final Logger LOG = LoggerFactory.getLogger(TestProcedureCleanup.class); - private static final int PROCEDURE_EXECUTOR_SLOTS = 1; - private static TestProcEnv procEnv; + private static final int PROCEDURE_EXECUTOR_SLOTS = 2; + private static WALProcedureStore procStore; - private static ProcedureExecutor procExecutor; + private static ProcedureExecutor procExecutor; private static HBaseCommonTestingUtility htu; @@ -57,43 +66,35 @@ public class TestProcedureCleanup { private static Path testDir; private static Path logDir; - private static class TestProcEnv { + @Rule + public final TestName name = new TestName(); - } - - private void createProcExecutor(String dir) throws Exception { - logDir = new Path(testDir, dir); + private void createProcExecutor() throws Exception { + logDir = new Path(testDir, name.getMethodName()); procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); - procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, - procStore); + procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), null, procStore); procStore.start(PROCEDURE_EXECUTOR_SLOTS); - ProcedureTestingUtility - .initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true, true); + ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true, true); } @BeforeClass public static void setUp() throws Exception { htu = new HBaseCommonTestingUtility(); - + htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, true); // NOTE: The executor will be created by each test - procEnv = new TestProcEnv(); testDir = htu.getDataTestDir(); fs = testDir.getFileSystem(htu.getConfiguration()); assertTrue(testDir.depth() > 1); - - } @Test public void testProcedureShouldNotCleanOnLoad() throws Exception { - createProcExecutor("testProcedureShouldNotCleanOnLoad"); + createProcExecutor(); final RootProcedure proc = new RootProcedure(); long rootProc = procExecutor.submitProcedure(proc); LOG.info("Begin to execute " + rootProc); // wait until the child procedure arrival - while(procExecutor.getProcedures().size() < 2) { - Thread.sleep(100); - } + htu.waitFor(10000, () -> procExecutor.getProcedures().size() >= 2); SuspendProcedure suspendProcedure = (SuspendProcedure) procExecutor .getProcedures().get(1); // wait until the suspendProcedure executed @@ -107,17 +108,17 @@ public class TestProcedureCleanup { LOG.info("begin to restart1 "); ProcedureTestingUtility.restart(procExecutor, true); LOG.info("finish to restart1 "); - Assert.assertTrue(procExecutor.getProcedure(rootProc) != null); + assertTrue(procExecutor.getProcedure(rootProc) != null); Thread.sleep(500); LOG.info("begin to restart2 "); ProcedureTestingUtility.restart(procExecutor, true); LOG.info("finish to restart2 "); - Assert.assertTrue(procExecutor.getProcedure(rootProc) != null); + assertTrue(procExecutor.getProcedure(rootProc) != null); } @Test public void testProcedureUpdatedShouldClean() throws Exception { - createProcExecutor("testProcedureUpdatedShouldClean"); + createProcExecutor(); SuspendProcedure suspendProcedure = new SuspendProcedure(); long suspendProc = procExecutor.submitProcedure(suspendProcedure); LOG.info("Begin to execute " + suspendProc); @@ -126,15 +127,13 @@ public class TestProcedureCleanup { LOG.info("begin to restart1 "); ProcedureTestingUtility.restart(procExecutor, true); LOG.info("finish to restart1 "); - while(procExecutor.getProcedure(suspendProc) == null) { - Thread.sleep(100); - } + htu.waitFor(10000, () -> procExecutor.getProcedure(suspendProc) != null); // Wait until the suspendProc executed after restart suspendProcedure = (SuspendProcedure) procExecutor.getProcedure(suspendProc); suspendProcedure.latch.countDown(); Thread.sleep(500); // Should be 1 log since the suspendProcedure is updated in the new log - Assert.assertTrue(procStore.getActiveLogs().size() == 1); + assertTrue(procStore.getActiveLogs().size() == 1); // restart procExecutor LOG.info("begin to restart2"); // Restart the executor but do not start the workers. @@ -143,14 +142,14 @@ public class TestProcedureCleanup { ProcedureTestingUtility.restart(procExecutor, true, false); LOG.info("finish to restart2"); // There should be two active logs - Assert.assertTrue(procStore.getActiveLogs().size() == 2); + assertTrue(procStore.getActiveLogs().size() == 2); procExecutor.startWorkers(); } @Test public void testProcedureDeletedShouldClean() throws Exception { - createProcExecutor("testProcedureDeletedShouldClean"); + createProcExecutor(); WaitProcedure waitProcedure = new WaitProcedure(); long waitProce = procExecutor.submitProcedure(waitProcedure); LOG.info("Begin to execute " + waitProce); @@ -158,15 +157,13 @@ public class TestProcedureCleanup { LOG.info("begin to restart1 "); ProcedureTestingUtility.restart(procExecutor, true); LOG.info("finish to restart1 "); - while(procExecutor.getProcedure(waitProce) == null) { - Thread.sleep(100); - } + htu.waitFor(10000, () -> procExecutor.getProcedure(waitProce) != null); // Wait until the suspendProc executed after restart waitProcedure = (WaitProcedure) procExecutor.getProcedure(waitProce); waitProcedure.latch.countDown(); Thread.sleep(500); // Should be 1 log since the suspendProcedure is updated in the new log - Assert.assertTrue(procStore.getActiveLogs().size() == 1); + assertTrue(procStore.getActiveLogs().size() == 1); // restart procExecutor LOG.info("begin to restart2"); // Restart the executor but do not start the workers. @@ -175,12 +172,64 @@ public class TestProcedureCleanup { ProcedureTestingUtility.restart(procExecutor, true, false); LOG.info("finish to restart2"); // There should be two active logs - Assert.assertTrue(procStore.getActiveLogs().size() == 2); + assertTrue(procStore.getActiveLogs().size() == 2); procExecutor.startWorkers(); } - public static class WaitProcedure - extends ProcedureTestingUtility.NoopProcedure { + private void corrupt(FileStatus file) throws IOException { + LOG.info("Corrupt " + file); + Path tmpFile = file.getPath().suffix(".tmp"); + // remove the last byte to make the trailer corrupted + try (FSDataInputStream in = fs.open(file.getPath()); + FSDataOutputStream out = fs.create(tmpFile)) { + ByteStreams.copy(ByteStreams.limit(in, file.getLen() - 1), out); + } + fs.delete(file.getPath(), false); + fs.rename(tmpFile, file.getPath()); + } + + + public static final class ExchangeProcedure extends ProcedureTestingUtility.NoopProcedure { + + private final Exchanger exchanger = new Exchanger<>(); + + @SuppressWarnings("unchecked") + @Override + protected Procedure[] execute(Void env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + if (exchanger.exchange(Boolean.TRUE)) { + return new Procedure[] { this }; + } else { + return null; + } + } + } + + @Test + public void testResetDeleteWhenBuildingHoldingCleanupTracker() throws Exception { + createProcExecutor(); + ExchangeProcedure proc1 = new ExchangeProcedure(); + ExchangeProcedure proc2 = new ExchangeProcedure(); + procExecutor.submitProcedure(proc1); + long procId2 = procExecutor.submitProcedure(proc2); + Thread.sleep(500); + procStore.rollWriterForTesting(); + proc1.exchanger.exchange(Boolean.TRUE); + Thread.sleep(500); + + FileStatus[] walFiles = fs.listStatus(logDir); + Arrays.sort(walFiles, (f1, f2) -> f1.getPath().getName().compareTo(f2.getPath().getName())); + // corrupt the first proc wal file, so we will have a partial tracker for it after restarting + corrupt(walFiles[0]); + ProcedureTestingUtility.restart(procExecutor, false, true); + // also update proc2, which means that all the procedures in the first proc wal have been + // updated and it should be deleted. + proc2 = (ExchangeProcedure) procExecutor.getProcedure(procId2); + proc2.exchanger.exchange(Boolean.TRUE); + htu.waitFor(10000, () -> !fs.exists(walFiles[0].getPath())); + } + + public static class WaitProcedure extends ProcedureTestingUtility.NoopProcedure { public WaitProcedure() { super(); } @@ -188,8 +237,7 @@ public class TestProcedureCleanup { private CountDownLatch latch = new CountDownLatch(1); @Override - protected Procedure[] execute(final TestProcEnv env) - throws ProcedureSuspendedException { + protected Procedure[] execute(Void env) throws ProcedureSuspendedException { // Always wait here LOG.info("wait here"); try { @@ -202,7 +250,7 @@ public class TestProcedureCleanup { } } - public static class SuspendProcedure extends ProcedureTestingUtility.NoopProcedure { + public static class SuspendProcedure extends ProcedureTestingUtility.NoopProcedure { public SuspendProcedure() { super(); } @@ -210,8 +258,7 @@ public class TestProcedureCleanup { private CountDownLatch latch = new CountDownLatch(1); @Override - protected Procedure[] execute(final TestProcEnv env) - throws ProcedureSuspendedException { + protected Procedure[] execute(Void env) throws ProcedureSuspendedException { // Always suspend the procedure LOG.info("suspend here"); latch.countDown(); @@ -219,7 +266,7 @@ public class TestProcedureCleanup { } } - public static class RootProcedure extends ProcedureTestingUtility.NoopProcedure { + public static class RootProcedure extends ProcedureTestingUtility.NoopProcedure { private boolean childSpwaned = false; public RootProcedure() { @@ -227,16 +274,13 @@ public class TestProcedureCleanup { } @Override - protected Procedure[] execute(final TestProcEnv env) - throws ProcedureSuspendedException { + protected Procedure[] execute(Void env) throws ProcedureSuspendedException { if (!childSpwaned) { childSpwaned = true; - return new Procedure[] {new SuspendProcedure()}; + return new Procedure[] { new SuspendProcedure() }; } else { return null; } } } - - }