diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java index 3bff8b893be..3e474513d83 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.procedure2; +import java.util.Iterator; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.TimeUnit; @@ -79,10 +80,31 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { */ protected abstract void enqueue(Procedure procedure, boolean addFront); + @Override public void addFront(final Procedure procedure) { push(procedure, true, true); } + @Override + public void addFront(Iterator procedureIterator) { + schedLock(); + try { + int count = 0; + while (procedureIterator.hasNext()) { + Procedure procedure = procedureIterator.next(); + if (LOG.isTraceEnabled()) { + LOG.trace("Wake " + procedure); + } + push(procedure, /* addFront= */ true, /* notify= */false); + count++; + } + wakePollIfNeeded(count); + } finally { + schedUnlock(); + } + } + + @Override public void addBack(final Procedure procedure) { push(procedure, false, true); } @@ -206,61 +228,22 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { // ========================================================================== // Procedure Events // ========================================================================== - @Override - public boolean waitEvent(final ProcedureEvent event, final Procedure procedure) { - synchronized (event) { - if (event.isReady()) { - return false; - } - waitProcedure(event.getSuspendedProcedures(), procedure); - return true; - } - } - @Override - public void suspendEvent(final ProcedureEvent event) { - final boolean traceEnabled = LOG.isTraceEnabled(); - synchronized (event) { - event.setReady(false); - if (traceEnabled) { - LOG.trace("Suspend " + event); - } - } - } - - @Override - public void wakeEvent(final ProcedureEvent event) { - wakeEvents(1, event); - } - - @Override - public void wakeEvents(final int count, final ProcedureEvent... events) { - final boolean traceEnabled = LOG.isTraceEnabled(); + /** + * Wake up all of the given events. + * Note that we first take scheduler lock and then wakeInternal() synchronizes on the event. + * Access should remain package-private. Use ProcedureEvent class to wake/suspend events. + * @param events the list of events to wake + */ + void wakeEvents(ProcedureEvent[] events) { schedLock(); try { - int waitingCount = 0; - for (int i = 0; i < count; ++i) { - final ProcedureEvent event = events[i]; - synchronized (event) { - if (!event.isReady()) { - // Only set ready if we were not ready; i.e. suspended. Otherwise, we double-wake - // on this event and down in wakeWaitingProcedures, we double decrement this - // finish which messes up child procedure accounting. - event.setReady(true); - if (traceEnabled) { - LOG.trace("Unsuspend " + event); - } - waitingCount += wakeWaitingProcedures(event.getSuspendedProcedures()); - } else { - ProcedureDeque q = event.getSuspendedProcedures(); - if (q != null && !q.isEmpty()) { - LOG.warn("Q is not empty! size=" + q.size() + "; PROCESSING..."); - waitingCount += wakeWaitingProcedures(event.getSuspendedProcedures()); - } - } + for (ProcedureEvent event : events) { + if (event == null) { + continue; } + event.wakeInternal(this); } - wakePollIfNeeded(waitingCount); } finally { schedUnlock(); } @@ -275,9 +258,8 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { // wakeProcedure adds to the front of queue, so we start from last in the // waitQueue' queue, so that the procedure which was added first goes in the front for // the scheduler queue. - while (!waitQueue.isEmpty()) { - wakeProcedure(waitQueue.removeLast()); - } + addFront(waitQueue.descendingIterator()); + waitQueue.clear(); return count; } @@ -290,6 +272,7 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { push(procedure, /* addFront= */ true, /* notify= */false); } + // ========================================================================== // Internal helpers // ========================================================================== diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java index d3e076c8e44..20803f453fe 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,15 +18,20 @@ package org.apache.hadoop.hbase.procedure2; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; + /** * Basic ProcedureEvent that contains an "object", which can be a description or a reference to the * resource to wait on, and a queue for suspended procedures. - * Access to suspended procedures queue is 'synchronized' on the event itself. */ @InterfaceAudience.Private public class ProcedureEvent { + private static final Log LOG = LogFactory.getLog(ProcedureEvent.class); + private final T object; private boolean ready = false; private ProcedureDeque suspendedProcedures = new ProcedureDeque(); @@ -39,10 +44,74 @@ public class ProcedureEvent { return ready; } - synchronized void setReady(final boolean isReady) { - this.ready = isReady; + /** + * @return true if event is not ready and adds procedure to suspended queue, else returns false. + */ + public synchronized boolean suspendIfNotReady(Procedure proc) { + if (!ready) { + suspendedProcedures.addLast(proc); + } + return !ready; } + /** Mark the event as not ready. */ + public synchronized void suspend() { + ready = false; + if (LOG.isTraceEnabled()) { + LOG.trace("Suspend " + toString()); + } + } + + /** + * Wakes up the suspended procedures by pushing them back into scheduler queues and sets the + * event as ready. + * See {@link #wakeInternal(AbstractProcedureScheduler)} for why this is not synchronized. + */ + public void wake(AbstractProcedureScheduler procedureScheduler) { + procedureScheduler.wakeEvents(new ProcedureEvent[]{this}); + } + + /** + * Wakes up all the given events and puts the procedures waiting on them back into + * ProcedureScheduler queues. + */ + public static void wakeEvents(AbstractProcedureScheduler scheduler, ProcedureEvent ... events) { + scheduler.wakeEvents(events); + } + + /** + * Only to be used by ProcedureScheduler implementations. + * Reason: To wake up multiple events, locking sequence is + * schedLock --> synchronized (event) + * To wake up an event, both schedLock() and synchronized(event) are required. + * The order is schedLock() --> synchronized(event) because when waking up multiple events + * simultaneously, we keep the scheduler locked until all procedures suspended on these events + * have been added back to the queue (Maybe it's not required? Evaluate!) + * To avoid deadlocks, we want to keep the locking order same even when waking up single event. + * That's why, {@link #wake(AbstractProcedureScheduler)} above uses the same code path as used + * when waking up multiple events. + * Access should remain package-private. + */ + synchronized void wakeInternal(AbstractProcedureScheduler procedureScheduler) { + if (ready && !suspendedProcedures.isEmpty()) { + LOG.warn("Found procedures suspended in a ready event! Size=" + suspendedProcedures.size()); + } + ready = true; + if (LOG.isTraceEnabled()) { + LOG.trace("Unsuspend " + toString()); + } + // wakeProcedure adds to the front of queue, so we start from last in the + // waitQueue' queue, so that the procedure which was added first goes in the front for + // the scheduler queue. + procedureScheduler.addFront(suspendedProcedures.descendingIterator()); + suspendedProcedures.clear(); + } + + /** + * Access to suspendedProcedures is 'synchronized' on this object, but it's fine to return it + * here for tests. + */ + @VisibleForTesting public ProcedureDeque getSuspendedProcedures() { return suspendedProcedures; } @@ -50,6 +119,6 @@ public class ProcedureEvent { @Override public String toString() { return getClass().getSimpleName() + " for " + object + ", ready=" + isReady() + - ", " + getSuspendedProcedures(); + ", " + suspendedProcedures; } } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java index d4314d5bf4f..79367f3eeb8 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java @@ -18,11 +18,10 @@ package org.apache.hadoop.hbase.procedure2; -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; - +import com.google.common.annotations.VisibleForTesting; +import java.util.Iterator; import java.util.List; import java.util.concurrent.TimeUnit; - import org.apache.yetus.audience.InterfaceAudience; /** @@ -52,6 +51,11 @@ public interface ProcedureScheduler { */ void addFront(Procedure proc); + /** + * Inserts all elements in the iterator at the front of this queue. + */ + void addFront(Iterator procedureIterator); + /** * Inserts the specified element at the end of this queue. * @param proc the Procedure to add @@ -91,36 +95,6 @@ public interface ProcedureScheduler { */ Procedure poll(long timeout, TimeUnit unit); - /** - * Mark the event as not ready. - * Procedures calling waitEvent() will be suspended. - * @param event the event to mark as suspended/not ready - */ - void suspendEvent(ProcedureEvent event); - - /** - * Wake every procedure waiting for the specified event - * (By design each event has only one "wake" caller) - * @param event the event to wait - */ - void wakeEvent(ProcedureEvent event); - - /** - * Wake every procedure waiting for the specified events. - * (By design each event has only one "wake" caller) - * @param count the number of events in the array to wake - * @param events the list of events to wake - */ - void wakeEvents(int count, ProcedureEvent... events); - - /** - * Suspend the procedure if the event is not ready yet. - * @param event the event to wait on - * @param procedure the procedure waiting on the event - * @return true if the procedure has to wait for the event to be ready, false otherwise. - */ - boolean waitEvent(ProcedureEvent event, Procedure procedure); - /** * List lock queues. * @return the locks diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java index bd310fd9292..d2b2b7d54d5 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java @@ -26,7 +26,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseCommonTestingUtility; -import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Int32Value; @@ -107,6 +106,25 @@ public class TestProcedureEvents { ProcedureTestingUtility.assertIsAbortException(procExecutor.getResult(proc.getProcId())); } + /** + * This Event+Procedure exhibits following behavior: + *
    + *
  • On procedure execute() + *
      + *
    • If had enough timeouts, abort the procedure. Else....
    • + *
    • Suspend the event and add self to its suspend queue
    • + *
    • Go into waiting state
    • + *
    + *
  • + *
  • + * On waiting timeout + *
      + *
    • Wake the event (which adds this procedure back into scheduler queue), and set own's + * state to RUNNABLE (so can be executed again).
    • + *
    + *
  • + *
+ */ public static class TestTimeoutEventProcedure extends NoopProcedure { private final ProcedureEvent event = new ProcedureEvent("timeout-event"); @@ -132,8 +150,8 @@ public class TestProcedureEvents { return null; } - env.getProcedureScheduler().suspendEvent(event); - if (env.getProcedureScheduler().waitEvent(event, this)) { + event.suspend(); + if (event.suspendIfNotReady(this)) { setState(ProcedureState.WAITING_TIMEOUT); throw new ProcedureSuspendedException(); } @@ -146,15 +164,15 @@ public class TestProcedureEvents { int n = ntimeouts.incrementAndGet(); LOG.info("HANDLE TIMEOUT " + this + " ntimeouts=" + n); setState(ProcedureState.RUNNABLE); - env.getProcedureScheduler().wakeEvent(event); + event.wake((AbstractProcedureScheduler) env.getProcedureScheduler()); return false; } @Override protected void afterReplay(final TestProcEnv env) { if (getState() == ProcedureState.WAITING_TIMEOUT) { - env.getProcedureScheduler().suspendEvent(event); - env.getProcedureScheduler().waitEvent(event, this); + event.suspend(); + event.suspendIfNotReady(this); } } diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSchedulerConcurrency.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSchedulerConcurrency.java index 42176937e13..1c8f1ebb66c 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSchedulerConcurrency.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSchedulerConcurrency.java @@ -92,13 +92,13 @@ public class TestProcedureSchedulerConcurrency { ev[i] = waitQueue.pollFirst().getEvent(); LOG.debug("WAKE BATCH " + ev[i] + " total=" + wakeCount.get()); } - sched.wakeEvents(ev.length, ev); + ProcedureEvent.wakeEvents((AbstractProcedureScheduler) sched, ev); wakeCount.addAndGet(ev.length); } else { int size = waitQueue.size(); while (size-- > 0) { ProcedureEvent ev = waitQueue.pollFirst().getEvent(); - sched.wakeEvent(ev); + ev.wake(procSched); LOG.debug("WAKE " + ev + " total=" + wakeCount.get()); wakeCount.incrementAndGet(); } @@ -122,9 +122,9 @@ public class TestProcedureSchedulerConcurrency { TestProcedureWithEvent proc = (TestProcedureWithEvent)sched.poll(); if (proc == null) continue; - sched.suspendEvent(proc.getEvent()); + proc.getEvent().suspend(); waitQueue.add(proc); - sched.waitEvent(proc.getEvent(), proc); + proc.getEvent().suspendIfNotReady(proc); LOG.debug("WAIT " + proc.getEvent()); if (waitCount.incrementAndGet() >= NRUNS) { break; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java index 1c193f9853c..6f481e50226 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java @@ -252,7 +252,7 @@ public class AssignmentManager implements ServerListener { // Update meta events (for testing) if (hasProcExecutor) { - getProcedureScheduler().suspendEvent(metaLoadEvent); + metaLoadEvent.suspend(); setFailoverCleanupDone(false); for (RegionInfo hri: getMetaRegionSet()) { setMetaInitialized(hri, false); @@ -413,17 +413,16 @@ public class AssignmentManager implements ServerListener { } public boolean waitMetaInitialized(final Procedure proc, final RegionInfo regionInfo) { - return getProcedureScheduler().waitEvent( - getMetaInitializedEvent(getMetaForRegion(regionInfo)), proc); + return getMetaInitializedEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc); } private void setMetaInitialized(final RegionInfo metaRegionInfo, final boolean isInitialized) { assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo; final ProcedureEvent metaInitEvent = getMetaInitializedEvent(metaRegionInfo); if (isInitialized) { - getProcedureScheduler().wakeEvent(metaInitEvent); + metaInitEvent.wake(getProcedureScheduler()); } else { - getProcedureScheduler().suspendEvent(metaInitEvent); + metaInitEvent.suspend(); } } @@ -434,11 +433,11 @@ public class AssignmentManager implements ServerListener { } public boolean waitMetaLoaded(final Procedure proc) { - return getProcedureScheduler().waitEvent(metaLoadEvent, proc); + return metaLoadEvent.suspendIfNotReady(proc); } protected void wakeMetaLoadedEvent() { - getProcedureScheduler().wakeEvent(metaLoadEvent); + metaLoadEvent.wake(getProcedureScheduler()); assert isMetaLoaded() : "expected meta to be loaded"; } @@ -1011,11 +1010,11 @@ public class AssignmentManager implements ServerListener { protected boolean waitServerReportEvent(final ServerName serverName, final Procedure proc) { final ServerStateNode serverNode = regionStates.getOrCreateServer(serverName); - return getProcedureScheduler().waitEvent(serverNode.getReportEvent(), proc); + return serverNode.getReportEvent().suspendIfNotReady(proc); } protected void wakeServerReportEvent(final ServerStateNode serverNode) { - getProcedureScheduler().wakeEvent(serverNode.getReportEvent()); + serverNode.getReportEvent().wake(getProcedureScheduler()); } // ============================================================================================ @@ -1588,7 +1587,7 @@ public class AssignmentManager implements ServerListener { * and each region will be assigned by a server using the balancer. */ protected void queueAssign(final RegionStateNode regionNode) { - getProcedureScheduler().suspendEvent(regionNode.getProcedureEvent()); + regionNode.getProcedureEvent().suspend(); // TODO: quick-start for meta and the other sys-tables? assignQueueLock.lock(); @@ -1787,7 +1786,7 @@ public class AssignmentManager implements ServerListener { events[evcount++] = regionNode.getProcedureEvent(); } } - getProcedureScheduler().wakeEvents(evcount, events); + ProcedureEvent.wakeEvents(getProcedureScheduler(), events); final long et = System.currentTimeMillis(); if (LOG.isTraceEnabled()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java index 6f54dcf29a6..05b8104b1d3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java @@ -190,7 +190,7 @@ public abstract class RegionTransitionProcedure // NOTE: This call to wakeEvent puts this Procedure back on the scheduler. // Thereafter, another Worker can be in here so DO NOT MESS WITH STATE beyond // this method. Just get out of this current processing quickly. - env.getProcedureScheduler().wakeEvent(regionNode.getProcedureEvent()); + regionNode.getProcedureEvent().wake(env.getProcedureScheduler()); } // else leave the procedure in suspended state; it is waiting on another call to this callback } @@ -214,7 +214,7 @@ public abstract class RegionTransitionProcedure // Put this procedure into suspended mode to wait on report of state change // from remote regionserver. Means Procedure associated ProcedureEvent is marked not 'ready'. - env.getProcedureScheduler().suspendEvent(getRegionState(env).getProcedureEvent()); + getRegionState(env).getProcedureEvent().suspend(); // Tricky because the below call to addOperationToNode can fail. If it fails, we need to // backtrack on stuff like the 'suspend' done above -- tricky as the 'wake' requeues us -- and @@ -253,7 +253,7 @@ public abstract class RegionTransitionProcedure // processing to the next stage. At an extreme, the other worker may run in // parallel so DO NOT CHANGE any state hereafter! This should be last thing // done in this processing step. - env.getProcedureScheduler().wakeEvent(regionNode.getProcedureEvent()); + regionNode.getProcedureEvent().wake(env.getProcedureScheduler()); } protected boolean isServerOnline(final MasterProcedureEnv env, final RegionStateNode regionNode) { @@ -298,7 +298,7 @@ public abstract class RegionTransitionProcedure return null; } transitionState = RegionTransitionState.REGION_TRANSITION_DISPATCH; - if (env.getProcedureScheduler().waitEvent(regionNode.getProcedureEvent(), this)) { + if (regionNode.getProcedureEvent().suspendIfNotReady(this)) { // Why this suspend? Because we want to ensure Store happens before proceed? throw new ProcedureSuspendedException(); } @@ -315,7 +315,7 @@ public abstract class RegionTransitionProcedure retry = true; break; } - if (env.getProcedureScheduler().waitEvent(regionNode.getProcedureEvent(), this)) { + if (regionNode.getProcedureEvent().suspendIfNotReady(this)) { throw new ProcedureSuspendedException(); } break; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java index c9b8ef990db..61843d81fa3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java @@ -209,7 +209,7 @@ public final class LockProcedure extends Procedure if (!event.isReady()) { // Maybe unlock() awakened the event. setState(ProcedureProtos.ProcedureState.RUNNABLE); if (LOG.isDebugEnabled()) LOG.debug("Calling wake on " + this.event); - env.getProcedureScheduler().wakeEvent(event); + event.wake(env.getProcedureScheduler()); } } return false; // false: do not mark the procedure as failed. @@ -224,7 +224,7 @@ public final class LockProcedure extends Procedure synchronized (event) { if (!event.isReady()) { setState(ProcedureProtos.ProcedureState.RUNNABLE); - env.getProcedureScheduler().wakeEvent(event); + event.wake(env.getProcedureScheduler()); } } } @@ -244,8 +244,8 @@ public final class LockProcedure extends Procedure return null; } synchronized (event) { - env.getProcedureScheduler().suspendEvent(event); - env.getProcedureScheduler().waitEvent(event, this); + event.suspend(); + event.suspendIfNotReady(this); setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); } throw new ProcedureSuspendedException(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java index f294f57b621..c9c3ac98204 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java @@ -147,25 +147,25 @@ public class MasterProcedureEnv implements ConfigurationObserver { } public boolean waitInitialized(Procedure proc) { - return procSched.waitEvent(master.getInitializedEvent(), proc); + return master.getInitializedEvent().suspendIfNotReady(proc); } public boolean waitServerCrashProcessingEnabled(Procedure proc) { if (master instanceof HMaster) { - return procSched.waitEvent(((HMaster)master).getServerCrashProcessingEnabledEvent(), proc); + return ((HMaster)master).getServerCrashProcessingEnabledEvent().suspendIfNotReady(proc); } return false; } public boolean waitFailoverCleanup(Procedure proc) { - return procSched.waitEvent(master.getAssignmentManager().getFailoverCleanupEvent(), proc); + return master.getAssignmentManager().getFailoverCleanupEvent().suspendIfNotReady(proc); } public void setEventReady(ProcedureEvent event, boolean isReady) { if (isReady) { - procSched.wakeEvent(event); + event.wake(procSched); } else { - procSched.suspendEvent(event); + event.suspend(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java index b7bc28ff0b9..b0a598ef0d1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java @@ -168,7 +168,7 @@ public class TestMasterProcedureEvents { // wake the event LOG.debug("wake " + event); - procSched.wakeEvent(event); + event.wake(procSched); assertEquals(true, event.isReady()); // wait until proc completes diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java index e2d6b0c369d..d971b5fb26a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java @@ -572,14 +572,14 @@ public class TestMasterProcedureScheduler { // suspend ProcedureEvent event = new ProcedureEvent("testSuspendedProcedureEvent"); - assertEquals(true, queue.waitEvent(event, proc)); + assertEquals(true, event.suspendIfNotReady(proc)); proc = queue.poll(); assertEquals(2, proc.getProcId()); assertEquals(null, queue.poll(0)); // resume - queue.wakeEvent(event); + event.wake(queue); proc = queue.poll(); assertEquals(1, proc.getProcId()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java index d046a132d07..035fb9e7f03 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java @@ -467,21 +467,14 @@ public class TestRegionMergeTransactionOnCluster { verifyRowCount(table, ROWSIZE); LOG.info("Verified " + table.getName()); - // Sleep here is an ugly hack to allow region transitions to finish - long timeout = System.currentTimeMillis() + waitTime; List> tableRegions; - while (System.currentTimeMillis() < timeout) { - tableRegions = MetaTableAccessor.getTableRegionsAndLocations( - TEST_UTIL.getConnection(), tablename); - LOG.info("Found " + tableRegions.size() + ", expecting " + numRegions * replication); - if (tableRegions.size() == numRegions * replication) - break; - Thread.sleep(250); - } - LOG.info("Getting regions of " + table.getName()); + TEST_UTIL.waitUntilAllRegionsAssigned(tablename); + LOG.info("All regions assigned for table - " + table.getName()); tableRegions = MetaTableAccessor.getTableRegionsAndLocations( TEST_UTIL.getConnection(), tablename); - LOG.info("Regions after load: " + Joiner.on(',').join(tableRegions)); + assertEquals("Wrong number of regions in table " + tablename, + numRegions * replication, tableRegions.size()); + LOG.info(tableRegions.size() + "Regions after load: " + Joiner.on(',').join(tableRegions)); assertEquals(numRegions * replication, tableRegions.size()); return table; }