From 9da4c1393d293835608e88ef44f9de505a33b4b8 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Thu, 11 Oct 2018 15:43:11 +0800 Subject: [PATCH] HBASE-21254 Need to find a way to limit the number of proc wal files --- .../hbase/procedure2/ProcedureExecutor.java | 41 +++- .../hbase/procedure2/store/BitSetNode.java | 23 ++ .../procedure2/store/ProcedureStore.java | 26 ++- .../procedure2/store/ProcedureStoreBase.java | 22 +- .../store/ProcedureStoreTracker.java | 13 +- .../store/wal/WALProcedureStore.java | 40 +++- .../store/TestProcedureStoreTracker.java | 64 ++---- .../store/wal/TestForceUpdateProcedure.java | 217 ++++++++++++++++++ .../apache/hadoop/hbase/master/HMaster.java | 9 +- .../master/procedure/MasterProcedureEnv.java | 21 -- .../master/assignment/MockMasterServices.java | 9 +- 11 files changed, 382 insertions(+), 103 deletions(-) create mode 100644 hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestForceUpdateProcedure.java diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 12520d6a675..b7c1ac8cdac 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -31,6 +31,8 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -44,6 +46,7 @@ import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.procedure2.Procedure.LockState; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener; import org.apache.hadoop.hbase.procedure2.util.StringUtils; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -56,6 +59,7 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; @@ -347,6 +351,9 @@ public class ProcedureExecutor { */ private final ProcedureScheduler scheduler; + private final Executor forceUpdateExecutor = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Force-Update-PEWorker-%d").build()); + private final AtomicLong lastProcId = new AtomicLong(-1); private final AtomicLong workerId = new AtomicLong(0); private final AtomicInteger activeExecutorCount = new AtomicInteger(0); @@ -369,6 +376,25 @@ public class ProcedureExecutor { this(conf, environment, store, new SimpleProcedureScheduler()); } + private void forceUpdateProcedure(long procId) throws IOException { + IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId); + try { + Procedure proc = procedures.get(procId); + if (proc == null) { + LOG.debug("No pending procedure with id = {}, skip force updating.", procId); + return; + } + if (proc.isFinished()) { + LOG.debug("Procedure {} has already been finished, skip force updating.", proc); + return; + } + LOG.debug("Force update procedure {}", proc); + store.update(proc); + } finally { + procExecutionLock.releaseLockEntry(lockEntry); + } + } + public ProcedureExecutor(final Configuration conf, final TEnvironment environment, final ProcedureStore store, final ProcedureScheduler scheduler) { this.environment = environment; @@ -377,7 +403,19 @@ public class ProcedureExecutor { this.conf = conf; this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, DEFAULT_CHECK_OWNER_SET); refreshConfiguration(conf); + store.registerListener(new ProcedureStoreListener() { + @Override + public void forceUpdate(long[] procIds) { + Arrays.stream(procIds).forEach(procId -> forceUpdateExecutor.execute(() -> { + try { + forceUpdateProcedure(procId); + } catch (IOException e) { + LOG.warn("Failed to force update procedure with pid={}", procId); + } + })); + } + }); } private void load(final boolean abortOnCorruption) throws IOException { @@ -1057,7 +1095,7 @@ public class ProcedureExecutor { // Now, the procedure is not finished, and no one can execute it since we take the lock now // And we can be sure that its ancestor is not running too, since their child has not // finished yet - Procedure current = procedure; + Procedure current = procedure; while (current != null) { LOG.debug("Bypassing {}", current); current.bypass(); @@ -1989,7 +2027,6 @@ public class ProcedureExecutor { public void sendStopSignal() { scheduler.signalAll(); } - @Override public void run() { long lastUpdate = EnvironmentEdgeManager.currentTime(); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java index b76c026d01d..efb806ffbf8 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java @@ -17,7 +17,9 @@ */ package org.apache.hadoop.hbase.procedure2.store; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker.DeleteState; import org.apache.yetus.audience.InterfaceAudience; @@ -196,6 +198,27 @@ class BitSetNode { return true; } + /** + * @return all the active procedure ids in this bit set. + */ + public long[] getActiveProcIds() { + List procIds = new ArrayList<>(); + for (int wordIndex = 0; wordIndex < modified.length; wordIndex++) { + if (deleted[wordIndex] == WORD_MASK || modified[wordIndex] == 0) { + // This should be the common case, where most procedures has been deleted. + continue; + } + long baseProcId = getStart() + (wordIndex << ADDRESS_BITS_PER_WORD); + for (int i = 0; i < (1 << ADDRESS_BITS_PER_WORD); i++) { + long mask = 1L << i; + if ((deleted[wordIndex] & mask) == 0 && (modified[wordIndex] & mask) != 0) { + procIds.add(baseProcId + i); + } + } + } + return procIds.stream().mapToLong(Long::longValue).toArray(); + } + /** * @return true, if there are no active procedures in this BitSetNode, else false. */ diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java index 8063b125ba5..0599acfcc85 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java @@ -34,19 +34,37 @@ import org.apache.hadoop.hbase.procedure2.Procedure; public interface ProcedureStore { /** * Store listener interface. + *

* The main process should register a listener and respond to the store events. */ public interface ProcedureStoreListener { + /** * triggered when the store sync is completed. */ - void postSync(); + default void postSync() { + } /** - * triggered when the store is not able to write out data. - * the main process should abort. + * triggered when the store is not able to write out data. the main process should abort. */ - void abortProcess(); + default void abortProcess() { + } + + /** + * Suggest that the upper layer should update the state of some procedures. Ignore this call + * will not effect correctness but performance. + *

+ * For a WAL based ProcedureStore implementation, if all the procedures stored in a WAL file + * have been deleted, or updated later in another WAL file, then we can delete the WAL file. If + * there are old procedures in a WAL file which are never deleted or updated, then we can not + * delete the WAL file and this will cause we hold lots of WAL file and slow down the master + * restarts. So here we introduce this method to tell the upper layer that please update the + * states of these procedures so that we can delete the old WAL file. + * @param procIds the id for the procedures + */ + default void forceUpdate(long[] procIds) { + } } /** diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java index 90da9331f04..b1a8d3d1a2e 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java @@ -15,12 +15,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.procedure2.store; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.yetus.audience.InterfaceAudience; /** @@ -58,19 +56,15 @@ public abstract class ProcedureStoreBase implements ProcedureStore { return listeners.remove(listener); } - protected void sendPostSyncSignal() { - if (!this.listeners.isEmpty()) { - for (ProcedureStoreListener listener : this.listeners) { - listener.postSync(); - } - } + protected final void sendPostSyncSignal() { + listeners.forEach(ProcedureStoreListener::postSync); } - protected void sendAbortProcessSignal() { - if (!this.listeners.isEmpty()) { - for (ProcedureStoreListener listener : this.listeners) { - listener.abortProcess(); - } - } + protected final void sendAbortProcessSignal() { + listeners.forEach(ProcedureStoreListener::abortProcess); + } + + protected final void sendForceUpdateSignal(long[] procIds) { + listeners.forEach(l -> l.forceUpdate(procIds)); } } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java index 361419ab48f..f98c766d779 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java @@ -23,9 +23,10 @@ import java.util.Arrays; import java.util.Iterator; import java.util.Map; import java.util.TreeMap; - +import java.util.stream.LongStream; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; + import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; /** @@ -335,6 +336,16 @@ public class ProcedureStoreTracker { return true; } + /** + * Will be used when there are too many proc wal files. We will rewrite the states of the active + * procedures in the oldest proc wal file so that we can delete it. + * @return all the active procedure ids in this tracker. + */ + public long[] getAllActiveProcIds() { + return map.values().stream().map(BitSetNode::getActiveProcIds).filter(p -> p.length > 0) + .flatMapToLong(LongStream::of).toArray(); + } + /** * Clears the list of updated procedure ids. This doesn't affect global list of active * procedure ids. diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index b3f5d10f83f..1aee86d80e6 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase; import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; import org.apache.hadoop.hbase.procedure2.util.ByteSlot; @@ -98,7 +99,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu * with the tracker of every newer wal files, using the * {@link ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker)}. If we find out * that all the modified procedures for the oldest wal file are modified or deleted in newer wal - * files, then we can delete it. + * files, then we can delete it. This is because that, every time we call + * {@link ProcedureStore#insert(Procedure[])} or {@link ProcedureStore#update(Procedure)}, we will + * persist the full state of a Procedure, so the earlier wal records for this procedure can all be + * deleted. * @see ProcedureWALPrettyPrinter for printing content of a single WAL. * @see #main(String[]) to parse a directory of MasterWALProcs. */ @@ -116,7 +120,7 @@ public class WALProcedureStore extends ProcedureStoreBase { public static final String WAL_COUNT_WARN_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.warn.threshold"; - private static final int DEFAULT_WAL_COUNT_WARN_THRESHOLD = 64; + private static final int DEFAULT_WAL_COUNT_WARN_THRESHOLD = 10; public static final String EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY = "hbase.procedure.store.wal.exec.cleanup.on.load"; @@ -496,7 +500,9 @@ public class WALProcedureStore extends ProcedureStoreBase { private void tryCleanupLogsOnLoad() { // nothing to cleanup. - if (logs.size() <= 1) return; + if (logs.size() <= 1) { + return; + } // the config says to not cleanup wals on load. if (!conf.getBoolean(EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, @@ -967,7 +973,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } @VisibleForTesting - protected void periodicRollForTesting() throws IOException { + void periodicRollForTesting() throws IOException { lock.lock(); try { periodicRoll(); @@ -977,7 +983,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } @VisibleForTesting - protected boolean rollWriterForTesting() throws IOException { + boolean rollWriterForTesting() throws IOException { lock.lock(); try { return rollWriter(); @@ -987,7 +993,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } @VisibleForTesting - protected void removeInactiveLogsForTesting() throws Exception { + void removeInactiveLogsForTesting() throws Exception { lock.lock(); try { removeInactiveLogs(); @@ -1041,7 +1047,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } @VisibleForTesting - boolean rollWriter(final long logId) throws IOException { + boolean rollWriter(long logId) throws IOException { assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId; assert lock.isHeldByCurrentThread() : "expected to be the lock owner. " + lock.isLocked(); @@ -1059,10 +1065,10 @@ public class WALProcedureStore extends ProcedureStoreBase { try { newStream = CommonFSUtils.createForWal(fs, newLogFile, false); } catch (FileAlreadyExistsException e) { - LOG.error("Log file with id=" + logId + " already exists", e); + LOG.error("Log file with id={} already exists", logId, e); return false; } catch (RemoteException re) { - LOG.warn("failed to create log file with id=" + logId, re); + LOG.warn("failed to create log file with id={}", logId, re); return false; } // After we create the stream but before we attempt to use it at all @@ -1099,9 +1105,19 @@ public class WALProcedureStore extends ProcedureStoreBase { if (logs.size() == 2) { buildHoldingCleanupTracker(); } else if (logs.size() > walCountWarnThreshold) { - LOG.warn("procedure WALs count=" + logs.size() + - " above the warning threshold " + walCountWarnThreshold + - ". check running procedures to see if something is stuck."); + LOG.warn("procedure WALs count={} above the warning threshold {}. check running procedures" + + " to see if something is stuck.", logs.size(), walCountWarnThreshold); + // This is just like what we have done at RS side when there are too many wal files. For RS, + // if there are too many wal files, we will find out the wal entries in the oldest file, and + // tell the upper layer to flush these regions so the wal entries will be useless and then we + // can delete the wal file. For WALProcedureStore, the assumption is that, if all the + // procedures recorded in a proc wal file are modified or deleted in a new proc wal file, then + // we are safe to delete it. So here if there are too many proc wal files, we will find out + // the procedure ids in the oldest file, which are neither modified nor deleted in newer proc + // wal files, and tell upper layer to update the state of these procedures to the newest proc + // wal file(by calling ProcedureStore.update), then we are safe to delete the oldest proc wal + // file. + sendForceUpdateSignal(holdingCleanupTracker.getAllActiveProcIds()); } LOG.info("Rolled new Procedure Store WAL, id={}", logId); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java index d6b58d0b33f..93fcbe97fa4 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java @@ -217,54 +217,8 @@ public class TestProcedureStoreTracker { } } - boolean isDeleted(ProcedureStoreTracker n, long procId) { - return n.isDeleted(procId) == ProcedureStoreTracker.DeleteState.YES; - } - - boolean isDeleted(BitSetNode n, long procId) { - return n.isDeleted(procId) == ProcedureStoreTracker.DeleteState.YES; - } - - /** - * @param active list of active proc ids. To mark them as non-deleted, since by default a proc - * id is always marked deleted. - */ - ProcedureStoreTracker buildTracker(long[] active, long[] updated, long[] deleted) { - ProcedureStoreTracker tracker = new ProcedureStoreTracker(); - for (long i : active) { - tracker.insert(i); - } - tracker.resetModified(); - for (long i : updated) { - tracker.update(i); - } - for (long i : deleted) { - tracker.delete(i); - } - return tracker; - } - - /** - * @param active list of active proc ids. To mark them as non-deleted, since by default a proc - * id is always marked deleted. - */ - BitSetNode buildBitSetNode(long[] active, long[] updated, long[] deleted) { - BitSetNode bitSetNode = new BitSetNode(0L, false); - for (long i : active) { - bitSetNode.insertOrUpdate(i); - } - bitSetNode.resetModified(); - for (long i : updated) { - bitSetNode.insertOrUpdate(i); - } - for (long i : deleted) { - bitSetNode.delete(i); - } - return bitSetNode; - } - @Test - public void testSetDeletedIfSet() { + public void testSetDeletedIfModified() { final ProcedureStoreTracker tracker = new ProcedureStoreTracker(); final long[] procIds = new long[] { 1, 3, 7, 152, 512, 1024, 1025 }; @@ -291,4 +245,20 @@ public class TestProcedureStoreTracker { tracker.setDeletedIfModified(procIds); assertEquals(true, tracker.isEmpty()); } + + @Test + public void testGetActiveProcIds() { + ProcedureStoreTracker tracker = new ProcedureStoreTracker(); + for (int i = 0; i < 10000; i++) { + tracker.insert(i * 10); + } + for (int i = 0; i < 10000; i += 2) { + tracker.delete(i * 10); + } + long[] activeProcIds = tracker.getAllActiveProcIds(); + assertEquals(5000, activeProcIds.length); + for (int i = 0; i < 5000; i++) { + assertEquals((2 * i + 1) * 10, activeProcIds[i]); + } + } } diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestForceUpdateProcedure.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestForceUpdateProcedure.java new file mode 100644 index 00000000000..0d37b77c669 --- /dev/null +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestForceUpdateProcedure.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure2.store.wal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Exchanger; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; + +@Category({ MasterTests.class, SmallTests.class }) +public class TestForceUpdateProcedure { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestForceUpdateProcedure.class); + + private static HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(); + + private static WALProcedureStore STORE; + + private static ProcedureExecutor EXEC; + + private static Exchanger EXCHANGER = new Exchanger<>(); + + private static int WAL_COUNT = 5; + + private static void createStoreAndExecutor() throws IOException { + Path logDir = UTIL.getDataTestDir("proc-wals"); + STORE = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), logDir); + STORE.start(1); + EXEC = new ProcedureExecutor(UTIL.getConfiguration(), null, STORE); + ProcedureTestingUtility.initAndStartWorkers(EXEC, 1, true); + } + + @BeforeClass + public static void setUp() throws IOException { + UTIL.getConfiguration().setInt(WALProcedureStore.WAL_COUNT_WARN_THRESHOLD_CONF_KEY, WAL_COUNT); + createStoreAndExecutor(); + } + + private static void stopStoreAndExecutor() { + EXEC.stop(); + STORE.stop(false); + EXEC = null; + STORE = null; + } + + @AfterClass + public static void tearDown() throws IOException { + stopStoreAndExecutor(); + UTIL.cleanupTestDir(); + } + + public static final class WaitingProcedure extends Procedure { + + @Override + protected Procedure[] execute(Void env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + EXCHANGER.exchange(Boolean.TRUE); + setState(ProcedureState.WAITING_TIMEOUT); + setTimeout(Integer.MAX_VALUE); + throw new ProcedureSuspendedException(); + } + + @Override + protected void rollback(Void env) throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean abort(Void env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + } + } + + public static final class ParentProcedure extends Procedure { + + @SuppressWarnings("unchecked") + @Override + protected Procedure[] execute(Void env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + return new Procedure[] { new WaitingProcedure() }; + } + + @Override + protected void rollback(Void env) throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean abort(Void env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + } + } + + public static final class ExchangeProcedure extends Procedure { + + @SuppressWarnings("unchecked") + @Override + protected Procedure[] execute(Void env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + if (EXCHANGER.exchange(Boolean.TRUE)) { + return new Procedure[] { this }; + } else { + return null; + } + } + + @Override + protected void rollback(Void env) throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean abort(Void env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + } + } + + @Test + public void test() throws IOException, InterruptedException { + EXEC.submitProcedure(new ParentProcedure()); + EXCHANGER.exchange(Boolean.TRUE); + UTIL.waitFor(10000, () -> EXEC.getActiveExecutorCount() == 0); + // The above operations are used to make sure that we have persist the states of the two + // procedures. + long procId = EXEC.submitProcedure(new ExchangeProcedure()); + assertEquals(1, STORE.getActiveLogs().size()); + for (int i = 0; i < WAL_COUNT - 1; i++) { + assertTrue(STORE.rollWriterForTesting()); + // The WaitinProcedure never gets updated so we can not delete the oldest wal file, so the + // number of wal files will increase + assertEquals(2 + i, STORE.getActiveLogs().size()); + EXCHANGER.exchange(Boolean.TRUE); + Thread.sleep(1000); + } + STORE.rollWriterForTesting(); + // Finish the ExchangeProcedure + EXCHANGER.exchange(Boolean.FALSE); + // Make sure that we can delete several wal files because we force update the state of + // WaitingProcedure. Notice that the last closed wal files can not be deleted, as when rolling + // the newest wal file does not have anything in it, and in the closed file we still have the + // state for the ExchangeProcedure so it can not be deleted + UTIL.waitFor(10000, () -> STORE.getActiveLogs().size() <= 2); + UTIL.waitFor(10000, () -> EXEC.isFinished(procId)); + // Make sure that after the force update we could still load the procedures + stopStoreAndExecutor(); + createStoreAndExecutor(); + Map, Procedure> procMap = new HashMap<>(); + EXEC.getActiveProceduresNoCopy().forEach(p -> procMap.put(p.getClass(), p)); + assertEquals(2, procMap.size()); + ParentProcedure parentProc = (ParentProcedure) procMap.get(ParentProcedure.class); + assertEquals(ProcedureState.WAITING, parentProc.getState()); + WaitingProcedure waitingProc = (WaitingProcedure) procMap.get(WaitingProcedure.class); + assertEquals(ProcedureState.WAITING_TIMEOUT, waitingProc.getState()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 0678bfe9477..43a1a15d906 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -159,6 +159,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.quotas.MasterQuotaManager; import org.apache.hadoop.hbase.quotas.MasterQuotasObserver; @@ -1466,7 +1467,13 @@ public class HMaster extends HRegionServer implements MasterServices { MasterProcedureEnv procEnv = new MasterProcedureEnv(this); procedureStore = new WALProcedureStore(conf, new MasterProcedureEnv.WALStoreLeaseRecovery(this)); - procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this)); + procedureStore.registerListener(new ProcedureStoreListener() { + + @Override + public void abortProcess() { + abort("The Procedure Store lost the lease", null); + } + }); MasterProcedureScheduler procedureScheduler = procEnv.getProcedureScheduler(); procedureExecutor = new ProcedureExecutor<>(conf, procEnv, procedureStore, procedureScheduler); configurationManager.registerObserver(procEnv); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java index 0ec932ce47d..1eec72ab862 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureEvent; -import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; @@ -72,26 +71,6 @@ public class MasterProcedureEnv implements ConfigurationObserver { } } - @InterfaceAudience.Private - public static class MasterProcedureStoreListener - implements ProcedureStore.ProcedureStoreListener { - private final MasterServices master; - - public MasterProcedureStoreListener(final MasterServices master) { - this.master = master; - } - - @Override - public void postSync() { - // no-op - } - - @Override - public void abortProcess() { - master.abort("The Procedure Store lost the lease", null); - } - } - private final RSProcedureDispatcher remoteDispatcher; private final MasterProcedureScheduler procSched; private final MasterServices master; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java index 320687756b3..c0dc72c39a4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.zookeeper.KeeperException; @@ -220,7 +221,13 @@ public class MockMasterServices extends MockNoopMasterServices { throws IOException { final Configuration conf = getConfiguration(); this.procedureStore = new NoopProcedureStore(); - this.procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this)); + this.procedureStore.registerListener(new ProcedureStoreListener() { + + @Override + public void abortProcess() { + abort("The Procedure Store lost the lease", null); + } + }); this.procedureEnv = new MasterProcedureEnv(this, remoteDispatcher != null ? remoteDispatcher : new RSProcedureDispatcher(this));