From 6a64811f447a1440663c179641d337c8dc1dfd2d Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Mon, 19 Nov 2018 11:03:52 +0800 Subject: [PATCH] HBASE-21490 WALProcedure may remove proc wal files still with active procedures Signed-off-by: Allan Yang --- .../store/ProcedureStoreTracker.java | 3 +- .../store/wal/ProcedureWALFormat.java | 35 ++-- .../store/wal/WALProcedureStore.java | 40 +++-- .../hbase/master/TestLoadProcedureError.java | 150 ++++++++++++++++++ 4 files changed, 193 insertions(+), 35 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestLoadProcedureError.java diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java index 25c94272f08..7d430d66b61 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java @@ -274,7 +274,8 @@ public class ProcedureStoreTracker { this.keepDeletes = false; this.partial = false; this.map.clear(); - resetModified(); + minModifiedProcId = Long.MAX_VALUE; + maxModifiedProcId = Long.MIN_VALUE; } public boolean isModified(long procId) { diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java index 179c7404d95..9686593049b 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java @@ -88,27 +88,24 @@ public final class ProcedureWALFormat { Loader loader) throws IOException { ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker, loader); tracker.setKeepDeletes(true); - try { - // Ignore the last log which is current active log. - while (logs.hasNext()) { - ProcedureWALFile log = logs.next(); - log.open(); - try { - reader.read(log); - } finally { - log.close(); - } + // Ignore the last log which is current active log. + while (logs.hasNext()) { + ProcedureWALFile log = logs.next(); + log.open(); + try { + reader.read(log); + } finally { + log.close(); } - reader.finish(); - - // The tracker is now updated with all the procedures read from the logs - if (tracker.isPartial()) { - tracker.setPartialFlag(false); - } - tracker.resetModified(); - } finally { - tracker.setKeepDeletes(false); } + reader.finish(); + + // The tracker is now updated with all the procedures read from the logs + if (tracker.isPartial()) { + tracker.setPartialFlag(false); + } + tracker.resetModified(); + tracker.setKeepDeletes(false); } public static void writeHeader(OutputStream stream, ProcedureWALHeader header) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 1b1018e82cf..e8a6d97a314 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -448,13 +448,14 @@ public class WALProcedureStore extends ProcedureStoreBase { lock.lock(); try { if (logs.isEmpty()) { - throw new RuntimeException("recoverLease() must be called before loading data"); + throw new IllegalStateException("recoverLease() must be called before loading data"); } // Nothing to do, If we have only the current log. if (logs.size() == 1) { LOG.debug("No state logs to replay."); loader.setMaxProcId(0); + loading.set(false); return; } @@ -488,15 +489,20 @@ public class WALProcedureStore extends ProcedureStoreBase { // TODO: sideline corrupted log } }); + // if we fail when loading, we should prevent persisting the storeTracker later in the stop + // method. As it may happen that, we have finished constructing the modified and deleted bits, + // but before we call resetModified, we fail, then if we persist the storeTracker then when + // restarting, we will consider that all procedures have been included in this file and delete + // all the previous files. Obviously this not correct. So here we will only set loading to + // false when we successfully loaded all the procedures, and when closing we will skip + // persisting the store tracker. And also, this will prevent the sync thread to do + // periodicRoll, where we may also clean old logs. + loading.set(false); + // try to cleanup inactive wals and complete the operation + buildHoldingCleanupTracker(); + tryCleanupLogsOnLoad(); } finally { - try { - // try to cleanup inactive wals and complete the operation - buildHoldingCleanupTracker(); - tryCleanupLogsOnLoad(); - loading.set(false); - } finally { - lock.unlock(); - } + lock.unlock(); } } @@ -1133,11 +1139,15 @@ public class WALProcedureStore extends ProcedureStoreBase { try { ProcedureWALFile log = logs.getLast(); - log.setProcIds(storeTracker.getModifiedMinProcId(), storeTracker.getModifiedMaxProcId()); - log.updateLocalTracker(storeTracker); - if (!abort) { - long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker); - log.addToSize(trailerSize); + // If the loading flag is true, it usually means that we fail when loading procedures, so we + // should not persist the store tracker, as its state may not be correct. + if (!loading.get()) { + log.setProcIds(storeTracker.getModifiedMinProcId(), storeTracker.getModifiedMaxProcId()); + log.updateLocalTracker(storeTracker); + if (!abort) { + long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker); + log.addToSize(trailerSize); + } } } catch (IOException e) { LOG.warn("Unable to write the trailer", e); @@ -1193,7 +1203,7 @@ public class WALProcedureStore extends ProcedureStoreBase { if (holdingCleanupTracker.isEmpty()) { break; } - iter.next(); + tracker = iter.next().getTracker(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestLoadProcedureError.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestLoadProcedureError.java new file mode 100644 index 00000000000..0a57dba911e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestLoadProcedureError.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.junit.Assert.assertFalse; + +import java.util.concurrent.CountDownLatch; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; + +/** + * Testcase for HBASE-21490. + */ +@Category({ MasterTests.class, MediumTests.class }) +public class TestLoadProcedureError { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestLoadProcedureError.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static TableName NAME = TableName.valueOf("Load"); + + private static volatile CountDownLatch ARRIVE; + + private static volatile boolean FINISH_PROC; + + private static volatile boolean FAIL_LOAD; + + public static final class TestProcedure extends NoopProcedure + implements TableProcedureInterface { + + @Override + protected Procedure[] execute(MasterProcedureEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + if (ARRIVE != null) { + ARRIVE.countDown(); + ARRIVE = null; + } + if (FINISH_PROC) { + return null; + } + setTimeout(1000); + setState(ProcedureState.WAITING_TIMEOUT); + throw new ProcedureSuspendedException(); + } + + @Override + protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { + setState(ProcedureState.RUNNABLE); + env.getProcedureScheduler().addBack(this); + return false; + } + + @Override + protected void afterReplay(MasterProcedureEnv env) { + if (FAIL_LOAD) { + throw new RuntimeException("Inject error"); + } + } + + @Override + public TableName getTableName() { + return NAME; + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.READ; + } + } + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + private void waitNoMaster() { + UTIL.waitFor(30000, () -> UTIL.getMiniHBaseCluster().getLiveMasterThreads().isEmpty()); + } + + @Test + public void testLoadError() throws Exception { + ProcedureExecutor procExec = + UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); + ARRIVE = new CountDownLatch(1); + long procId = procExec.submitProcedure(new TestProcedure()); + ARRIVE.await(); + FAIL_LOAD = true; + // do not persist the store tracker + UTIL.getMiniHBaseCluster().getMaster().getWalProcedureStore().stop(true); + UTIL.getMiniHBaseCluster().getMaster().abort("for testing"); + waitNoMaster(); + // restart twice, and should fail twice, as we will throw an exception in the afterReplay above + // in order to reproduce the problem in HBASE-21490 stably, here we will wait until a master is + // fully done, before starting the new master, otherwise the new master may start too early and + // call recoverLease on the proc wal files and cause we fail to persist the store tracker when + // shutting down + UTIL.getMiniHBaseCluster().startMaster(); + waitNoMaster(); + UTIL.getMiniHBaseCluster().startMaster(); + waitNoMaster(); + FAIL_LOAD = false; + HMaster master = UTIL.getMiniHBaseCluster().startMaster().getMaster(); + UTIL.waitFor(30000, () -> master.isActiveMaster() && master.isInitialized()); + // assert the procedure is still there and not finished yet + TestProcedure proc = (TestProcedure) master.getMasterProcedureExecutor().getProcedure(procId); + assertFalse(proc.isFinished()); + FINISH_PROC = true; + UTIL.waitFor(30000, () -> proc.isFinished()); + } +}