From 92ef234486537b4325641ce47f6fde26d9432710 Mon Sep 17 00:00:00 2001 From: Matteo Bertozzi Date: Wed, 12 Oct 2016 16:33:25 -0700 Subject: [PATCH] HBASE-16813 Procedure v2 - Move ProcedureEvent to hbase-procedure module --- .../AbstractProcedureScheduler.java | 311 ++++++++++ .../hbase/procedure2/ProcedureEvent.java | 57 ++ .../hbase/procedure2/ProcedureEventQueue.java | 85 +++ .../hbase/procedure2/ProcedureExecutor.java | 63 +- ...nnableSet.java => ProcedureScheduler.java} | 70 ++- .../procedure2/ProcedureSimpleRunQueue.java | 121 ---- .../procedure2/SimpleProcedureScheduler.java | 71 +++ .../procedure2/ProcedureTestingUtility.java | 38 +- .../hbase/procedure2/TestProcedureEvents.java | 132 ++++ .../TestProcedureSchedulerConcurrency.java | 160 +++++ .../procedure2/TestProcedureSuspended.java | 6 +- .../hbase/procedure2/TestYieldProcedures.java | 59 +- .../apache/hadoop/hbase/master/HMaster.java | 2 +- .../master/procedure/MasterProcedureEnv.java | 7 +- .../procedure/MasterProcedureScheduler.java | 573 +++--------------- .../procedure/TestMasterProcedureEvents.java | 92 +-- .../TestMasterProcedureScheduler.java | 36 +- ...stMasterProcedureSchedulerConcurrency.java | 93 +-- 18 files changed, 1073 insertions(+), 903 deletions(-) create mode 100644 hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java create mode 100644 hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java create mode 100644 hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEventQueue.java rename hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/{ProcedureRunnableSet.java => ProcedureScheduler.java} (55%) delete mode 100644 hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java create mode 100644 hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java create mode 100644 hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java create mode 100644 hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSchedulerConcurrency.java diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java new file mode 100644 index 00000000000..c4ae877d77b --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java @@ -0,0 +1,311 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class AbstractProcedureScheduler implements ProcedureScheduler { + private static final Log LOG = LogFactory.getLog(AbstractProcedureScheduler.class); + + private final ReentrantLock schedLock = new ReentrantLock(); + private final Condition schedWaitCond = schedLock.newCondition(); + private boolean running = false; + + // TODO: metrics + private long pollCalls = 0; + private long nullPollCalls = 0; + + @Override + public void start() { + schedLock(); + try { + running = true; + } finally { + schedUnlock(); + } + } + + @Override + public void stop() { + schedLock(); + try { + running = false; + schedWaitCond.signalAll(); + } finally { + schedUnlock(); + } + } + + @Override + public void signalAll() { + schedLock(); + try { + schedWaitCond.signalAll(); + } finally { + schedUnlock(); + } + } + + // ========================================================================== + // Add related + // ========================================================================== + /** + * Add the procedure to the queue. + * NOTE: this method is called with the sched lock held. + * @param procedure the Procedure to add + * @param addFront true if the item should be added to the front of the queue + */ + protected abstract void enqueue(Procedure procedure, boolean addFront); + + public void addFront(final Procedure procedure) { + push(procedure, true, true); + } + + public void addBack(final Procedure procedure) { + push(procedure, false, true); + } + + protected void push(final Procedure procedure, final boolean addFront, final boolean notify) { + schedLock.lock(); + try { + enqueue(procedure, addFront); + if (notify) { + schedWaitCond.signal(); + } + } finally { + schedLock.unlock(); + } + } + + // ========================================================================== + // Poll related + // ========================================================================== + /** + * Fetch one Procedure from the queue + * NOTE: this method is called with the sched lock held. + * @return the Procedure to execute, or null if nothing is available. + */ + protected abstract Procedure dequeue(); + + @Override + public Procedure poll() { + return poll(-1); + } + + @Override + public Procedure poll(long timeout, TimeUnit unit) { + return poll(unit.toNanos(timeout)); + } + + public Procedure poll(long nanos) { + final boolean waitForever = (nanos < 0); + schedLock(); + try { + while (!queueHasRunnables()) { + if (!running) return null; + if (waitForever) { + schedWaitCond.await(); + } else { + if (nanos <= 0) return null; + nanos = schedWaitCond.awaitNanos(nanos); + } + } + + final Procedure pollResult = dequeue(); + pollCalls++; + nullPollCalls += (pollResult == null) ? 1 : 0; + return pollResult; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + nullPollCalls++; + return null; + } finally { + schedUnlock(); + } + } + + // ========================================================================== + // Utils + // ========================================================================== + /** + * Removes all of the elements from the queue + * NOTE: this method is called with the sched lock held. + */ + protected abstract void clearQueue(); + + /** + * Returns the number of elements in this queue. + * NOTE: this method is called with the sched lock held. + * @return the number of elements in this queue. + */ + protected abstract int queueSize(); + + /** + * Returns true if there are procedures available to process. + * NOTE: this method is called with the sched lock held. + * @return true if there are procedures available to process, otherwise false. + */ + protected abstract boolean queueHasRunnables(); + + @Override + public void clear() { + // NOTE: USED ONLY FOR TESTING + schedLock(); + try { + clearQueue(); + } finally { + schedUnlock(); + } + } + + @Override + public int size() { + schedLock(); + try { + return queueSize(); + } finally { + schedUnlock(); + } + } + + @Override + public boolean hasRunnables() { + schedLock(); + try { + return queueHasRunnables(); + } finally { + schedUnlock(); + } + } + + // ============================================================================ + // TODO: Metrics + // ============================================================================ + public long getPollCalls() { + return pollCalls; + } + + public long getNullPollCalls() { + return nullPollCalls; + } + + // ========================================================================== + // Procedure Events + // ========================================================================== + @Override + public boolean waitEvent(final ProcedureEvent event, final Procedure procedure) { + synchronized (event) { + if (event.isReady()) { + return false; + } + suspendProcedure(event, procedure); + return true; + } + } + + @Override + public void suspendEvent(final ProcedureEvent event) { + final boolean isTraceEnabled = LOG.isTraceEnabled(); + synchronized (event) { + event.setReady(false); + if (isTraceEnabled) { + LOG.trace("Suspend event " + event); + } + } + } + + @Override + public void wakeEvent(final ProcedureEvent event) { + wakeEvents(1, event); + } + + @Override + public void wakeEvents(final int count, final ProcedureEvent... events) { + final boolean isTraceEnabled = LOG.isTraceEnabled(); + schedLock(); + try { + int waitingCount = 0; + for (int i = 0; i < count; ++i) { + final ProcedureEvent event = events[i]; + synchronized (event) { + event.setReady(true); + if (isTraceEnabled) { + LOG.trace("Wake event " + event); + } + waitingCount += popEventWaitingObjects(event); + } + } + wakePollIfNeeded(waitingCount); + } finally { + schedUnlock(); + } + } + + protected int popEventWaitingObjects(final ProcedureEvent event) { + return popEventWaitingProcedures(event); + } + + protected int popEventWaitingProcedures(final ProcedureEventQueue event) { + int count = 0; + while (event.hasWaitingProcedures()) { + wakeProcedure(event.popWaitingProcedure(false)); + count++; + } + return count; + } + + protected void suspendProcedure(final ProcedureEventQueue event, final Procedure procedure) { + procedure.suspend(); + event.suspendProcedure(procedure); + } + + protected void wakeProcedure(final Procedure procedure) { + procedure.resume(); + push(procedure, /* addFront= */ true, /* notify= */false); + } + + // ========================================================================== + // Internal helpers + // ========================================================================== + protected void schedLock() { + schedLock.lock(); + } + + protected void schedUnlock() { + schedLock.unlock(); + } + + protected void wakePollIfNeeded(final int waitingCount) { + if (waitingCount > 1) { + schedWaitCond.signalAll(); + } else if (waitingCount > 0) { + schedWaitCond.signal(); + } + } +} diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java new file mode 100644 index 00000000000..6335832508f --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Basic ProcedureEvent that contains an "object", which can be a + * description or a reference to the resource to wait on, and a + * queue for suspended procedures. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ProcedureEvent extends ProcedureEventQueue { + private final T object; + + private boolean ready = false; + + public ProcedureEvent(final T object) { + this.object = object; + } + + public T getObject() { + return object; + } + + public synchronized boolean isReady() { + return ready; + } + + @InterfaceAudience.Private + protected synchronized void setReady(final boolean isReady) { + this.ready = isReady; + } + + @Override + public String toString() { + return getClass().getSimpleName() + "(" + object + ")"; + } +} diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEventQueue.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEventQueue.java new file mode 100644 index 00000000000..a109e9e639d --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEventQueue.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import com.google.common.annotations.VisibleForTesting; + +import java.util.ArrayDeque; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Basic queue to store suspended procedures. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ProcedureEventQueue { + private static final Log LOG = LogFactory.getLog(ProcedureEventQueue.class); + + private ArrayDeque waitingProcedures = null; + + public ProcedureEventQueue() { + } + + @InterfaceAudience.Private + public synchronized void suspendProcedure(final Procedure proc) { + if (waitingProcedures == null) { + waitingProcedures = new ArrayDeque(); + } + waitingProcedures.addLast(proc); + } + + @InterfaceAudience.Private + public synchronized void removeProcedure(final Procedure proc) { + if (waitingProcedures != null) { + waitingProcedures.remove(proc); + } + } + + @InterfaceAudience.Private + public synchronized boolean hasWaitingProcedures() { + return waitingProcedures != null; + } + + @InterfaceAudience.Private + public synchronized Procedure popWaitingProcedure(final boolean popFront) { + // it will be nice to use IterableList on a procedure and avoid allocations... + Procedure proc = popFront ? waitingProcedures.removeFirst() : waitingProcedures.removeLast(); + if (waitingProcedures.isEmpty()) { + waitingProcedures = null; + } + return proc; + } + + @VisibleForTesting + public synchronized void clear() { + waitingProcedures = null; + } + + @VisibleForTesting + public synchronized int size() { + if (waitingProcedures != null) { + return waitingProcedures.size(); + } + return 0; + } +} diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 2eeef9e8195..2e9e3a338a0 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -243,9 +243,9 @@ public class ProcedureExecutor { new TimeoutBlockingQueue(new ProcedureTimeoutRetriever()); /** - * Queue that contains runnable procedures. + * Scheduler/Queue that contains runnable procedures. */ - private final ProcedureRunnableSet runnables; + private final ProcedureScheduler scheduler; // TODO private final ReentrantLock submitLock = new ReentrantLock(); @@ -267,13 +267,13 @@ public class ProcedureExecutor { public ProcedureExecutor(final Configuration conf, final TEnvironment environment, final ProcedureStore store) { - this(conf, environment, store, new ProcedureSimpleRunQueue()); + this(conf, environment, store, new SimpleProcedureScheduler()); } public ProcedureExecutor(final Configuration conf, final TEnvironment environment, - final ProcedureStore store, final ProcedureRunnableSet runqueue) { + final ProcedureStore store, final ProcedureScheduler scheduler) { this.environment = environment; - this.runnables = runqueue; + this.scheduler = scheduler; this.store = store; this.conf = conf; this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, true); @@ -284,7 +284,7 @@ public class ProcedureExecutor { Preconditions.checkArgument(rollbackStack.isEmpty()); Preconditions.checkArgument(procedures.isEmpty()); Preconditions.checkArgument(waitingTimeout.isEmpty()); - Preconditions.checkArgument(runnables.size() == 0); + Preconditions.checkArgument(scheduler.size() == 0); store.load(new ProcedureStore.ProcedureLoader() { @Override @@ -378,7 +378,7 @@ public class ProcedureExecutor { Long rootProcId = getRootProcedureId(proc); if (rootProcId == null) { // The 'proc' was ready to run but the root procedure was rolledback? - runnables.addBack(proc); + scheduler.addBack(proc); continue; } @@ -410,8 +410,8 @@ public class ProcedureExecutor { break; case FINISHED: if (proc.hasException()) { - // add the proc to the runnables to perform the rollback - runnables.addBack(proc); + // add the proc to the scheduler to perform the rollback + scheduler.addBack(proc); } break; case ROLLEDBACK: @@ -446,7 +446,7 @@ public class ProcedureExecutor { throw new IOException("found " + corruptedCount + " procedures on replay"); } - // 4. Push the runnables + // 4. Push the scheduler if (!runnableList.isEmpty()) { // TODO: See ProcedureWALFormatReader#hasFastStartSupport // some procedure may be started way before this stuff. @@ -457,10 +457,10 @@ public class ProcedureExecutor { sendProcedureLoadedNotification(proc.getProcId()); } if (proc.wasExecuted()) { - runnables.addFront(proc); + scheduler.addFront(proc); } else { // if it was not in execution, it can wait. - runnables.addBack(proc); + scheduler.addBack(proc); } } } @@ -514,6 +514,9 @@ public class ProcedureExecutor { LOG.info(String.format("recover procedure store (%s) lease: %s", store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st))); + // start the procedure scheduler + scheduler.start(); + // TODO: Split in two steps. // TODO: Handle corrupted procedures (currently just a warn) // The first one will make sure that we have the latest id, @@ -540,7 +543,7 @@ public class ProcedureExecutor { } LOG.info("Stopping the procedure executor"); - runnables.signalAll(); + scheduler.stop(); waitingTimeout.signalAll(); } @@ -564,7 +567,7 @@ public class ProcedureExecutor { procedures.clear(); nonceKeysToProcIdsMap.clear(); waitingTimeout.clear(); - runnables.clear(); + scheduler.clear(); lastProcId.set(-1); } @@ -698,7 +701,7 @@ public class ProcedureExecutor { assert !procedures.containsKey(currentProcId); procedures.put(currentProcId, proc); sendProcedureAddedNotification(currentProcId); - runnables.addBack(proc); + scheduler.addBack(proc); return currentProcId; } @@ -810,18 +813,18 @@ public class ProcedureExecutor { return procedures.get(procId); } - protected ProcedureRunnableSet getRunnableSet() { - return runnables; + protected ProcedureScheduler getScheduler() { + return scheduler; } /** * Execution loop (N threads) * while the executor is in a running state, - * fetch a procedure from the runnables queue and start the execution. + * fetch a procedure from the scheduler queue and start the execution. */ private void execLoop() { while (isRunning()) { - Procedure proc = runnables.poll(); + Procedure proc = scheduler.poll(); if (proc == null) continue; try { @@ -855,7 +858,7 @@ public class ProcedureExecutor { // we have the 'rollback-lock' we can start rollingback if (!executeRollback(rootProcId, procStack)) { procStack.unsetRollback(); - runnables.yield(proc); + scheduler.yield(proc); } } else { // if we can't rollback means that some child is still running. @@ -863,7 +866,7 @@ public class ProcedureExecutor { // If the procedure was never executed, remove and mark it as rolledback. if (!proc.wasExecuted()) { if (!executeRollback(proc)) { - runnables.yield(proc); + scheduler.yield(proc); } } } @@ -876,7 +879,7 @@ public class ProcedureExecutor { execProcedure(procStack, proc); releaseLock(proc, false); } else { - runnables.yield(proc); + scheduler.yield(proc); } procStack.release(proc); @@ -965,7 +968,7 @@ public class ProcedureExecutor { RootProcedureState procStack = rollbackStack.get(rootProcId); procStack.abort(); store.update(proc); - runnables.addFront(proc); + scheduler.addFront(proc); continue; } else if (proc.getState() == ProcedureState.WAITING_TIMEOUT) { waitingTimeout.add(proc); @@ -1124,11 +1127,11 @@ public class ProcedureExecutor { if (LOG.isTraceEnabled()) { LOG.trace("Yield procedure: " + procedure + ": " + e.getMessage()); } - runnables.yield(procedure); + scheduler.yield(procedure); return; } catch (InterruptedException e) { handleInterruptedException(procedure, e); - runnables.yield(procedure); + scheduler.yield(procedure); return; } catch (Throwable e) { // Catch NullPointerExceptions or similar errors... @@ -1205,7 +1208,7 @@ public class ProcedureExecutor { // if the procedure is kind enough to pass the slot to someone else, yield if (procedure.getState() == ProcedureState.RUNNABLE && procedure.isYieldAfterExecutionStep(getEnvironment())) { - runnables.yield(procedure); + scheduler.yield(procedure); return; } @@ -1218,7 +1221,7 @@ public class ProcedureExecutor { Procedure subproc = subprocs[i]; assert !procedures.containsKey(subproc.getProcId()); procedures.put(subproc.getProcId(), subproc); - runnables.addFront(subproc); + scheduler.addFront(subproc); } } @@ -1236,7 +1239,7 @@ public class ProcedureExecutor { if (parent.childrenCountDown() && parent.getState() == ProcedureState.WAITING) { parent.setState(ProcedureState.RUNNABLE); store.update(parent); - runnables.addFront(parent); + scheduler.addFront(parent); if (LOG.isTraceEnabled()) { LOG.trace(parent + " all the children finished their work, resume."); } @@ -1374,10 +1377,10 @@ public class ProcedureExecutor { // call the runnableSet completion cleanup handler try { - runnables.completionCleanup(proc); + scheduler.completionCleanup(proc); } catch (Throwable e) { // Catch NullPointerExceptions or similar errors... - LOG.error("CODE-BUG: uncatched runtime exception for runnableSet: " + runnables, e); + LOG.error("CODE-BUG: uncatched runtime exception for completion cleanup: " + proc, e); } // Notify the listeners diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java similarity index 55% rename from hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java rename to hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java index 64c41ee580e..1793158c308 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.procedure2; import com.google.common.annotations.VisibleForTesting; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -28,7 +30,23 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; */ @InterfaceAudience.Private @InterfaceStability.Evolving -public interface ProcedureRunnableSet { +public interface ProcedureScheduler { + /** + * Start the scheduler + */ + void start(); + + /** + * Stop the scheduler + */ + void stop(); + + /** + * In case the class is blocking on poll() waiting for items to be added, + * this method should awake poll() and poll() should return. + */ + void signalAll(); + /** * Inserts the specified element at the front of this queue. * @param proc the Procedure to add @@ -55,6 +73,11 @@ public interface ProcedureRunnableSet { */ void completionCleanup(Procedure proc); + /** + * @return true if there are procedures available to process, otherwise false. + */ + boolean hasRunnables(); + /** * Fetch one Procedure from the queue * @return the Procedure to execute, or null if nothing present. @@ -62,20 +85,53 @@ public interface ProcedureRunnableSet { Procedure poll(); /** - * In case the class is blocking on poll() waiting for items to be added, - * this method should awake poll() and poll() should return. + * Fetch one Procedure from the queue + * @param timeout how long to wait before giving up, in units of unit + * @param unit a TimeUnit determining how to interpret the timeout parameter + * @return the Procedure to execute, or null if nothing present. */ - void signalAll(); + Procedure poll(long timeout, TimeUnit unit); /** - * Returns the number of elements in this collection. - * @return the number of elements in this collection. + * Mark the event has not ready. + * procedures calling waitEvent() will be suspended. + * @param event the event to mark as suspended/not ready + */ + void suspendEvent(ProcedureEvent event); + + /** + * Wake every procedure waiting for the specified event + * (By design each event has only one "wake" caller) + * @param event the event to wait + */ + void wakeEvent(ProcedureEvent event); + + /** + * Wake every procedure waiting for the specified events. + * (By design each event has only one "wake" caller) + * @param count the number of events in the array to wake + * @param events the list of events to wake + */ + void wakeEvents(int count, ProcedureEvent... events); + + /** + * Suspend the procedure if the event is not ready yet. + * @param event the event to wait on + * @param procedure the procedure waiting on the event + * @return true if the procedure has to wait for the event to be ready, false otherwise. + */ + boolean waitEvent(ProcedureEvent event, Procedure procedure); + + /** + * Returns the number of elements in this queue. + * @return the number of elements in this queue. */ @VisibleForTesting int size(); /** - * Removes all of the elements from this collection. + * Removes all of the elements from the queue */ + @VisibleForTesting void clear(); } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java deleted file mode 100644 index d23680dd367..00000000000 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.procedure2; - -import java.util.ArrayDeque; -import java.util.Deque; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; - -/** - * Simple runqueue for the procedures - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class ProcedureSimpleRunQueue implements ProcedureRunnableSet { - private final Deque runnables = new ArrayDeque(); - private final ReentrantLock lock = new ReentrantLock(); - private final Condition waitCond = lock.newCondition(); - - @Override - public void addFront(final Procedure proc) { - lock.lock(); - try { - runnables.addFirst(proc); - waitCond.signal(); - } finally { - lock.unlock(); - } - } - - @Override - public void addBack(final Procedure proc) { - lock.lock(); - try { - runnables.addLast(proc); - waitCond.signal(); - } finally { - lock.unlock(); - } - } - - @Override - public void yield(final Procedure proc) { - addBack(proc); - } - - @Override - @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") - public Procedure poll() { - lock.lock(); - try { - if (runnables.isEmpty()) { - waitCond.await(); - if (!runnables.isEmpty()) { - return runnables.pop(); - } - } else { - return runnables.pop(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return null; - } finally { - lock.unlock(); - } - return null; - } - - @Override - public void signalAll() { - lock.lock(); - try { - waitCond.signalAll(); - } finally { - lock.unlock(); - } - } - - @Override - public void clear() { - lock.lock(); - try { - runnables.clear(); - } finally { - lock.unlock(); - } - } - - @Override - public int size() { - lock.lock(); - try { - return runnables.size(); - } finally { - lock.unlock(); - } - } - - @Override - public void completionCleanup(Procedure proc) { - } -} \ No newline at end of file diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java new file mode 100644 index 00000000000..ffc8273a99b --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import java.util.ArrayDeque; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Simple scheduler for procedures + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class SimpleProcedureScheduler extends AbstractProcedureScheduler { + private final ArrayDeque runnables = new ArrayDeque(); + + @Override + protected void enqueue(final Procedure procedure, final boolean addFront) { + if (addFront) { + runnables.addFirst(procedure); + } else { + runnables.addLast(procedure); + } + } + + @Override + protected Procedure dequeue() { + return runnables.poll(); + } + + @Override + protected void clearQueue() { + runnables.clear(); + } + + @Override + public void yield(final Procedure proc) { + addBack(proc); + } + + @Override + public boolean queueHasRunnables() { + return runnables.size() > 0; + } + + @Override + public int queueSize() { + return runnables.size(); + } + + @Override + public void completionCleanup(Procedure proc) { + } +} \ No newline at end of file diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java index 0b85ff88b07..f2c7e6b0f00 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java @@ -181,7 +181,7 @@ public class ProcedureTestingUtility { public static void waitNoProcedureRunning(ProcedureExecutor procExecutor) { int stableRuns = 0; while (stableRuns < 10) { - if (procExecutor.getActiveExecutorCount() > 0 || procExecutor.getRunnableSet().size() > 0) { + if (procExecutor.getActiveExecutorCount() > 0 || procExecutor.getScheduler().size() > 0) { stableRuns = 0; Threads.sleepWithoutInterrupt(100); } else { @@ -236,7 +236,32 @@ public class ProcedureTestingUtility { return cause == null ? procInfo.getException() : cause; } - public static class TestProcedure extends Procedure { + public static class NoopProcedure extends Procedure { + public NoopProcedure() {} + + @Override + protected Procedure[] execute(TEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + return null; + } + + @Override + protected void rollback(TEnv env) throws IOException, InterruptedException { + } + + @Override + protected boolean abort(TEnv env) { return false; } + + @Override + protected void serializeStateData(final OutputStream stream) throws IOException { + } + + @Override + protected void deserializeStateData(final InputStream stream) throws IOException { + } + } + + public static class TestProcedure extends NoopProcedure { private byte[] data = null; public TestProcedure() {} @@ -269,15 +294,6 @@ public class ProcedureTestingUtility { this.data = data; } - @Override - protected Procedure[] execute(Void env) { return null; } - - @Override - protected void rollback(Void env) { } - - @Override - protected boolean abort(Void env) { return false; } - @Override protected void serializeStateData(final OutputStream stream) throws IOException { StreamUtils.writeRawVInt32(stream, data != null ? data.length : 0); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java new file mode 100644 index 00000000000..c4316466649 --- /dev/null +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; +import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@Category({MasterTests.class, SmallTests.class}) +public class TestProcedureEvents { + private static final Log LOG = LogFactory.getLog(TestProcedureEvents.class); + + private TestProcEnv procEnv; + private NoopProcedureStore procStore; + private ProcedureExecutor procExecutor; + + private HBaseCommonTestingUtility htu; + + @Before + public void setUp() throws IOException { + htu = new HBaseCommonTestingUtility(); + + procEnv = new TestProcEnv(); + procStore = new NoopProcedureStore(); + procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); + procStore.start(1); + procExecutor.start(1, true); + } + + @After + public void tearDown() throws IOException { + procExecutor.stop(); + procStore.stop(false); + procExecutor.join(); + } + + @Test(timeout=30000) + public void testTimeoutEventProcedure() throws Exception { + final int NTIMEOUTS = 5; + + TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(1000, NTIMEOUTS); + procExecutor.submitProcedure(proc); + + ProcedureTestingUtility.waitProcedure(procExecutor, proc.getProcId()); + ProcedureTestingUtility.assertIsAbortException(procExecutor.getResult(proc.getProcId())); + assertEquals(NTIMEOUTS + 1, proc.getTimeoutsCount()); + } + + public static class TestTimeoutEventProcedure extends NoopProcedure { + private final ProcedureEvent event = new ProcedureEvent("timeout-event"); + + private final AtomicInteger ntimeouts = new AtomicInteger(0); + private int maxTimeouts = 1; + + public TestTimeoutEventProcedure() {} + + public TestTimeoutEventProcedure(final int timeoutMsec, final int maxTimeouts) { + this.maxTimeouts = maxTimeouts; + setTimeout(timeoutMsec); + } + + public int getTimeoutsCount() { + return ntimeouts.get(); + } + + @Override + protected Procedure[] execute(final TestProcEnv env) throws ProcedureSuspendedException { + LOG.info("EXECUTE " + this + " ntimeouts=" + ntimeouts); + if (ntimeouts.get() > maxTimeouts) { + setAbortFailure("test", "give up after " + ntimeouts.get()); + return null; + } + + env.getProcedureScheduler().suspendEvent(event); + if (env.getProcedureScheduler().waitEvent(event, this)) { + setState(ProcedureState.WAITING_TIMEOUT); + throw new ProcedureSuspendedException(); + } + + return null; + } + + @Override + protected boolean setTimeoutFailure(final TestProcEnv env) { + int n = ntimeouts.incrementAndGet(); + LOG.info("HANDLE TIMEOUT " + this + " ntimeouts=" + n); + setState(ProcedureState.RUNNABLE); + env.getProcedureScheduler().wakeEvent(event); + return false; + } + } + + private class TestProcEnv { + public ProcedureScheduler getProcedureScheduler() { + return procExecutor.getScheduler(); + } + } +} diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSchedulerConcurrency.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSchedulerConcurrency.java new file mode 100644 index 00000000000..b8cd8ffc8f3 --- /dev/null +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSchedulerConcurrency.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ConcurrentSkipListSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.util.Threads; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@Category({MasterTests.class, MediumTests.class}) +public class TestProcedureSchedulerConcurrency { + private static final Log LOG = LogFactory.getLog(TestProcedureEvents.class); + + private SimpleProcedureScheduler procSched; + + @Before + public void setUp() throws IOException { + procSched = new SimpleProcedureScheduler(); + procSched.start(); + } + + @After + public void tearDown() throws IOException { + procSched.stop(); + } + + @Test(timeout=60000) + public void testConcurrentWaitWake() throws Exception { + testConcurrentWaitWake(false); + } + + @Test(timeout=60000) + public void testConcurrentWaitWakeBatch() throws Exception { + testConcurrentWaitWake(true); + } + + private void testConcurrentWaitWake(final boolean useWakeBatch) throws Exception { + final int WAIT_THRESHOLD = 2500; + final int NPROCS = 20; + final int NRUNS = 500; + + final ProcedureScheduler sched = procSched; + for (long i = 0; i < NPROCS; ++i) { + sched.addBack(new TestProcedureWithEvent(i)); + } + + final Thread[] threads = new Thread[4]; + final AtomicInteger waitCount = new AtomicInteger(0); + final AtomicInteger wakeCount = new AtomicInteger(0); + + final ConcurrentSkipListSet waitQueue = + new ConcurrentSkipListSet(); + threads[0] = new Thread() { + @Override + public void run() { + long lastUpdate = 0; + while (true) { + final int oldWakeCount = wakeCount.get(); + if (useWakeBatch) { + ProcedureEvent[] ev = new ProcedureEvent[waitQueue.size()]; + for (int i = 0; i < ev.length; ++i) { + ev[i] = waitQueue.pollFirst().getEvent(); + LOG.debug("WAKE BATCH " + ev[i] + " total=" + wakeCount.get()); + } + sched.wakeEvents(ev.length, ev); + wakeCount.addAndGet(ev.length); + } else { + int size = waitQueue.size(); + while (size-- > 0) { + ProcedureEvent ev = waitQueue.pollFirst().getEvent(); + sched.wakeEvent(ev); + LOG.debug("WAKE " + ev + " total=" + wakeCount.get()); + wakeCount.incrementAndGet(); + } + } + if (wakeCount.get() != oldWakeCount) { + lastUpdate = System.currentTimeMillis(); + } else if (wakeCount.get() >= NRUNS && + (System.currentTimeMillis() - lastUpdate) > WAIT_THRESHOLD) { + break; + } + Threads.sleepWithoutInterrupt(25); + } + } + }; + + for (int i = 1; i < threads.length; ++i) { + threads[i] = new Thread() { + @Override + public void run() { + while (true) { + TestProcedureWithEvent proc = (TestProcedureWithEvent)sched.poll(); + if (proc == null) continue; + + sched.suspendEvent(proc.getEvent()); + waitQueue.add(proc); + sched.waitEvent(proc.getEvent(), proc); + LOG.debug("WAIT " + proc.getEvent()); + if (waitCount.incrementAndGet() >= NRUNS) { + break; + } + } + } + }; + } + + for (int i = 0; i < threads.length; ++i) { + threads[i].start(); + } + for (int i = 0; i < threads.length; ++i) { + threads[i].join(); + } + + sched.clear(); + } + + public static class TestProcedureWithEvent extends NoopProcedure { + private final ProcedureEvent event; + + public TestProcedureWithEvent(long procId) { + setProcId(procId); + event = new ProcedureEvent("test-event procId=" + procId); + } + + public ProcedureEvent getEvent() { + return event; + } + } +} diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java index eb729397c3d..9a108a8a283 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java @@ -92,7 +92,7 @@ public class TestProcedureSuspended { // release p3 p3keyB.setThrowSuspend(false); - procExecutor.getRunnableSet().addFront(p3keyB); + procExecutor.getScheduler().addFront(p3keyB); waitAndAssertTimestamp(p1keyA, 1, 1); waitAndAssertTimestamp(p2keyA, 0, -1); waitAndAssertTimestamp(p3keyB, 2, 3); @@ -104,7 +104,7 @@ public class TestProcedureSuspended { // rollback p2 and wait until is fully completed p1keyA.setTriggerRollback(true); - procExecutor.getRunnableSet().addFront(p1keyA); + procExecutor.getScheduler().addFront(p1keyA); ProcedureTestingUtility.waitProcedure(procExecutor, p1keyA); // p2 should start and suspend @@ -115,7 +115,7 @@ public class TestProcedureSuspended { // wait until p2 is fully completed p2keyA.setThrowSuspend(false); - procExecutor.getRunnableSet().addFront(p2keyA); + procExecutor.getScheduler().addFront(p2keyA); ProcedureTestingUtility.waitProcedure(procExecutor, p2keyA); waitAndAssertTimestamp(p1keyA, 4, 60000); waitAndAssertTimestamp(p2keyA, 2, 8); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java index 6e66f761763..165179db9ee 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java @@ -24,6 +24,7 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -50,7 +51,7 @@ public class TestYieldProcedures { private static final Procedure NULL_PROC = null; private ProcedureExecutor procExecutor; - private TestRunQueue procRunnables; + private TestScheduler procRunnables; private ProcedureStore procStore; private HBaseCommonTestingUtility htu; @@ -67,7 +68,7 @@ public class TestYieldProcedures { logDir = new Path(testDir, "proc-logs"); procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); - procRunnables = new TestRunQueue(); + procRunnables = new TestScheduler(); procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), procStore, procRunnables); procStore.start(PROCEDURE_EXECUTOR_SLOTS); @@ -343,41 +344,47 @@ public class TestYieldProcedures { } } - private static class TestRunQueue extends ProcedureSimpleRunQueue { + private static class TestScheduler extends SimpleProcedureScheduler { private int completionCalls; private int addFrontCalls; private int addBackCalls; private int yieldCalls; private int pollCalls; - public TestRunQueue() {} + public TestScheduler() {} public void addFront(final Procedure proc) { - addFrontCalls++; - super.addFront(proc); - } + addFrontCalls++; + super.addFront(proc); + } - @Override - public void addBack(final Procedure proc) { - addBackCalls++; - super.addBack(proc); - } + @Override + public void addBack(final Procedure proc) { + addBackCalls++; + super.addBack(proc); + } - @Override - public void yield(final Procedure proc) { - yieldCalls++; - super.yield(proc); - } + @Override + public void yield(final Procedure proc) { + yieldCalls++; + super.yield(proc); + } - @Override - public Procedure poll() { - pollCalls++; - return super.poll(); - } + @Override + public Procedure poll() { + pollCalls++; + return super.poll(); + } - @Override - public void completionCleanup(Procedure proc) { - completionCalls++; - } + @Override + public Procedure poll(long timeout, TimeUnit unit) { + pollCalls++; + return super.poll(timeout, unit); + } + + @Override + public void completionCleanup(Procedure proc) { + completionCalls++; + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index b23ce43dbc9..af5d03dc081 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -108,7 +108,6 @@ import org.apache.hadoop.hbase.master.procedure.DispatchMergingRegionsProcedure; import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure; import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; -import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent; import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure; import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; @@ -120,6 +119,7 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost; import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.quotas.MasterQuotaManager; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java index e90813cff8a..183b41dba7d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java @@ -31,8 +31,8 @@ import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.security.User; @@ -121,10 +121,15 @@ public class MasterProcedureEnv { return master.getMasterCoprocessorHost(); } + @Deprecated public MasterProcedureScheduler getProcedureQueue() { return procSched; } + public MasterProcedureScheduler getProcedureScheduler() { + return procSched; + } + public boolean isRunning() { return master.getMasterProcedureExecutor().isRunning(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index aba82c177f7..cc9efd2caff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -24,8 +24,6 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.Arrays; import java.util.HashMap; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -40,8 +38,9 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.master.TableLockManager.TableLock; import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType; +import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler; import org.apache.hadoop.hbase.procedure2.Procedure; -import org.apache.hadoop.hbase.procedure2.ProcedureRunnableSet; +import org.apache.hadoop.hbase.procedure2.ProcedureEventQueue; import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator; import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList; import org.apache.hadoop.hbase.util.AvlUtil.AvlLinkedNode; @@ -49,8 +48,8 @@ import org.apache.hadoop.hbase.util.AvlUtil.AvlTree; import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator; /** - * ProcedureRunnableSet for the Master Procedures. - * This RunnableSet tries to provide to the ProcedureExecutor procedures + * ProcedureScheduler for the Master Procedures. + * This ProcedureScheduler tries to provide to the ProcedureExecutor procedures * that can be executed without having to wait on a lock. * Most of the master operations can be executed concurrently, if they * are operating on different tables (e.g. two create table can be performed @@ -65,12 +64,10 @@ import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator; */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class MasterProcedureScheduler implements ProcedureRunnableSet { +public class MasterProcedureScheduler extends AbstractProcedureScheduler { private static final Log LOG = LogFactory.getLog(MasterProcedureScheduler.class); private final TableLockManager lockManager; - private final ReentrantLock schedLock = new ReentrantLock(); - private final Condition schedWaitCond = schedLock.newCondition(); private final static NamespaceQueueKeyComparator NAMESPACE_QUEUE_KEY_COMPARATOR = new NamespaceQueueKeyComparator(); @@ -90,10 +87,6 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { private final int userTablePriority; private final int sysTablePriority; - // TODO: metrics - private long pollCalls = 0; - private long nullPollCalls = 0; - public MasterProcedureScheduler(final Configuration conf, final TableLockManager lockManager) { this.lockManager = lockManager; @@ -103,45 +96,24 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { userTablePriority = conf.getInt("hbase.master.procedure.queue.user.table.priority", 1); } - @Override - public void addFront(Procedure proc) { - doAdd(proc, true); - } - - @Override - public void addBack(Procedure proc) { - doAdd(proc, false); - } - @Override public void yield(final Procedure proc) { - doAdd(proc, isTableProcedure(proc)); + push(proc, isTableProcedure(proc), true); } - private void doAdd(final Procedure proc, final boolean addFront) { - doAdd(proc, addFront, true); - } - - private void doAdd(final Procedure proc, final boolean addFront, final boolean notify) { - schedLock.lock(); - try { - if (isTableProcedure(proc)) { - doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront); - } else if (isServerProcedure(proc)) { - doAdd(serverRunQueue, getServerQueue(getServerName(proc)), proc, addFront); - } else { - // TODO: at the moment we only have Table and Server procedures - // if you are implementing a non-table/non-server procedure, you have two options: create - // a group for all the non-table/non-server procedures or try to find a key for your - // non-table/non-server procedures and implement something similar to the TableRunQueue. - throw new UnsupportedOperationException( - "RQs for non-table/non-server procedures are not implemented yet: " + proc); - } - if (notify) { - schedWaitCond.signal(); - } - } finally { - schedLock.unlock(); + @Override + protected void enqueue(final Procedure proc, final boolean addFront) { + if (isTableProcedure(proc)) { + doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront); + } else if (isServerProcedure(proc)) { + doAdd(serverRunQueue, getServerQueue(getServerName(proc)), proc, addFront); + } else { + // TODO: at the moment we only have Table and Server procedures + // if you are implementing a non-table/non-server procedure, you have two options: create + // a group for all the non-table/non-server procedures or try to find a key for your + // non-table/non-server procedures and implement something similar to the TableRunQueue. + throw new UnsupportedOperationException( + "RQs for non-table/non-server procedures are not implemented yet: " + proc); } } @@ -165,49 +137,22 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } @Override - public Procedure poll() { - return poll(-1); + protected boolean queueHasRunnables() { + return tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables(); } - @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") - protected Procedure poll(long waitNsec) { - Procedure pollResult = null; - schedLock.lock(); - try { - if (!hasRunnables()) { - if (waitNsec < 0) { - schedWaitCond.await(); - } else { - schedWaitCond.awaitNanos(waitNsec); - } - if (!hasRunnables()) { - return null; - } - } - - // For now, let server handling have precedence over table handling; presumption is that it - // is more important handling crashed servers than it is running the - // enabling/disabling tables, etc. - pollResult = doPoll(serverRunQueue); - if (pollResult == null) { - pollResult = doPoll(tableRunQueue); - } - - // update metrics - pollCalls++; - nullPollCalls += (pollResult == null) ? 1 : 0; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } finally { - schedLock.unlock(); + @Override + protected Procedure dequeue() { + // For now, let server handling have precedence over table handling; presumption is that it + // is more important handling crashed servers than it is running the + // enabling/disabling tables, etc. + Procedure pollResult = doPoll(serverRunQueue); + if (pollResult == null) { + pollResult = doPoll(tableRunQueue); } return pollResult; } - private boolean hasRunnables() { - return tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables(); - } - private > Procedure doPoll(final FairQueue fairq) { final Queue rq = fairq.poll(); if (rq == null || !rq.isAvailable()) { @@ -239,24 +184,18 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } @Override - public void clear() { - // NOTE: USED ONLY FOR TESTING - schedLock.lock(); - try { - // Remove Servers - for (int i = 0; i < serverBuckets.length; ++i) { - clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR); - serverBuckets[i] = null; - } - - // Remove Tables - clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR); - tableMap = null; - - assert size() == 0 : "expected queue size to be 0, got " + size(); - } finally { - schedLock.unlock(); + public void clearQueue() { + // Remove Servers + for (int i = 0; i < serverBuckets.length; ++i) { + clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR); + serverBuckets[i] = null; } + + // Remove Tables + clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR); + tableMap = null; + + assert size() == 0 : "expected queue size to be 0, got " + size(); } private , TNode extends Queue> void clear(TNode treeMap, @@ -269,48 +208,25 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } } - private void wakePollIfNeeded(final int waitingCount) { - if (waitingCount > 1) { - schedWaitCond.signalAll(); - } else if (waitingCount > 0) { - schedWaitCond.signal(); - } - } - @Override - public void signalAll() { - schedLock.lock(); - try { - schedWaitCond.signalAll(); - } finally { - schedLock.unlock(); - } - } + public int queueSize() { + int count = 0; - @Override - public int size() { - schedLock.lock(); - try { - int count = 0; - - // Server queues - final AvlTreeIterator serverIter = new AvlTreeIterator(); - for (int i = 0; i < serverBuckets.length; ++i) { - serverIter.seekFirst(serverBuckets[i]); - while (serverIter.hasNext()) { - count += serverIter.next().size(); - } + // Server queues + final AvlTreeIterator serverIter = new AvlTreeIterator(); + for (int i = 0; i < serverBuckets.length; ++i) { + serverIter.seekFirst(serverBuckets[i]); + while (serverIter.hasNext()) { + count += serverIter.next().size(); } - - // Table queues - final AvlTreeIterator tableIter = new AvlTreeIterator(tableMap); - while (tableIter.hasNext()) { - count += tableIter.next().size(); - } - return count; - } finally { - schedLock.unlock(); } + + // Table queues + final AvlTreeIterator tableIter = new AvlTreeIterator(tableMap); + while (tableIter.hasNext()) { + count += tableIter.next().size(); + } + return count; } @Override @@ -354,329 +270,15 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } } - // ============================================================================ - // TODO: Metrics - // ============================================================================ - public long getPollCalls() { - return pollCalls; - } - - public long getNullPollCalls() { - return nullPollCalls; - } - - // ============================================================================ - // Event Helpers - // ============================================================================ - /** - * Suspend the procedure if the event is not ready yet. - * @param event the event to wait on - * @param procedure the procedure waiting on the event - * @return true if the procedure has to wait for the event to be ready, false otherwise. - */ - public boolean waitEvent(final ProcedureEvent event, final Procedure procedure) { - return waitEvent(event, procedure, false); - } - - /** - * Suspend the procedure if the event is not ready yet. - * @param event the event to wait on - * @param procedure the procedure waiting on the event - * @param suspendQueue true if the entire queue of the procedure should be suspended - * @return true if the procedure has to wait for the event to be ready, false otherwise. - */ - public boolean waitEvent(final ProcedureEvent event, final Procedure procedure, - final boolean suspendQueue) { - return waitEvent(event, /* lockEvent= */false, procedure, suspendQueue); - } - - private boolean waitEvent(final ProcedureEvent event, final boolean lockEvent, - final Procedure procedure, final boolean suspendQueue) { - synchronized (event) { - if (event.isReady()) { - if (lockEvent) { - event.setReady(false); - } - return false; - } - - if (!suspendQueue) { - suspendProcedure(event, procedure); - } else if (isTableProcedure(procedure)) { - waitTableEvent(event, procedure); - } else if (isServerProcedure(procedure)) { - waitServerEvent(event, procedure); - } else { - // TODO: at the moment we only have Table and Server procedures - // if you are implementing a non-table/non-server procedure, you have two options: create - // a group for all the non-table/non-server procedures or try to find a key for your - // non-table/non-server procedures and implement something similar to the TableRunQueue. - throw new UnsupportedOperationException( - "RQs for non-table/non-server procedures are not implemented yet: " + procedure); - } - } - return true; - } - - private void waitTableEvent(final ProcedureEvent event, final Procedure procedure) { - final TableName tableName = getTableName(procedure); - final boolean isDebugEnabled = LOG.isDebugEnabled(); - - schedLock.lock(); - try { - TableQueue queue = getTableQueue(tableName); - queue.addFront(procedure); - if (queue.isSuspended()) return; - - if (isDebugEnabled) { - LOG.debug("Suspend table queue " + tableName); - } - queue.setSuspended(true); - removeFromRunQueue(tableRunQueue, queue); - event.suspendTableQueue(queue); - } finally { - schedLock.unlock(); - } - } - - private void waitServerEvent(final ProcedureEvent event, final Procedure procedure) { - final ServerName serverName = getServerName(procedure); - final boolean isDebugEnabled = LOG.isDebugEnabled(); - - schedLock.lock(); - try { - // TODO: This will change once we have the new AM - ServerQueue queue = getServerQueue(serverName); - queue.addFront(procedure); - if (queue.isSuspended()) return; - - if (isDebugEnabled) { - LOG.debug("Suspend server queue " + serverName); - } - queue.setSuspended(true); - removeFromRunQueue(serverRunQueue, queue); - event.suspendServerQueue(queue); - } finally { - schedLock.unlock(); - } - } - - /** - * Mark the event has not ready. - * procedures calling waitEvent() will be suspended. - * @param event the event to mark as suspended/not ready - */ - public void suspendEvent(final ProcedureEvent event) { - final boolean isTraceEnabled = LOG.isTraceEnabled(); - synchronized (event) { - event.setReady(false); - if (isTraceEnabled) { - LOG.trace("Suspend event " + event); - } - } - } - - /** - * Wake every procedure waiting for the specified event - * (By design each event has only one "wake" caller) - * @param event the event to wait - */ - public void wakeEvent(final ProcedureEvent event) { - final boolean isTraceEnabled = LOG.isTraceEnabled(); - synchronized (event) { - event.setReady(true); - if (isTraceEnabled) { - LOG.trace("Wake event " + event); - } - - schedLock.lock(); - try { - final int waitingCount = popEventWaitingObjects(event); - wakePollIfNeeded(waitingCount); - } finally { - schedLock.unlock(); - } - } - } - - /** - * Wake every procedure waiting for the specified events. - * (By design each event has only one "wake" caller) - * @param events the list of events to wake - * @param count the number of events in the array to wake - */ - public void wakeEvents(final ProcedureEvent[] events, final int count) { - final boolean isTraceEnabled = LOG.isTraceEnabled(); - schedLock.lock(); - try { - int waitingCount = 0; - for (int i = 0; i < count; ++i) { - final ProcedureEvent event = events[i]; - synchronized (event) { - event.setReady(true); - if (isTraceEnabled) { - LOG.trace("Wake event " + event); - } - waitingCount += popEventWaitingObjects(event); - } - } - wakePollIfNeeded(waitingCount); - } finally { - schedLock.unlock(); - } - } - - private int popEventWaitingObjects(final ProcedureEvent event) { - int count = 0; - while (event.hasWaitingTables()) { - final Queue queue = event.popWaitingTable(); - queue.setSuspended(false); - addToRunQueue(tableRunQueue, queue); - count += queue.size(); - } - // TODO: This will change once we have the new AM - while (event.hasWaitingServers()) { - final Queue queue = event.popWaitingServer(); - queue.setSuspended(false); - addToRunQueue(serverRunQueue, queue); - count += queue.size(); - } - - while (event.hasWaitingProcedures()) { - wakeProcedure(event.popWaitingProcedure(false)); - count++; - } - return count; - } - - private void suspendProcedure(final BaseProcedureEvent event, final Procedure procedure) { - procedure.suspend(); - event.suspendProcedure(procedure); - } - - private void wakeProcedure(final Procedure procedure) { - procedure.resume(); - doAdd(procedure, /* addFront= */ true, /* notify= */false); - } - - private static abstract class BaseProcedureEvent { - private ArrayDeque waitingProcedures = null; - - protected void suspendProcedure(final Procedure proc) { - if (waitingProcedures == null) { - waitingProcedures = new ArrayDeque(); - } - waitingProcedures.addLast(proc); - } - - protected boolean hasWaitingProcedures() { - return waitingProcedures != null; - } - - protected Procedure popWaitingProcedure(final boolean popFront) { - // it will be nice to use IterableList on a procedure and avoid allocations... - Procedure proc = popFront ? waitingProcedures.removeFirst() : waitingProcedures.removeLast(); - if (waitingProcedures.isEmpty()) { - waitingProcedures = null; - } - return proc; - } - - @VisibleForTesting - protected synchronized int size() { - if (waitingProcedures != null) { - return waitingProcedures.size(); - } - return 0; - } - } - - public static class ProcedureEvent extends BaseProcedureEvent { - private final String description; - - private Queue waitingServers = null; - private Queue waitingTables = null; - private boolean ready = false; - - protected ProcedureEvent() { - this(null); - } - - public ProcedureEvent(final String description) { - this.description = description; - } - - public synchronized boolean isReady() { - return ready; - } - - private synchronized void setReady(boolean isReady) { - this.ready = isReady; - } - - private void suspendTableQueue(Queue queue) { - waitingTables = AvlIterableList.append(waitingTables, queue); - } - - private void suspendServerQueue(Queue queue) { - waitingServers = AvlIterableList.append(waitingServers, queue); - } - - private boolean hasWaitingTables() { - return waitingTables != null; - } - - private Queue popWaitingTable() { - Queue node = waitingTables; - waitingTables = AvlIterableList.remove(waitingTables, node); - return node; - } - - private boolean hasWaitingServers() { - return waitingServers != null; - } - - private Queue popWaitingServer() { - Queue node = waitingServers; - waitingServers = AvlIterableList.remove(waitingServers, node); - return node; - } - - protected String getDescription() { - if (description == null) { - // you should override this method if you are using the default constructor - throw new UnsupportedOperationException(); - } - return description; - } - - @VisibleForTesting - protected synchronized int size() { - int count = super.size(); - if (waitingTables != null) { - count += waitingTables.size(); - } - if (waitingServers != null) { - count += waitingServers.size(); - } - return count; - } - - @Override - public String toString() { - return String.format("%s(%s)", getClass().getSimpleName(), getDescription()); - } - } - // ============================================================================ // Table Queue Lookup Helpers // ============================================================================ private TableQueue getTableQueueWithLock(TableName tableName) { - schedLock.lock(); + schedLock(); try { return getTableQueue(tableName); } finally { - schedLock.unlock(); + schedUnlock(); } } @@ -727,11 +329,11 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { // Server Queue Lookup Helpers // ============================================================================ private ServerQueue getServerQueueWithLock(ServerName serverName) { - schedLock.lock(); + schedLock(); try { return getServerQueue(serverName); } finally { - schedLock.unlock(); + schedUnlock(); } } @@ -790,7 +392,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } } - private static class RegionEvent extends BaseProcedureEvent { + private static class RegionEvent extends ProcedureEventQueue { private final HRegionInfo regionInfo; private long exclusiveLockProcIdOwner = Long.MIN_VALUE; @@ -823,7 +425,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { @Override public String toString() { - return String.format("region %s event", regionInfo.getRegionNameAsString()); + return "RegionEvent(" + regionInfo.getRegionNameAsString() + ")"; } } @@ -1046,33 +648,33 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { * @return true if we were able to acquire the lock on the table, otherwise false. */ public boolean tryAcquireTableExclusiveLock(final Procedure procedure, final TableName table) { - schedLock.lock(); + schedLock(); TableQueue queue = getTableQueue(table); if (!queue.getNamespaceQueue().trySharedLock()) { - schedLock.unlock(); + schedUnlock(); return false; } if (!queue.tryExclusiveLock(procedure)) { queue.getNamespaceQueue().releaseSharedLock(); - schedLock.unlock(); + schedUnlock(); return false; } removeFromRunQueue(tableRunQueue, queue); boolean hasParentLock = queue.hasParentLock(procedure); - schedLock.unlock(); + schedUnlock(); boolean hasXLock = true; if (!hasParentLock) { // Zk lock is expensive... hasXLock = queue.tryZkExclusiveLock(lockManager, procedure.toString()); if (!hasXLock) { - schedLock.lock(); + schedLock(); if (!hasParentLock) queue.releaseExclusiveLock(); queue.getNamespaceQueue().releaseSharedLock(); addToRunQueue(tableRunQueue, queue); - schedLock.unlock(); + schedUnlock(); } } return hasXLock; @@ -1092,11 +694,11 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { queue.releaseZkExclusiveLock(lockManager); } - schedLock.lock(); + schedLock(); if (!hasParentLock) queue.releaseExclusiveLock(); queue.getNamespaceQueue().releaseSharedLock(); addToRunQueue(tableRunQueue, queue); - schedLock.unlock(); + schedUnlock(); } /** @@ -1112,7 +714,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { private TableQueue tryAcquireTableQueueSharedLock(final Procedure procedure, final TableName table) { - schedLock.lock(); + schedLock(); TableQueue queue = getTableQueue(table); if (!queue.getNamespaceQueue().trySharedLock()) { return null; @@ -1120,7 +722,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { if (!queue.trySharedLock()) { queue.getNamespaceQueue().releaseSharedLock(); - schedLock.unlock(); + schedUnlock(); return null; } @@ -1129,11 +731,11 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { if (!queue.tryZkSharedLock(lockManager, procedure.toString())) { queue.releaseSharedLock(); queue.getNamespaceQueue().releaseSharedLock(); - schedLock.unlock(); + schedUnlock(); return null; } - schedLock.unlock(); + schedUnlock(); return queue; } @@ -1146,7 +748,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { public void releaseTableSharedLock(final Procedure procedure, final TableName table) { final TableQueue queue = getTableQueueWithLock(table); - schedLock.lock(); + schedLock(); // Zk lock is expensive... queue.releaseZkSharedLock(lockManager); @@ -1154,7 +756,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { if (queue.releaseSharedLock()) { addToRunQueue(tableRunQueue, queue); } - schedLock.unlock(); + schedUnlock(); } /** @@ -1168,8 +770,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { */ @VisibleForTesting protected boolean markTableAsDeleted(final TableName table, final Procedure procedure) { - final ReentrantLock l = schedLock; - l.lock(); + schedLock(); try { TableQueue queue = getTableQueue(table); if (queue == null) return true; @@ -1193,7 +794,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { return false; } } finally { - l.unlock(); + schedUnlock(); } return true; } @@ -1298,7 +899,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } // awake procedures if any - schedLock.lock(); + schedLock(); try { for (int i = numProcs - 1; i >= 0; --i) { wakeProcedure(nextProcs[i]); @@ -1312,7 +913,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { releaseTableSharedLock(procedure, table); } } finally { - schedLock.unlock(); + schedUnlock(); } } @@ -1327,7 +928,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { * @return true if we were able to acquire the lock on the namespace, otherwise false. */ public boolean tryAcquireNamespaceExclusiveLock(final Procedure procedure, final String nsName) { - schedLock.lock(); + schedLock(); try { TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME); if (!tableQueue.trySharedLock()) return false; @@ -1339,7 +940,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } return hasLock; } finally { - schedLock.unlock(); + schedUnlock(); } } @@ -1350,7 +951,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { * @param nsName the namespace that has the exclusive lock */ public void releaseNamespaceExclusiveLock(final Procedure procedure, final String nsName) { - schedLock.lock(); + schedLock(); try { final TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME); final NamespaceQueue queue = getNamespaceQueue(nsName); @@ -1360,7 +961,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { addToRunQueue(tableRunQueue, tableQueue); } } finally { - schedLock.unlock(); + schedUnlock(); } } @@ -1376,7 +977,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { */ public boolean tryAcquireServerExclusiveLock(final Procedure procedure, final ServerName serverName) { - schedLock.lock(); + schedLock(); try { ServerQueue queue = getServerQueue(serverName); if (queue.tryExclusiveLock(procedure)) { @@ -1384,7 +985,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { return true; } } finally { - schedLock.unlock(); + schedUnlock(); } return false; } @@ -1397,13 +998,13 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { */ public void releaseServerExclusiveLock(final Procedure procedure, final ServerName serverName) { - schedLock.lock(); + schedLock(); try { ServerQueue queue = getServerQueue(serverName); queue.releaseExclusiveLock(); addToRunQueue(serverRunQueue, queue); } finally { - schedLock.unlock(); + schedUnlock(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java index 4c538454b51..1b434feea52 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java @@ -18,11 +18,6 @@ package org.apache.hadoop.hbase.master.procedure; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.concurrent.atomic.AtomicInteger; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -34,17 +29,15 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; -import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Threads; + import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -200,85 +193,4 @@ public class TestMasterProcedureEvents { } return null; } - - @Test(timeout=30000) - public void testTimeoutEventProcedure() throws Exception { - HMaster master = UTIL.getMiniHBaseCluster().getMaster(); - ProcedureExecutor procExec = master.getMasterProcedureExecutor(); - MasterProcedureScheduler procSched = procExec.getEnvironment().getProcedureQueue(); - - TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(1000, 5); - procExec.submitProcedure(proc); - - ProcedureTestingUtility.waitProcedure(procExec, proc.getProcId()); - ProcedureTestingUtility.assertIsAbortException(procExec.getResult(proc.getProcId())); - } - - public static class TestTimeoutEventProcedure - extends Procedure implements TableProcedureInterface { - private final ProcedureEvent event = new ProcedureEvent("timeout-event"); - - private final AtomicInteger ntimeouts = new AtomicInteger(0); - private int maxTimeouts = 1; - - public TestTimeoutEventProcedure() {} - - public TestTimeoutEventProcedure(final int timeoutMsec, final int maxTimeouts) { - this.maxTimeouts = maxTimeouts; - setTimeout(timeoutMsec); - setOwner("test"); - } - - @Override - protected Procedure[] execute(final MasterProcedureEnv env) - throws ProcedureSuspendedException { - LOG.info("EXECUTE " + this + " ntimeouts=" + ntimeouts); - if (ntimeouts.get() > maxTimeouts) { - setAbortFailure("test", "give up after " + ntimeouts.get()); - return null; - } - - env.getProcedureQueue().suspendEvent(event); - if (env.getProcedureQueue().waitEvent(event, this)) { - setState(ProcedureState.WAITING_TIMEOUT); - throw new ProcedureSuspendedException(); - } - - return null; - } - - @Override - protected void rollback(final MasterProcedureEnv env) { - } - - @Override - protected boolean setTimeoutFailure(final MasterProcedureEnv env) { - int n = ntimeouts.incrementAndGet(); - LOG.info("HANDLE TIMEOUT " + this + " ntimeouts=" + n); - setState(ProcedureState.RUNNABLE); - env.getProcedureQueue().wakeEvent(event); - return false; - } - - @Override - public TableName getTableName() { - return TableName.valueOf("testtb"); - } - - @Override - public TableOperationType getTableOperationType() { - return TableOperationType.READ; - } - - @Override - protected boolean abort(MasterProcedureEnv env) { return false; } - - @Override - protected void serializeStateData(final OutputStream stream) throws IOException { - } - - @Override - protected void deserializeStateData(final InputStream stream) throws IOException { - } - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java index 12f8263f6ec..bcacb4833d9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java @@ -31,8 +31,8 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.master.TableLockManager; -import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -540,40 +540,6 @@ public class TestMasterProcedureScheduler { queue.releaseTableExclusiveLock(rootProc, tableName); } - @Test - public void testSuspendedTableQueue() throws Exception { - final TableName tableName = TableName.valueOf("testSuspendedQueue"); - - queue.addBack(new TestTableProcedure(1, tableName, - TableProcedureInterface.TableOperationType.EDIT)); - queue.addBack(new TestTableProcedure(2, tableName, - TableProcedureInterface.TableOperationType.EDIT)); - - Procedure proc = queue.poll(); - assertEquals(1, proc.getProcId()); - assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName)); - - // Suspend - // TODO: If we want to keep the zk-lock we need to retain the lock on suspend - ProcedureEvent event = new ProcedureEvent("testSuspendedTableQueueEvent"); - assertEquals(true, queue.waitEvent(event, proc, true)); - queue.releaseTableExclusiveLock(proc, tableName); - assertEquals(null, queue.poll(0)); - - // Resume - queue.wakeEvent(event); - - proc = queue.poll(); - assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName)); - assertEquals(1, proc.getProcId()); - queue.releaseTableExclusiveLock(proc, tableName); - - proc = queue.poll(); - assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName)); - assertEquals(2, proc.getProcId()); - queue.releaseTableExclusiveLock(proc, tableName); - } - @Test public void testSuspendedProcedure() throws Exception { final TableName tableName = TableName.valueOf("testSuspendedProcedure"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java index 380067d5e58..d6ddd131f82 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java @@ -23,23 +23,18 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.ConcurrentSkipListSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.master.TableLockManager; -import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent; import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestTableProcedure; -import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestTableProcedureWithEvent; import org.apache.hadoop.hbase.procedure2.Procedure; -import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MasterTests; -import org.apache.hadoop.hbase.util.Threads; + import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -212,92 +207,6 @@ public class TestMasterProcedureSchedulerConcurrency { } } - @Test(timeout=60000) - public void testConcurrentWaitWake() throws Exception { - testConcurrentWaitWake(false); - } - - @Test(timeout=60000) - public void testConcurrentWaitWakeBatch() throws Exception { - testConcurrentWaitWake(true); - } - - private void testConcurrentWaitWake(final boolean useWakeBatch) throws Exception { - final TableName tableName = TableName.valueOf("testtb"); - - final int NPROCS = 20; - final int NRUNS = 100; - - for (long i = 0; i < NPROCS; ++i) { - queue.addBack(new TestTableProcedureWithEvent(i, tableName, - TableProcedureInterface.TableOperationType.READ)); - } - - final Thread[] threads = new Thread[4]; - final AtomicInteger waitCount = new AtomicInteger(0); - final AtomicInteger wakeCount = new AtomicInteger(0); - - final ConcurrentSkipListSet waitQueue = - new ConcurrentSkipListSet(); - threads[0] = new Thread() { - @Override - public void run() { - while (true) { - if (useWakeBatch) { - ProcedureEvent[] ev = new ProcedureEvent[waitQueue.size()]; - for (int i = 0; i < ev.length; ++i) { - ev[i] = waitQueue.pollFirst().getEvent(); - LOG.debug("WAKE " + ev[i] + " total=" + wakeCount.get()); - } - queue.wakeEvents(ev, ev.length); - wakeCount.addAndGet(ev.length); - } else { - int size = waitQueue.size(); - while (size-- > 0) { - ProcedureEvent ev = waitQueue.pollFirst().getEvent(); - queue.wakeEvent(ev); - LOG.debug("WAKE " + ev + " total=" + wakeCount.get()); - wakeCount.incrementAndGet(); - } - } - if (wakeCount.get() >= NRUNS) { - break; - } - Threads.sleepWithoutInterrupt(25); - } - } - }; - - for (int i = 1; i < threads.length; ++i) { - threads[i] = new Thread() { - @Override - public void run() { - while (true) { - TestTableProcedureWithEvent proc = (TestTableProcedureWithEvent)queue.poll(); - if (proc == null) continue; - - waitQueue.add(proc); - queue.suspendEvent(proc.getEvent()); - queue.waitEvent(proc.getEvent(), proc); - LOG.debug("WAIT " + proc.getEvent()); - if (waitCount.incrementAndGet() >= NRUNS) { - break; - } - } - } - }; - } - - for (int i = 0; i < threads.length; ++i) { - threads[i].start(); - } - for (int i = 0; i < threads.length; ++i) { - threads[i].join(); - } - - queue.clear(); - } - public static class TestTableProcSet { private final MasterProcedureScheduler queue;