diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IdLock.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IdLock.java index 269bf8382fa..414cc66fd8f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IdLock.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IdLock.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; /** * Allows multiple concurrent clients to lock on a numeric id with a minimal @@ -97,6 +98,59 @@ public class IdLock { return entry; } + /** + * Blocks until the lock corresponding to the given id is acquired. + * + * @param id an arbitrary number to lock on + * @param time time to wait in ms + * @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release + * the lock + * @throws IOException if interrupted + */ + public Entry tryLockEntry(long id, long time) throws IOException { + Preconditions.checkArgument(time >= 0); + Entry entry = new Entry(id); + Entry existing; + long waitUtilTS = System.currentTimeMillis() + time; + long remaining = time; + while ((existing = map.putIfAbsent(entry.id, entry)) != null) { + synchronized (existing) { + if (existing.locked) { + ++existing.numWaiters; // Add ourselves to waiters. + try { + while (existing.locked) { + existing.wait(remaining); + if (existing.locked) { + long currentTS = System.currentTimeMillis(); + if (currentTS >= waitUtilTS) { + // time is up + return null; + } else { + // our wait is waken, but the lock is still taken, this can happen + // due to JDK Object's wait/notify mechanism. + // Calculate the new remaining time to wait + remaining = waitUtilTS - currentTS; + } + } + + } + } catch (InterruptedException e) { + throw new InterruptedIOException( + "Interrupted waiting to acquire sparse lock"); + } finally { + --existing.numWaiters; // Remove ourselves from waiters. + } + existing.locked = true; + return existing; + } + // If the entry is not locked, it might already be deleted from the + // map, so we cannot return it. We need to get our entry into the map + // or get someone else's locked entry. + } + } + return entry; + } + /** * Must be called in a finally block to decrease the internal counter and * remove the monitor object for the given id if the caller is the last diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index 2d303882fba..b2685f611dc 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -144,6 +144,32 @@ public abstract class Procedure implements Comparable implements Comparable implements Comparable implements Comparable { return submitProcedure(proc, null); } + /** + * Bypass a procedure. If the procedure is set to bypass, all the logic in + * execute/rollback will be ignored and it will return success, whatever. + * It is used to recover buggy stuck procedures, releasing the lock resources + * and letting other procedures to run. Bypassing one procedure (and its ancestors will + * be bypassed automatically) may leave the cluster in a middle state, e.g. region + * not assigned, or some hdfs files left behind. After getting rid of those stuck procedures, + * the operators may have to do some clean up on hdfs or schedule some assign procedures + * to let region online. DO AT YOUR OWN RISK. + *

+ * A procedure can be bypassed only if + * 1. The procedure is in state of RUNNABLE, WAITING, WAITING_TIMEOUT + * or it is a root procedure without any child. + * 2. No other worker thread is executing it + * 3. No child procedure has been submitted + * + *

+ * If all the requirements are meet, the procedure and its ancestors will be + * bypassed and persisted to WAL. + * + *

+ * If the procedure is in WAITING state, will set it to RUNNABLE add it to run queue. + * TODO: What about WAITING_TIMEOUT? + * @param id the procedure id + * @param lockWait time to wait lock + * @param force if force set to true, we will bypass the procedure even if it is executing. + * This is for procedures which can't break out during executing(due to bug, mostly) + * In this case, bypassing the procedure is not enough, since it is already stuck + * there. We need to restart the master after bypassing, and letting the problematic + * procedure to execute wth bypass=true, so in that condition, the procedure can be + * successfully bypassed. + * @return true if bypass success + * @throws IOException IOException + */ + public boolean bypassProcedure(long id, long lockWait, boolean force) throws IOException { + Procedure procedure = getProcedure(id); + if (procedure == null) { + LOG.debug("Procedure with id={} does not exist, skipping bypass", id); + return false; + } + + LOG.debug("Begin bypass {} with lockWait={}, force={}", procedure, lockWait, force); + + IdLock.Entry lockEntry = procExecutionLock.tryLockEntry(procedure.getProcId(), lockWait); + if (lockEntry == null && !force) { + LOG.debug("Waited {} ms, but {} is still running, skipping bypass with force={}", + lockWait, procedure, force); + return false; + } else if (lockEntry == null) { + LOG.debug("Waited {} ms, but {} is still running, begin bypass with force={}", + lockWait, procedure, force); + } + try { + // check whether the procedure is already finished + if (procedure.isFinished()) { + LOG.debug("{} is already finished, skipping bypass", procedure); + return false; + } + + if (procedure.hasChildren()) { + LOG.debug("{} has children, skipping bypass", procedure); + return false; + } + + // If the procedure has no parent or no child, we are safe to bypass it in whatever state + if (procedure.hasParent() && procedure.getState() != ProcedureState.RUNNABLE + && procedure.getState() != ProcedureState.WAITING + && procedure.getState() != ProcedureState.WAITING_TIMEOUT) { + LOG.debug("Bypassing procedures in RUNNABLE, WAITING and WAITING_TIMEOUT states " + + "(with no parent), {}", + procedure); + return false; + } + + // Now, the procedure is not finished, and no one can execute it since we take the lock now + // And we can be sure that its ancestor is not running too, since their child has not + // finished yet + Procedure current = procedure; + while (current != null) { + LOG.debug("Bypassing {}", current); + current.bypass(); + store.update(procedure); + long parentID = current.getParentProcId(); + current = getProcedure(parentID); + } + + //wake up waiting procedure, already checked there is no child + if (procedure.getState() == ProcedureState.WAITING) { + procedure.setState(ProcedureState.RUNNABLE); + store.update(procedure); + } + + // If we don't have the lock, we can't re-submit the queue, + // since it is already executing. To get rid of the stuck situation, we + // need to restart the master. With the procedure set to bypass, the procedureExecutor + // will bypass it and won't get stuck again. + if (lockEntry != null) { + // add the procedure to run queue, + scheduler.addFront(procedure); + LOG.debug("Bypassing {} and its ancestors successfully, adding to queue", procedure); + } else { + LOG.debug("Bypassing {} and its ancestors successfully, but since it is already running, " + + "skipping add to queue", procedure); + } + return true; + + } finally { + if (lockEntry != null) { + procExecutionLock.releaseLockEntry(lockEntry); + } + } + } + /** * Add a new root-procedure to the executor. * @param proc the new procedure to execute. @@ -1280,6 +1393,10 @@ public class ProcedureExecutor { // Executions // ========================================================================== private void executeProcedure(Procedure proc) { + if (proc.isFinished()) { + LOG.debug("{} is already finished, skipping execution", proc); + return; + } final Long rootProcId = getRootProcedureId(proc); if (rootProcId == null) { // The 'proc' was ready to run but the root procedure was rolledback @@ -1433,7 +1550,8 @@ public class ProcedureExecutor { subprocStack.remove(stackTail); // if the procedure is kind enough to pass the slot to someone else, yield - if (proc.isYieldAfterExecutionStep(getEnvironment())) { + // if the proc is already finished, do not yield + if (!proc.isFinished() && proc.isYieldAfterExecutionStep(getEnvironment())) { return LockState.LOCK_YIELD_WAIT; } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java index 1215008cdf7..8a438d4c8c5 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java @@ -205,6 +205,10 @@ public final class ProcedureUtil { if (proc.hasLock()) { builder.setLocked(true); } + + if (proc.isBypass()) { + builder.setBypass(true); + } return builder.build(); } @@ -262,6 +266,10 @@ public final class ProcedureUtil { proc.lockedWhenLoading(); } + if (proto.getBypass()) { + proc.bypass(); + } + ProcedureStateSerializer serializer = null; if (proto.getStateMessageCount() > 0) { diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java new file mode 100644 index 00000000000..d58d57e76e8 --- /dev/null +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure2; + +import static org.junit.Assert.assertTrue; + +import java.util.stream.Collectors; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + + +@Category({MasterTests.class, SmallTests.class}) +public class TestProcedureBypass { + + @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule + .forClass(TestProcedureBypass.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestProcedureBypass.class); + + private static final int PROCEDURE_EXECUTOR_SLOTS = 1; + + private static TestProcEnv procEnv; + private static ProcedureStore procStore; + + private static ProcedureExecutor procExecutor; + + private static HBaseCommonTestingUtility htu; + + private static FileSystem fs; + private static Path testDir; + private static Path logDir; + + private static class TestProcEnv { + } + + @BeforeClass + public static void setUp() throws Exception { + htu = new HBaseCommonTestingUtility(); + + // NOTE: The executor will be created by each test + procEnv = new TestProcEnv(); + testDir = htu.getDataTestDir(); + fs = testDir.getFileSystem(htu.getConfiguration()); + assertTrue(testDir.depth() > 1); + + logDir = new Path(testDir, "proc-logs"); + procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); + procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, + procStore); + procStore.start(PROCEDURE_EXECUTOR_SLOTS); + ProcedureTestingUtility + .initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); + } + + @Test + public void testBypassSuspendProcedure() throws Exception { + final SuspendProcedure proc = new SuspendProcedure(); + long id = procExecutor.submitProcedure(proc); + Thread.sleep(500); + //bypass the procedure + assertTrue(procExecutor.bypassProcedure(id, 30000, false)); + htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); + LOG.info("{} finished", proc); + } + + @Test + public void testStuckProcedure() throws Exception { + final StuckProcedure proc = new StuckProcedure(); + long id = procExecutor.submitProcedure(proc); + Thread.sleep(500); + //bypass the procedure + assertTrue(procExecutor.bypassProcedure(id, 1000, true)); + //Since the procedure is stuck there, we need to restart the executor to recovery. + ProcedureTestingUtility.restart(procExecutor); + htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); + LOG.info("{} finished", proc); + } + + @Test + public void testBypassingProcedureWithParent() throws Exception { + final RootProcedure proc = new RootProcedure(); + long rootId = procExecutor.submitProcedure(proc); + htu.waitFor(5000, () -> procExecutor.getProcedures().stream() + .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()) + .size() > 0); + SuspendProcedure suspendProcedure = (SuspendProcedure)procExecutor.getProcedures().stream() + .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).get(0); + assertTrue(procExecutor.bypassProcedure(suspendProcedure.getProcId(), 1000, false)); + htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); + LOG.info("{} finished", proc); + } + + + + @AfterClass + public static void tearDown() throws Exception { + procExecutor.stop(); + procStore.stop(false); + procExecutor.join(); + } + + public static class SuspendProcedure extends ProcedureTestingUtility.NoopProcedure { + + public SuspendProcedure() { + super(); + } + + @Override + protected Procedure[] execute(final TestProcEnv env) + throws ProcedureSuspendedException { + // Always suspend the procedure + throw new ProcedureSuspendedException(); + } + } + + public static class StuckProcedure extends ProcedureTestingUtility.NoopProcedure { + + public StuckProcedure() { + super(); + } + + @Override + protected Procedure[] execute(final TestProcEnv env) { + try { + Thread.sleep(Long.MAX_VALUE); + } catch (Throwable t) { + LOG.debug("Sleep is interrupted.", t); + } + return null; + } + + } + + + public static class RootProcedure extends ProcedureTestingUtility.NoopProcedure { + private boolean childSpwaned = false; + + public RootProcedure() { + super(); + } + + @Override + protected Procedure[] execute(final TestProcEnv env) + throws ProcedureSuspendedException { + if (!childSpwaned) { + childSpwaned = true; + return new Procedure[] {new SuspendProcedure()}; + } else { + return null; + } + } + } + + + +} diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java index 18d92ea19f0..b5137b0823b 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java @@ -117,9 +117,9 @@ public class TestYieldProcedures { // check runnable queue stats assertEquals(0, procRunnables.size()); assertEquals(0, procRunnables.addFrontCalls); - assertEquals(18, procRunnables.addBackCalls); - assertEquals(15, procRunnables.yieldCalls); - assertEquals(19, procRunnables.pollCalls); + assertEquals(15, procRunnables.addBackCalls); + assertEquals(12, procRunnables.yieldCalls); + assertEquals(16, procRunnables.pollCalls); assertEquals(3, procRunnables.completionCalls); } @@ -159,9 +159,9 @@ public class TestYieldProcedures { // check runnable queue stats assertEquals(0, procRunnables.size()); assertEquals(0, procRunnables.addFrontCalls); - assertEquals(12, procRunnables.addBackCalls); - assertEquals(11, procRunnables.yieldCalls); - assertEquals(13, procRunnables.pollCalls); + assertEquals(11, procRunnables.addBackCalls); + assertEquals(10, procRunnables.yieldCalls); + assertEquals(12, procRunnables.pollCalls); assertEquals(1, procRunnables.completionCalls); } diff --git a/hbase-protocol-shaded/src/main/protobuf/Procedure.proto b/hbase-protocol-shaded/src/main/protobuf/Procedure.proto index b4a31074308..c41330729f4 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Procedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Procedure.proto @@ -66,6 +66,9 @@ message Procedure { // whether the procedure has held the lock optional bool locked = 16 [default = false]; + + // whether the procedure need to be bypassed + optional bool bypass = 17 [default = false]; } /** diff --git a/hbase-server/src/main/resources/hbase-webapps/master/procedures.jsp b/hbase-server/src/main/resources/hbase-webapps/master/procedures.jsp index f6172375d86..c4adcd34908 100644 --- a/hbase-server/src/main/resources/hbase-webapps/master/procedures.jsp +++ b/hbase-server/src/main/resources/hbase-webapps/master/procedures.jsp @@ -85,7 +85,7 @@ <%= proc.getProcId() %> <%= proc.hasParent() ? proc.getParentProcId() : "" %> - <%= escapeXml(proc.getState().toString()) %> + <%= escapeXml(proc.getState().toString() + (proc.isBypass() ? "(Bypass)" : "")) %> <%= proc.hasOwner() ? escapeXml(proc.getOwner()) : "" %> <%= escapeXml(proc.getProcName()) %> <%= new Date(proc.getSubmittedTime()) %>