HBASE-21083 Introduce a mechanism to bypass the execution of a stuck procedure

This commit is contained in:
Allan Yang 2018-08-28 20:17:23 -07:00 committed by Michael Stack
parent 4340930c71
commit e33591515c
No known key found for this signature in database
GPG Key ID: 9816C7FC8ACC93D2
8 changed files with 419 additions and 8 deletions

View File

@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
/** /**
* Allows multiple concurrent clients to lock on a numeric id with a minimal * Allows multiple concurrent clients to lock on a numeric id with a minimal
@ -97,6 +98,59 @@ public class IdLock {
return entry; return entry;
} }
/**
* Blocks until the lock corresponding to the given id is acquired.
*
* @param id an arbitrary number to lock on
* @param time time to wait in ms
* @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release
* the lock
* @throws IOException if interrupted
*/
public Entry tryLockEntry(long id, long time) throws IOException {
Preconditions.checkArgument(time >= 0);
Entry entry = new Entry(id);
Entry existing;
long waitUtilTS = System.currentTimeMillis() + time;
long remaining = time;
while ((existing = map.putIfAbsent(entry.id, entry)) != null) {
synchronized (existing) {
if (existing.locked) {
++existing.numWaiters; // Add ourselves to waiters.
try {
while (existing.locked) {
existing.wait(remaining);
if (existing.locked) {
long currentTS = System.currentTimeMillis();
if (currentTS >= waitUtilTS) {
// time is up
return null;
} else {
// our wait is waken, but the lock is still taken, this can happen
// due to JDK Object's wait/notify mechanism.
// Calculate the new remaining time to wait
remaining = waitUtilTS - currentTS;
}
}
}
} catch (InterruptedException e) {
throw new InterruptedIOException(
"Interrupted waiting to acquire sparse lock");
} finally {
--existing.numWaiters; // Remove ourselves from waiters.
}
existing.locked = true;
return existing;
}
// If the entry is not locked, it might already be deleted from the
// map, so we cannot return it. We need to get our entry into the map
// or get someone else's locked entry.
}
}
return entry;
}
/** /**
* Must be called in a finally block to decrease the internal counter and * Must be called in a finally block to decrease the internal counter and
* remove the monitor object for the given id if the caller is the last * remove the monitor object for the given id if the caller is the last

View File

@ -144,6 +144,32 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
private boolean lockedWhenLoading = false; private boolean lockedWhenLoading = false;
/**
* Used for force complete of the procedure without
* actually doing any logic in the procedure.
* If bypass is set to true, when executing it will return null when
* {@link #doExecute(Object)} to finish the procedure and releasing any locks
* it may currently hold.
* Bypassing a procedure is not like aborting. Aborting a procedure will trigger
* a rollback. And since the {@link #abort(Object)} method is overrideable
* Some procedures may have chosen to ignore the aborting.
*/
private volatile boolean bypass = false;
public boolean isBypass() {
return bypass;
}
/**
* set the bypass to true
* Only called in {@link ProcedureExecutor#bypassProcedure(long, long, boolean)} for now,
* DO NOT use this method alone, since we can't just bypass
* one single procedure. We need to bypass its ancestor too. So making it package private
*/
void bypass() {
this.bypass = true;
}
/** /**
* The main code of the procedure. It must be idempotent since execute() * The main code of the procedure. It must be idempotent since execute()
* may be called multiple times in case of machine failure in the middle * may be called multiple times in case of machine failure in the middle
@ -426,6 +452,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
sb.append(", locked=").append(locked); sb.append(", locked=").append(locked);
} }
if (bypass) {
sb.append(", bypass=").append(bypass);
}
if (hasException()) { if (hasException()) {
sb.append(", exception=" + getException()); sb.append(", exception=" + getException());
} }
@ -873,6 +903,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
try { try {
updateTimestamp(); updateTimestamp();
if (bypass) {
LOG.info("{} bypassed, returning null to finish it", this);
return null;
}
return execute(env); return execute(env);
} finally { } finally {
updateTimestamp(); updateTimestamp();
@ -886,6 +920,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
throws IOException, InterruptedException { throws IOException, InterruptedException {
try { try {
updateTimestamp(); updateTimestamp();
if (bypass) {
LOG.info("{} bypassed, skipping rollback", this);
return;
}
rollback(env); rollback(env);
} finally { } finally {
updateTimestamp(); updateTimestamp();
@ -903,6 +941,11 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
return; return;
} }
if (isBypass()) {
LOG.debug("{} is already bypassed, skip acquiring lock.", this);
return;
}
LOG.debug("{} held the lock before restarting, call acquireLock to restore it.", this); LOG.debug("{} held the lock before restarting, call acquireLock to restore it.", this);
LockState state = acquireLock(env); LockState state = acquireLock(env);
assert state == LockState.LOCK_ACQUIRED; assert state == LockState.LOCK_ACQUIRED;

View File

@ -963,6 +963,119 @@ public class ProcedureExecutor<TEnvironment> {
return submitProcedure(proc, null); return submitProcedure(proc, null);
} }
/**
* Bypass a procedure. If the procedure is set to bypass, all the logic in
* execute/rollback will be ignored and it will return success, whatever.
* It is used to recover buggy stuck procedures, releasing the lock resources
* and letting other procedures to run. Bypassing one procedure (and its ancestors will
* be bypassed automatically) may leave the cluster in a middle state, e.g. region
* not assigned, or some hdfs files left behind. After getting rid of those stuck procedures,
* the operators may have to do some clean up on hdfs or schedule some assign procedures
* to let region online. DO AT YOUR OWN RISK.
* <p>
* A procedure can be bypassed only if
* 1. The procedure is in state of RUNNABLE, WAITING, WAITING_TIMEOUT
* or it is a root procedure without any child.
* 2. No other worker thread is executing it
* 3. No child procedure has been submitted
*
* <p>
* If all the requirements are meet, the procedure and its ancestors will be
* bypassed and persisted to WAL.
*
* <p>
* If the procedure is in WAITING state, will set it to RUNNABLE add it to run queue.
* TODO: What about WAITING_TIMEOUT?
* @param id the procedure id
* @param lockWait time to wait lock
* @param force if force set to true, we will bypass the procedure even if it is executing.
* This is for procedures which can't break out during executing(due to bug, mostly)
* In this case, bypassing the procedure is not enough, since it is already stuck
* there. We need to restart the master after bypassing, and letting the problematic
* procedure to execute wth bypass=true, so in that condition, the procedure can be
* successfully bypassed.
* @return true if bypass success
* @throws IOException IOException
*/
public boolean bypassProcedure(long id, long lockWait, boolean force) throws IOException {
Procedure<TEnvironment> procedure = getProcedure(id);
if (procedure == null) {
LOG.debug("Procedure with id={} does not exist, skipping bypass", id);
return false;
}
LOG.debug("Begin bypass {} with lockWait={}, force={}", procedure, lockWait, force);
IdLock.Entry lockEntry = procExecutionLock.tryLockEntry(procedure.getProcId(), lockWait);
if (lockEntry == null && !force) {
LOG.debug("Waited {} ms, but {} is still running, skipping bypass with force={}",
lockWait, procedure, force);
return false;
} else if (lockEntry == null) {
LOG.debug("Waited {} ms, but {} is still running, begin bypass with force={}",
lockWait, procedure, force);
}
try {
// check whether the procedure is already finished
if (procedure.isFinished()) {
LOG.debug("{} is already finished, skipping bypass", procedure);
return false;
}
if (procedure.hasChildren()) {
LOG.debug("{} has children, skipping bypass", procedure);
return false;
}
// If the procedure has no parent or no child, we are safe to bypass it in whatever state
if (procedure.hasParent() && procedure.getState() != ProcedureState.RUNNABLE
&& procedure.getState() != ProcedureState.WAITING
&& procedure.getState() != ProcedureState.WAITING_TIMEOUT) {
LOG.debug("Bypassing procedures in RUNNABLE, WAITING and WAITING_TIMEOUT states "
+ "(with no parent), {}",
procedure);
return false;
}
// Now, the procedure is not finished, and no one can execute it since we take the lock now
// And we can be sure that its ancestor is not running too, since their child has not
// finished yet
Procedure current = procedure;
while (current != null) {
LOG.debug("Bypassing {}", current);
current.bypass();
store.update(procedure);
long parentID = current.getParentProcId();
current = getProcedure(parentID);
}
//wake up waiting procedure, already checked there is no child
if (procedure.getState() == ProcedureState.WAITING) {
procedure.setState(ProcedureState.RUNNABLE);
store.update(procedure);
}
// If we don't have the lock, we can't re-submit the queue,
// since it is already executing. To get rid of the stuck situation, we
// need to restart the master. With the procedure set to bypass, the procedureExecutor
// will bypass it and won't get stuck again.
if (lockEntry != null) {
// add the procedure to run queue,
scheduler.addFront(procedure);
LOG.debug("Bypassing {} and its ancestors successfully, adding to queue", procedure);
} else {
LOG.debug("Bypassing {} and its ancestors successfully, but since it is already running, "
+ "skipping add to queue", procedure);
}
return true;
} finally {
if (lockEntry != null) {
procExecutionLock.releaseLockEntry(lockEntry);
}
}
}
/** /**
* Add a new root-procedure to the executor. * Add a new root-procedure to the executor.
* @param proc the new procedure to execute. * @param proc the new procedure to execute.
@ -1280,6 +1393,10 @@ public class ProcedureExecutor<TEnvironment> {
// Executions // Executions
// ========================================================================== // ==========================================================================
private void executeProcedure(Procedure<TEnvironment> proc) { private void executeProcedure(Procedure<TEnvironment> proc) {
if (proc.isFinished()) {
LOG.debug("{} is already finished, skipping execution", proc);
return;
}
final Long rootProcId = getRootProcedureId(proc); final Long rootProcId = getRootProcedureId(proc);
if (rootProcId == null) { if (rootProcId == null) {
// The 'proc' was ready to run but the root procedure was rolledback // The 'proc' was ready to run but the root procedure was rolledback
@ -1433,7 +1550,8 @@ public class ProcedureExecutor<TEnvironment> {
subprocStack.remove(stackTail); subprocStack.remove(stackTail);
// if the procedure is kind enough to pass the slot to someone else, yield // if the procedure is kind enough to pass the slot to someone else, yield
if (proc.isYieldAfterExecutionStep(getEnvironment())) { // if the proc is already finished, do not yield
if (!proc.isFinished() && proc.isYieldAfterExecutionStep(getEnvironment())) {
return LockState.LOCK_YIELD_WAIT; return LockState.LOCK_YIELD_WAIT;
} }

View File

@ -205,6 +205,10 @@ public final class ProcedureUtil {
if (proc.hasLock()) { if (proc.hasLock()) {
builder.setLocked(true); builder.setLocked(true);
} }
if (proc.isBypass()) {
builder.setBypass(true);
}
return builder.build(); return builder.build();
} }
@ -262,6 +266,10 @@ public final class ProcedureUtil {
proc.lockedWhenLoading(); proc.lockedWhenLoading();
} }
if (proto.getBypass()) {
proc.bypass();
}
ProcedureStateSerializer serializer = null; ProcedureStateSerializer serializer = null;
if (proto.getStateMessageCount() > 0) { if (proto.getStateMessageCount() > 0) {

View File

@ -0,0 +1,185 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import static org.junit.Assert.assertTrue;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({MasterTests.class, SmallTests.class})
public class TestProcedureBypass {
@ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule
.forClass(TestProcedureBypass.class);
private static final Logger LOG = LoggerFactory.getLogger(TestProcedureBypass.class);
private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
private static TestProcEnv procEnv;
private static ProcedureStore procStore;
private static ProcedureExecutor<TestProcEnv> procExecutor;
private static HBaseCommonTestingUtility htu;
private static FileSystem fs;
private static Path testDir;
private static Path logDir;
private static class TestProcEnv {
}
@BeforeClass
public static void setUp() throws Exception {
htu = new HBaseCommonTestingUtility();
// NOTE: The executor will be created by each test
procEnv = new TestProcEnv();
testDir = htu.getDataTestDir();
fs = testDir.getFileSystem(htu.getConfiguration());
assertTrue(testDir.depth() > 1);
logDir = new Path(testDir, "proc-logs");
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv,
procStore);
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
ProcedureTestingUtility
.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
}
@Test
public void testBypassSuspendProcedure() throws Exception {
final SuspendProcedure proc = new SuspendProcedure();
long id = procExecutor.submitProcedure(proc);
Thread.sleep(500);
//bypass the procedure
assertTrue(procExecutor.bypassProcedure(id, 30000, false));
htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
LOG.info("{} finished", proc);
}
@Test
public void testStuckProcedure() throws Exception {
final StuckProcedure proc = new StuckProcedure();
long id = procExecutor.submitProcedure(proc);
Thread.sleep(500);
//bypass the procedure
assertTrue(procExecutor.bypassProcedure(id, 1000, true));
//Since the procedure is stuck there, we need to restart the executor to recovery.
ProcedureTestingUtility.restart(procExecutor);
htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
LOG.info("{} finished", proc);
}
@Test
public void testBypassingProcedureWithParent() throws Exception {
final RootProcedure proc = new RootProcedure();
long rootId = procExecutor.submitProcedure(proc);
htu.waitFor(5000, () -> procExecutor.getProcedures().stream()
.filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList())
.size() > 0);
SuspendProcedure suspendProcedure = (SuspendProcedure)procExecutor.getProcedures().stream()
.filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).get(0);
assertTrue(procExecutor.bypassProcedure(suspendProcedure.getProcId(), 1000, false));
htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
LOG.info("{} finished", proc);
}
@AfterClass
public static void tearDown() throws Exception {
procExecutor.stop();
procStore.stop(false);
procExecutor.join();
}
public static class SuspendProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
public SuspendProcedure() {
super();
}
@Override
protected Procedure[] execute(final TestProcEnv env)
throws ProcedureSuspendedException {
// Always suspend the procedure
throw new ProcedureSuspendedException();
}
}
public static class StuckProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
public StuckProcedure() {
super();
}
@Override
protected Procedure[] execute(final TestProcEnv env) {
try {
Thread.sleep(Long.MAX_VALUE);
} catch (Throwable t) {
LOG.debug("Sleep is interrupted.", t);
}
return null;
}
}
public static class RootProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
private boolean childSpwaned = false;
public RootProcedure() {
super();
}
@Override
protected Procedure[] execute(final TestProcEnv env)
throws ProcedureSuspendedException {
if (!childSpwaned) {
childSpwaned = true;
return new Procedure[] {new SuspendProcedure()};
} else {
return null;
}
}
}
}

View File

@ -117,9 +117,9 @@ public class TestYieldProcedures {
// check runnable queue stats // check runnable queue stats
assertEquals(0, procRunnables.size()); assertEquals(0, procRunnables.size());
assertEquals(0, procRunnables.addFrontCalls); assertEquals(0, procRunnables.addFrontCalls);
assertEquals(18, procRunnables.addBackCalls); assertEquals(15, procRunnables.addBackCalls);
assertEquals(15, procRunnables.yieldCalls); assertEquals(12, procRunnables.yieldCalls);
assertEquals(19, procRunnables.pollCalls); assertEquals(16, procRunnables.pollCalls);
assertEquals(3, procRunnables.completionCalls); assertEquals(3, procRunnables.completionCalls);
} }
@ -159,9 +159,9 @@ public class TestYieldProcedures {
// check runnable queue stats // check runnable queue stats
assertEquals(0, procRunnables.size()); assertEquals(0, procRunnables.size());
assertEquals(0, procRunnables.addFrontCalls); assertEquals(0, procRunnables.addFrontCalls);
assertEquals(12, procRunnables.addBackCalls); assertEquals(11, procRunnables.addBackCalls);
assertEquals(11, procRunnables.yieldCalls); assertEquals(10, procRunnables.yieldCalls);
assertEquals(13, procRunnables.pollCalls); assertEquals(12, procRunnables.pollCalls);
assertEquals(1, procRunnables.completionCalls); assertEquals(1, procRunnables.completionCalls);
} }

View File

@ -66,6 +66,9 @@ message Procedure {
// whether the procedure has held the lock // whether the procedure has held the lock
optional bool locked = 16 [default = false]; optional bool locked = 16 [default = false];
// whether the procedure need to be bypassed
optional bool bypass = 17 [default = false];
} }
/** /**

View File

@ -85,7 +85,7 @@
<tr> <tr>
<td><%= proc.getProcId() %></td> <td><%= proc.getProcId() %></td>
<td><%= proc.hasParent() ? proc.getParentProcId() : "" %></td> <td><%= proc.hasParent() ? proc.getParentProcId() : "" %></td>
<td><%= escapeXml(proc.getState().toString()) %></td> <td><%= escapeXml(proc.getState().toString() + (proc.isBypass() ? "(Bypass)" : "")) %></td>
<td><%= proc.hasOwner() ? escapeXml(proc.getOwner()) : "" %></td> <td><%= proc.hasOwner() ? escapeXml(proc.getOwner()) : "" %></td>
<td><%= escapeXml(proc.getProcName()) %></td> <td><%= escapeXml(proc.getProcName()) %></td>
<td><%= new Date(proc.getSubmittedTime()) %></td> <td><%= new Date(proc.getSubmittedTime()) %></td>