HBASE-19978 The keepalive logic is incomplete in ProcedureExecutor

This commit is contained in:
zhangduo 2018-02-19 08:08:06 +08:00 committed by Michael Stack
parent 4ef6319af0
commit c1fe9f441c
7 changed files with 522 additions and 217 deletions

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
class DelayedProcedure extends DelayedUtil.DelayedContainerWithTimestamp<Procedure<?>> {
public DelayedProcedure(Procedure<?> procedure) {
super(procedure, procedure.getTimeoutTimestamp());
}
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Inline Chores (executors internal chores).
*/
@InterfaceAudience.Private
abstract class InlineChore extends DelayedUtil.DelayedObject implements Runnable {
private long timeout;
public abstract int getTimeoutInterval();
protected void refreshTimeout() {
this.timeout = EnvironmentEdgeManager.currentTime() + getTimeoutInterval();
}
@Override
public long getTimeout() {
return timeout;
}
}

View File

@ -18,9 +18,6 @@
package org.apache.hadoop.hbase.procedure2;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@ -30,35 +27,35 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.procedure2.Procedure.LockState;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.NonceKey;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
/**
* Thread Pool that executes the submitted procedures.
@ -83,7 +80,7 @@ public class ProcedureExecutor<TEnvironment> {
public static final String WORKER_KEEP_ALIVE_TIME_CONF_KEY =
"hbase.procedure.worker.keep.alive.time.msec";
private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = Long.MAX_VALUE;
private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = TimeUnit.MINUTES.toMillis(1);
Testing testing = null;
public static class Testing {
@ -272,8 +269,9 @@ public class ProcedureExecutor<TEnvironment> {
private CopyOnWriteArrayList<WorkerThread> workerThreads;
private TimeoutExecutorThread timeoutExecutor;
private int corePoolSize;
private int maxPoolSize;
private volatile long keepAliveTime = Long.MAX_VALUE;
private volatile long keepAliveTime;
/**
* Scheduler/Queue that contains runnable procedures.
@ -501,7 +499,7 @@ public class ProcedureExecutor<TEnvironment> {
* a corrupted procedure is found on replay. otherwise false.
*/
public void start(int numThreads, boolean abortOnCorruption) throws IOException {
if (running.getAndSet(true)) {
if (!running.compareAndSet(false, true)) {
LOG.warn("Already running");
return;
}
@ -509,13 +507,15 @@ public class ProcedureExecutor<TEnvironment> {
// We have numThreads executor + one timer thread used for timing out
// procedures and triggering periodic procedures.
this.corePoolSize = numThreads;
LOG.info("Starting {} Workers (bigger of cpus/4 or 16)", corePoolSize);
this.maxPoolSize = 10 * numThreads;
LOG.info("Starting {} core workers (bigger of cpus/4 or 16) with max (burst) worker count={}",
corePoolSize, maxPoolSize);
// Create the Thread Group for the executors
threadGroup = new ThreadGroup("PEWorkerGroup");
// Create the timeout executor
timeoutExecutor = new TimeoutExecutorThread(threadGroup);
timeoutExecutor = new TimeoutExecutorThread(this, threadGroup);
// Create the workers
workerId.set(0);
@ -530,8 +530,8 @@ public class ProcedureExecutor<TEnvironment> {
st = EnvironmentEdgeManager.currentTime();
store.recoverLease();
et = EnvironmentEdgeManager.currentTime();
LOG.info(String.format("Recovered %s lease in %s",
store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st)));
LOG.info("Recovered {} lease in {}", store.getClass().getSimpleName(),
StringUtils.humanTimeDiff(et - st));
// start the procedure scheduler
scheduler.start();
@ -544,13 +544,11 @@ public class ProcedureExecutor<TEnvironment> {
st = EnvironmentEdgeManager.currentTime();
load(abortOnCorruption);
et = EnvironmentEdgeManager.currentTime();
LOG.info(String.format("Loaded %s in %s, um pid=",
store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st)));
LOG.info("Loaded {} in {}", store.getClass().getSimpleName(),
StringUtils.humanTimeDiff(et - st));
// Start the executors. Here we must have the lastProcId set.
if (LOG.isTraceEnabled()) {
LOG.trace("Start workers " + workerThreads.size());
}
LOG.trace("Start workers {}", workerThreads.size());
timeoutExecutor.start();
for (WorkerThread worker: workerThreads) {
worker.start();
@ -645,7 +643,7 @@ public class ProcedureExecutor<TEnvironment> {
return this.store;
}
protected ProcedureScheduler getScheduler() {
ProcedureScheduler getScheduler() {
return scheduler;
}
@ -1152,7 +1150,7 @@ public class ProcedureExecutor<TEnvironment> {
return procedures.keySet();
}
private Long getRootProcedureId(Procedure proc) {
Long getRootProcedureId(Procedure proc) {
return Procedure.getRootProcedureId(procedures, proc);
}
@ -1699,15 +1697,23 @@ public class ProcedureExecutor<TEnvironment> {
sendProcedureFinishedNotification(proc.getProcId());
}
RootProcedureState getProcStack(long rootProcId) {
return rollbackStack.get(rootProcId);
}
// ==========================================================================
// Worker Thread
// ==========================================================================
private final class WorkerThread extends StoppableThread {
private class WorkerThread extends StoppableThread {
private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);
private Procedure activeProcedure;
private volatile Procedure<?> activeProcedure;
public WorkerThread(final ThreadGroup group) {
super(group, "PEWorker-" + workerId.incrementAndGet());
public WorkerThread(ThreadGroup group) {
this(group, "PEWorker-");
}
protected WorkerThread(ThreadGroup group, String prefix) {
super(group, prefix + workerId.incrementAndGet());
setDaemon(true);
}
@ -1721,34 +1727,33 @@ public class ProcedureExecutor<TEnvironment> {
long lastUpdate = EnvironmentEdgeManager.currentTime();
try {
while (isRunning() && keepAlive(lastUpdate)) {
this.activeProcedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
if (this.activeProcedure == null) continue;
Procedure<?> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
if (proc == null) {
continue;
}
this.activeProcedure = proc;
int activeCount = activeExecutorCount.incrementAndGet();
int runningCount = store.setRunningProcedureCount(activeCount);
if (LOG.isTraceEnabled()) {
LOG.trace("Execute pid=" + this.activeProcedure.getProcId() +
" runningCount=" + runningCount + ", activeCount=" + activeCount);
}
LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(),
runningCount, activeCount);
executionStartTime.set(EnvironmentEdgeManager.currentTime());
try {
executeProcedure(this.activeProcedure);
executeProcedure(proc);
} catch (AssertionError e) {
LOG.info("ASSERT pid=" + this.activeProcedure.getProcId(), e);
LOG.info("ASSERT pid=" + proc.getProcId(), e);
throw e;
} finally {
activeCount = activeExecutorCount.decrementAndGet();
runningCount = store.setRunningProcedureCount(activeCount);
if (LOG.isTraceEnabled()) {
LOG.trace("Halt pid=" + this.activeProcedure.getProcId() +
" runningCount=" + runningCount + ", activeCount=" + activeCount);
}
LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(),
runningCount, activeCount);
this.activeProcedure = null;
lastUpdate = EnvironmentEdgeManager.currentTime();
executionStartTime.set(Long.MAX_VALUE);
}
}
} catch (Throwable t) {
LOG.warn("Worker terminating UNNATURALLY " + this.activeProcedure, t);
LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t);
} finally {
LOG.trace("Worker terminated.");
}
@ -1768,169 +1773,23 @@ public class ProcedureExecutor<TEnvironment> {
return EnvironmentEdgeManager.currentTime() - executionStartTime.get();
}
private boolean keepAlive(final long lastUpdate) {
if (workerThreads.size() <= corePoolSize) return true;
return (EnvironmentEdgeManager.currentTime() - lastUpdate) < keepAliveTime;
// core worker never timeout
protected boolean keepAlive(long lastUpdate) {
return true;
}
}
/**
* Runs task on a period such as check for stuck workers.
* @see InlineChore
*/
private final class TimeoutExecutorThread extends StoppableThread {
private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<>();
// A worker thread which can be added when core workers are stuck. Will timeout after
// keepAliveTime if there is no procedure to run.
private final class KeepAliveWorkerThread extends WorkerThread {
public TimeoutExecutorThread(final ThreadGroup group) {
super(group, "ProcExecTimeout");
setDaemon(true);
public KeepAliveWorkerThread(ThreadGroup group) {
super(group, "KeepAlivePEWorker-");
}
@Override
public void sendStopSignal() {
queue.add(DelayedUtil.DELAYED_POISON);
}
@Override
public void run() {
final boolean traceEnabled = LOG.isTraceEnabled();
while (isRunning()) {
final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue);
if (task == null || task == DelayedUtil.DELAYED_POISON) {
// the executor may be shutting down,
// and the task is just the shutdown request
continue;
}
if (traceEnabled) {
LOG.trace("Executing " + task);
}
// execute the task
if (task instanceof InlineChore) {
execInlineChore((InlineChore)task);
} else if (task instanceof DelayedProcedure) {
execDelayedProcedure((DelayedProcedure)task);
} else {
LOG.error("CODE-BUG unknown timeout task type " + task);
}
}
}
public void add(final InlineChore chore) {
chore.refreshTimeout();
queue.add(chore);
}
public void add(final Procedure procedure) {
assert procedure.getState() == ProcedureState.WAITING_TIMEOUT;
LOG.info("ADDED " + procedure + "; timeout=" + procedure.getTimeout() +
", timestamp=" + procedure.getTimeoutTimestamp());
queue.add(new DelayedProcedure(procedure));
}
public boolean remove(final Procedure procedure) {
return queue.remove(new DelayedProcedure(procedure));
}
private void execInlineChore(final InlineChore chore) {
chore.run();
add(chore);
}
private void execDelayedProcedure(final DelayedProcedure delayed) {
// TODO: treat this as a normal procedure, add it to the scheduler and
// let one of the workers handle it.
// Today we consider ProcedureInMemoryChore as InlineChores
final Procedure procedure = delayed.getObject();
if (procedure instanceof ProcedureInMemoryChore) {
executeInMemoryChore((ProcedureInMemoryChore)procedure);
// if the procedure is in a waiting state again, put it back in the queue
procedure.updateTimestamp();
if (procedure.isWaiting()) {
delayed.setTimeout(procedure.getTimeoutTimestamp());
queue.add(delayed);
}
} else {
executeTimedoutProcedure(procedure);
}
}
private void executeInMemoryChore(final ProcedureInMemoryChore chore) {
if (!chore.isWaiting()) return;
// The ProcedureInMemoryChore is a special case, and it acts as a chore.
// instead of bringing the Chore class in, we reuse this timeout thread for
// this special case.
try {
chore.periodicExecute(getEnvironment());
} catch (Throwable e) {
LOG.error("Ignoring " + chore + " exception: " + e.getMessage(), e);
}
}
private void executeTimedoutProcedure(final Procedure proc) {
// The procedure received a timeout. if the procedure itself does not handle it,
// call abort() and add the procedure back in the queue for rollback.
if (proc.setTimeoutFailure(getEnvironment())) {
long rootProcId = Procedure.getRootProcedureId(procedures, proc);
RootProcedureState procStack = rollbackStack.get(rootProcId);
procStack.abort();
store.update(proc);
scheduler.addFront(proc);
}
}
}
private static final class DelayedProcedure
extends DelayedUtil.DelayedContainerWithTimestamp<Procedure> {
public DelayedProcedure(final Procedure procedure) {
super(procedure, procedure.getTimeoutTimestamp());
}
}
private static abstract class StoppableThread extends Thread {
public StoppableThread(final ThreadGroup group, final String name) {
super(group, name);
}
public abstract void sendStopSignal();
public void awaitTermination() {
try {
final long startTime = EnvironmentEdgeManager.currentTime();
for (int i = 0; isAlive(); ++i) {
sendStopSignal();
join(250);
// Log every two seconds; send interrupt too.
if (i > 0 && (i % 8) == 0) {
LOG.warn("Waiting termination of thread " + getName() + ", " +
StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime) +
"; sending interrupt");
interrupt();
}
}
} catch (InterruptedException e) {
LOG.warn(getName() + " join wait got interrupted", e);
}
}
}
// ==========================================================================
// Inline Chores (executors internal chores)
// ==========================================================================
private static abstract class InlineChore extends DelayedUtil.DelayedObject implements Runnable {
private long timeout;
public abstract int getTimeoutInterval();
protected void refreshTimeout() {
this.timeout = EnvironmentEdgeManager.currentTime() + getTimeoutInterval();
}
@Override
public long getTimeout() {
return timeout;
protected boolean keepAlive(long lastUpdate) {
return EnvironmentEdgeManager.currentTime() - lastUpdate < keepAliveTime;
}
}
@ -1975,32 +1834,35 @@ public class ProcedureExecutor<TEnvironment> {
private int checkForStuckWorkers() {
// check if any of the worker is stuck
int stuckCount = 0;
for (WorkerThread worker: workerThreads) {
for (WorkerThread worker : workerThreads) {
if (worker.getCurrentRunTime() < stuckThreshold) {
continue;
}
// WARN the worker is stuck
stuckCount++;
LOG.warn("Worker stuck " + worker +
" run time " + StringUtils.humanTimeDiff(worker.getCurrentRunTime()));
LOG.warn("Worker stuck {} run time {}", worker,
StringUtils.humanTimeDiff(worker.getCurrentRunTime()));
}
return stuckCount;
}
private void checkThreadCount(final int stuckCount) {
// nothing to do if there are no runnable tasks
if (stuckCount < 1 || !scheduler.hasRunnables()) return;
if (stuckCount < 1 || !scheduler.hasRunnables()) {
return;
}
// add a new thread if the worker stuck percentage exceed the threshold limit
// and every handler is active.
final float stuckPerc = ((float)stuckCount) / workerThreads.size();
if (stuckPerc >= addWorkerStuckPercentage &&
activeExecutorCount.get() == workerThreads.size()) {
final WorkerThread worker = new WorkerThread(threadGroup);
final float stuckPerc = ((float) stuckCount) / workerThreads.size();
// let's add new worker thread more aggressively, as they will timeout finally if there is no
// work to do.
if (stuckPerc >= addWorkerStuckPercentage && workerThreads.size() < maxPoolSize) {
final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup);
workerThreads.add(worker);
worker.start();
LOG.debug("Added new worker thread " + worker);
LOG.debug("Added new worker thread {}", worker);
}
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
abstract class StoppableThread extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(StoppableThread.class);
public StoppableThread(final ThreadGroup group, final String name) {
super(group, name);
}
public abstract void sendStopSignal();
public void awaitTermination() {
try {
final long startTime = EnvironmentEdgeManager.currentTime();
for (int i = 0; isAlive(); ++i) {
sendStopSignal();
join(250);
// Log every two seconds; send interrupt too.
if (i > 0 && (i % 8) == 0) {
LOG.warn("Waiting termination of thread {}, {}; sending interrupt", getName(),
StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime));
interrupt();
}
}
} catch (InterruptedException e) {
LOG.warn("{} join wait got interrupted", getName(), e);
}
}
}

View File

@ -0,0 +1,140 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import java.util.concurrent.DelayQueue;
import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
/**
* Runs task on a period such as check for stuck workers.
* @see InlineChore
*/
@InterfaceAudience.Private
class TimeoutExecutorThread extends StoppableThread {
private static final Logger LOG = LoggerFactory.getLogger(TimeoutExecutorThread.class);
private final ProcedureExecutor<?> executor;
private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<>();
public TimeoutExecutorThread(ProcedureExecutor<?> executor, ThreadGroup group) {
super(group, "ProcExecTimeout");
setDaemon(true);
this.executor = executor;
}
@Override
public void sendStopSignal() {
queue.add(DelayedUtil.DELAYED_POISON);
}
@Override
public void run() {
while (executor.isRunning()) {
final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue);
if (task == null || task == DelayedUtil.DELAYED_POISON) {
// the executor may be shutting down,
// and the task is just the shutdown request
continue;
}
LOG.trace("Executing {}", task);
// execute the task
if (task instanceof InlineChore) {
execInlineChore((InlineChore) task);
} else if (task instanceof DelayedProcedure) {
execDelayedProcedure((DelayedProcedure) task);
} else {
LOG.error("CODE-BUG unknown timeout task type {}", task);
}
}
}
public void add(InlineChore chore) {
chore.refreshTimeout();
queue.add(chore);
}
public void add(Procedure<?> procedure) {
assert procedure.getState() == ProcedureState.WAITING_TIMEOUT;
LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(),
procedure.getTimeoutTimestamp());
queue.add(new DelayedProcedure(procedure));
}
public boolean remove(Procedure<?> procedure) {
return queue.remove(new DelayedProcedure(procedure));
}
private void execInlineChore(InlineChore chore) {
chore.run();
add(chore);
}
private void execDelayedProcedure(DelayedProcedure delayed) {
// TODO: treat this as a normal procedure, add it to the scheduler and
// let one of the workers handle it.
// Today we consider ProcedureInMemoryChore as InlineChores
Procedure<?> procedure = delayed.getObject();
if (procedure instanceof ProcedureInMemoryChore) {
executeInMemoryChore((ProcedureInMemoryChore) procedure);
// if the procedure is in a waiting state again, put it back in the queue
procedure.updateTimestamp();
if (procedure.isWaiting()) {
delayed.setTimeout(procedure.getTimeoutTimestamp());
queue.add(delayed);
}
} else {
executeTimedoutProcedure(procedure);
}
}
private void executeInMemoryChore(ProcedureInMemoryChore chore) {
if (!chore.isWaiting()) {
return;
}
// The ProcedureInMemoryChore is a special case, and it acts as a chore.
// instead of bringing the Chore class in, we reuse this timeout thread for
// this special case.
try {
chore.periodicExecute(executor.getEnvironment());
} catch (Throwable e) {
LOG.error("Ignoring {} exception: {}", chore, e.getMessage(), e);
}
}
private void executeTimedoutProcedure(Procedure proc) {
// The procedure received a timeout. if the procedure itself does not handle it,
// call abort() and add the procedure back in the queue for rollback.
if (proc.setTimeoutFailure(executor.getEnvironment())) {
long rootProcId = executor.getRootProcedureId(proc);
RootProcedureState procStack = executor.getProcStack(rootProcId);
procStack.abort();
executor.getStore().update(proc);
executor.getScheduler().addFront(proc);
}
}
}

View File

@ -1221,11 +1221,10 @@ public class HMaster extends HRegionServer implements MasterServices {
configurationManager.registerObserver(procEnv);
int cpus = Runtime.getRuntime().availableProcessors();
final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
Math.max((cpus > 0? cpus/4: 0),
MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
final boolean abortOnCorruption = conf.getBoolean(
MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, Math.max(
(cpus > 0 ? cpus / 4 : 0), MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
final boolean abortOnCorruption =
conf.getBoolean(MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
procedureStore.start(numThreads);
procedureExecutor.start(numThreads, abortOnCorruption);
@ -3556,7 +3555,7 @@ public class HMaster extends HRegionServer implements MasterServices {
public boolean recoverMeta() throws IOException {
ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0);
LOG.info("Running RecoverMetaProcedure to ensure proper hbase:meta deploy.");
long procId = procedureExecutor.submitProcedure(new RecoverMetaProcedure(null, true, latch));
procedureExecutor.submitProcedure(new RecoverMetaProcedure(null, true, latch));
latch.await();
LOG.info("hbase:meta deployed at=" +
getMetaTableLocator().getMetaRegionLocation(getZooKeeper()));

View File

@ -0,0 +1,180 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Test to ensure that the priority for procedures and stuck checker can partially solve the problem
* describe in HBASE-19976, that is, RecoverMetaProcedure can finally be executed within a certain
* period of time.
*/
@Category({ MasterTests.class, LargeTests.class })
public class TestProcedurePriority {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestProcedurePriority.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static String TABLE_NAME_PREFIX = "TestProcedurePriority-";
private static byte[] CF = Bytes.toBytes("cf");
private static byte[] CQ = Bytes.toBytes("cq");
private static int CORE_POOL_SIZE;
private static int TABLE_COUNT;
private static volatile boolean FAIL = false;
public static final class MyCP implements RegionObserver, RegionCoprocessor {
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
@Override
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
List<Cell> result) throws IOException {
if (FAIL && c.getEnvironment().getRegionInfo().isMetaRegion()) {
throw new IOException("Inject error");
}
}
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
Durability durability) throws IOException {
if (FAIL && c.getEnvironment().getRegionInfo().isMetaRegion()) {
throw new IOException("Inject error");
}
}
}
@BeforeClass
public static void setUp() throws Exception {
UTIL.getConfiguration().setLong(ProcedureExecutor.WORKER_KEEP_ALIVE_TIME_CONF_KEY, 5000);
UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 4);
UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, MyCP.class.getName());
UTIL.startMiniCluster(3);
CORE_POOL_SIZE =
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor().getCorePoolSize();
TABLE_COUNT = 50 * CORE_POOL_SIZE;
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < TABLE_COUNT; i++) {
futures.add(UTIL.getAdmin().createTableAsync(
TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE_NAME_PREFIX + i))
.addColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)).build(),
null));
}
for (Future<?> future : futures) {
future.get(1, TimeUnit.MINUTES);
}
UTIL.getAdmin().balance(true);
UTIL.waitUntilNoRegionsInTransition();
}
@AfterClass
public static void tearDown() throws Exception {
UTIL.shutdownMiniCluster();
}
@Test
public void test() throws Exception {
RegionServerThread rsWithMetaThread = UTIL.getMiniHBaseCluster().getRegionServerThreads()
.stream().filter(t -> !t.getRegionServer().getRegions(TableName.META_TABLE_NAME).isEmpty())
.findAny().get();
HRegionServer rsNoMeta = UTIL.getOtherRegionServer(rsWithMetaThread.getRegionServer());
FAIL = true;
UTIL.getMiniHBaseCluster().killRegionServer(rsNoMeta.getServerName());
// wait until all the worker thread are stuck, which means that the stuck checker will start to
// add new worker thread.
ProcedureExecutor<?> executor =
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
UTIL.waitFor(60000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return executor.getWorkerThreadCount() > CORE_POOL_SIZE;
}
@Override
public String explainFailure() throws Exception {
return "Stuck checker does not add new worker thread";
}
});
UTIL.getMiniHBaseCluster().killRegionServer(rsWithMetaThread.getRegionServer().getServerName());
rsWithMetaThread.join();
FAIL = false;
// verify that the cluster is back
UTIL.waitUntilNoRegionsInTransition(60000);
for (int i = 0; i < TABLE_COUNT; i++) {
try (Table table = UTIL.getConnection().getTable(TableName.valueOf(TABLE_NAME_PREFIX + i))) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return executor.getWorkerThreadCount() == CORE_POOL_SIZE;
}
@Override
public String explainFailure() throws Exception {
return "The new workers do not timeout";
}
});
}
}