HBASE-19978 The keepalive logic is incomplete in ProcedureExecutor
This commit is contained in:
parent
a9a6eed372
commit
391790ddb0
|
@ -0,0 +1,28 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.procedure2;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class DelayedProcedure extends DelayedUtil.DelayedContainerWithTimestamp<Procedure<?>> {
|
||||||
|
public DelayedProcedure(Procedure<?> procedure) {
|
||||||
|
super(procedure, procedure.getTimeoutTimestamp());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,42 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.procedure2;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Inline Chores (executors internal chores).
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
abstract class InlineChore extends DelayedUtil.DelayedObject implements Runnable {
|
||||||
|
|
||||||
|
private long timeout;
|
||||||
|
|
||||||
|
public abstract int getTimeoutInterval();
|
||||||
|
|
||||||
|
protected void refreshTimeout() {
|
||||||
|
this.timeout = EnvironmentEdgeManager.currentTime() + getTimeoutInterval();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getTimeout() {
|
||||||
|
return timeout;
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,9 +18,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.procedure2;
|
package org.apache.hadoop.hbase.procedure2;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -30,35 +27,35 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
|
||||||
import java.util.concurrent.DelayQueue;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
|
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
|
||||||
import org.apache.hadoop.hbase.log.HBaseMarkers;
|
import org.apache.hadoop.hbase.log.HBaseMarkers;
|
||||||
import org.apache.hadoop.hbase.procedure2.Procedure.LockState;
|
import org.apache.hadoop.hbase.procedure2.Procedure.LockState;
|
||||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
|
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
|
||||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
|
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
|
||||||
import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
|
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
|
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.NonceKey;
|
import org.apache.hadoop.hbase.util.NonceKey;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.apache.yetus.audience.InterfaceStability;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Thread Pool that executes the submitted procedures.
|
* Thread Pool that executes the submitted procedures.
|
||||||
|
@ -83,7 +80,7 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
|
|
||||||
public static final String WORKER_KEEP_ALIVE_TIME_CONF_KEY =
|
public static final String WORKER_KEEP_ALIVE_TIME_CONF_KEY =
|
||||||
"hbase.procedure.worker.keep.alive.time.msec";
|
"hbase.procedure.worker.keep.alive.time.msec";
|
||||||
private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = Long.MAX_VALUE;
|
private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = TimeUnit.MINUTES.toMillis(1);
|
||||||
|
|
||||||
Testing testing = null;
|
Testing testing = null;
|
||||||
public static class Testing {
|
public static class Testing {
|
||||||
|
@ -272,8 +269,9 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
private CopyOnWriteArrayList<WorkerThread> workerThreads;
|
private CopyOnWriteArrayList<WorkerThread> workerThreads;
|
||||||
private TimeoutExecutorThread timeoutExecutor;
|
private TimeoutExecutorThread timeoutExecutor;
|
||||||
private int corePoolSize;
|
private int corePoolSize;
|
||||||
|
private int maxPoolSize;
|
||||||
|
|
||||||
private volatile long keepAliveTime = Long.MAX_VALUE;
|
private volatile long keepAliveTime;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Scheduler/Queue that contains runnable procedures.
|
* Scheduler/Queue that contains runnable procedures.
|
||||||
|
@ -501,7 +499,7 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
* a corrupted procedure is found on replay. otherwise false.
|
* a corrupted procedure is found on replay. otherwise false.
|
||||||
*/
|
*/
|
||||||
public void start(int numThreads, boolean abortOnCorruption) throws IOException {
|
public void start(int numThreads, boolean abortOnCorruption) throws IOException {
|
||||||
if (running.getAndSet(true)) {
|
if (!running.compareAndSet(false, true)) {
|
||||||
LOG.warn("Already running");
|
LOG.warn("Already running");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -509,13 +507,15 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
// We have numThreads executor + one timer thread used for timing out
|
// We have numThreads executor + one timer thread used for timing out
|
||||||
// procedures and triggering periodic procedures.
|
// procedures and triggering periodic procedures.
|
||||||
this.corePoolSize = numThreads;
|
this.corePoolSize = numThreads;
|
||||||
LOG.info("Starting {} Workers (bigger of cpus/4 or 16)", corePoolSize);
|
this.maxPoolSize = 10 * numThreads;
|
||||||
|
LOG.info("Starting {} core workers (bigger of cpus/4 or 16) with max (burst) worker count={}",
|
||||||
|
corePoolSize, maxPoolSize);
|
||||||
|
|
||||||
// Create the Thread Group for the executors
|
// Create the Thread Group for the executors
|
||||||
threadGroup = new ThreadGroup("PEWorkerGroup");
|
threadGroup = new ThreadGroup("PEWorkerGroup");
|
||||||
|
|
||||||
// Create the timeout executor
|
// Create the timeout executor
|
||||||
timeoutExecutor = new TimeoutExecutorThread(threadGroup);
|
timeoutExecutor = new TimeoutExecutorThread(this, threadGroup);
|
||||||
|
|
||||||
// Create the workers
|
// Create the workers
|
||||||
workerId.set(0);
|
workerId.set(0);
|
||||||
|
@ -530,8 +530,8 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
st = EnvironmentEdgeManager.currentTime();
|
st = EnvironmentEdgeManager.currentTime();
|
||||||
store.recoverLease();
|
store.recoverLease();
|
||||||
et = EnvironmentEdgeManager.currentTime();
|
et = EnvironmentEdgeManager.currentTime();
|
||||||
LOG.info(String.format("Recovered %s lease in %s",
|
LOG.info("Recovered {} lease in {}", store.getClass().getSimpleName(),
|
||||||
store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st)));
|
StringUtils.humanTimeDiff(et - st));
|
||||||
|
|
||||||
// start the procedure scheduler
|
// start the procedure scheduler
|
||||||
scheduler.start();
|
scheduler.start();
|
||||||
|
@ -544,13 +544,11 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
st = EnvironmentEdgeManager.currentTime();
|
st = EnvironmentEdgeManager.currentTime();
|
||||||
load(abortOnCorruption);
|
load(abortOnCorruption);
|
||||||
et = EnvironmentEdgeManager.currentTime();
|
et = EnvironmentEdgeManager.currentTime();
|
||||||
LOG.info(String.format("Loaded %s in %s, um pid=",
|
LOG.info("Loaded {} in {}", store.getClass().getSimpleName(),
|
||||||
store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st)));
|
StringUtils.humanTimeDiff(et - st));
|
||||||
|
|
||||||
// Start the executors. Here we must have the lastProcId set.
|
// Start the executors. Here we must have the lastProcId set.
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("Start workers {}", workerThreads.size());
|
||||||
LOG.trace("Start workers " + workerThreads.size());
|
|
||||||
}
|
|
||||||
timeoutExecutor.start();
|
timeoutExecutor.start();
|
||||||
for (WorkerThread worker: workerThreads) {
|
for (WorkerThread worker: workerThreads) {
|
||||||
worker.start();
|
worker.start();
|
||||||
|
@ -645,7 +643,7 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
return this.store;
|
return this.store;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ProcedureScheduler getScheduler() {
|
ProcedureScheduler getScheduler() {
|
||||||
return scheduler;
|
return scheduler;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1152,7 +1150,7 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
return procedures.keySet();
|
return procedures.keySet();
|
||||||
}
|
}
|
||||||
|
|
||||||
private Long getRootProcedureId(Procedure proc) {
|
Long getRootProcedureId(Procedure proc) {
|
||||||
return Procedure.getRootProcedureId(procedures, proc);
|
return Procedure.getRootProcedureId(procedures, proc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1699,15 +1697,23 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
sendProcedureFinishedNotification(proc.getProcId());
|
sendProcedureFinishedNotification(proc.getProcId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
RootProcedureState getProcStack(long rootProcId) {
|
||||||
|
return rollbackStack.get(rootProcId);
|
||||||
|
}
|
||||||
|
|
||||||
// ==========================================================================
|
// ==========================================================================
|
||||||
// Worker Thread
|
// Worker Thread
|
||||||
// ==========================================================================
|
// ==========================================================================
|
||||||
private final class WorkerThread extends StoppableThread {
|
private class WorkerThread extends StoppableThread {
|
||||||
private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);
|
private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);
|
||||||
private Procedure activeProcedure;
|
private volatile Procedure<?> activeProcedure;
|
||||||
|
|
||||||
public WorkerThread(final ThreadGroup group) {
|
public WorkerThread(ThreadGroup group) {
|
||||||
super(group, "PEWorker-" + workerId.incrementAndGet());
|
this(group, "PEWorker-");
|
||||||
|
}
|
||||||
|
|
||||||
|
protected WorkerThread(ThreadGroup group, String prefix) {
|
||||||
|
super(group, prefix + workerId.incrementAndGet());
|
||||||
setDaemon(true);
|
setDaemon(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1721,34 +1727,33 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
long lastUpdate = EnvironmentEdgeManager.currentTime();
|
long lastUpdate = EnvironmentEdgeManager.currentTime();
|
||||||
try {
|
try {
|
||||||
while (isRunning() && keepAlive(lastUpdate)) {
|
while (isRunning() && keepAlive(lastUpdate)) {
|
||||||
this.activeProcedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
|
Procedure<?> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
|
||||||
if (this.activeProcedure == null) continue;
|
if (proc == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
this.activeProcedure = proc;
|
||||||
int activeCount = activeExecutorCount.incrementAndGet();
|
int activeCount = activeExecutorCount.incrementAndGet();
|
||||||
int runningCount = store.setRunningProcedureCount(activeCount);
|
int runningCount = store.setRunningProcedureCount(activeCount);
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(),
|
||||||
LOG.trace("Execute pid=" + this.activeProcedure.getProcId() +
|
runningCount, activeCount);
|
||||||
" runningCount=" + runningCount + ", activeCount=" + activeCount);
|
|
||||||
}
|
|
||||||
executionStartTime.set(EnvironmentEdgeManager.currentTime());
|
executionStartTime.set(EnvironmentEdgeManager.currentTime());
|
||||||
try {
|
try {
|
||||||
executeProcedure(this.activeProcedure);
|
executeProcedure(proc);
|
||||||
} catch (AssertionError e) {
|
} catch (AssertionError e) {
|
||||||
LOG.info("ASSERT pid=" + this.activeProcedure.getProcId(), e);
|
LOG.info("ASSERT pid=" + proc.getProcId(), e);
|
||||||
throw e;
|
throw e;
|
||||||
} finally {
|
} finally {
|
||||||
activeCount = activeExecutorCount.decrementAndGet();
|
activeCount = activeExecutorCount.decrementAndGet();
|
||||||
runningCount = store.setRunningProcedureCount(activeCount);
|
runningCount = store.setRunningProcedureCount(activeCount);
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(),
|
||||||
LOG.trace("Halt pid=" + this.activeProcedure.getProcId() +
|
runningCount, activeCount);
|
||||||
" runningCount=" + runningCount + ", activeCount=" + activeCount);
|
|
||||||
}
|
|
||||||
this.activeProcedure = null;
|
this.activeProcedure = null;
|
||||||
lastUpdate = EnvironmentEdgeManager.currentTime();
|
lastUpdate = EnvironmentEdgeManager.currentTime();
|
||||||
executionStartTime.set(Long.MAX_VALUE);
|
executionStartTime.set(Long.MAX_VALUE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOG.warn("Worker terminating UNNATURALLY " + this.activeProcedure, t);
|
LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t);
|
||||||
} finally {
|
} finally {
|
||||||
LOG.trace("Worker terminated.");
|
LOG.trace("Worker terminated.");
|
||||||
}
|
}
|
||||||
|
@ -1768,169 +1773,23 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
return EnvironmentEdgeManager.currentTime() - executionStartTime.get();
|
return EnvironmentEdgeManager.currentTime() - executionStartTime.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean keepAlive(final long lastUpdate) {
|
// core worker never timeout
|
||||||
if (workerThreads.size() <= corePoolSize) return true;
|
protected boolean keepAlive(long lastUpdate) {
|
||||||
return (EnvironmentEdgeManager.currentTime() - lastUpdate) < keepAliveTime;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
// A worker thread which can be added when core workers are stuck. Will timeout after
|
||||||
* Runs task on a period such as check for stuck workers.
|
// keepAliveTime if there is no procedure to run.
|
||||||
* @see InlineChore
|
private final class KeepAliveWorkerThread extends WorkerThread {
|
||||||
*/
|
|
||||||
private final class TimeoutExecutorThread extends StoppableThread {
|
|
||||||
private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<>();
|
|
||||||
|
|
||||||
public TimeoutExecutorThread(final ThreadGroup group) {
|
public KeepAliveWorkerThread(ThreadGroup group) {
|
||||||
super(group, "ProcExecTimeout");
|
super(group, "KeepAlivePEWorker-");
|
||||||
setDaemon(true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sendStopSignal() {
|
protected boolean keepAlive(long lastUpdate) {
|
||||||
queue.add(DelayedUtil.DELAYED_POISON);
|
return EnvironmentEdgeManager.currentTime() - lastUpdate < keepAliveTime;
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
final boolean traceEnabled = LOG.isTraceEnabled();
|
|
||||||
while (isRunning()) {
|
|
||||||
final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue);
|
|
||||||
if (task == null || task == DelayedUtil.DELAYED_POISON) {
|
|
||||||
// the executor may be shutting down,
|
|
||||||
// and the task is just the shutdown request
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (traceEnabled) {
|
|
||||||
LOG.trace("Executing " + task);
|
|
||||||
}
|
|
||||||
|
|
||||||
// execute the task
|
|
||||||
if (task instanceof InlineChore) {
|
|
||||||
execInlineChore((InlineChore)task);
|
|
||||||
} else if (task instanceof DelayedProcedure) {
|
|
||||||
execDelayedProcedure((DelayedProcedure)task);
|
|
||||||
} else {
|
|
||||||
LOG.error("CODE-BUG unknown timeout task type " + task);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void add(final InlineChore chore) {
|
|
||||||
chore.refreshTimeout();
|
|
||||||
queue.add(chore);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void add(final Procedure procedure) {
|
|
||||||
assert procedure.getState() == ProcedureState.WAITING_TIMEOUT;
|
|
||||||
LOG.info("ADDED " + procedure + "; timeout=" + procedure.getTimeout() +
|
|
||||||
", timestamp=" + procedure.getTimeoutTimestamp());
|
|
||||||
queue.add(new DelayedProcedure(procedure));
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean remove(final Procedure procedure) {
|
|
||||||
return queue.remove(new DelayedProcedure(procedure));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void execInlineChore(final InlineChore chore) {
|
|
||||||
chore.run();
|
|
||||||
add(chore);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void execDelayedProcedure(final DelayedProcedure delayed) {
|
|
||||||
// TODO: treat this as a normal procedure, add it to the scheduler and
|
|
||||||
// let one of the workers handle it.
|
|
||||||
// Today we consider ProcedureInMemoryChore as InlineChores
|
|
||||||
final Procedure procedure = delayed.getObject();
|
|
||||||
if (procedure instanceof ProcedureInMemoryChore) {
|
|
||||||
executeInMemoryChore((ProcedureInMemoryChore)procedure);
|
|
||||||
// if the procedure is in a waiting state again, put it back in the queue
|
|
||||||
procedure.updateTimestamp();
|
|
||||||
if (procedure.isWaiting()) {
|
|
||||||
delayed.setTimeout(procedure.getTimeoutTimestamp());
|
|
||||||
queue.add(delayed);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
executeTimedoutProcedure(procedure);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void executeInMemoryChore(final ProcedureInMemoryChore chore) {
|
|
||||||
if (!chore.isWaiting()) return;
|
|
||||||
|
|
||||||
// The ProcedureInMemoryChore is a special case, and it acts as a chore.
|
|
||||||
// instead of bringing the Chore class in, we reuse this timeout thread for
|
|
||||||
// this special case.
|
|
||||||
try {
|
|
||||||
chore.periodicExecute(getEnvironment());
|
|
||||||
} catch (Throwable e) {
|
|
||||||
LOG.error("Ignoring " + chore + " exception: " + e.getMessage(), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void executeTimedoutProcedure(final Procedure proc) {
|
|
||||||
// The procedure received a timeout. if the procedure itself does not handle it,
|
|
||||||
// call abort() and add the procedure back in the queue for rollback.
|
|
||||||
if (proc.setTimeoutFailure(getEnvironment())) {
|
|
||||||
long rootProcId = Procedure.getRootProcedureId(procedures, proc);
|
|
||||||
RootProcedureState procStack = rollbackStack.get(rootProcId);
|
|
||||||
procStack.abort();
|
|
||||||
store.update(proc);
|
|
||||||
scheduler.addFront(proc);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final class DelayedProcedure
|
|
||||||
extends DelayedUtil.DelayedContainerWithTimestamp<Procedure> {
|
|
||||||
public DelayedProcedure(final Procedure procedure) {
|
|
||||||
super(procedure, procedure.getTimeoutTimestamp());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static abstract class StoppableThread extends Thread {
|
|
||||||
public StoppableThread(final ThreadGroup group, final String name) {
|
|
||||||
super(group, name);
|
|
||||||
}
|
|
||||||
|
|
||||||
public abstract void sendStopSignal();
|
|
||||||
|
|
||||||
public void awaitTermination() {
|
|
||||||
try {
|
|
||||||
final long startTime = EnvironmentEdgeManager.currentTime();
|
|
||||||
for (int i = 0; isAlive(); ++i) {
|
|
||||||
sendStopSignal();
|
|
||||||
join(250);
|
|
||||||
// Log every two seconds; send interrupt too.
|
|
||||||
if (i > 0 && (i % 8) == 0) {
|
|
||||||
LOG.warn("Waiting termination of thread " + getName() + ", " +
|
|
||||||
StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime) +
|
|
||||||
"; sending interrupt");
|
|
||||||
interrupt();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
LOG.warn(getName() + " join wait got interrupted", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ==========================================================================
|
|
||||||
// Inline Chores (executors internal chores)
|
|
||||||
// ==========================================================================
|
|
||||||
private static abstract class InlineChore extends DelayedUtil.DelayedObject implements Runnable {
|
|
||||||
private long timeout;
|
|
||||||
|
|
||||||
public abstract int getTimeoutInterval();
|
|
||||||
|
|
||||||
protected void refreshTimeout() {
|
|
||||||
this.timeout = EnvironmentEdgeManager.currentTime() + getTimeoutInterval();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getTimeout() {
|
|
||||||
return timeout;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1982,25 +1841,28 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
|
|
||||||
// WARN the worker is stuck
|
// WARN the worker is stuck
|
||||||
stuckCount++;
|
stuckCount++;
|
||||||
LOG.warn("Worker stuck " + worker +
|
LOG.warn("Worker stuck {} run time {}", worker,
|
||||||
" run time " + StringUtils.humanTimeDiff(worker.getCurrentRunTime()));
|
StringUtils.humanTimeDiff(worker.getCurrentRunTime()));
|
||||||
}
|
}
|
||||||
return stuckCount;
|
return stuckCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkThreadCount(final int stuckCount) {
|
private void checkThreadCount(final int stuckCount) {
|
||||||
// nothing to do if there are no runnable tasks
|
// nothing to do if there are no runnable tasks
|
||||||
if (stuckCount < 1 || !scheduler.hasRunnables()) return;
|
if (stuckCount < 1 || !scheduler.hasRunnables()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// add a new thread if the worker stuck percentage exceed the threshold limit
|
// add a new thread if the worker stuck percentage exceed the threshold limit
|
||||||
// and every handler is active.
|
// and every handler is active.
|
||||||
final float stuckPerc = ((float) stuckCount) / workerThreads.size();
|
final float stuckPerc = ((float) stuckCount) / workerThreads.size();
|
||||||
if (stuckPerc >= addWorkerStuckPercentage &&
|
// let's add new worker thread more aggressively, as they will timeout finally if there is no
|
||||||
activeExecutorCount.get() == workerThreads.size()) {
|
// work to do.
|
||||||
final WorkerThread worker = new WorkerThread(threadGroup);
|
if (stuckPerc >= addWorkerStuckPercentage && workerThreads.size() < maxPoolSize) {
|
||||||
|
final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup);
|
||||||
workerThreads.add(worker);
|
workerThreads.add(worker);
|
||||||
worker.start();
|
worker.start();
|
||||||
LOG.debug("Added new worker thread " + worker);
|
LOG.debug("Added new worker thread {}", worker);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,54 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.procedure2;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
abstract class StoppableThread extends Thread {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(StoppableThread.class);
|
||||||
|
|
||||||
|
public StoppableThread(final ThreadGroup group, final String name) {
|
||||||
|
super(group, name);
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract void sendStopSignal();
|
||||||
|
|
||||||
|
public void awaitTermination() {
|
||||||
|
try {
|
||||||
|
final long startTime = EnvironmentEdgeManager.currentTime();
|
||||||
|
for (int i = 0; isAlive(); ++i) {
|
||||||
|
sendStopSignal();
|
||||||
|
join(250);
|
||||||
|
// Log every two seconds; send interrupt too.
|
||||||
|
if (i > 0 && (i % 8) == 0) {
|
||||||
|
LOG.warn("Waiting termination of thread {}, {}; sending interrupt", getName(),
|
||||||
|
StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime));
|
||||||
|
interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.warn("{} join wait got interrupted", getName(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,140 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.procedure2;
|
||||||
|
|
||||||
|
import java.util.concurrent.DelayQueue;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Runs task on a period such as check for stuck workers.
|
||||||
|
* @see InlineChore
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class TimeoutExecutorThread extends StoppableThread {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(TimeoutExecutorThread.class);
|
||||||
|
|
||||||
|
private final ProcedureExecutor<?> executor;
|
||||||
|
|
||||||
|
private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<>();
|
||||||
|
|
||||||
|
public TimeoutExecutorThread(ProcedureExecutor<?> executor, ThreadGroup group) {
|
||||||
|
super(group, "ProcExecTimeout");
|
||||||
|
setDaemon(true);
|
||||||
|
this.executor = executor;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void sendStopSignal() {
|
||||||
|
queue.add(DelayedUtil.DELAYED_POISON);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
while (executor.isRunning()) {
|
||||||
|
final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue);
|
||||||
|
if (task == null || task == DelayedUtil.DELAYED_POISON) {
|
||||||
|
// the executor may be shutting down,
|
||||||
|
// and the task is just the shutdown request
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
LOG.trace("Executing {}", task);
|
||||||
|
|
||||||
|
// execute the task
|
||||||
|
if (task instanceof InlineChore) {
|
||||||
|
execInlineChore((InlineChore) task);
|
||||||
|
} else if (task instanceof DelayedProcedure) {
|
||||||
|
execDelayedProcedure((DelayedProcedure) task);
|
||||||
|
} else {
|
||||||
|
LOG.error("CODE-BUG unknown timeout task type {}", task);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void add(InlineChore chore) {
|
||||||
|
chore.refreshTimeout();
|
||||||
|
queue.add(chore);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void add(Procedure<?> procedure) {
|
||||||
|
assert procedure.getState() == ProcedureState.WAITING_TIMEOUT;
|
||||||
|
LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(),
|
||||||
|
procedure.getTimeoutTimestamp());
|
||||||
|
queue.add(new DelayedProcedure(procedure));
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean remove(Procedure<?> procedure) {
|
||||||
|
return queue.remove(new DelayedProcedure(procedure));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void execInlineChore(InlineChore chore) {
|
||||||
|
chore.run();
|
||||||
|
add(chore);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void execDelayedProcedure(DelayedProcedure delayed) {
|
||||||
|
// TODO: treat this as a normal procedure, add it to the scheduler and
|
||||||
|
// let one of the workers handle it.
|
||||||
|
// Today we consider ProcedureInMemoryChore as InlineChores
|
||||||
|
Procedure<?> procedure = delayed.getObject();
|
||||||
|
if (procedure instanceof ProcedureInMemoryChore) {
|
||||||
|
executeInMemoryChore((ProcedureInMemoryChore) procedure);
|
||||||
|
// if the procedure is in a waiting state again, put it back in the queue
|
||||||
|
procedure.updateTimestamp();
|
||||||
|
if (procedure.isWaiting()) {
|
||||||
|
delayed.setTimeout(procedure.getTimeoutTimestamp());
|
||||||
|
queue.add(delayed);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
executeTimedoutProcedure(procedure);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void executeInMemoryChore(ProcedureInMemoryChore chore) {
|
||||||
|
if (!chore.isWaiting()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// The ProcedureInMemoryChore is a special case, and it acts as a chore.
|
||||||
|
// instead of bringing the Chore class in, we reuse this timeout thread for
|
||||||
|
// this special case.
|
||||||
|
try {
|
||||||
|
chore.periodicExecute(executor.getEnvironment());
|
||||||
|
} catch (Throwable e) {
|
||||||
|
LOG.error("Ignoring {} exception: {}", chore, e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void executeTimedoutProcedure(Procedure proc) {
|
||||||
|
// The procedure received a timeout. if the procedure itself does not handle it,
|
||||||
|
// call abort() and add the procedure back in the queue for rollback.
|
||||||
|
if (proc.setTimeoutFailure(executor.getEnvironment())) {
|
||||||
|
long rootProcId = executor.getRootProcedureId(proc);
|
||||||
|
RootProcedureState procStack = executor.getProcStack(rootProcId);
|
||||||
|
procStack.abort();
|
||||||
|
executor.getStore().update(proc);
|
||||||
|
executor.getScheduler().addFront(proc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1213,11 +1213,10 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
configurationManager.registerObserver(procEnv);
|
configurationManager.registerObserver(procEnv);
|
||||||
|
|
||||||
int cpus = Runtime.getRuntime().availableProcessors();
|
int cpus = Runtime.getRuntime().availableProcessors();
|
||||||
final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
|
final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, Math.max(
|
||||||
Math.max((cpus > 0? cpus/4: 0),
|
(cpus > 0 ? cpus / 4 : 0), MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
|
||||||
MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
|
final boolean abortOnCorruption =
|
||||||
final boolean abortOnCorruption = conf.getBoolean(
|
conf.getBoolean(MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
|
||||||
MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
|
|
||||||
MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
|
MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
|
||||||
procedureStore.start(numThreads);
|
procedureStore.start(numThreads);
|
||||||
procedureExecutor.start(numThreads, abortOnCorruption);
|
procedureExecutor.start(numThreads, abortOnCorruption);
|
||||||
|
@ -3522,7 +3521,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
public boolean recoverMeta() throws IOException {
|
public boolean recoverMeta() throws IOException {
|
||||||
ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0);
|
ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0);
|
||||||
LOG.info("Running RecoverMetaProcedure to ensure proper hbase:meta deploy.");
|
LOG.info("Running RecoverMetaProcedure to ensure proper hbase:meta deploy.");
|
||||||
long procId = procedureExecutor.submitProcedure(new RecoverMetaProcedure(null, true, latch));
|
procedureExecutor.submitProcedure(new RecoverMetaProcedure(null, true, latch));
|
||||||
latch.await();
|
latch.await();
|
||||||
LOG.info("hbase:meta deployed at=" +
|
LOG.info("hbase:meta deployed at=" +
|
||||||
getMetaTableLocator().getMetaRegionLocation(getZooKeeper()));
|
getMetaTableLocator().getMetaRegionLocation(getZooKeeper()));
|
||||||
|
|
|
@ -0,0 +1,180 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master.procedure;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.client.Durability;
|
||||||
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test to ensure that the priority for procedures and stuck checker can partially solve the problem
|
||||||
|
* describe in HBASE-19976, that is, RecoverMetaProcedure can finally be executed within a certain
|
||||||
|
* period of time.
|
||||||
|
*/
|
||||||
|
@Category({ MasterTests.class, LargeTests.class })
|
||||||
|
public class TestProcedurePriority {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestProcedurePriority.class);
|
||||||
|
|
||||||
|
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
private static String TABLE_NAME_PREFIX = "TestProcedurePriority-";
|
||||||
|
|
||||||
|
private static byte[] CF = Bytes.toBytes("cf");
|
||||||
|
|
||||||
|
private static byte[] CQ = Bytes.toBytes("cq");
|
||||||
|
|
||||||
|
private static int CORE_POOL_SIZE;
|
||||||
|
|
||||||
|
private static int TABLE_COUNT;
|
||||||
|
|
||||||
|
private static volatile boolean FAIL = false;
|
||||||
|
|
||||||
|
public static final class MyCP implements RegionObserver, RegionCoprocessor {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Optional<RegionObserver> getRegionObserver() {
|
||||||
|
return Optional.of(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
|
||||||
|
List<Cell> result) throws IOException {
|
||||||
|
if (FAIL && c.getEnvironment().getRegionInfo().isMetaRegion()) {
|
||||||
|
throw new IOException("Inject error");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
|
||||||
|
Durability durability) throws IOException {
|
||||||
|
if (FAIL && c.getEnvironment().getRegionInfo().isMetaRegion()) {
|
||||||
|
throw new IOException("Inject error");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws Exception {
|
||||||
|
UTIL.getConfiguration().setLong(ProcedureExecutor.WORKER_KEEP_ALIVE_TIME_CONF_KEY, 5000);
|
||||||
|
UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 4);
|
||||||
|
UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, MyCP.class.getName());
|
||||||
|
UTIL.startMiniCluster(3);
|
||||||
|
CORE_POOL_SIZE =
|
||||||
|
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor().getCorePoolSize();
|
||||||
|
TABLE_COUNT = 50 * CORE_POOL_SIZE;
|
||||||
|
List<Future<?>> futures = new ArrayList<>();
|
||||||
|
for (int i = 0; i < TABLE_COUNT; i++) {
|
||||||
|
futures.add(UTIL.getAdmin().createTableAsync(
|
||||||
|
TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE_NAME_PREFIX + i))
|
||||||
|
.addColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)).build(),
|
||||||
|
null));
|
||||||
|
}
|
||||||
|
for (Future<?> future : futures) {
|
||||||
|
future.get(1, TimeUnit.MINUTES);
|
||||||
|
}
|
||||||
|
UTIL.getAdmin().balance(true);
|
||||||
|
UTIL.waitUntilNoRegionsInTransition();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() throws Exception {
|
||||||
|
UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test() throws Exception {
|
||||||
|
RegionServerThread rsWithMetaThread = UTIL.getMiniHBaseCluster().getRegionServerThreads()
|
||||||
|
.stream().filter(t -> !t.getRegionServer().getRegions(TableName.META_TABLE_NAME).isEmpty())
|
||||||
|
.findAny().get();
|
||||||
|
HRegionServer rsNoMeta = UTIL.getOtherRegionServer(rsWithMetaThread.getRegionServer());
|
||||||
|
FAIL = true;
|
||||||
|
UTIL.getMiniHBaseCluster().killRegionServer(rsNoMeta.getServerName());
|
||||||
|
// wait until all the worker thread are stuck, which means that the stuck checker will start to
|
||||||
|
// add new worker thread.
|
||||||
|
ProcedureExecutor<?> executor =
|
||||||
|
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
|
||||||
|
UTIL.waitFor(60000, new ExplainingPredicate<Exception>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean evaluate() throws Exception {
|
||||||
|
return executor.getWorkerThreadCount() > CORE_POOL_SIZE;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String explainFailure() throws Exception {
|
||||||
|
return "Stuck checker does not add new worker thread";
|
||||||
|
}
|
||||||
|
});
|
||||||
|
UTIL.getMiniHBaseCluster().killRegionServer(rsWithMetaThread.getRegionServer().getServerName());
|
||||||
|
rsWithMetaThread.join();
|
||||||
|
FAIL = false;
|
||||||
|
// verify that the cluster is back
|
||||||
|
UTIL.waitUntilNoRegionsInTransition(60000);
|
||||||
|
for (int i = 0; i < TABLE_COUNT; i++) {
|
||||||
|
try (Table table = UTIL.getConnection().getTable(TableName.valueOf(TABLE_NAME_PREFIX + i))) {
|
||||||
|
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean evaluate() throws Exception {
|
||||||
|
return executor.getWorkerThreadCount() == CORE_POOL_SIZE;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String explainFailure() throws Exception {
|
||||||
|
return "The new workers do not timeout";
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue