From 391790ddb05f0db0ffd67e0c2f2f961157af7be3 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Mon, 19 Feb 2018 08:08:06 +0800 Subject: [PATCH] HBASE-19978 The keepalive logic is incomplete in ProcedureExecutor --- .../hbase/procedure2/DelayedProcedure.java | 28 ++ .../hadoop/hbase/procedure2/InlineChore.java | 42 +++ .../hbase/procedure2/ProcedureExecutor.java | 284 +++++------------- .../hbase/procedure2/StoppableThread.java | 54 ++++ .../procedure2/TimeoutExecutorThread.java | 140 +++++++++ .../apache/hadoop/hbase/master/HMaster.java | 11 +- .../procedure/TestProcedurePriority.java | 180 +++++++++++ 7 files changed, 522 insertions(+), 217 deletions(-) create mode 100644 hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/DelayedProcedure.java create mode 100644 hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/InlineChore.java create mode 100644 hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StoppableThread.java create mode 100644 hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedurePriority.java diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/DelayedProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/DelayedProcedure.java new file mode 100644 index 00000000000..fcec0b7d228 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/DelayedProcedure.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure2; + +import org.apache.hadoop.hbase.procedure2.util.DelayedUtil; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +class DelayedProcedure extends DelayedUtil.DelayedContainerWithTimestamp> { + public DelayedProcedure(Procedure procedure) { + super(procedure, procedure.getTimeoutTimestamp()); + } +} diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/InlineChore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/InlineChore.java new file mode 100644 index 00000000000..32b4922a0b1 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/InlineChore.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure2; + +import org.apache.hadoop.hbase.procedure2.util.DelayedUtil; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Inline Chores (executors internal chores). + */ +@InterfaceAudience.Private +abstract class InlineChore extends DelayedUtil.DelayedObject implements Runnable { + + private long timeout; + + public abstract int getTimeoutInterval(); + + protected void refreshTimeout() { + this.timeout = EnvironmentEdgeManager.currentTime() + getTimeoutInterval(); + } + + @Override + public long getTimeout() { + return timeout; + } +} diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index b1ff4261b76..af0a61bf1d2 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -18,9 +18,6 @@ package org.apache.hadoop.hbase.procedure2; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; - import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -30,35 +27,35 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.Stream; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.DelayQueue; -import java.util.concurrent.TimeUnit; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.procedure2.Procedure.LockState; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; -import org.apache.hadoop.hbase.procedure2.util.DelayedUtil; -import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout; import org.apache.hadoop.hbase.procedure2.util.StringUtils; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.NonceKey; import org.apache.hadoop.hbase.util.Threads; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; /** * Thread Pool that executes the submitted procedures. @@ -83,7 +80,7 @@ public class ProcedureExecutor { public static final String WORKER_KEEP_ALIVE_TIME_CONF_KEY = "hbase.procedure.worker.keep.alive.time.msec"; - private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = Long.MAX_VALUE; + private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = TimeUnit.MINUTES.toMillis(1); Testing testing = null; public static class Testing { @@ -272,8 +269,9 @@ public class ProcedureExecutor { private CopyOnWriteArrayList workerThreads; private TimeoutExecutorThread timeoutExecutor; private int corePoolSize; + private int maxPoolSize; - private volatile long keepAliveTime = Long.MAX_VALUE; + private volatile long keepAliveTime; /** * Scheduler/Queue that contains runnable procedures. @@ -501,7 +499,7 @@ public class ProcedureExecutor { * a corrupted procedure is found on replay. otherwise false. */ public void start(int numThreads, boolean abortOnCorruption) throws IOException { - if (running.getAndSet(true)) { + if (!running.compareAndSet(false, true)) { LOG.warn("Already running"); return; } @@ -509,13 +507,15 @@ public class ProcedureExecutor { // We have numThreads executor + one timer thread used for timing out // procedures and triggering periodic procedures. this.corePoolSize = numThreads; - LOG.info("Starting {} Workers (bigger of cpus/4 or 16)", corePoolSize); + this.maxPoolSize = 10 * numThreads; + LOG.info("Starting {} core workers (bigger of cpus/4 or 16) with max (burst) worker count={}", + corePoolSize, maxPoolSize); // Create the Thread Group for the executors threadGroup = new ThreadGroup("PEWorkerGroup"); // Create the timeout executor - timeoutExecutor = new TimeoutExecutorThread(threadGroup); + timeoutExecutor = new TimeoutExecutorThread(this, threadGroup); // Create the workers workerId.set(0); @@ -530,8 +530,8 @@ public class ProcedureExecutor { st = EnvironmentEdgeManager.currentTime(); store.recoverLease(); et = EnvironmentEdgeManager.currentTime(); - LOG.info(String.format("Recovered %s lease in %s", - store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st))); + LOG.info("Recovered {} lease in {}", store.getClass().getSimpleName(), + StringUtils.humanTimeDiff(et - st)); // start the procedure scheduler scheduler.start(); @@ -544,13 +544,11 @@ public class ProcedureExecutor { st = EnvironmentEdgeManager.currentTime(); load(abortOnCorruption); et = EnvironmentEdgeManager.currentTime(); - LOG.info(String.format("Loaded %s in %s, um pid=", - store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st))); + LOG.info("Loaded {} in {}", store.getClass().getSimpleName(), + StringUtils.humanTimeDiff(et - st)); // Start the executors. Here we must have the lastProcId set. - if (LOG.isTraceEnabled()) { - LOG.trace("Start workers " + workerThreads.size()); - } + LOG.trace("Start workers {}", workerThreads.size()); timeoutExecutor.start(); for (WorkerThread worker: workerThreads) { worker.start(); @@ -645,7 +643,7 @@ public class ProcedureExecutor { return this.store; } - protected ProcedureScheduler getScheduler() { + ProcedureScheduler getScheduler() { return scheduler; } @@ -1152,7 +1150,7 @@ public class ProcedureExecutor { return procedures.keySet(); } - private Long getRootProcedureId(Procedure proc) { + Long getRootProcedureId(Procedure proc) { return Procedure.getRootProcedureId(procedures, proc); } @@ -1699,15 +1697,23 @@ public class ProcedureExecutor { sendProcedureFinishedNotification(proc.getProcId()); } + RootProcedureState getProcStack(long rootProcId) { + return rollbackStack.get(rootProcId); + } + // ========================================================================== // Worker Thread // ========================================================================== - private final class WorkerThread extends StoppableThread { + private class WorkerThread extends StoppableThread { private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE); - private Procedure activeProcedure; + private volatile Procedure activeProcedure; - public WorkerThread(final ThreadGroup group) { - super(group, "PEWorker-" + workerId.incrementAndGet()); + public WorkerThread(ThreadGroup group) { + this(group, "PEWorker-"); + } + + protected WorkerThread(ThreadGroup group, String prefix) { + super(group, prefix + workerId.incrementAndGet()); setDaemon(true); } @@ -1721,34 +1727,33 @@ public class ProcedureExecutor { long lastUpdate = EnvironmentEdgeManager.currentTime(); try { while (isRunning() && keepAlive(lastUpdate)) { - this.activeProcedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); - if (this.activeProcedure == null) continue; + Procedure proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); + if (proc == null) { + continue; + } + this.activeProcedure = proc; int activeCount = activeExecutorCount.incrementAndGet(); int runningCount = store.setRunningProcedureCount(activeCount); - if (LOG.isTraceEnabled()) { - LOG.trace("Execute pid=" + this.activeProcedure.getProcId() + - " runningCount=" + runningCount + ", activeCount=" + activeCount); - } + LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(), + runningCount, activeCount); executionStartTime.set(EnvironmentEdgeManager.currentTime()); try { - executeProcedure(this.activeProcedure); + executeProcedure(proc); } catch (AssertionError e) { - LOG.info("ASSERT pid=" + this.activeProcedure.getProcId(), e); + LOG.info("ASSERT pid=" + proc.getProcId(), e); throw e; } finally { activeCount = activeExecutorCount.decrementAndGet(); runningCount = store.setRunningProcedureCount(activeCount); - if (LOG.isTraceEnabled()) { - LOG.trace("Halt pid=" + this.activeProcedure.getProcId() + - " runningCount=" + runningCount + ", activeCount=" + activeCount); - } + LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(), + runningCount, activeCount); this.activeProcedure = null; lastUpdate = EnvironmentEdgeManager.currentTime(); executionStartTime.set(Long.MAX_VALUE); } } } catch (Throwable t) { - LOG.warn("Worker terminating UNNATURALLY " + this.activeProcedure, t); + LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t); } finally { LOG.trace("Worker terminated."); } @@ -1768,169 +1773,23 @@ public class ProcedureExecutor { return EnvironmentEdgeManager.currentTime() - executionStartTime.get(); } - private boolean keepAlive(final long lastUpdate) { - if (workerThreads.size() <= corePoolSize) return true; - return (EnvironmentEdgeManager.currentTime() - lastUpdate) < keepAliveTime; + // core worker never timeout + protected boolean keepAlive(long lastUpdate) { + return true; } } - /** - * Runs task on a period such as check for stuck workers. - * @see InlineChore - */ - private final class TimeoutExecutorThread extends StoppableThread { - private final DelayQueue queue = new DelayQueue<>(); + // A worker thread which can be added when core workers are stuck. Will timeout after + // keepAliveTime if there is no procedure to run. + private final class KeepAliveWorkerThread extends WorkerThread { - public TimeoutExecutorThread(final ThreadGroup group) { - super(group, "ProcExecTimeout"); - setDaemon(true); + public KeepAliveWorkerThread(ThreadGroup group) { + super(group, "KeepAlivePEWorker-"); } @Override - public void sendStopSignal() { - queue.add(DelayedUtil.DELAYED_POISON); - } - - @Override - public void run() { - final boolean traceEnabled = LOG.isTraceEnabled(); - while (isRunning()) { - final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue); - if (task == null || task == DelayedUtil.DELAYED_POISON) { - // the executor may be shutting down, - // and the task is just the shutdown request - continue; - } - - if (traceEnabled) { - LOG.trace("Executing " + task); - } - - // execute the task - if (task instanceof InlineChore) { - execInlineChore((InlineChore)task); - } else if (task instanceof DelayedProcedure) { - execDelayedProcedure((DelayedProcedure)task); - } else { - LOG.error("CODE-BUG unknown timeout task type " + task); - } - } - } - - public void add(final InlineChore chore) { - chore.refreshTimeout(); - queue.add(chore); - } - - public void add(final Procedure procedure) { - assert procedure.getState() == ProcedureState.WAITING_TIMEOUT; - LOG.info("ADDED " + procedure + "; timeout=" + procedure.getTimeout() + - ", timestamp=" + procedure.getTimeoutTimestamp()); - queue.add(new DelayedProcedure(procedure)); - } - - public boolean remove(final Procedure procedure) { - return queue.remove(new DelayedProcedure(procedure)); - } - - private void execInlineChore(final InlineChore chore) { - chore.run(); - add(chore); - } - - private void execDelayedProcedure(final DelayedProcedure delayed) { - // TODO: treat this as a normal procedure, add it to the scheduler and - // let one of the workers handle it. - // Today we consider ProcedureInMemoryChore as InlineChores - final Procedure procedure = delayed.getObject(); - if (procedure instanceof ProcedureInMemoryChore) { - executeInMemoryChore((ProcedureInMemoryChore)procedure); - // if the procedure is in a waiting state again, put it back in the queue - procedure.updateTimestamp(); - if (procedure.isWaiting()) { - delayed.setTimeout(procedure.getTimeoutTimestamp()); - queue.add(delayed); - } - } else { - executeTimedoutProcedure(procedure); - } - } - - private void executeInMemoryChore(final ProcedureInMemoryChore chore) { - if (!chore.isWaiting()) return; - - // The ProcedureInMemoryChore is a special case, and it acts as a chore. - // instead of bringing the Chore class in, we reuse this timeout thread for - // this special case. - try { - chore.periodicExecute(getEnvironment()); - } catch (Throwable e) { - LOG.error("Ignoring " + chore + " exception: " + e.getMessage(), e); - } - } - - private void executeTimedoutProcedure(final Procedure proc) { - // The procedure received a timeout. if the procedure itself does not handle it, - // call abort() and add the procedure back in the queue for rollback. - if (proc.setTimeoutFailure(getEnvironment())) { - long rootProcId = Procedure.getRootProcedureId(procedures, proc); - RootProcedureState procStack = rollbackStack.get(rootProcId); - procStack.abort(); - store.update(proc); - scheduler.addFront(proc); - } - } - } - - private static final class DelayedProcedure - extends DelayedUtil.DelayedContainerWithTimestamp { - public DelayedProcedure(final Procedure procedure) { - super(procedure, procedure.getTimeoutTimestamp()); - } - } - - private static abstract class StoppableThread extends Thread { - public StoppableThread(final ThreadGroup group, final String name) { - super(group, name); - } - - public abstract void sendStopSignal(); - - public void awaitTermination() { - try { - final long startTime = EnvironmentEdgeManager.currentTime(); - for (int i = 0; isAlive(); ++i) { - sendStopSignal(); - join(250); - // Log every two seconds; send interrupt too. - if (i > 0 && (i % 8) == 0) { - LOG.warn("Waiting termination of thread " + getName() + ", " + - StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime) + - "; sending interrupt"); - interrupt(); - } - } - } catch (InterruptedException e) { - LOG.warn(getName() + " join wait got interrupted", e); - } - } - } - - // ========================================================================== - // Inline Chores (executors internal chores) - // ========================================================================== - private static abstract class InlineChore extends DelayedUtil.DelayedObject implements Runnable { - private long timeout; - - public abstract int getTimeoutInterval(); - - protected void refreshTimeout() { - this.timeout = EnvironmentEdgeManager.currentTime() + getTimeoutInterval(); - } - - @Override - public long getTimeout() { - return timeout; + protected boolean keepAlive(long lastUpdate) { + return EnvironmentEdgeManager.currentTime() - lastUpdate < keepAliveTime; } } @@ -1975,32 +1834,35 @@ public class ProcedureExecutor { private int checkForStuckWorkers() { // check if any of the worker is stuck int stuckCount = 0; - for (WorkerThread worker: workerThreads) { + for (WorkerThread worker : workerThreads) { if (worker.getCurrentRunTime() < stuckThreshold) { continue; } // WARN the worker is stuck stuckCount++; - LOG.warn("Worker stuck " + worker + - " run time " + StringUtils.humanTimeDiff(worker.getCurrentRunTime())); + LOG.warn("Worker stuck {} run time {}", worker, + StringUtils.humanTimeDiff(worker.getCurrentRunTime())); } return stuckCount; } private void checkThreadCount(final int stuckCount) { // nothing to do if there are no runnable tasks - if (stuckCount < 1 || !scheduler.hasRunnables()) return; + if (stuckCount < 1 || !scheduler.hasRunnables()) { + return; + } // add a new thread if the worker stuck percentage exceed the threshold limit // and every handler is active. - final float stuckPerc = ((float)stuckCount) / workerThreads.size(); - if (stuckPerc >= addWorkerStuckPercentage && - activeExecutorCount.get() == workerThreads.size()) { - final WorkerThread worker = new WorkerThread(threadGroup); + final float stuckPerc = ((float) stuckCount) / workerThreads.size(); + // let's add new worker thread more aggressively, as they will timeout finally if there is no + // work to do. + if (stuckPerc >= addWorkerStuckPercentage && workerThreads.size() < maxPoolSize) { + final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup); workerThreads.add(worker); worker.start(); - LOG.debug("Added new worker thread " + worker); + LOG.debug("Added new worker thread {}", worker); } } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StoppableThread.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StoppableThread.java new file mode 100644 index 00000000000..b58b571a934 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StoppableThread.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure2; + +import org.apache.hadoop.hbase.procedure2.util.StringUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +abstract class StoppableThread extends Thread { + + private static final Logger LOG = LoggerFactory.getLogger(StoppableThread.class); + + public StoppableThread(final ThreadGroup group, final String name) { + super(group, name); + } + + public abstract void sendStopSignal(); + + public void awaitTermination() { + try { + final long startTime = EnvironmentEdgeManager.currentTime(); + for (int i = 0; isAlive(); ++i) { + sendStopSignal(); + join(250); + // Log every two seconds; send interrupt too. + if (i > 0 && (i % 8) == 0) { + LOG.warn("Waiting termination of thread {}, {}; sending interrupt", getName(), + StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime)); + interrupt(); + } + } + } catch (InterruptedException e) { + LOG.warn("{} join wait got interrupted", getName(), e); + } + } +} diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java new file mode 100644 index 00000000000..e5e32309aeb --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure2; + +import java.util.concurrent.DelayQueue; +import org.apache.hadoop.hbase.procedure2.util.DelayedUtil; +import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; + +/** + * Runs task on a period such as check for stuck workers. + * @see InlineChore + */ +@InterfaceAudience.Private +class TimeoutExecutorThread extends StoppableThread { + + private static final Logger LOG = LoggerFactory.getLogger(TimeoutExecutorThread.class); + + private final ProcedureExecutor executor; + + private final DelayQueue queue = new DelayQueue<>(); + + public TimeoutExecutorThread(ProcedureExecutor executor, ThreadGroup group) { + super(group, "ProcExecTimeout"); + setDaemon(true); + this.executor = executor; + } + + @Override + public void sendStopSignal() { + queue.add(DelayedUtil.DELAYED_POISON); + } + + @Override + public void run() { + while (executor.isRunning()) { + final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue); + if (task == null || task == DelayedUtil.DELAYED_POISON) { + // the executor may be shutting down, + // and the task is just the shutdown request + continue; + } + LOG.trace("Executing {}", task); + + // execute the task + if (task instanceof InlineChore) { + execInlineChore((InlineChore) task); + } else if (task instanceof DelayedProcedure) { + execDelayedProcedure((DelayedProcedure) task); + } else { + LOG.error("CODE-BUG unknown timeout task type {}", task); + } + } + } + + public void add(InlineChore chore) { + chore.refreshTimeout(); + queue.add(chore); + } + + public void add(Procedure procedure) { + assert procedure.getState() == ProcedureState.WAITING_TIMEOUT; + LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(), + procedure.getTimeoutTimestamp()); + queue.add(new DelayedProcedure(procedure)); + } + + public boolean remove(Procedure procedure) { + return queue.remove(new DelayedProcedure(procedure)); + } + + private void execInlineChore(InlineChore chore) { + chore.run(); + add(chore); + } + + private void execDelayedProcedure(DelayedProcedure delayed) { + // TODO: treat this as a normal procedure, add it to the scheduler and + // let one of the workers handle it. + // Today we consider ProcedureInMemoryChore as InlineChores + Procedure procedure = delayed.getObject(); + if (procedure instanceof ProcedureInMemoryChore) { + executeInMemoryChore((ProcedureInMemoryChore) procedure); + // if the procedure is in a waiting state again, put it back in the queue + procedure.updateTimestamp(); + if (procedure.isWaiting()) { + delayed.setTimeout(procedure.getTimeoutTimestamp()); + queue.add(delayed); + } + } else { + executeTimedoutProcedure(procedure); + } + } + + private void executeInMemoryChore(ProcedureInMemoryChore chore) { + if (!chore.isWaiting()) { + return; + } + + // The ProcedureInMemoryChore is a special case, and it acts as a chore. + // instead of bringing the Chore class in, we reuse this timeout thread for + // this special case. + try { + chore.periodicExecute(executor.getEnvironment()); + } catch (Throwable e) { + LOG.error("Ignoring {} exception: {}", chore, e.getMessage(), e); + } + } + + private void executeTimedoutProcedure(Procedure proc) { + // The procedure received a timeout. if the procedure itself does not handle it, + // call abort() and add the procedure back in the queue for rollback. + if (proc.setTimeoutFailure(executor.getEnvironment())) { + long rootProcId = executor.getRootProcedureId(proc); + RootProcedureState procStack = executor.getProcStack(rootProcId); + procStack.abort(); + executor.getStore().update(proc); + executor.getScheduler().addFront(proc); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index b949aa98ae0..b639503c59b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1213,11 +1213,10 @@ public class HMaster extends HRegionServer implements MasterServices { configurationManager.registerObserver(procEnv); int cpus = Runtime.getRuntime().availableProcessors(); - final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, - Math.max((cpus > 0? cpus/4: 0), - MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS)); - final boolean abortOnCorruption = conf.getBoolean( - MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION, + final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, Math.max( + (cpus > 0 ? cpus / 4 : 0), MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS)); + final boolean abortOnCorruption = + conf.getBoolean(MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION, MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION); procedureStore.start(numThreads); procedureExecutor.start(numThreads, abortOnCorruption); @@ -3522,7 +3521,7 @@ public class HMaster extends HRegionServer implements MasterServices { public boolean recoverMeta() throws IOException { ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0); LOG.info("Running RecoverMetaProcedure to ensure proper hbase:meta deploy."); - long procId = procedureExecutor.submitProcedure(new RecoverMetaProcedure(null, true, latch)); + procedureExecutor.submitProcedure(new RecoverMetaProcedure(null, true, latch)); latch.await(); LOG.info("hbase:meta deployed at=" + getMetaTableLocator().getMetaRegionLocation(getZooKeeper())); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedurePriority.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedurePriority.java new file mode 100644 index 00000000000..05d8976a0f1 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedurePriority.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test to ensure that the priority for procedures and stuck checker can partially solve the problem + * describe in HBASE-19976, that is, RecoverMetaProcedure can finally be executed within a certain + * period of time. + */ +@Category({ MasterTests.class, LargeTests.class }) +public class TestProcedurePriority { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestProcedurePriority.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static String TABLE_NAME_PREFIX = "TestProcedurePriority-"; + + private static byte[] CF = Bytes.toBytes("cf"); + + private static byte[] CQ = Bytes.toBytes("cq"); + + private static int CORE_POOL_SIZE; + + private static int TABLE_COUNT; + + private static volatile boolean FAIL = false; + + public static final class MyCP implements RegionObserver, RegionCoprocessor { + + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + + @Override + public void preGetOp(ObserverContext c, Get get, + List result) throws IOException { + if (FAIL && c.getEnvironment().getRegionInfo().isMetaRegion()) { + throw new IOException("Inject error"); + } + } + + @Override + public void prePut(ObserverContext c, Put put, WALEdit edit, + Durability durability) throws IOException { + if (FAIL && c.getEnvironment().getRegionInfo().isMetaRegion()) { + throw new IOException("Inject error"); + } + } + } + + @BeforeClass + public static void setUp() throws Exception { + UTIL.getConfiguration().setLong(ProcedureExecutor.WORKER_KEEP_ALIVE_TIME_CONF_KEY, 5000); + UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 4); + UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, MyCP.class.getName()); + UTIL.startMiniCluster(3); + CORE_POOL_SIZE = + UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor().getCorePoolSize(); + TABLE_COUNT = 50 * CORE_POOL_SIZE; + List> futures = new ArrayList<>(); + for (int i = 0; i < TABLE_COUNT; i++) { + futures.add(UTIL.getAdmin().createTableAsync( + TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE_NAME_PREFIX + i)) + .addColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)).build(), + null)); + } + for (Future future : futures) { + future.get(1, TimeUnit.MINUTES); + } + UTIL.getAdmin().balance(true); + UTIL.waitUntilNoRegionsInTransition(); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + public void test() throws Exception { + RegionServerThread rsWithMetaThread = UTIL.getMiniHBaseCluster().getRegionServerThreads() + .stream().filter(t -> !t.getRegionServer().getRegions(TableName.META_TABLE_NAME).isEmpty()) + .findAny().get(); + HRegionServer rsNoMeta = UTIL.getOtherRegionServer(rsWithMetaThread.getRegionServer()); + FAIL = true; + UTIL.getMiniHBaseCluster().killRegionServer(rsNoMeta.getServerName()); + // wait until all the worker thread are stuck, which means that the stuck checker will start to + // add new worker thread. + ProcedureExecutor executor = + UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); + UTIL.waitFor(60000, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + return executor.getWorkerThreadCount() > CORE_POOL_SIZE; + } + + @Override + public String explainFailure() throws Exception { + return "Stuck checker does not add new worker thread"; + } + }); + UTIL.getMiniHBaseCluster().killRegionServer(rsWithMetaThread.getRegionServer().getServerName()); + rsWithMetaThread.join(); + FAIL = false; + // verify that the cluster is back + UTIL.waitUntilNoRegionsInTransition(60000); + for (int i = 0; i < TABLE_COUNT; i++) { + try (Table table = UTIL.getConnection().getTable(TableName.valueOf(TABLE_NAME_PREFIX + i))) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + UTIL.waitFor(30000, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + return executor.getWorkerThreadCount() == CORE_POOL_SIZE; + } + + @Override + public String explainFailure() throws Exception { + return "The new workers do not timeout"; + } + }); + } +}