HBASE-21083 Introduce a mechanism to bypass the execution of a stuck procedure
This commit is contained in:
parent
98600f1860
commit
7c1fad4992
|
@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentMap;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* Allows multiple concurrent clients to lock on a numeric id with a minimal
|
||||
|
@ -97,6 +98,59 @@ public class IdLock {
|
|||
return entry;
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocks until the lock corresponding to the given id is acquired.
|
||||
*
|
||||
* @param id an arbitrary number to lock on
|
||||
* @param time time to wait in ms
|
||||
* @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release
|
||||
* the lock
|
||||
* @throws IOException if interrupted
|
||||
*/
|
||||
public Entry tryLockEntry(long id, long time) throws IOException {
|
||||
Preconditions.checkArgument(time >= 0);
|
||||
Entry entry = new Entry(id);
|
||||
Entry existing;
|
||||
long waitUtilTS = System.currentTimeMillis() + time;
|
||||
long remaining = time;
|
||||
while ((existing = map.putIfAbsent(entry.id, entry)) != null) {
|
||||
synchronized (existing) {
|
||||
if (existing.locked) {
|
||||
++existing.numWaiters; // Add ourselves to waiters.
|
||||
try {
|
||||
while (existing.locked) {
|
||||
existing.wait(remaining);
|
||||
if (existing.locked) {
|
||||
long currentTS = System.currentTimeMillis();
|
||||
if (currentTS >= waitUtilTS) {
|
||||
// time is up
|
||||
return null;
|
||||
} else {
|
||||
// our wait is waken, but the lock is still taken, this can happen
|
||||
// due to JDK Object's wait/notify mechanism.
|
||||
// Calculate the new remaining time to wait
|
||||
remaining = waitUtilTS - currentTS;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException(
|
||||
"Interrupted waiting to acquire sparse lock");
|
||||
} finally {
|
||||
--existing.numWaiters; // Remove ourselves from waiters.
|
||||
}
|
||||
existing.locked = true;
|
||||
return existing;
|
||||
}
|
||||
// If the entry is not locked, it might already be deleted from the
|
||||
// map, so we cannot return it. We need to get our entry into the map
|
||||
// or get someone else's locked entry.
|
||||
}
|
||||
}
|
||||
return entry;
|
||||
}
|
||||
|
||||
/**
|
||||
* Must be called in a finally block to decrease the internal counter and
|
||||
* remove the monitor object for the given id if the caller is the last
|
||||
|
|
|
@ -144,6 +144,32 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
|
|||
|
||||
private boolean lockedWhenLoading = false;
|
||||
|
||||
/**
|
||||
* Used for force complete of the procedure without
|
||||
* actually doing any logic in the procedure.
|
||||
* If bypass is set to true, when executing it will return null when
|
||||
* {@link #doExecute(Object)} to finish the procedure and releasing any locks
|
||||
* it may currently hold.
|
||||
* Bypassing a procedure is not like aborting. Aborting a procedure will trigger
|
||||
* a rollback. And since the {@link #abort(Object)} method is overrideable
|
||||
* Some procedures may have chosen to ignore the aborting.
|
||||
*/
|
||||
private volatile boolean bypass = false;
|
||||
|
||||
public boolean isBypass() {
|
||||
return bypass;
|
||||
}
|
||||
|
||||
/**
|
||||
* set the bypass to true
|
||||
* Only called in {@link ProcedureExecutor#bypassProcedure(long, long, boolean)} for now,
|
||||
* DO NOT use this method alone, since we can't just bypass
|
||||
* one single procedure. We need to bypass its ancestor too. So making it package private
|
||||
*/
|
||||
void bypass() {
|
||||
this.bypass = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* The main code of the procedure. It must be idempotent since execute()
|
||||
* may be called multiple times in case of machine failure in the middle
|
||||
|
@ -423,6 +449,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
|
|||
|
||||
sb.append(", hasLock=").append(locked);
|
||||
|
||||
if (bypass) {
|
||||
sb.append(", bypass=").append(bypass);
|
||||
}
|
||||
|
||||
if (hasException()) {
|
||||
sb.append(", exception=" + getException());
|
||||
}
|
||||
|
@ -870,6 +900,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
|
|||
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
|
||||
try {
|
||||
updateTimestamp();
|
||||
if (bypass) {
|
||||
LOG.info("{} bypassed, returning null to finish it", this);
|
||||
return null;
|
||||
}
|
||||
return execute(env);
|
||||
} finally {
|
||||
updateTimestamp();
|
||||
|
@ -883,6 +917,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
|
|||
throws IOException, InterruptedException {
|
||||
try {
|
||||
updateTimestamp();
|
||||
if (bypass) {
|
||||
LOG.info("{} bypassed, skipping rollback", this);
|
||||
return;
|
||||
}
|
||||
rollback(env);
|
||||
} finally {
|
||||
updateTimestamp();
|
||||
|
@ -900,6 +938,11 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
|
|||
return;
|
||||
}
|
||||
|
||||
if (isBypass()) {
|
||||
LOG.debug("{} is already bypassed, skip acquiring lock.", this);
|
||||
return;
|
||||
}
|
||||
|
||||
LOG.debug("{} held the lock before restarting, call acquireLock to restore it.", this);
|
||||
LockState state = acquireLock(env);
|
||||
assert state == LockState.LOCK_ACQUIRED;
|
||||
|
|
|
@ -973,6 +973,119 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
return submitProcedure(proc, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Bypass a procedure. If the procedure is set to bypass, all the logic in
|
||||
* execute/rollback will be ignored and it will return success, whatever.
|
||||
* It is used to recover buggy stuck procedures, releasing the lock resources
|
||||
* and letting other procedures to run. Bypassing one procedure (and its ancestors will
|
||||
* be bypassed automatically) may leave the cluster in a middle state, e.g. region
|
||||
* not assigned, or some hdfs files left behind. After getting rid of those stuck procedures,
|
||||
* the operators may have to do some clean up on hdfs or schedule some assign procedures
|
||||
* to let region online. DO AT YOUR OWN RISK.
|
||||
* <p>
|
||||
* A procedure can be bypassed only if
|
||||
* 1. The procedure is in state of RUNNABLE, WAITING, WAITING_TIMEOUT
|
||||
* or it is a root procedure without any child.
|
||||
* 2. No other worker thread is executing it
|
||||
* 3. No child procedure has been submitted
|
||||
*
|
||||
* <p>
|
||||
* If all the requirements are meet, the procedure and its ancestors will be
|
||||
* bypassed and persisted to WAL.
|
||||
*
|
||||
* <p>
|
||||
* If the procedure is in WAITING state, will set it to RUNNABLE add it to run queue.
|
||||
* TODO: What about WAITING_TIMEOUT?
|
||||
* @param id the procedure id
|
||||
* @param lockWait time to wait lock
|
||||
* @param force if force set to true, we will bypass the procedure even if it is executing.
|
||||
* This is for procedures which can't break out during executing(due to bug, mostly)
|
||||
* In this case, bypassing the procedure is not enough, since it is already stuck
|
||||
* there. We need to restart the master after bypassing, and letting the problematic
|
||||
* procedure to execute wth bypass=true, so in that condition, the procedure can be
|
||||
* successfully bypassed.
|
||||
* @return true if bypass success
|
||||
* @throws IOException IOException
|
||||
*/
|
||||
public boolean bypassProcedure(long id, long lockWait, boolean force) throws IOException {
|
||||
Procedure<TEnvironment> procedure = getProcedure(id);
|
||||
if (procedure == null) {
|
||||
LOG.debug("Procedure with id={} does not exist, skipping bypass", id);
|
||||
return false;
|
||||
}
|
||||
|
||||
LOG.debug("Begin bypass {} with lockWait={}, force={}", procedure, lockWait, force);
|
||||
|
||||
IdLock.Entry lockEntry = procExecutionLock.tryLockEntry(procedure.getProcId(), lockWait);
|
||||
if (lockEntry == null && !force) {
|
||||
LOG.debug("Waited {} ms, but {} is still running, skipping bypass with force={}",
|
||||
lockWait, procedure, force);
|
||||
return false;
|
||||
} else if (lockEntry == null) {
|
||||
LOG.debug("Waited {} ms, but {} is still running, begin bypass with force={}",
|
||||
lockWait, procedure, force);
|
||||
}
|
||||
try {
|
||||
// check whether the procedure is already finished
|
||||
if (procedure.isFinished()) {
|
||||
LOG.debug("{} is already finished, skipping bypass", procedure);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (procedure.hasChildren()) {
|
||||
LOG.debug("{} has children, skipping bypass", procedure);
|
||||
return false;
|
||||
}
|
||||
|
||||
// If the procedure has no parent or no child, we are safe to bypass it in whatever state
|
||||
if (procedure.hasParent() && procedure.getState() != ProcedureState.RUNNABLE
|
||||
&& procedure.getState() != ProcedureState.WAITING
|
||||
&& procedure.getState() != ProcedureState.WAITING_TIMEOUT) {
|
||||
LOG.debug("Bypassing procedures in RUNNABLE, WAITING and WAITING_TIMEOUT states "
|
||||
+ "(with no parent), {}",
|
||||
procedure);
|
||||
return false;
|
||||
}
|
||||
|
||||
// Now, the procedure is not finished, and no one can execute it since we take the lock now
|
||||
// And we can be sure that its ancestor is not running too, since their child has not
|
||||
// finished yet
|
||||
Procedure current = procedure;
|
||||
while (current != null) {
|
||||
LOG.debug("Bypassing {}", current);
|
||||
current.bypass();
|
||||
store.update(procedure);
|
||||
long parentID = current.getParentProcId();
|
||||
current = getProcedure(parentID);
|
||||
}
|
||||
|
||||
//wake up waiting procedure, already checked there is no child
|
||||
if (procedure.getState() == ProcedureState.WAITING) {
|
||||
procedure.setState(ProcedureState.RUNNABLE);
|
||||
store.update(procedure);
|
||||
}
|
||||
|
||||
// If we don't have the lock, we can't re-submit the queue,
|
||||
// since it is already executing. To get rid of the stuck situation, we
|
||||
// need to restart the master. With the procedure set to bypass, the procedureExecutor
|
||||
// will bypass it and won't get stuck again.
|
||||
if (lockEntry != null) {
|
||||
// add the procedure to run queue,
|
||||
scheduler.addFront(procedure);
|
||||
LOG.debug("Bypassing {} and its ancestors successfully, adding to queue", procedure);
|
||||
} else {
|
||||
LOG.debug("Bypassing {} and its ancestors successfully, but since it is already running, "
|
||||
+ "skipping add to queue", procedure);
|
||||
}
|
||||
return true;
|
||||
|
||||
} finally {
|
||||
if (lockEntry != null) {
|
||||
procExecutionLock.releaseLockEntry(lockEntry);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a new root-procedure to the executor.
|
||||
* @param proc the new procedure to execute.
|
||||
|
@ -1301,6 +1414,10 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
// Executions
|
||||
// ==========================================================================
|
||||
private void executeProcedure(Procedure<TEnvironment> proc) {
|
||||
if (proc.isFinished()) {
|
||||
LOG.debug("{} is already finished, skipping execution", proc);
|
||||
return;
|
||||
}
|
||||
final Long rootProcId = getRootProcedureId(proc);
|
||||
if (rootProcId == null) {
|
||||
// The 'proc' was ready to run but the root procedure was rolledback
|
||||
|
@ -1454,7 +1571,8 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
subprocStack.remove(stackTail);
|
||||
|
||||
// if the procedure is kind enough to pass the slot to someone else, yield
|
||||
if (proc.isYieldAfterExecutionStep(getEnvironment())) {
|
||||
// if the proc is already finished, do not yield
|
||||
if (!proc.isFinished() && proc.isYieldAfterExecutionStep(getEnvironment())) {
|
||||
return LockState.LOCK_YIELD_WAIT;
|
||||
}
|
||||
|
||||
|
|
|
@ -205,6 +205,10 @@ public final class ProcedureUtil {
|
|||
if (proc.hasLock()) {
|
||||
builder.setLocked(true);
|
||||
}
|
||||
|
||||
if (proc.isBypass()) {
|
||||
builder.setBypass(true);
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
@ -262,6 +266,10 @@ public final class ProcedureUtil {
|
|||
proc.lockedWhenLoading();
|
||||
}
|
||||
|
||||
if (proto.getBypass()) {
|
||||
proc.bypass();
|
||||
}
|
||||
|
||||
ProcedureStateSerializer serializer = null;
|
||||
|
||||
if (proto.getStateMessageCount() > 0) {
|
||||
|
|
|
@ -0,0 +1,185 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.procedure2;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
|
||||
@Category({MasterTests.class, SmallTests.class})
|
||||
public class TestProcedureBypass {
|
||||
|
||||
@ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule
|
||||
.forClass(TestProcedureBypass.class);
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestProcedureBypass.class);
|
||||
|
||||
private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
|
||||
|
||||
private static TestProcEnv procEnv;
|
||||
private static ProcedureStore procStore;
|
||||
|
||||
private static ProcedureExecutor<TestProcEnv> procExecutor;
|
||||
|
||||
private static HBaseCommonTestingUtility htu;
|
||||
|
||||
private static FileSystem fs;
|
||||
private static Path testDir;
|
||||
private static Path logDir;
|
||||
|
||||
private static class TestProcEnv {
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
htu = new HBaseCommonTestingUtility();
|
||||
|
||||
// NOTE: The executor will be created by each test
|
||||
procEnv = new TestProcEnv();
|
||||
testDir = htu.getDataTestDir();
|
||||
fs = testDir.getFileSystem(htu.getConfiguration());
|
||||
assertTrue(testDir.depth() > 1);
|
||||
|
||||
logDir = new Path(testDir, "proc-logs");
|
||||
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
|
||||
procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv,
|
||||
procStore);
|
||||
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
|
||||
ProcedureTestingUtility
|
||||
.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBypassSuspendProcedure() throws Exception {
|
||||
final SuspendProcedure proc = new SuspendProcedure();
|
||||
long id = procExecutor.submitProcedure(proc);
|
||||
Thread.sleep(500);
|
||||
//bypass the procedure
|
||||
assertTrue(procExecutor.bypassProcedure(id, 30000, false));
|
||||
htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
|
||||
LOG.info("{} finished", proc);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStuckProcedure() throws Exception {
|
||||
final StuckProcedure proc = new StuckProcedure();
|
||||
long id = procExecutor.submitProcedure(proc);
|
||||
Thread.sleep(500);
|
||||
//bypass the procedure
|
||||
assertTrue(procExecutor.bypassProcedure(id, 1000, true));
|
||||
//Since the procedure is stuck there, we need to restart the executor to recovery.
|
||||
ProcedureTestingUtility.restart(procExecutor);
|
||||
htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
|
||||
LOG.info("{} finished", proc);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBypassingProcedureWithParent() throws Exception {
|
||||
final RootProcedure proc = new RootProcedure();
|
||||
long rootId = procExecutor.submitProcedure(proc);
|
||||
htu.waitFor(5000, () -> procExecutor.getProcedures().stream()
|
||||
.filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList())
|
||||
.size() > 0);
|
||||
SuspendProcedure suspendProcedure = (SuspendProcedure)procExecutor.getProcedures().stream()
|
||||
.filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).get(0);
|
||||
assertTrue(procExecutor.bypassProcedure(suspendProcedure.getProcId(), 1000, false));
|
||||
htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
|
||||
LOG.info("{} finished", proc);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
procExecutor.stop();
|
||||
procStore.stop(false);
|
||||
procExecutor.join();
|
||||
}
|
||||
|
||||
public static class SuspendProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
|
||||
|
||||
public SuspendProcedure() {
|
||||
super();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Procedure[] execute(final TestProcEnv env)
|
||||
throws ProcedureSuspendedException {
|
||||
// Always suspend the procedure
|
||||
throw new ProcedureSuspendedException();
|
||||
}
|
||||
}
|
||||
|
||||
public static class StuckProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
|
||||
|
||||
public StuckProcedure() {
|
||||
super();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Procedure[] execute(final TestProcEnv env) {
|
||||
try {
|
||||
Thread.sleep(Long.MAX_VALUE);
|
||||
} catch (Throwable t) {
|
||||
LOG.debug("Sleep is interrupted.", t);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
public static class RootProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
|
||||
private boolean childSpwaned = false;
|
||||
|
||||
public RootProcedure() {
|
||||
super();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Procedure[] execute(final TestProcEnv env)
|
||||
throws ProcedureSuspendedException {
|
||||
if (!childSpwaned) {
|
||||
childSpwaned = true;
|
||||
return new Procedure[] {new SuspendProcedure()};
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -117,9 +117,9 @@ public class TestYieldProcedures {
|
|||
// check runnable queue stats
|
||||
assertEquals(0, procRunnables.size());
|
||||
assertEquals(0, procRunnables.addFrontCalls);
|
||||
assertEquals(18, procRunnables.addBackCalls);
|
||||
assertEquals(15, procRunnables.yieldCalls);
|
||||
assertEquals(19, procRunnables.pollCalls);
|
||||
assertEquals(15, procRunnables.addBackCalls);
|
||||
assertEquals(12, procRunnables.yieldCalls);
|
||||
assertEquals(16, procRunnables.pollCalls);
|
||||
assertEquals(3, procRunnables.completionCalls);
|
||||
}
|
||||
|
||||
|
@ -159,9 +159,9 @@ public class TestYieldProcedures {
|
|||
// check runnable queue stats
|
||||
assertEquals(0, procRunnables.size());
|
||||
assertEquals(0, procRunnables.addFrontCalls);
|
||||
assertEquals(12, procRunnables.addBackCalls);
|
||||
assertEquals(11, procRunnables.yieldCalls);
|
||||
assertEquals(13, procRunnables.pollCalls);
|
||||
assertEquals(11, procRunnables.addBackCalls);
|
||||
assertEquals(10, procRunnables.yieldCalls);
|
||||
assertEquals(12, procRunnables.pollCalls);
|
||||
assertEquals(1, procRunnables.completionCalls);
|
||||
}
|
||||
|
||||
|
|
|
@ -66,6 +66,9 @@ message Procedure {
|
|||
|
||||
// whether the procedure has held the lock
|
||||
optional bool locked = 16 [default = false];
|
||||
|
||||
// whether the procedure need to be bypassed
|
||||
optional bool bypass = 17 [default = false];
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -85,7 +85,7 @@
|
|||
<tr>
|
||||
<td><%= proc.getProcId() %></td>
|
||||
<td><%= proc.hasParent() ? proc.getParentProcId() : "" %></td>
|
||||
<td><%= escapeXml(proc.getState().toString()) %></td>
|
||||
<td><%= escapeXml(proc.getState().toString() + (proc.isBypass() ? "(Bypass)" : "")) %></td>
|
||||
<td><%= proc.hasOwner() ? escapeXml(proc.getOwner()) : "" %></td>
|
||||
<td><%= escapeXml(proc.getProcName()) %></td>
|
||||
<td><%= new Date(proc.getSubmittedTime()) %></td>
|
||||
|
|
Loading…
Reference in New Issue