HBASE-16813 Procedure v2 - Move ProcedureEvent to hbase-procedure module

This commit is contained in:
Matteo Bertozzi 2016-10-12 16:33:25 -07:00
parent dfb2a800c4
commit 92ef234486
18 changed files with 1073 additions and 903 deletions

View File

@ -0,0 +1,311 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
private static final Log LOG = LogFactory.getLog(AbstractProcedureScheduler.class);
private final ReentrantLock schedLock = new ReentrantLock();
private final Condition schedWaitCond = schedLock.newCondition();
private boolean running = false;
// TODO: metrics
private long pollCalls = 0;
private long nullPollCalls = 0;
@Override
public void start() {
schedLock();
try {
running = true;
} finally {
schedUnlock();
}
}
@Override
public void stop() {
schedLock();
try {
running = false;
schedWaitCond.signalAll();
} finally {
schedUnlock();
}
}
@Override
public void signalAll() {
schedLock();
try {
schedWaitCond.signalAll();
} finally {
schedUnlock();
}
}
// ==========================================================================
// Add related
// ==========================================================================
/**
* Add the procedure to the queue.
* NOTE: this method is called with the sched lock held.
* @param procedure the Procedure to add
* @param addFront true if the item should be added to the front of the queue
*/
protected abstract void enqueue(Procedure procedure, boolean addFront);
public void addFront(final Procedure procedure) {
push(procedure, true, true);
}
public void addBack(final Procedure procedure) {
push(procedure, false, true);
}
protected void push(final Procedure procedure, final boolean addFront, final boolean notify) {
schedLock.lock();
try {
enqueue(procedure, addFront);
if (notify) {
schedWaitCond.signal();
}
} finally {
schedLock.unlock();
}
}
// ==========================================================================
// Poll related
// ==========================================================================
/**
* Fetch one Procedure from the queue
* NOTE: this method is called with the sched lock held.
* @return the Procedure to execute, or null if nothing is available.
*/
protected abstract Procedure dequeue();
@Override
public Procedure poll() {
return poll(-1);
}
@Override
public Procedure poll(long timeout, TimeUnit unit) {
return poll(unit.toNanos(timeout));
}
public Procedure poll(long nanos) {
final boolean waitForever = (nanos < 0);
schedLock();
try {
while (!queueHasRunnables()) {
if (!running) return null;
if (waitForever) {
schedWaitCond.await();
} else {
if (nanos <= 0) return null;
nanos = schedWaitCond.awaitNanos(nanos);
}
}
final Procedure pollResult = dequeue();
pollCalls++;
nullPollCalls += (pollResult == null) ? 1 : 0;
return pollResult;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
nullPollCalls++;
return null;
} finally {
schedUnlock();
}
}
// ==========================================================================
// Utils
// ==========================================================================
/**
* Removes all of the elements from the queue
* NOTE: this method is called with the sched lock held.
*/
protected abstract void clearQueue();
/**
* Returns the number of elements in this queue.
* NOTE: this method is called with the sched lock held.
* @return the number of elements in this queue.
*/
protected abstract int queueSize();
/**
* Returns true if there are procedures available to process.
* NOTE: this method is called with the sched lock held.
* @return true if there are procedures available to process, otherwise false.
*/
protected abstract boolean queueHasRunnables();
@Override
public void clear() {
// NOTE: USED ONLY FOR TESTING
schedLock();
try {
clearQueue();
} finally {
schedUnlock();
}
}
@Override
public int size() {
schedLock();
try {
return queueSize();
} finally {
schedUnlock();
}
}
@Override
public boolean hasRunnables() {
schedLock();
try {
return queueHasRunnables();
} finally {
schedUnlock();
}
}
// ============================================================================
// TODO: Metrics
// ============================================================================
public long getPollCalls() {
return pollCalls;
}
public long getNullPollCalls() {
return nullPollCalls;
}
// ==========================================================================
// Procedure Events
// ==========================================================================
@Override
public boolean waitEvent(final ProcedureEvent event, final Procedure procedure) {
synchronized (event) {
if (event.isReady()) {
return false;
}
suspendProcedure(event, procedure);
return true;
}
}
@Override
public void suspendEvent(final ProcedureEvent event) {
final boolean isTraceEnabled = LOG.isTraceEnabled();
synchronized (event) {
event.setReady(false);
if (isTraceEnabled) {
LOG.trace("Suspend event " + event);
}
}
}
@Override
public void wakeEvent(final ProcedureEvent event) {
wakeEvents(1, event);
}
@Override
public void wakeEvents(final int count, final ProcedureEvent... events) {
final boolean isTraceEnabled = LOG.isTraceEnabled();
schedLock();
try {
int waitingCount = 0;
for (int i = 0; i < count; ++i) {
final ProcedureEvent event = events[i];
synchronized (event) {
event.setReady(true);
if (isTraceEnabled) {
LOG.trace("Wake event " + event);
}
waitingCount += popEventWaitingObjects(event);
}
}
wakePollIfNeeded(waitingCount);
} finally {
schedUnlock();
}
}
protected int popEventWaitingObjects(final ProcedureEvent event) {
return popEventWaitingProcedures(event);
}
protected int popEventWaitingProcedures(final ProcedureEventQueue event) {
int count = 0;
while (event.hasWaitingProcedures()) {
wakeProcedure(event.popWaitingProcedure(false));
count++;
}
return count;
}
protected void suspendProcedure(final ProcedureEventQueue event, final Procedure procedure) {
procedure.suspend();
event.suspendProcedure(procedure);
}
protected void wakeProcedure(final Procedure procedure) {
procedure.resume();
push(procedure, /* addFront= */ true, /* notify= */false);
}
// ==========================================================================
// Internal helpers
// ==========================================================================
protected void schedLock() {
schedLock.lock();
}
protected void schedUnlock() {
schedLock.unlock();
}
protected void wakePollIfNeeded(final int waitingCount) {
if (waitingCount > 1) {
schedWaitCond.signalAll();
} else if (waitingCount > 0) {
schedWaitCond.signal();
}
}
}

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Basic ProcedureEvent that contains an "object", which can be a
* description or a reference to the resource to wait on, and a
* queue for suspended procedures.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ProcedureEvent<T> extends ProcedureEventQueue {
private final T object;
private boolean ready = false;
public ProcedureEvent(final T object) {
this.object = object;
}
public T getObject() {
return object;
}
public synchronized boolean isReady() {
return ready;
}
@InterfaceAudience.Private
protected synchronized void setReady(final boolean isReady) {
this.ready = isReady;
}
@Override
public String toString() {
return getClass().getSimpleName() + "(" + object + ")";
}
}

View File

@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayDeque;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Basic queue to store suspended procedures.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ProcedureEventQueue {
private static final Log LOG = LogFactory.getLog(ProcedureEventQueue.class);
private ArrayDeque<Procedure> waitingProcedures = null;
public ProcedureEventQueue() {
}
@InterfaceAudience.Private
public synchronized void suspendProcedure(final Procedure proc) {
if (waitingProcedures == null) {
waitingProcedures = new ArrayDeque<Procedure>();
}
waitingProcedures.addLast(proc);
}
@InterfaceAudience.Private
public synchronized void removeProcedure(final Procedure proc) {
if (waitingProcedures != null) {
waitingProcedures.remove(proc);
}
}
@InterfaceAudience.Private
public synchronized boolean hasWaitingProcedures() {
return waitingProcedures != null;
}
@InterfaceAudience.Private
public synchronized Procedure popWaitingProcedure(final boolean popFront) {
// it will be nice to use IterableList on a procedure and avoid allocations...
Procedure proc = popFront ? waitingProcedures.removeFirst() : waitingProcedures.removeLast();
if (waitingProcedures.isEmpty()) {
waitingProcedures = null;
}
return proc;
}
@VisibleForTesting
public synchronized void clear() {
waitingProcedures = null;
}
@VisibleForTesting
public synchronized int size() {
if (waitingProcedures != null) {
return waitingProcedures.size();
}
return 0;
}
}

View File

@ -243,9 +243,9 @@ public class ProcedureExecutor<TEnvironment> {
new TimeoutBlockingQueue<Procedure>(new ProcedureTimeoutRetriever());
/**
* Queue that contains runnable procedures.
* Scheduler/Queue that contains runnable procedures.
*/
private final ProcedureRunnableSet runnables;
private final ProcedureScheduler scheduler;
// TODO
private final ReentrantLock submitLock = new ReentrantLock();
@ -267,13 +267,13 @@ public class ProcedureExecutor<TEnvironment> {
public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
final ProcedureStore store) {
this(conf, environment, store, new ProcedureSimpleRunQueue());
this(conf, environment, store, new SimpleProcedureScheduler());
}
public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
final ProcedureStore store, final ProcedureRunnableSet runqueue) {
final ProcedureStore store, final ProcedureScheduler scheduler) {
this.environment = environment;
this.runnables = runqueue;
this.scheduler = scheduler;
this.store = store;
this.conf = conf;
this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, true);
@ -284,7 +284,7 @@ public class ProcedureExecutor<TEnvironment> {
Preconditions.checkArgument(rollbackStack.isEmpty());
Preconditions.checkArgument(procedures.isEmpty());
Preconditions.checkArgument(waitingTimeout.isEmpty());
Preconditions.checkArgument(runnables.size() == 0);
Preconditions.checkArgument(scheduler.size() == 0);
store.load(new ProcedureStore.ProcedureLoader() {
@Override
@ -378,7 +378,7 @@ public class ProcedureExecutor<TEnvironment> {
Long rootProcId = getRootProcedureId(proc);
if (rootProcId == null) {
// The 'proc' was ready to run but the root procedure was rolledback?
runnables.addBack(proc);
scheduler.addBack(proc);
continue;
}
@ -410,8 +410,8 @@ public class ProcedureExecutor<TEnvironment> {
break;
case FINISHED:
if (proc.hasException()) {
// add the proc to the runnables to perform the rollback
runnables.addBack(proc);
// add the proc to the scheduler to perform the rollback
scheduler.addBack(proc);
}
break;
case ROLLEDBACK:
@ -446,7 +446,7 @@ public class ProcedureExecutor<TEnvironment> {
throw new IOException("found " + corruptedCount + " procedures on replay");
}
// 4. Push the runnables
// 4. Push the scheduler
if (!runnableList.isEmpty()) {
// TODO: See ProcedureWALFormatReader#hasFastStartSupport
// some procedure may be started way before this stuff.
@ -457,10 +457,10 @@ public class ProcedureExecutor<TEnvironment> {
sendProcedureLoadedNotification(proc.getProcId());
}
if (proc.wasExecuted()) {
runnables.addFront(proc);
scheduler.addFront(proc);
} else {
// if it was not in execution, it can wait.
runnables.addBack(proc);
scheduler.addBack(proc);
}
}
}
@ -514,6 +514,9 @@ public class ProcedureExecutor<TEnvironment> {
LOG.info(String.format("recover procedure store (%s) lease: %s",
store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st)));
// start the procedure scheduler
scheduler.start();
// TODO: Split in two steps.
// TODO: Handle corrupted procedures (currently just a warn)
// The first one will make sure that we have the latest id,
@ -540,7 +543,7 @@ public class ProcedureExecutor<TEnvironment> {
}
LOG.info("Stopping the procedure executor");
runnables.signalAll();
scheduler.stop();
waitingTimeout.signalAll();
}
@ -564,7 +567,7 @@ public class ProcedureExecutor<TEnvironment> {
procedures.clear();
nonceKeysToProcIdsMap.clear();
waitingTimeout.clear();
runnables.clear();
scheduler.clear();
lastProcId.set(-1);
}
@ -698,7 +701,7 @@ public class ProcedureExecutor<TEnvironment> {
assert !procedures.containsKey(currentProcId);
procedures.put(currentProcId, proc);
sendProcedureAddedNotification(currentProcId);
runnables.addBack(proc);
scheduler.addBack(proc);
return currentProcId;
}
@ -810,18 +813,18 @@ public class ProcedureExecutor<TEnvironment> {
return procedures.get(procId);
}
protected ProcedureRunnableSet getRunnableSet() {
return runnables;
protected ProcedureScheduler getScheduler() {
return scheduler;
}
/**
* Execution loop (N threads)
* while the executor is in a running state,
* fetch a procedure from the runnables queue and start the execution.
* fetch a procedure from the scheduler queue and start the execution.
*/
private void execLoop() {
while (isRunning()) {
Procedure proc = runnables.poll();
Procedure proc = scheduler.poll();
if (proc == null) continue;
try {
@ -855,7 +858,7 @@ public class ProcedureExecutor<TEnvironment> {
// we have the 'rollback-lock' we can start rollingback
if (!executeRollback(rootProcId, procStack)) {
procStack.unsetRollback();
runnables.yield(proc);
scheduler.yield(proc);
}
} else {
// if we can't rollback means that some child is still running.
@ -863,7 +866,7 @@ public class ProcedureExecutor<TEnvironment> {
// If the procedure was never executed, remove and mark it as rolledback.
if (!proc.wasExecuted()) {
if (!executeRollback(proc)) {
runnables.yield(proc);
scheduler.yield(proc);
}
}
}
@ -876,7 +879,7 @@ public class ProcedureExecutor<TEnvironment> {
execProcedure(procStack, proc);
releaseLock(proc, false);
} else {
runnables.yield(proc);
scheduler.yield(proc);
}
procStack.release(proc);
@ -965,7 +968,7 @@ public class ProcedureExecutor<TEnvironment> {
RootProcedureState procStack = rollbackStack.get(rootProcId);
procStack.abort();
store.update(proc);
runnables.addFront(proc);
scheduler.addFront(proc);
continue;
} else if (proc.getState() == ProcedureState.WAITING_TIMEOUT) {
waitingTimeout.add(proc);
@ -1124,11 +1127,11 @@ public class ProcedureExecutor<TEnvironment> {
if (LOG.isTraceEnabled()) {
LOG.trace("Yield procedure: " + procedure + ": " + e.getMessage());
}
runnables.yield(procedure);
scheduler.yield(procedure);
return;
} catch (InterruptedException e) {
handleInterruptedException(procedure, e);
runnables.yield(procedure);
scheduler.yield(procedure);
return;
} catch (Throwable e) {
// Catch NullPointerExceptions or similar errors...
@ -1205,7 +1208,7 @@ public class ProcedureExecutor<TEnvironment> {
// if the procedure is kind enough to pass the slot to someone else, yield
if (procedure.getState() == ProcedureState.RUNNABLE &&
procedure.isYieldAfterExecutionStep(getEnvironment())) {
runnables.yield(procedure);
scheduler.yield(procedure);
return;
}
@ -1218,7 +1221,7 @@ public class ProcedureExecutor<TEnvironment> {
Procedure subproc = subprocs[i];
assert !procedures.containsKey(subproc.getProcId());
procedures.put(subproc.getProcId(), subproc);
runnables.addFront(subproc);
scheduler.addFront(subproc);
}
}
@ -1236,7 +1239,7 @@ public class ProcedureExecutor<TEnvironment> {
if (parent.childrenCountDown() && parent.getState() == ProcedureState.WAITING) {
parent.setState(ProcedureState.RUNNABLE);
store.update(parent);
runnables.addFront(parent);
scheduler.addFront(parent);
if (LOG.isTraceEnabled()) {
LOG.trace(parent + " all the children finished their work, resume.");
}
@ -1374,10 +1377,10 @@ public class ProcedureExecutor<TEnvironment> {
// call the runnableSet completion cleanup handler
try {
runnables.completionCleanup(proc);
scheduler.completionCleanup(proc);
} catch (Throwable e) {
// Catch NullPointerExceptions or similar errors...
LOG.error("CODE-BUG: uncatched runtime exception for runnableSet: " + runnables, e);
LOG.error("CODE-BUG: uncatched runtime exception for completion cleanup: " + proc, e);
}
// Notify the listeners

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.procedure2;
import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
@ -28,7 +30,23 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface ProcedureRunnableSet {
public interface ProcedureScheduler {
/**
* Start the scheduler
*/
void start();
/**
* Stop the scheduler
*/
void stop();
/**
* In case the class is blocking on poll() waiting for items to be added,
* this method should awake poll() and poll() should return.
*/
void signalAll();
/**
* Inserts the specified element at the front of this queue.
* @param proc the Procedure to add
@ -55,6 +73,11 @@ public interface ProcedureRunnableSet {
*/
void completionCleanup(Procedure proc);
/**
* @return true if there are procedures available to process, otherwise false.
*/
boolean hasRunnables();
/**
* Fetch one Procedure from the queue
* @return the Procedure to execute, or null if nothing present.
@ -62,20 +85,53 @@ public interface ProcedureRunnableSet {
Procedure poll();
/**
* In case the class is blocking on poll() waiting for items to be added,
* this method should awake poll() and poll() should return.
* Fetch one Procedure from the queue
* @param timeout how long to wait before giving up, in units of unit
* @param unit a TimeUnit determining how to interpret the timeout parameter
* @return the Procedure to execute, or null if nothing present.
*/
void signalAll();
Procedure poll(long timeout, TimeUnit unit);
/**
* Returns the number of elements in this collection.
* @return the number of elements in this collection.
* Mark the event has not ready.
* procedures calling waitEvent() will be suspended.
* @param event the event to mark as suspended/not ready
*/
void suspendEvent(ProcedureEvent event);
/**
* Wake every procedure waiting for the specified event
* (By design each event has only one "wake" caller)
* @param event the event to wait
*/
void wakeEvent(ProcedureEvent event);
/**
* Wake every procedure waiting for the specified events.
* (By design each event has only one "wake" caller)
* @param count the number of events in the array to wake
* @param events the list of events to wake
*/
void wakeEvents(int count, ProcedureEvent... events);
/**
* Suspend the procedure if the event is not ready yet.
* @param event the event to wait on
* @param procedure the procedure waiting on the event
* @return true if the procedure has to wait for the event to be ready, false otherwise.
*/
boolean waitEvent(ProcedureEvent event, Procedure procedure);
/**
* Returns the number of elements in this queue.
* @return the number of elements in this queue.
*/
@VisibleForTesting
int size();
/**
* Removes all of the elements from this collection.
* Removes all of the elements from the queue
*/
@VisibleForTesting
void clear();
}

View File

@ -1,121 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Simple runqueue for the procedures
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ProcedureSimpleRunQueue implements ProcedureRunnableSet {
private final Deque<Procedure> runnables = new ArrayDeque<Procedure>();
private final ReentrantLock lock = new ReentrantLock();
private final Condition waitCond = lock.newCondition();
@Override
public void addFront(final Procedure proc) {
lock.lock();
try {
runnables.addFirst(proc);
waitCond.signal();
} finally {
lock.unlock();
}
}
@Override
public void addBack(final Procedure proc) {
lock.lock();
try {
runnables.addLast(proc);
waitCond.signal();
} finally {
lock.unlock();
}
}
@Override
public void yield(final Procedure proc) {
addBack(proc);
}
@Override
@edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
public Procedure poll() {
lock.lock();
try {
if (runnables.isEmpty()) {
waitCond.await();
if (!runnables.isEmpty()) {
return runnables.pop();
}
} else {
return runnables.pop();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} finally {
lock.unlock();
}
return null;
}
@Override
public void signalAll() {
lock.lock();
try {
waitCond.signalAll();
} finally {
lock.unlock();
}
}
@Override
public void clear() {
lock.lock();
try {
runnables.clear();
} finally {
lock.unlock();
}
}
@Override
public int size() {
lock.lock();
try {
return runnables.size();
} finally {
lock.unlock();
}
}
@Override
public void completionCleanup(Procedure proc) {
}
}

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import java.util.ArrayDeque;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Simple scheduler for procedures
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class SimpleProcedureScheduler extends AbstractProcedureScheduler {
private final ArrayDeque<Procedure> runnables = new ArrayDeque<Procedure>();
@Override
protected void enqueue(final Procedure procedure, final boolean addFront) {
if (addFront) {
runnables.addFirst(procedure);
} else {
runnables.addLast(procedure);
}
}
@Override
protected Procedure dequeue() {
return runnables.poll();
}
@Override
protected void clearQueue() {
runnables.clear();
}
@Override
public void yield(final Procedure proc) {
addBack(proc);
}
@Override
public boolean queueHasRunnables() {
return runnables.size() > 0;
}
@Override
public int queueSize() {
return runnables.size();
}
@Override
public void completionCleanup(Procedure proc) {
}
}

View File

@ -181,7 +181,7 @@ public class ProcedureTestingUtility {
public static <TEnv> void waitNoProcedureRunning(ProcedureExecutor<TEnv> procExecutor) {
int stableRuns = 0;
while (stableRuns < 10) {
if (procExecutor.getActiveExecutorCount() > 0 || procExecutor.getRunnableSet().size() > 0) {
if (procExecutor.getActiveExecutorCount() > 0 || procExecutor.getScheduler().size() > 0) {
stableRuns = 0;
Threads.sleepWithoutInterrupt(100);
} else {
@ -236,7 +236,32 @@ public class ProcedureTestingUtility {
return cause == null ? procInfo.getException() : cause;
}
public static class TestProcedure extends Procedure<Void> {
public static class NoopProcedure<TEnv> extends Procedure<TEnv> {
public NoopProcedure() {}
@Override
protected Procedure[] execute(TEnv env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
return null;
}
@Override
protected void rollback(TEnv env) throws IOException, InterruptedException {
}
@Override
protected boolean abort(TEnv env) { return false; }
@Override
protected void serializeStateData(final OutputStream stream) throws IOException {
}
@Override
protected void deserializeStateData(final InputStream stream) throws IOException {
}
}
public static class TestProcedure extends NoopProcedure<Void> {
private byte[] data = null;
public TestProcedure() {}
@ -269,15 +294,6 @@ public class ProcedureTestingUtility {
this.data = data;
}
@Override
protected Procedure[] execute(Void env) { return null; }
@Override
protected void rollback(Void env) { }
@Override
protected boolean abort(Void env) { return false; }
@Override
protected void serializeStateData(final OutputStream stream) throws IOException {
StreamUtils.writeRawVInt32(stream, data != null ? data.length : 0);

View File

@ -0,0 +1,132 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@Category({MasterTests.class, SmallTests.class})
public class TestProcedureEvents {
private static final Log LOG = LogFactory.getLog(TestProcedureEvents.class);
private TestProcEnv procEnv;
private NoopProcedureStore procStore;
private ProcedureExecutor<TestProcEnv> procExecutor;
private HBaseCommonTestingUtility htu;
@Before
public void setUp() throws IOException {
htu = new HBaseCommonTestingUtility();
procEnv = new TestProcEnv();
procStore = new NoopProcedureStore();
procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
procStore.start(1);
procExecutor.start(1, true);
}
@After
public void tearDown() throws IOException {
procExecutor.stop();
procStore.stop(false);
procExecutor.join();
}
@Test(timeout=30000)
public void testTimeoutEventProcedure() throws Exception {
final int NTIMEOUTS = 5;
TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(1000, NTIMEOUTS);
procExecutor.submitProcedure(proc);
ProcedureTestingUtility.waitProcedure(procExecutor, proc.getProcId());
ProcedureTestingUtility.assertIsAbortException(procExecutor.getResult(proc.getProcId()));
assertEquals(NTIMEOUTS + 1, proc.getTimeoutsCount());
}
public static class TestTimeoutEventProcedure extends NoopProcedure<TestProcEnv> {
private final ProcedureEvent event = new ProcedureEvent("timeout-event");
private final AtomicInteger ntimeouts = new AtomicInteger(0);
private int maxTimeouts = 1;
public TestTimeoutEventProcedure() {}
public TestTimeoutEventProcedure(final int timeoutMsec, final int maxTimeouts) {
this.maxTimeouts = maxTimeouts;
setTimeout(timeoutMsec);
}
public int getTimeoutsCount() {
return ntimeouts.get();
}
@Override
protected Procedure[] execute(final TestProcEnv env) throws ProcedureSuspendedException {
LOG.info("EXECUTE " + this + " ntimeouts=" + ntimeouts);
if (ntimeouts.get() > maxTimeouts) {
setAbortFailure("test", "give up after " + ntimeouts.get());
return null;
}
env.getProcedureScheduler().suspendEvent(event);
if (env.getProcedureScheduler().waitEvent(event, this)) {
setState(ProcedureState.WAITING_TIMEOUT);
throw new ProcedureSuspendedException();
}
return null;
}
@Override
protected boolean setTimeoutFailure(final TestProcEnv env) {
int n = ntimeouts.incrementAndGet();
LOG.info("HANDLE TIMEOUT " + this + " ntimeouts=" + n);
setState(ProcedureState.RUNNABLE);
env.getProcedureScheduler().wakeEvent(event);
return false;
}
}
private class TestProcEnv {
public ProcedureScheduler getProcedureScheduler() {
return procExecutor.getScheduler();
}
}
}

View File

@ -0,0 +1,160 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@Category({MasterTests.class, MediumTests.class})
public class TestProcedureSchedulerConcurrency {
private static final Log LOG = LogFactory.getLog(TestProcedureEvents.class);
private SimpleProcedureScheduler procSched;
@Before
public void setUp() throws IOException {
procSched = new SimpleProcedureScheduler();
procSched.start();
}
@After
public void tearDown() throws IOException {
procSched.stop();
}
@Test(timeout=60000)
public void testConcurrentWaitWake() throws Exception {
testConcurrentWaitWake(false);
}
@Test(timeout=60000)
public void testConcurrentWaitWakeBatch() throws Exception {
testConcurrentWaitWake(true);
}
private void testConcurrentWaitWake(final boolean useWakeBatch) throws Exception {
final int WAIT_THRESHOLD = 2500;
final int NPROCS = 20;
final int NRUNS = 500;
final ProcedureScheduler sched = procSched;
for (long i = 0; i < NPROCS; ++i) {
sched.addBack(new TestProcedureWithEvent(i));
}
final Thread[] threads = new Thread[4];
final AtomicInteger waitCount = new AtomicInteger(0);
final AtomicInteger wakeCount = new AtomicInteger(0);
final ConcurrentSkipListSet<TestProcedureWithEvent> waitQueue =
new ConcurrentSkipListSet<TestProcedureWithEvent>();
threads[0] = new Thread() {
@Override
public void run() {
long lastUpdate = 0;
while (true) {
final int oldWakeCount = wakeCount.get();
if (useWakeBatch) {
ProcedureEvent[] ev = new ProcedureEvent[waitQueue.size()];
for (int i = 0; i < ev.length; ++i) {
ev[i] = waitQueue.pollFirst().getEvent();
LOG.debug("WAKE BATCH " + ev[i] + " total=" + wakeCount.get());
}
sched.wakeEvents(ev.length, ev);
wakeCount.addAndGet(ev.length);
} else {
int size = waitQueue.size();
while (size-- > 0) {
ProcedureEvent ev = waitQueue.pollFirst().getEvent();
sched.wakeEvent(ev);
LOG.debug("WAKE " + ev + " total=" + wakeCount.get());
wakeCount.incrementAndGet();
}
}
if (wakeCount.get() != oldWakeCount) {
lastUpdate = System.currentTimeMillis();
} else if (wakeCount.get() >= NRUNS &&
(System.currentTimeMillis() - lastUpdate) > WAIT_THRESHOLD) {
break;
}
Threads.sleepWithoutInterrupt(25);
}
}
};
for (int i = 1; i < threads.length; ++i) {
threads[i] = new Thread() {
@Override
public void run() {
while (true) {
TestProcedureWithEvent proc = (TestProcedureWithEvent)sched.poll();
if (proc == null) continue;
sched.suspendEvent(proc.getEvent());
waitQueue.add(proc);
sched.waitEvent(proc.getEvent(), proc);
LOG.debug("WAIT " + proc.getEvent());
if (waitCount.incrementAndGet() >= NRUNS) {
break;
}
}
}
};
}
for (int i = 0; i < threads.length; ++i) {
threads[i].start();
}
for (int i = 0; i < threads.length; ++i) {
threads[i].join();
}
sched.clear();
}
public static class TestProcedureWithEvent extends NoopProcedure<Void> {
private final ProcedureEvent event;
public TestProcedureWithEvent(long procId) {
setProcId(procId);
event = new ProcedureEvent("test-event procId=" + procId);
}
public ProcedureEvent getEvent() {
return event;
}
}
}

View File

@ -92,7 +92,7 @@ public class TestProcedureSuspended {
// release p3
p3keyB.setThrowSuspend(false);
procExecutor.getRunnableSet().addFront(p3keyB);
procExecutor.getScheduler().addFront(p3keyB);
waitAndAssertTimestamp(p1keyA, 1, 1);
waitAndAssertTimestamp(p2keyA, 0, -1);
waitAndAssertTimestamp(p3keyB, 2, 3);
@ -104,7 +104,7 @@ public class TestProcedureSuspended {
// rollback p2 and wait until is fully completed
p1keyA.setTriggerRollback(true);
procExecutor.getRunnableSet().addFront(p1keyA);
procExecutor.getScheduler().addFront(p1keyA);
ProcedureTestingUtility.waitProcedure(procExecutor, p1keyA);
// p2 should start and suspend
@ -115,7 +115,7 @@ public class TestProcedureSuspended {
// wait until p2 is fully completed
p2keyA.setThrowSuspend(false);
procExecutor.getRunnableSet().addFront(p2keyA);
procExecutor.getScheduler().addFront(p2keyA);
ProcedureTestingUtility.waitProcedure(procExecutor, p2keyA);
waitAndAssertTimestamp(p1keyA, 4, 60000);
waitAndAssertTimestamp(p2keyA, 2, 8);

View File

@ -24,6 +24,7 @@ import java.io.OutputStream;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -50,7 +51,7 @@ public class TestYieldProcedures {
private static final Procedure NULL_PROC = null;
private ProcedureExecutor<TestProcEnv> procExecutor;
private TestRunQueue procRunnables;
private TestScheduler procRunnables;
private ProcedureStore procStore;
private HBaseCommonTestingUtility htu;
@ -67,7 +68,7 @@ public class TestYieldProcedures {
logDir = new Path(testDir, "proc-logs");
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
procRunnables = new TestRunQueue();
procRunnables = new TestScheduler();
procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(),
procStore, procRunnables);
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
@ -343,41 +344,47 @@ public class TestYieldProcedures {
}
}
private static class TestRunQueue extends ProcedureSimpleRunQueue {
private static class TestScheduler extends SimpleProcedureScheduler {
private int completionCalls;
private int addFrontCalls;
private int addBackCalls;
private int yieldCalls;
private int pollCalls;
public TestRunQueue() {}
public TestScheduler() {}
public void addFront(final Procedure proc) {
addFrontCalls++;
super.addFront(proc);
}
addFrontCalls++;
super.addFront(proc);
}
@Override
public void addBack(final Procedure proc) {
addBackCalls++;
super.addBack(proc);
}
@Override
public void addBack(final Procedure proc) {
addBackCalls++;
super.addBack(proc);
}
@Override
public void yield(final Procedure proc) {
yieldCalls++;
super.yield(proc);
}
@Override
public void yield(final Procedure proc) {
yieldCalls++;
super.yield(proc);
}
@Override
public Procedure poll() {
pollCalls++;
return super.poll();
}
@Override
public Procedure poll() {
pollCalls++;
return super.poll();
}
@Override
public void completionCleanup(Procedure proc) {
completionCalls++;
}
@Override
public Procedure poll(long timeout, TimeUnit unit) {
pollCalls++;
return super.poll(timeout, unit);
}
@Override
public void completionCleanup(Procedure proc) {
completionCalls++;
}
}
}

View File

@ -108,7 +108,6 @@ import org.apache.hadoop.hbase.master.procedure.DispatchMergingRegionsProcedure;
import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure;
import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
@ -120,6 +119,7 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.quotas.MasterQuotaManager;

View File

@ -31,8 +31,8 @@ import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.security.User;
@ -121,10 +121,15 @@ public class MasterProcedureEnv {
return master.getMasterCoprocessorHost();
}
@Deprecated
public MasterProcedureScheduler getProcedureQueue() {
return procSched;
}
public MasterProcedureScheduler getProcedureScheduler() {
return procSched;
}
public boolean isRunning() {
return master.getMasterProcedureExecutor().isRunning();
}

View File

@ -24,8 +24,6 @@ import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -40,8 +38,9 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureRunnableSet;
import org.apache.hadoop.hbase.procedure2.ProcedureEventQueue;
import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator;
import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList;
import org.apache.hadoop.hbase.util.AvlUtil.AvlLinkedNode;
@ -49,8 +48,8 @@ import org.apache.hadoop.hbase.util.AvlUtil.AvlTree;
import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator;
/**
* ProcedureRunnableSet for the Master Procedures.
* This RunnableSet tries to provide to the ProcedureExecutor procedures
* ProcedureScheduler for the Master Procedures.
* This ProcedureScheduler tries to provide to the ProcedureExecutor procedures
* that can be executed without having to wait on a lock.
* Most of the master operations can be executed concurrently, if they
* are operating on different tables (e.g. two create table can be performed
@ -65,12 +64,10 @@ import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class MasterProcedureScheduler implements ProcedureRunnableSet {
public class MasterProcedureScheduler extends AbstractProcedureScheduler {
private static final Log LOG = LogFactory.getLog(MasterProcedureScheduler.class);
private final TableLockManager lockManager;
private final ReentrantLock schedLock = new ReentrantLock();
private final Condition schedWaitCond = schedLock.newCondition();
private final static NamespaceQueueKeyComparator NAMESPACE_QUEUE_KEY_COMPARATOR =
new NamespaceQueueKeyComparator();
@ -90,10 +87,6 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
private final int userTablePriority;
private final int sysTablePriority;
// TODO: metrics
private long pollCalls = 0;
private long nullPollCalls = 0;
public MasterProcedureScheduler(final Configuration conf, final TableLockManager lockManager) {
this.lockManager = lockManager;
@ -103,45 +96,24 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
userTablePriority = conf.getInt("hbase.master.procedure.queue.user.table.priority", 1);
}
@Override
public void addFront(Procedure proc) {
doAdd(proc, true);
}
@Override
public void addBack(Procedure proc) {
doAdd(proc, false);
}
@Override
public void yield(final Procedure proc) {
doAdd(proc, isTableProcedure(proc));
push(proc, isTableProcedure(proc), true);
}
private void doAdd(final Procedure proc, final boolean addFront) {
doAdd(proc, addFront, true);
}
private void doAdd(final Procedure proc, final boolean addFront, final boolean notify) {
schedLock.lock();
try {
if (isTableProcedure(proc)) {
doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
} else if (isServerProcedure(proc)) {
doAdd(serverRunQueue, getServerQueue(getServerName(proc)), proc, addFront);
} else {
// TODO: at the moment we only have Table and Server procedures
// if you are implementing a non-table/non-server procedure, you have two options: create
// a group for all the non-table/non-server procedures or try to find a key for your
// non-table/non-server procedures and implement something similar to the TableRunQueue.
throw new UnsupportedOperationException(
"RQs for non-table/non-server procedures are not implemented yet: " + proc);
}
if (notify) {
schedWaitCond.signal();
}
} finally {
schedLock.unlock();
@Override
protected void enqueue(final Procedure proc, final boolean addFront) {
if (isTableProcedure(proc)) {
doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
} else if (isServerProcedure(proc)) {
doAdd(serverRunQueue, getServerQueue(getServerName(proc)), proc, addFront);
} else {
// TODO: at the moment we only have Table and Server procedures
// if you are implementing a non-table/non-server procedure, you have two options: create
// a group for all the non-table/non-server procedures or try to find a key for your
// non-table/non-server procedures and implement something similar to the TableRunQueue.
throw new UnsupportedOperationException(
"RQs for non-table/non-server procedures are not implemented yet: " + proc);
}
}
@ -165,49 +137,22 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
@Override
public Procedure poll() {
return poll(-1);
protected boolean queueHasRunnables() {
return tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables();
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
protected Procedure poll(long waitNsec) {
Procedure pollResult = null;
schedLock.lock();
try {
if (!hasRunnables()) {
if (waitNsec < 0) {
schedWaitCond.await();
} else {
schedWaitCond.awaitNanos(waitNsec);
}
if (!hasRunnables()) {
return null;
}
}
// For now, let server handling have precedence over table handling; presumption is that it
// is more important handling crashed servers than it is running the
// enabling/disabling tables, etc.
pollResult = doPoll(serverRunQueue);
if (pollResult == null) {
pollResult = doPoll(tableRunQueue);
}
// update metrics
pollCalls++;
nullPollCalls += (pollResult == null) ? 1 : 0;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
schedLock.unlock();
@Override
protected Procedure dequeue() {
// For now, let server handling have precedence over table handling; presumption is that it
// is more important handling crashed servers than it is running the
// enabling/disabling tables, etc.
Procedure pollResult = doPoll(serverRunQueue);
if (pollResult == null) {
pollResult = doPoll(tableRunQueue);
}
return pollResult;
}
private boolean hasRunnables() {
return tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables();
}
private <T extends Comparable<T>> Procedure doPoll(final FairQueue<T> fairq) {
final Queue<T> rq = fairq.poll();
if (rq == null || !rq.isAvailable()) {
@ -239,24 +184,18 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
@Override
public void clear() {
// NOTE: USED ONLY FOR TESTING
schedLock.lock();
try {
// Remove Servers
for (int i = 0; i < serverBuckets.length; ++i) {
clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR);
serverBuckets[i] = null;
}
// Remove Tables
clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR);
tableMap = null;
assert size() == 0 : "expected queue size to be 0, got " + size();
} finally {
schedLock.unlock();
public void clearQueue() {
// Remove Servers
for (int i = 0; i < serverBuckets.length; ++i) {
clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR);
serverBuckets[i] = null;
}
// Remove Tables
clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR);
tableMap = null;
assert size() == 0 : "expected queue size to be 0, got " + size();
}
private <T extends Comparable<T>, TNode extends Queue<T>> void clear(TNode treeMap,
@ -269,48 +208,25 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
}
private void wakePollIfNeeded(final int waitingCount) {
if (waitingCount > 1) {
schedWaitCond.signalAll();
} else if (waitingCount > 0) {
schedWaitCond.signal();
}
}
@Override
public void signalAll() {
schedLock.lock();
try {
schedWaitCond.signalAll();
} finally {
schedLock.unlock();
}
}
public int queueSize() {
int count = 0;
@Override
public int size() {
schedLock.lock();
try {
int count = 0;
// Server queues
final AvlTreeIterator<ServerQueue> serverIter = new AvlTreeIterator<ServerQueue>();
for (int i = 0; i < serverBuckets.length; ++i) {
serverIter.seekFirst(serverBuckets[i]);
while (serverIter.hasNext()) {
count += serverIter.next().size();
}
// Server queues
final AvlTreeIterator<ServerQueue> serverIter = new AvlTreeIterator<ServerQueue>();
for (int i = 0; i < serverBuckets.length; ++i) {
serverIter.seekFirst(serverBuckets[i]);
while (serverIter.hasNext()) {
count += serverIter.next().size();
}
// Table queues
final AvlTreeIterator<TableQueue> tableIter = new AvlTreeIterator<TableQueue>(tableMap);
while (tableIter.hasNext()) {
count += tableIter.next().size();
}
return count;
} finally {
schedLock.unlock();
}
// Table queues
final AvlTreeIterator<TableQueue> tableIter = new AvlTreeIterator<TableQueue>(tableMap);
while (tableIter.hasNext()) {
count += tableIter.next().size();
}
return count;
}
@Override
@ -354,329 +270,15 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
}
// ============================================================================
// TODO: Metrics
// ============================================================================
public long getPollCalls() {
return pollCalls;
}
public long getNullPollCalls() {
return nullPollCalls;
}
// ============================================================================
// Event Helpers
// ============================================================================
/**
* Suspend the procedure if the event is not ready yet.
* @param event the event to wait on
* @param procedure the procedure waiting on the event
* @return true if the procedure has to wait for the event to be ready, false otherwise.
*/
public boolean waitEvent(final ProcedureEvent event, final Procedure procedure) {
return waitEvent(event, procedure, false);
}
/**
* Suspend the procedure if the event is not ready yet.
* @param event the event to wait on
* @param procedure the procedure waiting on the event
* @param suspendQueue true if the entire queue of the procedure should be suspended
* @return true if the procedure has to wait for the event to be ready, false otherwise.
*/
public boolean waitEvent(final ProcedureEvent event, final Procedure procedure,
final boolean suspendQueue) {
return waitEvent(event, /* lockEvent= */false, procedure, suspendQueue);
}
private boolean waitEvent(final ProcedureEvent event, final boolean lockEvent,
final Procedure procedure, final boolean suspendQueue) {
synchronized (event) {
if (event.isReady()) {
if (lockEvent) {
event.setReady(false);
}
return false;
}
if (!suspendQueue) {
suspendProcedure(event, procedure);
} else if (isTableProcedure(procedure)) {
waitTableEvent(event, procedure);
} else if (isServerProcedure(procedure)) {
waitServerEvent(event, procedure);
} else {
// TODO: at the moment we only have Table and Server procedures
// if you are implementing a non-table/non-server procedure, you have two options: create
// a group for all the non-table/non-server procedures or try to find a key for your
// non-table/non-server procedures and implement something similar to the TableRunQueue.
throw new UnsupportedOperationException(
"RQs for non-table/non-server procedures are not implemented yet: " + procedure);
}
}
return true;
}
private void waitTableEvent(final ProcedureEvent event, final Procedure procedure) {
final TableName tableName = getTableName(procedure);
final boolean isDebugEnabled = LOG.isDebugEnabled();
schedLock.lock();
try {
TableQueue queue = getTableQueue(tableName);
queue.addFront(procedure);
if (queue.isSuspended()) return;
if (isDebugEnabled) {
LOG.debug("Suspend table queue " + tableName);
}
queue.setSuspended(true);
removeFromRunQueue(tableRunQueue, queue);
event.suspendTableQueue(queue);
} finally {
schedLock.unlock();
}
}
private void waitServerEvent(final ProcedureEvent event, final Procedure procedure) {
final ServerName serverName = getServerName(procedure);
final boolean isDebugEnabled = LOG.isDebugEnabled();
schedLock.lock();
try {
// TODO: This will change once we have the new AM
ServerQueue queue = getServerQueue(serverName);
queue.addFront(procedure);
if (queue.isSuspended()) return;
if (isDebugEnabled) {
LOG.debug("Suspend server queue " + serverName);
}
queue.setSuspended(true);
removeFromRunQueue(serverRunQueue, queue);
event.suspendServerQueue(queue);
} finally {
schedLock.unlock();
}
}
/**
* Mark the event has not ready.
* procedures calling waitEvent() will be suspended.
* @param event the event to mark as suspended/not ready
*/
public void suspendEvent(final ProcedureEvent event) {
final boolean isTraceEnabled = LOG.isTraceEnabled();
synchronized (event) {
event.setReady(false);
if (isTraceEnabled) {
LOG.trace("Suspend event " + event);
}
}
}
/**
* Wake every procedure waiting for the specified event
* (By design each event has only one "wake" caller)
* @param event the event to wait
*/
public void wakeEvent(final ProcedureEvent event) {
final boolean isTraceEnabled = LOG.isTraceEnabled();
synchronized (event) {
event.setReady(true);
if (isTraceEnabled) {
LOG.trace("Wake event " + event);
}
schedLock.lock();
try {
final int waitingCount = popEventWaitingObjects(event);
wakePollIfNeeded(waitingCount);
} finally {
schedLock.unlock();
}
}
}
/**
* Wake every procedure waiting for the specified events.
* (By design each event has only one "wake" caller)
* @param events the list of events to wake
* @param count the number of events in the array to wake
*/
public void wakeEvents(final ProcedureEvent[] events, final int count) {
final boolean isTraceEnabled = LOG.isTraceEnabled();
schedLock.lock();
try {
int waitingCount = 0;
for (int i = 0; i < count; ++i) {
final ProcedureEvent event = events[i];
synchronized (event) {
event.setReady(true);
if (isTraceEnabled) {
LOG.trace("Wake event " + event);
}
waitingCount += popEventWaitingObjects(event);
}
}
wakePollIfNeeded(waitingCount);
} finally {
schedLock.unlock();
}
}
private int popEventWaitingObjects(final ProcedureEvent event) {
int count = 0;
while (event.hasWaitingTables()) {
final Queue<TableName> queue = event.popWaitingTable();
queue.setSuspended(false);
addToRunQueue(tableRunQueue, queue);
count += queue.size();
}
// TODO: This will change once we have the new AM
while (event.hasWaitingServers()) {
final Queue<ServerName> queue = event.popWaitingServer();
queue.setSuspended(false);
addToRunQueue(serverRunQueue, queue);
count += queue.size();
}
while (event.hasWaitingProcedures()) {
wakeProcedure(event.popWaitingProcedure(false));
count++;
}
return count;
}
private void suspendProcedure(final BaseProcedureEvent event, final Procedure procedure) {
procedure.suspend();
event.suspendProcedure(procedure);
}
private void wakeProcedure(final Procedure procedure) {
procedure.resume();
doAdd(procedure, /* addFront= */ true, /* notify= */false);
}
private static abstract class BaseProcedureEvent {
private ArrayDeque<Procedure> waitingProcedures = null;
protected void suspendProcedure(final Procedure proc) {
if (waitingProcedures == null) {
waitingProcedures = new ArrayDeque<Procedure>();
}
waitingProcedures.addLast(proc);
}
protected boolean hasWaitingProcedures() {
return waitingProcedures != null;
}
protected Procedure popWaitingProcedure(final boolean popFront) {
// it will be nice to use IterableList on a procedure and avoid allocations...
Procedure proc = popFront ? waitingProcedures.removeFirst() : waitingProcedures.removeLast();
if (waitingProcedures.isEmpty()) {
waitingProcedures = null;
}
return proc;
}
@VisibleForTesting
protected synchronized int size() {
if (waitingProcedures != null) {
return waitingProcedures.size();
}
return 0;
}
}
public static class ProcedureEvent extends BaseProcedureEvent {
private final String description;
private Queue<ServerName> waitingServers = null;
private Queue<TableName> waitingTables = null;
private boolean ready = false;
protected ProcedureEvent() {
this(null);
}
public ProcedureEvent(final String description) {
this.description = description;
}
public synchronized boolean isReady() {
return ready;
}
private synchronized void setReady(boolean isReady) {
this.ready = isReady;
}
private void suspendTableQueue(Queue<TableName> queue) {
waitingTables = AvlIterableList.append(waitingTables, queue);
}
private void suspendServerQueue(Queue<ServerName> queue) {
waitingServers = AvlIterableList.append(waitingServers, queue);
}
private boolean hasWaitingTables() {
return waitingTables != null;
}
private Queue<TableName> popWaitingTable() {
Queue<TableName> node = waitingTables;
waitingTables = AvlIterableList.remove(waitingTables, node);
return node;
}
private boolean hasWaitingServers() {
return waitingServers != null;
}
private Queue<ServerName> popWaitingServer() {
Queue<ServerName> node = waitingServers;
waitingServers = AvlIterableList.remove(waitingServers, node);
return node;
}
protected String getDescription() {
if (description == null) {
// you should override this method if you are using the default constructor
throw new UnsupportedOperationException();
}
return description;
}
@VisibleForTesting
protected synchronized int size() {
int count = super.size();
if (waitingTables != null) {
count += waitingTables.size();
}
if (waitingServers != null) {
count += waitingServers.size();
}
return count;
}
@Override
public String toString() {
return String.format("%s(%s)", getClass().getSimpleName(), getDescription());
}
}
// ============================================================================
// Table Queue Lookup Helpers
// ============================================================================
private TableQueue getTableQueueWithLock(TableName tableName) {
schedLock.lock();
schedLock();
try {
return getTableQueue(tableName);
} finally {
schedLock.unlock();
schedUnlock();
}
}
@ -727,11 +329,11 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
// Server Queue Lookup Helpers
// ============================================================================
private ServerQueue getServerQueueWithLock(ServerName serverName) {
schedLock.lock();
schedLock();
try {
return getServerQueue(serverName);
} finally {
schedLock.unlock();
schedUnlock();
}
}
@ -790,7 +392,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
}
private static class RegionEvent extends BaseProcedureEvent {
private static class RegionEvent extends ProcedureEventQueue {
private final HRegionInfo regionInfo;
private long exclusiveLockProcIdOwner = Long.MIN_VALUE;
@ -823,7 +425,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
@Override
public String toString() {
return String.format("region %s event", regionInfo.getRegionNameAsString());
return "RegionEvent(" + regionInfo.getRegionNameAsString() + ")";
}
}
@ -1046,33 +648,33 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
* @return true if we were able to acquire the lock on the table, otherwise false.
*/
public boolean tryAcquireTableExclusiveLock(final Procedure procedure, final TableName table) {
schedLock.lock();
schedLock();
TableQueue queue = getTableQueue(table);
if (!queue.getNamespaceQueue().trySharedLock()) {
schedLock.unlock();
schedUnlock();
return false;
}
if (!queue.tryExclusiveLock(procedure)) {
queue.getNamespaceQueue().releaseSharedLock();
schedLock.unlock();
schedUnlock();
return false;
}
removeFromRunQueue(tableRunQueue, queue);
boolean hasParentLock = queue.hasParentLock(procedure);
schedLock.unlock();
schedUnlock();
boolean hasXLock = true;
if (!hasParentLock) {
// Zk lock is expensive...
hasXLock = queue.tryZkExclusiveLock(lockManager, procedure.toString());
if (!hasXLock) {
schedLock.lock();
schedLock();
if (!hasParentLock) queue.releaseExclusiveLock();
queue.getNamespaceQueue().releaseSharedLock();
addToRunQueue(tableRunQueue, queue);
schedLock.unlock();
schedUnlock();
}
}
return hasXLock;
@ -1092,11 +694,11 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
queue.releaseZkExclusiveLock(lockManager);
}
schedLock.lock();
schedLock();
if (!hasParentLock) queue.releaseExclusiveLock();
queue.getNamespaceQueue().releaseSharedLock();
addToRunQueue(tableRunQueue, queue);
schedLock.unlock();
schedUnlock();
}
/**
@ -1112,7 +714,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
private TableQueue tryAcquireTableQueueSharedLock(final Procedure procedure,
final TableName table) {
schedLock.lock();
schedLock();
TableQueue queue = getTableQueue(table);
if (!queue.getNamespaceQueue().trySharedLock()) {
return null;
@ -1120,7 +722,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
if (!queue.trySharedLock()) {
queue.getNamespaceQueue().releaseSharedLock();
schedLock.unlock();
schedUnlock();
return null;
}
@ -1129,11 +731,11 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
if (!queue.tryZkSharedLock(lockManager, procedure.toString())) {
queue.releaseSharedLock();
queue.getNamespaceQueue().releaseSharedLock();
schedLock.unlock();
schedUnlock();
return null;
}
schedLock.unlock();
schedUnlock();
return queue;
}
@ -1146,7 +748,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
public void releaseTableSharedLock(final Procedure procedure, final TableName table) {
final TableQueue queue = getTableQueueWithLock(table);
schedLock.lock();
schedLock();
// Zk lock is expensive...
queue.releaseZkSharedLock(lockManager);
@ -1154,7 +756,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
if (queue.releaseSharedLock()) {
addToRunQueue(tableRunQueue, queue);
}
schedLock.unlock();
schedUnlock();
}
/**
@ -1168,8 +770,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
*/
@VisibleForTesting
protected boolean markTableAsDeleted(final TableName table, final Procedure procedure) {
final ReentrantLock l = schedLock;
l.lock();
schedLock();
try {
TableQueue queue = getTableQueue(table);
if (queue == null) return true;
@ -1193,7 +794,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
return false;
}
} finally {
l.unlock();
schedUnlock();
}
return true;
}
@ -1298,7 +899,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
// awake procedures if any
schedLock.lock();
schedLock();
try {
for (int i = numProcs - 1; i >= 0; --i) {
wakeProcedure(nextProcs[i]);
@ -1312,7 +913,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
releaseTableSharedLock(procedure, table);
}
} finally {
schedLock.unlock();
schedUnlock();
}
}
@ -1327,7 +928,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
* @return true if we were able to acquire the lock on the namespace, otherwise false.
*/
public boolean tryAcquireNamespaceExclusiveLock(final Procedure procedure, final String nsName) {
schedLock.lock();
schedLock();
try {
TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME);
if (!tableQueue.trySharedLock()) return false;
@ -1339,7 +940,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
return hasLock;
} finally {
schedLock.unlock();
schedUnlock();
}
}
@ -1350,7 +951,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
* @param nsName the namespace that has the exclusive lock
*/
public void releaseNamespaceExclusiveLock(final Procedure procedure, final String nsName) {
schedLock.lock();
schedLock();
try {
final TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME);
final NamespaceQueue queue = getNamespaceQueue(nsName);
@ -1360,7 +961,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
addToRunQueue(tableRunQueue, tableQueue);
}
} finally {
schedLock.unlock();
schedUnlock();
}
}
@ -1376,7 +977,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
*/
public boolean tryAcquireServerExclusiveLock(final Procedure procedure,
final ServerName serverName) {
schedLock.lock();
schedLock();
try {
ServerQueue queue = getServerQueue(serverName);
if (queue.tryExclusiveLock(procedure)) {
@ -1384,7 +985,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
return true;
}
} finally {
schedLock.unlock();
schedUnlock();
}
return false;
}
@ -1397,13 +998,13 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
*/
public void releaseServerExclusiveLock(final Procedure procedure,
final ServerName serverName) {
schedLock.lock();
schedLock();
try {
ServerQueue queue = getServerQueue(serverName);
queue.releaseExclusiveLock();
addToRunQueue(serverRunQueue, queue);
} finally {
schedLock.unlock();
schedUnlock();
}
}

View File

@ -18,11 +18,6 @@
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -34,17 +29,15 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -200,85 +193,4 @@ public class TestMasterProcedureEvents {
}
return null;
}
@Test(timeout=30000)
public void testTimeoutEventProcedure() throws Exception {
HMaster master = UTIL.getMiniHBaseCluster().getMaster();
ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
MasterProcedureScheduler procSched = procExec.getEnvironment().getProcedureQueue();
TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(1000, 5);
procExec.submitProcedure(proc);
ProcedureTestingUtility.waitProcedure(procExec, proc.getProcId());
ProcedureTestingUtility.assertIsAbortException(procExec.getResult(proc.getProcId()));
}
public static class TestTimeoutEventProcedure
extends Procedure<MasterProcedureEnv> implements TableProcedureInterface {
private final ProcedureEvent event = new ProcedureEvent("timeout-event");
private final AtomicInteger ntimeouts = new AtomicInteger(0);
private int maxTimeouts = 1;
public TestTimeoutEventProcedure() {}
public TestTimeoutEventProcedure(final int timeoutMsec, final int maxTimeouts) {
this.maxTimeouts = maxTimeouts;
setTimeout(timeoutMsec);
setOwner("test");
}
@Override
protected Procedure[] execute(final MasterProcedureEnv env)
throws ProcedureSuspendedException {
LOG.info("EXECUTE " + this + " ntimeouts=" + ntimeouts);
if (ntimeouts.get() > maxTimeouts) {
setAbortFailure("test", "give up after " + ntimeouts.get());
return null;
}
env.getProcedureQueue().suspendEvent(event);
if (env.getProcedureQueue().waitEvent(event, this)) {
setState(ProcedureState.WAITING_TIMEOUT);
throw new ProcedureSuspendedException();
}
return null;
}
@Override
protected void rollback(final MasterProcedureEnv env) {
}
@Override
protected boolean setTimeoutFailure(final MasterProcedureEnv env) {
int n = ntimeouts.incrementAndGet();
LOG.info("HANDLE TIMEOUT " + this + " ntimeouts=" + n);
setState(ProcedureState.RUNNABLE);
env.getProcedureQueue().wakeEvent(event);
return false;
}
@Override
public TableName getTableName() {
return TableName.valueOf("testtb");
}
@Override
public TableOperationType getTableOperationType() {
return TableOperationType.READ;
}
@Override
protected boolean abort(MasterProcedureEnv env) { return false; }
@Override
protected void serializeStateData(final OutputStream stream) throws IOException {
}
@Override
protected void deserializeStateData(final InputStream stream) throws IOException {
}
}
}

View File

@ -31,8 +31,8 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@ -540,40 +540,6 @@ public class TestMasterProcedureScheduler {
queue.releaseTableExclusiveLock(rootProc, tableName);
}
@Test
public void testSuspendedTableQueue() throws Exception {
final TableName tableName = TableName.valueOf("testSuspendedQueue");
queue.addBack(new TestTableProcedure(1, tableName,
TableProcedureInterface.TableOperationType.EDIT));
queue.addBack(new TestTableProcedure(2, tableName,
TableProcedureInterface.TableOperationType.EDIT));
Procedure proc = queue.poll();
assertEquals(1, proc.getProcId());
assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName));
// Suspend
// TODO: If we want to keep the zk-lock we need to retain the lock on suspend
ProcedureEvent event = new ProcedureEvent("testSuspendedTableQueueEvent");
assertEquals(true, queue.waitEvent(event, proc, true));
queue.releaseTableExclusiveLock(proc, tableName);
assertEquals(null, queue.poll(0));
// Resume
queue.wakeEvent(event);
proc = queue.poll();
assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName));
assertEquals(1, proc.getProcId());
queue.releaseTableExclusiveLock(proc, tableName);
proc = queue.poll();
assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName));
assertEquals(2, proc.getProcId());
queue.releaseTableExclusiveLock(proc, tableName);
}
@Test
public void testSuspendedProcedure() throws Exception {
final TableName tableName = TableName.valueOf("testSuspendedProcedure");

View File

@ -23,23 +23,18 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestTableProcedure;
import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestTableProcedureWithEvent;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -212,92 +207,6 @@ public class TestMasterProcedureSchedulerConcurrency {
}
}
@Test(timeout=60000)
public void testConcurrentWaitWake() throws Exception {
testConcurrentWaitWake(false);
}
@Test(timeout=60000)
public void testConcurrentWaitWakeBatch() throws Exception {
testConcurrentWaitWake(true);
}
private void testConcurrentWaitWake(final boolean useWakeBatch) throws Exception {
final TableName tableName = TableName.valueOf("testtb");
final int NPROCS = 20;
final int NRUNS = 100;
for (long i = 0; i < NPROCS; ++i) {
queue.addBack(new TestTableProcedureWithEvent(i, tableName,
TableProcedureInterface.TableOperationType.READ));
}
final Thread[] threads = new Thread[4];
final AtomicInteger waitCount = new AtomicInteger(0);
final AtomicInteger wakeCount = new AtomicInteger(0);
final ConcurrentSkipListSet<TestTableProcedureWithEvent> waitQueue =
new ConcurrentSkipListSet<TestTableProcedureWithEvent>();
threads[0] = new Thread() {
@Override
public void run() {
while (true) {
if (useWakeBatch) {
ProcedureEvent[] ev = new ProcedureEvent[waitQueue.size()];
for (int i = 0; i < ev.length; ++i) {
ev[i] = waitQueue.pollFirst().getEvent();
LOG.debug("WAKE " + ev[i] + " total=" + wakeCount.get());
}
queue.wakeEvents(ev, ev.length);
wakeCount.addAndGet(ev.length);
} else {
int size = waitQueue.size();
while (size-- > 0) {
ProcedureEvent ev = waitQueue.pollFirst().getEvent();
queue.wakeEvent(ev);
LOG.debug("WAKE " + ev + " total=" + wakeCount.get());
wakeCount.incrementAndGet();
}
}
if (wakeCount.get() >= NRUNS) {
break;
}
Threads.sleepWithoutInterrupt(25);
}
}
};
for (int i = 1; i < threads.length; ++i) {
threads[i] = new Thread() {
@Override
public void run() {
while (true) {
TestTableProcedureWithEvent proc = (TestTableProcedureWithEvent)queue.poll();
if (proc == null) continue;
waitQueue.add(proc);
queue.suspendEvent(proc.getEvent());
queue.waitEvent(proc.getEvent(), proc);
LOG.debug("WAIT " + proc.getEvent());
if (waitCount.incrementAndGet() >= NRUNS) {
break;
}
}
}
};
}
for (int i = 0; i < threads.length; ++i) {
threads[i].start();
}
for (int i = 0; i < threads.length; ++i) {
threads[i].join();
}
queue.clear();
}
public static class TestTableProcSet {
private final MasterProcedureScheduler queue;