HBASE-16846 Procedure v2 - executor cleanup

This commit is contained in:
Matteo Bertozzi 2016-10-17 10:23:33 -07:00
parent c8e9a295c1
commit c6e9dabe62
14 changed files with 1336 additions and 613 deletions

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.procedure2;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
@ -123,17 +122,26 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
return poll(unit.toNanos(timeout));
}
public Procedure poll(long nanos) {
final boolean waitForever = (nanos < 0);
@edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
public Procedure poll(final long nanos) {
schedLock();
try {
while (!queueHasRunnables()) {
if (!running) return null;
if (waitForever) {
if (!running) {
LOG.debug("the scheduler is not running");
return null;
}
if (!queueHasRunnables()) {
// WA_AWAIT_NOT_IN_LOOP: we are not in a loop because we want the caller
// to take decisions after a wake/interruption.
if (nanos < 0) {
schedWaitCond.await();
} else {
if (nanos <= 0) return null;
nanos = schedWaitCond.awaitNanos(nanos);
schedWaitCond.awaitNanos(nanos);
}
if (!queueHasRunnables()) {
nullPollCalls++;
return null;
}
}

View File

@ -57,27 +57,31 @@ import com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
protected static final long NO_PROC_ID = -1;
protected static final int NO_TIMEOUT = -1;
// unchanged after initialization
private NonceKey nonceKey = null;
private String owner = null;
private Long parentProcId = null;
private Long procId = null;
private long parentProcId = NO_PROC_ID;
private long rootProcId = NO_PROC_ID;
private long procId = NO_PROC_ID;
private long startTime;
// runtime state, updated every operation
private ProcedureState state = ProcedureState.INITIALIZING;
private Integer timeout = null;
private RemoteProcedureException exception = null;
private int[] stackIndexes = null;
private int childrenLatch = 0;
private long lastUpdate;
private volatile int timeout = NO_TIMEOUT;
private volatile long lastUpdate;
private volatile byte[] result = null;
// TODO: it will be nice having pointers to allow the scheduler doing suspend/resume tricks
private boolean suspended = false;
private RemoteProcedureException exception = null;
private byte[] result = null;
private NonceKey nonceKey = null;
/**
* The main code of the procedure. It must be idempotent since execute()
* may be called multiple time in case of machine failure in the middle
@ -235,13 +239,11 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
* @return the StringBuilder
*/
protected StringBuilder toStringSimpleSB() {
StringBuilder sb = new StringBuilder();
final StringBuilder sb = new StringBuilder();
toStringClassDetails(sb);
if (procId != null) {
sb.append(" id=");
sb.append(getProcId());
}
sb.append(" id=");
sb.append(getProcId());
if (hasParent()) {
sb.append(" parent=");
@ -256,6 +258,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
sb.append(" state=");
toStringState(sb);
if (hasException()) {
sb.append(" failed=" + getException());
}
return sb;
}
@ -264,7 +270,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
* details
*/
public String toStringDetails() {
StringBuilder sb = toStringSimpleSB();
final StringBuilder sb = toStringSimpleSB();
sb.append(" startTime=");
sb.append(getStartTime());
@ -272,7 +278,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
sb.append(" lastUpdate=");
sb.append(getLastUpdate());
int[] stackIndices = getStackIndexes();
final int[] stackIndices = getStackIndexes();
if (stackIndices != null) {
sb.append("\n");
sb.append("stackIndexes=");
@ -285,7 +291,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
protected String toStringClass() {
StringBuilder sb = new StringBuilder();
toStringClassDetails(sb);
return sb.toString();
}
@ -309,6 +314,156 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
builder.append(getClass().getName());
}
// ==========================================================================
// Those fields are unchanged after initialization.
//
// Each procedure will get created from the user or during
// ProcedureExecutor.start() during the load() phase and then submitted
// to the executor. these fields will never be changed after initialization
// ==========================================================================
public long getProcId() {
return procId;
}
public boolean hasParent() {
return parentProcId != NO_PROC_ID;
}
public long getParentProcId() {
return parentProcId;
}
public long getRootProcId() {
return rootProcId;
}
public NonceKey getNonceKey() {
return nonceKey;
}
public long getStartTime() {
return startTime;
}
public String getOwner() {
return owner;
}
public boolean hasOwner() {
return owner != null;
}
/**
* Called by the ProcedureExecutor to assign the ID to the newly created procedure.
*/
@VisibleForTesting
@InterfaceAudience.Private
protected void setProcId(final long procId) {
this.procId = procId;
this.startTime = EnvironmentEdgeManager.currentTime();
setState(ProcedureState.RUNNABLE);
}
/**
* Called by the ProcedureExecutor to assign the parent to the newly created procedure.
*/
@InterfaceAudience.Private
protected void setParentProcId(final long parentProcId) {
this.parentProcId = parentProcId;
}
@InterfaceAudience.Private
protected void setRootProcId(final long rootProcId) {
this.rootProcId = rootProcId;
}
/**
* Called by the ProcedureExecutor to set the value to the newly created procedure.
*/
@VisibleForTesting
@InterfaceAudience.Private
protected void setNonceKey(final NonceKey nonceKey) {
this.nonceKey = nonceKey;
}
@VisibleForTesting
@InterfaceAudience.Private
public void setOwner(final String owner) {
this.owner = StringUtils.isEmpty(owner) ? null : owner;
}
/**
* Called on store load to initialize the Procedure internals after
* the creation/deserialization.
*/
@InterfaceAudience.Private
protected void setStartTime(final long startTime) {
this.startTime = startTime;
}
// ==========================================================================
// runtime state - timeout related
// ==========================================================================
/**
* @param timeout timeout interval in msec
*/
protected void setTimeout(final int timeout) {
this.timeout = timeout;
}
public boolean hasTimeout() {
return timeout != NO_TIMEOUT;
}
/**
* @return the timeout in msec
*/
public int getTimeout() {
return timeout;
}
/**
* Called on store load to initialize the Procedure internals after
* the creation/deserialization.
*/
@InterfaceAudience.Private
protected void setLastUpdate(final long lastUpdate) {
this.lastUpdate = lastUpdate;
}
/**
* Called by ProcedureExecutor after each time a procedure step is executed.
*/
@InterfaceAudience.Private
protected void updateTimestamp() {
this.lastUpdate = EnvironmentEdgeManager.currentTime();
}
public long getLastUpdate() {
return lastUpdate;
}
/**
* Timeout of the next timeout.
* Called by the ProcedureExecutor if the procedure has timeout set and
* the procedure is in the waiting queue.
* @return the timestamp of the next timeout.
*/
@InterfaceAudience.Private
protected long getTimeoutTimestamp() {
return getLastUpdate() + getTimeout();
}
// ==========================================================================
// runtime state
// ==========================================================================
/**
* @return the time elapsed between the last update and the start time of the procedure.
*/
public long elapsedTime() {
return getLastUpdate() - getStartTime();
}
/**
* @return the serialized result if any, otherwise null
*/
@ -324,28 +479,30 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
this.result = result;
}
public long getProcId() {
return procId;
// ==============================================================================================
// Runtime state, updated every operation by the ProcedureExecutor
//
// There is always 1 thread at the time operating on the state of the procedure.
// The ProcedureExecutor may check and set states, or some Procecedure may
// update its own state. but no concurrent updates. we use synchronized here
// just because the procedure can get scheduled on different executor threads on each step.
// ==============================================================================================
/**
* @return true if the procedure is in a suspended state,
* waiting for the resources required to execute the procedure will become available.
*/
public synchronized boolean isSuspended() {
return suspended;
}
public boolean hasParent() {
return parentProcId != null;
public synchronized void suspend() {
suspended = true;
}
public boolean hasException() {
return exception != null;
}
public boolean hasTimeout() {
return timeout != null;
}
public long getParentProcId() {
return parentProcId.longValue();
}
public NonceKey getNonceKey() {
return nonceKey;
public synchronized void resume() {
assert isSuspended() : this + " expected suspended state, got " + state;
suspended = false;
}
/**
@ -404,74 +561,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
return false;
}
/**
* @return true if the procedure is in a suspended state,
* waiting for the resources required to execute the procedure will become available.
*/
public synchronized boolean isSuspended() {
return suspended;
}
public synchronized void suspend() {
suspended = true;
}
public synchronized void resume() {
assert isSuspended() : this + " expected suspended state, got " + state;
suspended = false;
}
public synchronized RemoteProcedureException getException() {
return exception;
}
public long getStartTime() {
return startTime;
}
public synchronized long getLastUpdate() {
return lastUpdate;
}
public synchronized long elapsedTime() {
return lastUpdate - startTime;
}
/**
* @param timeout timeout in msec
*/
protected void setTimeout(final int timeout) {
this.timeout = timeout;
}
/**
* @return the timeout in msec
*/
public int getTimeout() {
return timeout.intValue();
}
/**
* @return the remaining time before the timeout
*/
public long getTimeRemaining() {
return Math.max(0, timeout - (EnvironmentEdgeManager.currentTime() - startTime));
}
@VisibleForTesting
@InterfaceAudience.Private
public void setOwner(final String owner) {
this.owner = StringUtils.isEmpty(owner) ? null : owner;
}
public String getOwner() {
return owner;
}
public boolean hasOwner() {
return owner != null;
}
@VisibleForTesting
@InterfaceAudience.Private
protected synchronized void setState(final ProcedureState state) {
@ -514,101 +603,12 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
return false;
}
/**
* Called by the ProcedureExecutor to assign the ID to the newly created procedure.
*/
@VisibleForTesting
@InterfaceAudience.Private
protected void setProcId(final long procId) {
this.procId = procId;
this.startTime = EnvironmentEdgeManager.currentTime();
setState(ProcedureState.RUNNABLE);
public synchronized boolean hasException() {
return exception != null;
}
/**
* Called by the ProcedureExecutor to assign the parent to the newly created procedure.
*/
@InterfaceAudience.Private
protected void setParentProcId(final long parentProcId) {
this.parentProcId = parentProcId;
}
/**
* Called by the ProcedureExecutor to set the value to the newly created procedure.
*/
@VisibleForTesting
@InterfaceAudience.Private
protected void setNonceKey(final NonceKey nonceKey) {
this.nonceKey = nonceKey;
}
/**
* Internal method called by the ProcedureExecutor that starts the
* user-level code execute().
*/
@InterfaceAudience.Private
protected Procedure[] doExecute(final TEnvironment env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
try {
updateTimestamp();
return execute(env);
} finally {
updateTimestamp();
}
}
/**
* Internal method called by the ProcedureExecutor that starts the
* user-level code rollback().
*/
@InterfaceAudience.Private
protected void doRollback(final TEnvironment env)
throws IOException, InterruptedException {
try {
updateTimestamp();
rollback(env);
} finally {
updateTimestamp();
}
}
/**
* Internal method called by the ProcedureExecutor that starts the
* user-level code acquireLock().
*/
@InterfaceAudience.Private
protected boolean doAcquireLock(final TEnvironment env) {
return acquireLock(env);
}
/**
* Internal method called by the ProcedureExecutor that starts the
* user-level code releaseLock().
*/
@InterfaceAudience.Private
protected void doReleaseLock(final TEnvironment env) {
releaseLock(env);
}
/**
* Called on store load to initialize the Procedure internals after
* the creation/deserialization.
*/
@InterfaceAudience.Private
protected void setStartTime(final long startTime) {
this.startTime = startTime;
}
/**
* Called on store load to initialize the Procedure internals after
* the creation/deserialization.
*/
protected synchronized void setLastUpdate(final long lastUpdate) {
this.lastUpdate = lastUpdate;
}
protected synchronized void updateTimestamp() {
this.lastUpdate = EnvironmentEdgeManager.currentTime();
public synchronized RemoteProcedureException getException() {
return exception;
}
/**
@ -629,8 +629,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
}
/**
* Called by the ProcedureExecutor to notify that one of the sub-procedures
* has completed.
* Called by the ProcedureExecutor to notify that one of the sub-procedures has completed.
*/
@InterfaceAudience.Private
protected synchronized boolean childrenCountDown() {
@ -643,6 +642,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
return childrenLatch > 0;
}
@InterfaceAudience.Private
protected synchronized int getChildrenLatch() {
return childrenLatch;
}
@ -695,12 +695,63 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
return stackIndexes;
}
// ==========================================================================
// Internal methods - called by the ProcedureExecutor
// ==========================================================================
/**
* Internal method called by the ProcedureExecutor that starts the user-level code execute().
*/
@InterfaceAudience.Private
protected Procedure[] doExecute(final TEnvironment env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
try {
updateTimestamp();
return execute(env);
} finally {
updateTimestamp();
}
}
/**
* Internal method called by the ProcedureExecutor that starts the user-level code rollback().
*/
@InterfaceAudience.Private
protected void doRollback(final TEnvironment env)
throws IOException, InterruptedException {
try {
updateTimestamp();
rollback(env);
} finally {
updateTimestamp();
}
}
/**
* Internal method called by the ProcedureExecutor that starts the user-level code acquireLock().
*/
@InterfaceAudience.Private
protected boolean doAcquireLock(final TEnvironment env) {
return acquireLock(env);
}
/**
* Internal method called by the ProcedureExecutor that starts the user-level code releaseLock().
*/
@InterfaceAudience.Private
protected void doReleaseLock(final TEnvironment env) {
releaseLock(env);
}
@Override
public int compareTo(final Procedure other) {
long diff = getProcId() - other.getProcId();
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
return Long.compare(getProcId(), other.getProcId());
}
// ==========================================================================
// misc utils
// ==========================================================================
/**
* Get an hashcode for the specified Procedure ID
* @return the hashcode for the specified procId

View File

@ -0,0 +1,150 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.util;
import java.util.Objects;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public final class DelayedUtil {
private DelayedUtil() { }
public interface DelayedWithTimeout extends Delayed {
long getTimeoutTimestamp();
}
public static final DelayedWithTimeout DELAYED_POISON = new DelayedWithTimeout() {
@Override
public long getTimeoutTimestamp() {
return 0;
}
@Override
public long getDelay(final TimeUnit unit) {
return 0;
}
@Override
public int compareTo(final Delayed o) {
return Long.compare(0, DelayedUtil.getTimeoutTimestamp(o));
}
@Override
public boolean equals(final Object other) {
return this == other;
}
@Override
public String toString() {
return getClass().getSimpleName() + "(POISON)";
}
};
public static <E extends Delayed> E takeWithoutInterrupt(final DelayQueue<E> queue) {
try {
return queue.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
public static long getRemainingTime(final TimeUnit resultUnit, final long timeoutTime) {
final long currentTime = EnvironmentEdgeManager.currentTime();
if (currentTime >= timeoutTime) {
return 0;
}
return resultUnit.convert(timeoutTime - currentTime, TimeUnit.MILLISECONDS);
}
public static int compareDelayed(final Delayed o1, final Delayed o2) {
return Long.compare(getTimeoutTimestamp(o1), getTimeoutTimestamp(o2));
}
private static long getTimeoutTimestamp(final Delayed o) {
assert o instanceof DelayedWithTimeout : "expected DelayedWithTimeout instance, got " + o;
return ((DelayedWithTimeout)o).getTimeoutTimestamp();
}
public static abstract class DelayedObject implements DelayedWithTimeout {
@Override
public long getDelay(final TimeUnit unit) {
return DelayedUtil.getRemainingTime(unit, getTimeoutTimestamp());
}
@Override
public int compareTo(final Delayed other) {
return DelayedUtil.compareDelayed(this, other);
}
}
public static abstract class DelayedContainer<T> extends DelayedObject {
private final T object;
public DelayedContainer(final T object) {
this.object = object;
}
public T getObject() {
return this.object;
}
@Override
public boolean equals(final Object other) {
if (other == this) return true;
if (!(other instanceof DelayedContainer)) return false;
return Objects.equals(getObject(), ((DelayedContainer)other).getObject());
}
@Override
public int hashCode() {
return object != null ? object.hashCode() : 0;
}
@Override
public String toString() {
return getClass().getSimpleName() + "(" + getObject() + ")";
}
}
public static class DelayedContainerWithTimestamp<T> extends DelayedContainer<T> {
private long timeoutTimestamp;
public DelayedContainerWithTimestamp(final T object, final long timeoutTimestamp) {
super(object);
setTimeoutTimestamp(timeoutTimestamp);
}
@Override
public long getTimeoutTimestamp() {
return timeoutTimestamp;
}
public void setTimeoutTimestamp(final long timeoutTimestamp) {
this.timeoutTimestamp = timeoutTimestamp;
}
}
}

View File

@ -74,8 +74,8 @@ public class ProcedureTestingUtility {
public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
Runnable beforeStartAction, boolean failOnCorrupted) throws Exception {
ProcedureStore procStore = procExecutor.getStore();
int storeThreads = procExecutor.getNumThreads();
int execThreads = procExecutor.getNumThreads();
int storeThreads = procExecutor.getCorePoolSize();
int execThreads = procExecutor.getCorePoolSize();
// stop
procExecutor.stop();
procExecutor.join();

View File

@ -0,0 +1,171 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
@Category({MasterTests.class, SmallTests.class})
public class TestProcedureExecutor {
private static final Log LOG = LogFactory.getLog(TestProcedureExecutor.class);
private TestProcEnv procEnv;
private NoopProcedureStore procStore;
private ProcedureExecutor<TestProcEnv> procExecutor;
private HBaseCommonTestingUtility htu;
@Before
public void setUp() throws Exception {
htu = new HBaseCommonTestingUtility();
// NOTE: The executor will be created by each test
procEnv = new TestProcEnv();
procStore = new NoopProcedureStore();
procStore.start(1);
}
@After
public void tearDown() throws Exception {
procExecutor.stop();
procStore.stop(false);
procExecutor.join();
}
private void createNewExecutor(final Configuration conf, final int numThreads) throws Exception {
procExecutor = new ProcedureExecutor(conf, procEnv, procStore);
procExecutor.start(numThreads, true);
}
@Test(timeout=60000)
public void testWorkerStuck() throws Exception {
// replace the executor
final Configuration conf = new Configuration(htu.getConfiguration());
conf.setFloat("hbase.procedure.worker.add.stuck.percentage", 0.5f);
conf.setInt("hbase.procedure.worker.monitor.interval.msec", 500);
conf.setInt("hbase.procedure.worker.stuck.threshold.msec", 750);
final int NUM_THREADS = 2;
createNewExecutor(conf, NUM_THREADS);
Semaphore latch1 = new Semaphore(2);
latch1.acquire(2);
BusyWaitProcedure busyProc1 = new BusyWaitProcedure(latch1);
Semaphore latch2 = new Semaphore(2);
latch2.acquire(2);
BusyWaitProcedure busyProc2 = new BusyWaitProcedure(latch2);
long busyProcId1 = procExecutor.submitProcedure(busyProc1);
long busyProcId2 = procExecutor.submitProcedure(busyProc2);
long otherProcId = procExecutor.submitProcedure(new NoopProcedure());
// wait until a new worker is being created
int threads1 = waitThreadCount(NUM_THREADS + 1);
LOG.info("new threads got created: " + (threads1 - NUM_THREADS));
assertEquals(NUM_THREADS + 1, threads1);
ProcedureTestingUtility.waitProcedure(procExecutor, otherProcId);
assertEquals(true, procExecutor.isFinished(otherProcId));
ProcedureTestingUtility.assertProcNotFailed(procExecutor, otherProcId);
assertEquals(true, procExecutor.isRunning());
assertEquals(false, procExecutor.isFinished(busyProcId1));
assertEquals(false, procExecutor.isFinished(busyProcId2));
// terminate the busy procedures
latch1.release();
latch2.release();
LOG.info("set keep alive and wait threads being removed");
procExecutor.setKeepAliveTime(500L, TimeUnit.MILLISECONDS);
int threads2 = waitThreadCount(NUM_THREADS);
LOG.info("threads got removed: " + (threads1 - threads2));
assertEquals(NUM_THREADS, threads2);
// terminate the busy procedures
latch1.release();
latch2.release();
// wait for all procs to complete
ProcedureTestingUtility.waitProcedure(procExecutor, busyProcId1);
ProcedureTestingUtility.waitProcedure(procExecutor, busyProcId2);
ProcedureTestingUtility.assertProcNotFailed(procExecutor, busyProcId1);
ProcedureTestingUtility.assertProcNotFailed(procExecutor, busyProcId2);
}
private int waitThreadCount(final int expectedThreads) {
while (procExecutor.isRunning()) {
if (procExecutor.getWorkerThreadCount() == expectedThreads) {
break;
}
LOG.debug("waiting for thread count=" + expectedThreads +
" current=" + procExecutor.getWorkerThreadCount());
Threads.sleepWithoutInterrupt(250);
}
return procExecutor.getWorkerThreadCount();
}
public static class BusyWaitProcedure extends NoopProcedure<TestProcEnv> {
private final Semaphore latch;
public BusyWaitProcedure(final Semaphore latch) {
this.latch = latch;
}
@Override
protected Procedure[] execute(final TestProcEnv env) {
try {
LOG.info("worker started " + this);
if (!latch.tryAcquire(1, 30, TimeUnit.SECONDS)) {
throw new Exception("waited too long");
}
LOG.info("worker step 2 " + this);
if (!latch.tryAcquire(1, 30, TimeUnit.SECONDS)) {
throw new Exception("waited too long");
}
} catch (Exception e) {
LOG.error("got unexpected exception", e);
setFailure("BusyWaitProcedure", e);
}
return null;
}
}
private class TestProcEnv { }
}

View File

@ -76,17 +76,17 @@ public class TestProcedureInMemoryChore {
CountDownLatch latch = new CountDownLatch(nCountDown);
TestLatchChore chore = new TestLatchChore(timeoutMSec, latch);
procExecutor.addChore(chore);
assertTrue(chore.isRunnable());
assertTrue(chore.isWaiting());
latch.await();
// remove the chore and verify it is no longer executed
assertTrue(chore.isRunnable());
assertTrue(chore.isWaiting());
procExecutor.removeChore(chore);
latch = new CountDownLatch(nCountDown);
chore.setLatch(latch);
latch.await(timeoutMSec * nCountDown, TimeUnit.MILLISECONDS);
LOG.info("chore latch count=" + latch.getCount());
assertFalse(chore.isRunnable());
assertFalse(chore.isWaiting());
assertTrue("latchCount=" + latch.getCount(), latch.getCount() > 0);
}
@ -104,6 +104,7 @@ public class TestProcedureInMemoryChore {
@Override
protected void periodicExecute(final TestProcEnv env) {
LOG.info("periodic execute " + this);
latch.countDown();
}
}

View File

@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.util;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
@Category({MasterTests.class, SmallTests.class})
public class TestDelayedUtil {
private static final Log LOG = LogFactory.getLog(TestDelayedUtil.class);
@Test
public void testDelayedContainerEquals() {
Object o1 = new Object();
Object o2 = new Object();
ZeroDelayContainer<Long> lnull = new ZeroDelayContainer(null);
ZeroDelayContainer<Long> l10a = new ZeroDelayContainer<Long>(10L);
ZeroDelayContainer<Long> l10b = new ZeroDelayContainer(10L);
ZeroDelayContainer<Long> l15 = new ZeroDelayContainer(15L);
ZeroDelayContainer<Object> onull = new ZeroDelayContainer<Object>(null);
ZeroDelayContainer<Object> o1ca = new ZeroDelayContainer<Object>(o1);
ZeroDelayContainer<Object> o1cb = new ZeroDelayContainer<Object>(o1);
ZeroDelayContainer<Object> o2c = new ZeroDelayContainer<Object>(o2);
ZeroDelayContainer[] items = new ZeroDelayContainer[] {
lnull, l10a, l10b, l15, onull, o1ca, o1cb, o2c,
};
assertContainersEquals(lnull, items, lnull, onull);
assertContainersEquals(l10a, items, l10a, l10b);
assertContainersEquals(l10b, items, l10a, l10b);
assertContainersEquals(l15, items, l15);
assertContainersEquals(onull, items, lnull, onull);
assertContainersEquals(o1ca, items, o1ca, o1cb);
assertContainersEquals(o1cb, items, o1ca, o1cb);
assertContainersEquals(o2c, items, o2c);
}
private void assertContainersEquals(final ZeroDelayContainer src,
final ZeroDelayContainer[] items, final ZeroDelayContainer... matches) {
for (int i = 0; i < items.length; ++i) {
boolean shouldMatch = false;
for (int j = 0; j < matches.length; ++j) {
if (items[i] == matches[j]) {
shouldMatch = true;
break;
}
}
boolean isMatching = src.equals(items[i]);
assertEquals(src.getObject() + " unexpectedly match " + items[i].getObject(),
shouldMatch, isMatching);
}
}
private static class ZeroDelayContainer<T> extends DelayedUtil.DelayedContainer<T> {
public ZeroDelayContainer(final T object) {
super(object);
}
@Override
public long getTimeoutTimestamp() {
return 0;
}
}
}

View File

@ -1035,6 +1035,7 @@ public class HMaster extends HRegionServer implements MasterServices {
procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
procedureExecutor = new ProcedureExecutor(conf, procEnv, procedureStore,
procEnv.getProcedureQueue());
configurationManager.registerObserver(procEnv);
final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
Math.max(Runtime.getRuntime().availableProcessors(),
@ -1048,6 +1049,7 @@ public class HMaster extends HRegionServer implements MasterServices {
private void stopProcedureExecutor() {
if (procedureExecutor != null) {
configurationManager.deregisterObserver(procedureExecutor.getEnvironment());
procedureExecutor.stop();
}

View File

@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.ipc.RpcServer;
@ -42,7 +43,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class MasterProcedureEnv {
public class MasterProcedureEnv implements ConfigurationObserver {
private static final Log LOG = LogFactory.getLog(MasterProcedureEnv.class);
@InterfaceAudience.Private
@ -161,4 +162,9 @@ public class MasterProcedureEnv {
procSched.suspendEvent(event);
}
}
@Override
public void onConfigurationChange(Configuration conf) {
master.getMasterProcedureExecutor().refreshConfiguration(conf);
}
}

View File

@ -191,6 +191,10 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
serverBuckets[i] = null;
}
// Remove Namespaces
clear(namespaceMap, null, NAMESPACE_QUEUE_KEY_COMPARATOR);
namespaceMap = null;
// Remove Tables
clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR);
tableMap = null;
@ -204,12 +208,12 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
Queue<T> node = AvlTree.getFirst(treeMap);
assert !node.isSuspended() : "can't clear suspended " + node.getKey();
treeMap = AvlTree.remove(treeMap, node.getKey(), comparator);
removeFromRunQueue(fairq, node);
if (fairq != null) removeFromRunQueue(fairq, node);
}
}
@Override
public int queueSize() {
protected int queueSize() {
int count = 0;
// Server queues

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
import org.apache.hadoop.hbase.util.Bytes;
@ -244,24 +245,29 @@ public class MasterProcedureSchedulerPerformanceEvaluation extends AbstractHBase
protected int doWork() throws Exception {
procedureScheduler = new MasterProcedureScheduler(
UTIL.getConfiguration(), new TableLockManager.NullTableLockManager());
procedureScheduler.start();
setupOperations();
final Thread[] threads = new Thread[numThreads];
for (int i = 0; i < numThreads; ++i) {
threads[i] = new AddProcsWorker();
}
final float addBackTime = runThreads(threads) / 1000.0f;
final long addBackTime = runThreads(threads);
System.out.println("Added " + numOps + " procedures to scheduler.");
for (int i = 0; i < numThreads; ++i) {
threads[i] = new PollAndLockWorker();
}
final float pollTime = runThreads(threads) / 1000.0f;
final long pollTime = runThreads(threads);
procedureScheduler.stop();
final float pollTimeSec = pollTime / 1000.0f;
final float addBackTimeSec = addBackTime / 1000.0f;
System.out.println("******************************************");
System.out.println("Time - addBack : " + addBackTime + "sec");
System.out.println("Ops/sec - addBack : " + ((float)numOps / addBackTime));
System.out.println("Time - poll : " + pollTime + "sec");
System.out.println("Ops/sec - poll : " + ((float)numOps / pollTime));
System.out.println("Time - addBack : " + StringUtils.humanTimeDiff(addBackTime));
System.out.println("Ops/sec - addBack : " + StringUtils.humanSize(numOps / addBackTimeSec));
System.out.println("Time - poll : " + StringUtils.humanTimeDiff(pollTime));
System.out.println("Ops/sec - poll : " + StringUtils.humanSize(numOps / pollTimeSec));
System.out.println("Num Operations : " + numOps);
System.out.println();
System.out.println("Completed : " + completed.get());

View File

@ -59,11 +59,13 @@ public class TestMasterProcedureScheduler {
public void setUp() throws IOException {
conf = HBaseConfiguration.create();
queue = new MasterProcedureScheduler(conf, new TableLockManager.NullTableLockManager());
queue.start();
}
@After
public void tearDown() throws IOException {
assertEquals("proc-queue expected to be empty", 0, queue.size());
queue.stop();
queue.clear();
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestTableProcedure;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@ -55,11 +56,13 @@ public class TestMasterProcedureSchedulerConcurrency {
public void setUp() throws IOException {
conf = HBaseConfiguration.create();
queue = new MasterProcedureScheduler(conf, new TableLockManager.NullTableLockManager());
queue.start();
}
@After
public void tearDown() throws IOException {
assertEquals("proc-queue expected to be empty", 0, queue.size());
queue.stop();
queue.clear();
}
@ -207,6 +210,14 @@ public class TestMasterProcedureSchedulerConcurrency {
}
}
@Test(timeout=60000)
public void testMasterProcedureSchedulerPerformanceEvaluation() throws Exception {
// Make sure the tool does not get stuck
MasterProcedureSchedulerPerformanceEvaluation.main(new String[] {
"-num_ops", "1000"
});
}
public static class TestTableProcSet {
private final MasterProcedureScheduler queue;