HBASE-19319 Fix bug in synchronizing over ProcedureEvent
Also moves event related functions (wake/wait/suspend) from ProcedureScheduler to ProcedureEvent class
This commit is contained in:
parent
44bd94721c
commit
96e63ac7b8
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.procedure2;
|
package org.apache.hadoop.hbase.procedure2;
|
||||||
|
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.concurrent.locks.Condition;
|
import java.util.concurrent.locks.Condition;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -79,10 +80,31 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
|
||||||
*/
|
*/
|
||||||
protected abstract void enqueue(Procedure procedure, boolean addFront);
|
protected abstract void enqueue(Procedure procedure, boolean addFront);
|
||||||
|
|
||||||
|
@Override
|
||||||
public void addFront(final Procedure procedure) {
|
public void addFront(final Procedure procedure) {
|
||||||
push(procedure, true, true);
|
push(procedure, true, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addFront(Iterator<Procedure> procedureIterator) {
|
||||||
|
schedLock();
|
||||||
|
try {
|
||||||
|
int count = 0;
|
||||||
|
while (procedureIterator.hasNext()) {
|
||||||
|
Procedure procedure = procedureIterator.next();
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Wake " + procedure);
|
||||||
|
}
|
||||||
|
push(procedure, /* addFront= */ true, /* notify= */false);
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
wakePollIfNeeded(count);
|
||||||
|
} finally {
|
||||||
|
schedUnlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void addBack(final Procedure procedure) {
|
public void addBack(final Procedure procedure) {
|
||||||
push(procedure, false, true);
|
push(procedure, false, true);
|
||||||
}
|
}
|
||||||
|
@ -206,61 +228,22 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
|
||||||
// ==========================================================================
|
// ==========================================================================
|
||||||
// Procedure Events
|
// Procedure Events
|
||||||
// ==========================================================================
|
// ==========================================================================
|
||||||
@Override
|
|
||||||
public boolean waitEvent(final ProcedureEvent event, final Procedure procedure) {
|
|
||||||
synchronized (event) {
|
|
||||||
if (event.isReady()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
waitProcedure(event.getSuspendedProcedures(), procedure);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
/**
|
||||||
public void suspendEvent(final ProcedureEvent event) {
|
* Wake up all of the given events.
|
||||||
final boolean traceEnabled = LOG.isTraceEnabled();
|
* Note that we first take scheduler lock and then wakeInternal() synchronizes on the event.
|
||||||
synchronized (event) {
|
* Access should remain package-private. Use ProcedureEvent class to wake/suspend events.
|
||||||
event.setReady(false);
|
* @param events the list of events to wake
|
||||||
if (traceEnabled) {
|
*/
|
||||||
LOG.trace("Suspend " + event);
|
void wakeEvents(ProcedureEvent[] events) {
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void wakeEvent(final ProcedureEvent event) {
|
|
||||||
wakeEvents(1, event);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void wakeEvents(final int count, final ProcedureEvent... events) {
|
|
||||||
final boolean traceEnabled = LOG.isTraceEnabled();
|
|
||||||
schedLock();
|
schedLock();
|
||||||
try {
|
try {
|
||||||
int waitingCount = 0;
|
for (ProcedureEvent event : events) {
|
||||||
for (int i = 0; i < count; ++i) {
|
if (event == null) {
|
||||||
final ProcedureEvent event = events[i];
|
continue;
|
||||||
synchronized (event) {
|
|
||||||
if (!event.isReady()) {
|
|
||||||
// Only set ready if we were not ready; i.e. suspended. Otherwise, we double-wake
|
|
||||||
// on this event and down in wakeWaitingProcedures, we double decrement this
|
|
||||||
// finish which messes up child procedure accounting.
|
|
||||||
event.setReady(true);
|
|
||||||
if (traceEnabled) {
|
|
||||||
LOG.trace("Unsuspend " + event);
|
|
||||||
}
|
}
|
||||||
waitingCount += wakeWaitingProcedures(event.getSuspendedProcedures());
|
event.wakeInternal(this);
|
||||||
} else {
|
|
||||||
ProcedureDeque q = event.getSuspendedProcedures();
|
|
||||||
if (q != null && !q.isEmpty()) {
|
|
||||||
LOG.warn("Q is not empty! size=" + q.size() + "; PROCESSING...");
|
|
||||||
waitingCount += wakeWaitingProcedures(event.getSuspendedProcedures());
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
wakePollIfNeeded(waitingCount);
|
|
||||||
} finally {
|
} finally {
|
||||||
schedUnlock();
|
schedUnlock();
|
||||||
}
|
}
|
||||||
|
@ -275,9 +258,8 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
|
||||||
// wakeProcedure adds to the front of queue, so we start from last in the
|
// wakeProcedure adds to the front of queue, so we start from last in the
|
||||||
// waitQueue' queue, so that the procedure which was added first goes in the front for
|
// waitQueue' queue, so that the procedure which was added first goes in the front for
|
||||||
// the scheduler queue.
|
// the scheduler queue.
|
||||||
while (!waitQueue.isEmpty()) {
|
addFront(waitQueue.descendingIterator());
|
||||||
wakeProcedure(waitQueue.removeLast());
|
waitQueue.clear();
|
||||||
}
|
|
||||||
return count;
|
return count;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -290,6 +272,7 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
|
||||||
push(procedure, /* addFront= */ true, /* notify= */false);
|
push(procedure, /* addFront= */ true, /* notify= */false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// ==========================================================================
|
// ==========================================================================
|
||||||
// Internal helpers
|
// Internal helpers
|
||||||
// ==========================================================================
|
// ==========================================================================
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -18,15 +18,20 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.procedure2;
|
package org.apache.hadoop.hbase.procedure2;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Basic ProcedureEvent that contains an "object", which can be a description or a reference to the
|
* Basic ProcedureEvent that contains an "object", which can be a description or a reference to the
|
||||||
* resource to wait on, and a queue for suspended procedures.
|
* resource to wait on, and a queue for suspended procedures.
|
||||||
* Access to suspended procedures queue is 'synchronized' on the event itself.
|
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class ProcedureEvent<T> {
|
public class ProcedureEvent<T> {
|
||||||
|
private static final Log LOG = LogFactory.getLog(ProcedureEvent.class);
|
||||||
|
|
||||||
private final T object;
|
private final T object;
|
||||||
private boolean ready = false;
|
private boolean ready = false;
|
||||||
private ProcedureDeque suspendedProcedures = new ProcedureDeque();
|
private ProcedureDeque suspendedProcedures = new ProcedureDeque();
|
||||||
|
@ -39,10 +44,74 @@ public class ProcedureEvent<T> {
|
||||||
return ready;
|
return ready;
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void setReady(final boolean isReady) {
|
/**
|
||||||
this.ready = isReady;
|
* @return true if event is not ready and adds procedure to suspended queue, else returns false.
|
||||||
|
*/
|
||||||
|
public synchronized boolean suspendIfNotReady(Procedure proc) {
|
||||||
|
if (!ready) {
|
||||||
|
suspendedProcedures.addLast(proc);
|
||||||
|
}
|
||||||
|
return !ready;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Mark the event as not ready. */
|
||||||
|
public synchronized void suspend() {
|
||||||
|
ready = false;
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Suspend " + toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wakes up the suspended procedures by pushing them back into scheduler queues and sets the
|
||||||
|
* event as ready.
|
||||||
|
* See {@link #wakeInternal(AbstractProcedureScheduler)} for why this is not synchronized.
|
||||||
|
*/
|
||||||
|
public void wake(AbstractProcedureScheduler procedureScheduler) {
|
||||||
|
procedureScheduler.wakeEvents(new ProcedureEvent[]{this});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wakes up all the given events and puts the procedures waiting on them back into
|
||||||
|
* ProcedureScheduler queues.
|
||||||
|
*/
|
||||||
|
public static void wakeEvents(AbstractProcedureScheduler scheduler, ProcedureEvent ... events) {
|
||||||
|
scheduler.wakeEvents(events);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Only to be used by ProcedureScheduler implementations.
|
||||||
|
* Reason: To wake up multiple events, locking sequence is
|
||||||
|
* schedLock --> synchronized (event)
|
||||||
|
* To wake up an event, both schedLock() and synchronized(event) are required.
|
||||||
|
* The order is schedLock() --> synchronized(event) because when waking up multiple events
|
||||||
|
* simultaneously, we keep the scheduler locked until all procedures suspended on these events
|
||||||
|
* have been added back to the queue (Maybe it's not required? Evaluate!)
|
||||||
|
* To avoid deadlocks, we want to keep the locking order same even when waking up single event.
|
||||||
|
* That's why, {@link #wake(AbstractProcedureScheduler)} above uses the same code path as used
|
||||||
|
* when waking up multiple events.
|
||||||
|
* Access should remain package-private.
|
||||||
|
*/
|
||||||
|
synchronized void wakeInternal(AbstractProcedureScheduler procedureScheduler) {
|
||||||
|
if (ready && !suspendedProcedures.isEmpty()) {
|
||||||
|
LOG.warn("Found procedures suspended in a ready event! Size=" + suspendedProcedures.size());
|
||||||
|
}
|
||||||
|
ready = true;
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Unsuspend " + toString());
|
||||||
|
}
|
||||||
|
// wakeProcedure adds to the front of queue, so we start from last in the
|
||||||
|
// waitQueue' queue, so that the procedure which was added first goes in the front for
|
||||||
|
// the scheduler queue.
|
||||||
|
procedureScheduler.addFront(suspendedProcedures.descendingIterator());
|
||||||
|
suspendedProcedures.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Access to suspendedProcedures is 'synchronized' on this object, but it's fine to return it
|
||||||
|
* here for tests.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
public ProcedureDeque getSuspendedProcedures() {
|
public ProcedureDeque getSuspendedProcedures() {
|
||||||
return suspendedProcedures;
|
return suspendedProcedures;
|
||||||
}
|
}
|
||||||
|
@ -50,6 +119,6 @@ public class ProcedureEvent<T> {
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return getClass().getSimpleName() + " for " + object + ", ready=" + isReady() +
|
return getClass().getSimpleName() + " for " + object + ", ready=" + isReady() +
|
||||||
", " + getSuspendedProcedures();
|
", " + suspendedProcedures;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,11 +18,10 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.procedure2;
|
package org.apache.hadoop.hbase.procedure2;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -52,6 +51,11 @@ public interface ProcedureScheduler {
|
||||||
*/
|
*/
|
||||||
void addFront(Procedure proc);
|
void addFront(Procedure proc);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Inserts all elements in the iterator at the front of this queue.
|
||||||
|
*/
|
||||||
|
void addFront(Iterator<Procedure> procedureIterator);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Inserts the specified element at the end of this queue.
|
* Inserts the specified element at the end of this queue.
|
||||||
* @param proc the Procedure to add
|
* @param proc the Procedure to add
|
||||||
|
@ -91,36 +95,6 @@ public interface ProcedureScheduler {
|
||||||
*/
|
*/
|
||||||
Procedure poll(long timeout, TimeUnit unit);
|
Procedure poll(long timeout, TimeUnit unit);
|
||||||
|
|
||||||
/**
|
|
||||||
* Mark the event as not ready.
|
|
||||||
* Procedures calling waitEvent() will be suspended.
|
|
||||||
* @param event the event to mark as suspended/not ready
|
|
||||||
*/
|
|
||||||
void suspendEvent(ProcedureEvent event);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wake every procedure waiting for the specified event
|
|
||||||
* (By design each event has only one "wake" caller)
|
|
||||||
* @param event the event to wait
|
|
||||||
*/
|
|
||||||
void wakeEvent(ProcedureEvent event);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wake every procedure waiting for the specified events.
|
|
||||||
* (By design each event has only one "wake" caller)
|
|
||||||
* @param count the number of events in the array to wake
|
|
||||||
* @param events the list of events to wake
|
|
||||||
*/
|
|
||||||
void wakeEvents(int count, ProcedureEvent... events);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Suspend the procedure if the event is not ready yet.
|
|
||||||
* @param event the event to wait on
|
|
||||||
* @param procedure the procedure waiting on the event
|
|
||||||
* @return true if the procedure has to wait for the event to be ready, false otherwise.
|
|
||||||
*/
|
|
||||||
boolean waitEvent(ProcedureEvent event, Procedure procedure);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* List lock queues.
|
* List lock queues.
|
||||||
* @return the locks
|
* @return the locks
|
||||||
|
|
|
@ -26,7 +26,6 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
|
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
|
||||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
|
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Int32Value;
|
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Int32Value;
|
||||||
|
@ -107,6 +106,25 @@ public class TestProcedureEvents {
|
||||||
ProcedureTestingUtility.assertIsAbortException(procExecutor.getResult(proc.getProcId()));
|
ProcedureTestingUtility.assertIsAbortException(procExecutor.getResult(proc.getProcId()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This Event+Procedure exhibits following behavior:
|
||||||
|
* <ul>
|
||||||
|
* <li>On procedure execute()
|
||||||
|
* <ul>
|
||||||
|
* <li>If had enough timeouts, abort the procedure. Else....</li>
|
||||||
|
* <li>Suspend the event and add self to its suspend queue</li>
|
||||||
|
* <li>Go into waiting state</li>
|
||||||
|
* </ul>
|
||||||
|
* </li>
|
||||||
|
* <li>
|
||||||
|
* On waiting timeout
|
||||||
|
* <ul>
|
||||||
|
* <li>Wake the event (which adds this procedure back into scheduler queue), and set own's
|
||||||
|
* state to RUNNABLE (so can be executed again).</li>
|
||||||
|
* </ul>
|
||||||
|
* </li>
|
||||||
|
* </ul>
|
||||||
|
*/
|
||||||
public static class TestTimeoutEventProcedure extends NoopProcedure<TestProcEnv> {
|
public static class TestTimeoutEventProcedure extends NoopProcedure<TestProcEnv> {
|
||||||
private final ProcedureEvent event = new ProcedureEvent("timeout-event");
|
private final ProcedureEvent event = new ProcedureEvent("timeout-event");
|
||||||
|
|
||||||
|
@ -132,8 +150,8 @@ public class TestProcedureEvents {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
env.getProcedureScheduler().suspendEvent(event);
|
event.suspend();
|
||||||
if (env.getProcedureScheduler().waitEvent(event, this)) {
|
if (event.suspendIfNotReady(this)) {
|
||||||
setState(ProcedureState.WAITING_TIMEOUT);
|
setState(ProcedureState.WAITING_TIMEOUT);
|
||||||
throw new ProcedureSuspendedException();
|
throw new ProcedureSuspendedException();
|
||||||
}
|
}
|
||||||
|
@ -146,15 +164,15 @@ public class TestProcedureEvents {
|
||||||
int n = ntimeouts.incrementAndGet();
|
int n = ntimeouts.incrementAndGet();
|
||||||
LOG.info("HANDLE TIMEOUT " + this + " ntimeouts=" + n);
|
LOG.info("HANDLE TIMEOUT " + this + " ntimeouts=" + n);
|
||||||
setState(ProcedureState.RUNNABLE);
|
setState(ProcedureState.RUNNABLE);
|
||||||
env.getProcedureScheduler().wakeEvent(event);
|
event.wake((AbstractProcedureScheduler) env.getProcedureScheduler());
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void afterReplay(final TestProcEnv env) {
|
protected void afterReplay(final TestProcEnv env) {
|
||||||
if (getState() == ProcedureState.WAITING_TIMEOUT) {
|
if (getState() == ProcedureState.WAITING_TIMEOUT) {
|
||||||
env.getProcedureScheduler().suspendEvent(event);
|
event.suspend();
|
||||||
env.getProcedureScheduler().waitEvent(event, this);
|
event.suspendIfNotReady(this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -92,13 +92,13 @@ public class TestProcedureSchedulerConcurrency {
|
||||||
ev[i] = waitQueue.pollFirst().getEvent();
|
ev[i] = waitQueue.pollFirst().getEvent();
|
||||||
LOG.debug("WAKE BATCH " + ev[i] + " total=" + wakeCount.get());
|
LOG.debug("WAKE BATCH " + ev[i] + " total=" + wakeCount.get());
|
||||||
}
|
}
|
||||||
sched.wakeEvents(ev.length, ev);
|
ProcedureEvent.wakeEvents((AbstractProcedureScheduler) sched, ev);
|
||||||
wakeCount.addAndGet(ev.length);
|
wakeCount.addAndGet(ev.length);
|
||||||
} else {
|
} else {
|
||||||
int size = waitQueue.size();
|
int size = waitQueue.size();
|
||||||
while (size-- > 0) {
|
while (size-- > 0) {
|
||||||
ProcedureEvent ev = waitQueue.pollFirst().getEvent();
|
ProcedureEvent ev = waitQueue.pollFirst().getEvent();
|
||||||
sched.wakeEvent(ev);
|
ev.wake(procSched);
|
||||||
LOG.debug("WAKE " + ev + " total=" + wakeCount.get());
|
LOG.debug("WAKE " + ev + " total=" + wakeCount.get());
|
||||||
wakeCount.incrementAndGet();
|
wakeCount.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
@ -122,9 +122,9 @@ public class TestProcedureSchedulerConcurrency {
|
||||||
TestProcedureWithEvent proc = (TestProcedureWithEvent)sched.poll();
|
TestProcedureWithEvent proc = (TestProcedureWithEvent)sched.poll();
|
||||||
if (proc == null) continue;
|
if (proc == null) continue;
|
||||||
|
|
||||||
sched.suspendEvent(proc.getEvent());
|
proc.getEvent().suspend();
|
||||||
waitQueue.add(proc);
|
waitQueue.add(proc);
|
||||||
sched.waitEvent(proc.getEvent(), proc);
|
proc.getEvent().suspendIfNotReady(proc);
|
||||||
LOG.debug("WAIT " + proc.getEvent());
|
LOG.debug("WAIT " + proc.getEvent());
|
||||||
if (waitCount.incrementAndGet() >= NRUNS) {
|
if (waitCount.incrementAndGet() >= NRUNS) {
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -252,7 +252,7 @@ public class AssignmentManager implements ServerListener {
|
||||||
|
|
||||||
// Update meta events (for testing)
|
// Update meta events (for testing)
|
||||||
if (hasProcExecutor) {
|
if (hasProcExecutor) {
|
||||||
getProcedureScheduler().suspendEvent(metaLoadEvent);
|
metaLoadEvent.suspend();
|
||||||
setFailoverCleanupDone(false);
|
setFailoverCleanupDone(false);
|
||||||
for (RegionInfo hri: getMetaRegionSet()) {
|
for (RegionInfo hri: getMetaRegionSet()) {
|
||||||
setMetaInitialized(hri, false);
|
setMetaInitialized(hri, false);
|
||||||
|
@ -413,17 +413,16 @@ public class AssignmentManager implements ServerListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean waitMetaInitialized(final Procedure proc, final RegionInfo regionInfo) {
|
public boolean waitMetaInitialized(final Procedure proc, final RegionInfo regionInfo) {
|
||||||
return getProcedureScheduler().waitEvent(
|
return getMetaInitializedEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc);
|
||||||
getMetaInitializedEvent(getMetaForRegion(regionInfo)), proc);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setMetaInitialized(final RegionInfo metaRegionInfo, final boolean isInitialized) {
|
private void setMetaInitialized(final RegionInfo metaRegionInfo, final boolean isInitialized) {
|
||||||
assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
|
assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
|
||||||
final ProcedureEvent metaInitEvent = getMetaInitializedEvent(metaRegionInfo);
|
final ProcedureEvent metaInitEvent = getMetaInitializedEvent(metaRegionInfo);
|
||||||
if (isInitialized) {
|
if (isInitialized) {
|
||||||
getProcedureScheduler().wakeEvent(metaInitEvent);
|
metaInitEvent.wake(getProcedureScheduler());
|
||||||
} else {
|
} else {
|
||||||
getProcedureScheduler().suspendEvent(metaInitEvent);
|
metaInitEvent.suspend();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -434,11 +433,11 @@ public class AssignmentManager implements ServerListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean waitMetaLoaded(final Procedure proc) {
|
public boolean waitMetaLoaded(final Procedure proc) {
|
||||||
return getProcedureScheduler().waitEvent(metaLoadEvent, proc);
|
return metaLoadEvent.suspendIfNotReady(proc);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void wakeMetaLoadedEvent() {
|
protected void wakeMetaLoadedEvent() {
|
||||||
getProcedureScheduler().wakeEvent(metaLoadEvent);
|
metaLoadEvent.wake(getProcedureScheduler());
|
||||||
assert isMetaLoaded() : "expected meta to be loaded";
|
assert isMetaLoaded() : "expected meta to be loaded";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1011,11 +1010,11 @@ public class AssignmentManager implements ServerListener {
|
||||||
|
|
||||||
protected boolean waitServerReportEvent(final ServerName serverName, final Procedure proc) {
|
protected boolean waitServerReportEvent(final ServerName serverName, final Procedure proc) {
|
||||||
final ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
|
final ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
|
||||||
return getProcedureScheduler().waitEvent(serverNode.getReportEvent(), proc);
|
return serverNode.getReportEvent().suspendIfNotReady(proc);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void wakeServerReportEvent(final ServerStateNode serverNode) {
|
protected void wakeServerReportEvent(final ServerStateNode serverNode) {
|
||||||
getProcedureScheduler().wakeEvent(serverNode.getReportEvent());
|
serverNode.getReportEvent().wake(getProcedureScheduler());
|
||||||
}
|
}
|
||||||
|
|
||||||
// ============================================================================================
|
// ============================================================================================
|
||||||
|
@ -1588,7 +1587,7 @@ public class AssignmentManager implements ServerListener {
|
||||||
* and each region will be assigned by a server using the balancer.
|
* and each region will be assigned by a server using the balancer.
|
||||||
*/
|
*/
|
||||||
protected void queueAssign(final RegionStateNode regionNode) {
|
protected void queueAssign(final RegionStateNode regionNode) {
|
||||||
getProcedureScheduler().suspendEvent(regionNode.getProcedureEvent());
|
regionNode.getProcedureEvent().suspend();
|
||||||
|
|
||||||
// TODO: quick-start for meta and the other sys-tables?
|
// TODO: quick-start for meta and the other sys-tables?
|
||||||
assignQueueLock.lock();
|
assignQueueLock.lock();
|
||||||
|
@ -1787,7 +1786,7 @@ public class AssignmentManager implements ServerListener {
|
||||||
events[evcount++] = regionNode.getProcedureEvent();
|
events[evcount++] = regionNode.getProcedureEvent();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
getProcedureScheduler().wakeEvents(evcount, events);
|
ProcedureEvent.wakeEvents(getProcedureScheduler(), events);
|
||||||
|
|
||||||
final long et = System.currentTimeMillis();
|
final long et = System.currentTimeMillis();
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
|
|
|
@ -190,7 +190,7 @@ public abstract class RegionTransitionProcedure
|
||||||
// NOTE: This call to wakeEvent puts this Procedure back on the scheduler.
|
// NOTE: This call to wakeEvent puts this Procedure back on the scheduler.
|
||||||
// Thereafter, another Worker can be in here so DO NOT MESS WITH STATE beyond
|
// Thereafter, another Worker can be in here so DO NOT MESS WITH STATE beyond
|
||||||
// this method. Just get out of this current processing quickly.
|
// this method. Just get out of this current processing quickly.
|
||||||
env.getProcedureScheduler().wakeEvent(regionNode.getProcedureEvent());
|
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
|
||||||
}
|
}
|
||||||
// else leave the procedure in suspended state; it is waiting on another call to this callback
|
// else leave the procedure in suspended state; it is waiting on another call to this callback
|
||||||
}
|
}
|
||||||
|
@ -214,7 +214,7 @@ public abstract class RegionTransitionProcedure
|
||||||
|
|
||||||
// Put this procedure into suspended mode to wait on report of state change
|
// Put this procedure into suspended mode to wait on report of state change
|
||||||
// from remote regionserver. Means Procedure associated ProcedureEvent is marked not 'ready'.
|
// from remote regionserver. Means Procedure associated ProcedureEvent is marked not 'ready'.
|
||||||
env.getProcedureScheduler().suspendEvent(getRegionState(env).getProcedureEvent());
|
getRegionState(env).getProcedureEvent().suspend();
|
||||||
|
|
||||||
// Tricky because the below call to addOperationToNode can fail. If it fails, we need to
|
// Tricky because the below call to addOperationToNode can fail. If it fails, we need to
|
||||||
// backtrack on stuff like the 'suspend' done above -- tricky as the 'wake' requeues us -- and
|
// backtrack on stuff like the 'suspend' done above -- tricky as the 'wake' requeues us -- and
|
||||||
|
@ -253,7 +253,7 @@ public abstract class RegionTransitionProcedure
|
||||||
// processing to the next stage. At an extreme, the other worker may run in
|
// processing to the next stage. At an extreme, the other worker may run in
|
||||||
// parallel so DO NOT CHANGE any state hereafter! This should be last thing
|
// parallel so DO NOT CHANGE any state hereafter! This should be last thing
|
||||||
// done in this processing step.
|
// done in this processing step.
|
||||||
env.getProcedureScheduler().wakeEvent(regionNode.getProcedureEvent());
|
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
|
||||||
}
|
}
|
||||||
|
|
||||||
protected boolean isServerOnline(final MasterProcedureEnv env, final RegionStateNode regionNode) {
|
protected boolean isServerOnline(final MasterProcedureEnv env, final RegionStateNode regionNode) {
|
||||||
|
@ -298,7 +298,7 @@ public abstract class RegionTransitionProcedure
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
transitionState = RegionTransitionState.REGION_TRANSITION_DISPATCH;
|
transitionState = RegionTransitionState.REGION_TRANSITION_DISPATCH;
|
||||||
if (env.getProcedureScheduler().waitEvent(regionNode.getProcedureEvent(), this)) {
|
if (regionNode.getProcedureEvent().suspendIfNotReady(this)) {
|
||||||
// Why this suspend? Because we want to ensure Store happens before proceed?
|
// Why this suspend? Because we want to ensure Store happens before proceed?
|
||||||
throw new ProcedureSuspendedException();
|
throw new ProcedureSuspendedException();
|
||||||
}
|
}
|
||||||
|
@ -315,7 +315,7 @@ public abstract class RegionTransitionProcedure
|
||||||
retry = true;
|
retry = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (env.getProcedureScheduler().waitEvent(regionNode.getProcedureEvent(), this)) {
|
if (regionNode.getProcedureEvent().suspendIfNotReady(this)) {
|
||||||
throw new ProcedureSuspendedException();
|
throw new ProcedureSuspendedException();
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -209,7 +209,7 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv>
|
||||||
if (!event.isReady()) { // Maybe unlock() awakened the event.
|
if (!event.isReady()) { // Maybe unlock() awakened the event.
|
||||||
setState(ProcedureProtos.ProcedureState.RUNNABLE);
|
setState(ProcedureProtos.ProcedureState.RUNNABLE);
|
||||||
if (LOG.isDebugEnabled()) LOG.debug("Calling wake on " + this.event);
|
if (LOG.isDebugEnabled()) LOG.debug("Calling wake on " + this.event);
|
||||||
env.getProcedureScheduler().wakeEvent(event);
|
event.wake(env.getProcedureScheduler());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return false; // false: do not mark the procedure as failed.
|
return false; // false: do not mark the procedure as failed.
|
||||||
|
@ -224,7 +224,7 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv>
|
||||||
synchronized (event) {
|
synchronized (event) {
|
||||||
if (!event.isReady()) {
|
if (!event.isReady()) {
|
||||||
setState(ProcedureProtos.ProcedureState.RUNNABLE);
|
setState(ProcedureProtos.ProcedureState.RUNNABLE);
|
||||||
env.getProcedureScheduler().wakeEvent(event);
|
event.wake(env.getProcedureScheduler());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -244,8 +244,8 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv>
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
synchronized (event) {
|
synchronized (event) {
|
||||||
env.getProcedureScheduler().suspendEvent(event);
|
event.suspend();
|
||||||
env.getProcedureScheduler().waitEvent(event, this);
|
event.suspendIfNotReady(this);
|
||||||
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
|
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
|
||||||
}
|
}
|
||||||
throw new ProcedureSuspendedException();
|
throw new ProcedureSuspendedException();
|
||||||
|
|
|
@ -147,25 +147,25 @@ public class MasterProcedureEnv implements ConfigurationObserver {
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean waitInitialized(Procedure proc) {
|
public boolean waitInitialized(Procedure proc) {
|
||||||
return procSched.waitEvent(master.getInitializedEvent(), proc);
|
return master.getInitializedEvent().suspendIfNotReady(proc);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean waitServerCrashProcessingEnabled(Procedure proc) {
|
public boolean waitServerCrashProcessingEnabled(Procedure proc) {
|
||||||
if (master instanceof HMaster) {
|
if (master instanceof HMaster) {
|
||||||
return procSched.waitEvent(((HMaster)master).getServerCrashProcessingEnabledEvent(), proc);
|
return ((HMaster)master).getServerCrashProcessingEnabledEvent().suspendIfNotReady(proc);
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean waitFailoverCleanup(Procedure proc) {
|
public boolean waitFailoverCleanup(Procedure proc) {
|
||||||
return procSched.waitEvent(master.getAssignmentManager().getFailoverCleanupEvent(), proc);
|
return master.getAssignmentManager().getFailoverCleanupEvent().suspendIfNotReady(proc);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setEventReady(ProcedureEvent event, boolean isReady) {
|
public void setEventReady(ProcedureEvent event, boolean isReady) {
|
||||||
if (isReady) {
|
if (isReady) {
|
||||||
procSched.wakeEvent(event);
|
event.wake(procSched);
|
||||||
} else {
|
} else {
|
||||||
procSched.suspendEvent(event);
|
event.suspend();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -168,7 +168,7 @@ public class TestMasterProcedureEvents {
|
||||||
|
|
||||||
// wake the event
|
// wake the event
|
||||||
LOG.debug("wake " + event);
|
LOG.debug("wake " + event);
|
||||||
procSched.wakeEvent(event);
|
event.wake(procSched);
|
||||||
assertEquals(true, event.isReady());
|
assertEquals(true, event.isReady());
|
||||||
|
|
||||||
// wait until proc completes
|
// wait until proc completes
|
||||||
|
|
|
@ -572,14 +572,14 @@ public class TestMasterProcedureScheduler {
|
||||||
|
|
||||||
// suspend
|
// suspend
|
||||||
ProcedureEvent event = new ProcedureEvent("testSuspendedProcedureEvent");
|
ProcedureEvent event = new ProcedureEvent("testSuspendedProcedureEvent");
|
||||||
assertEquals(true, queue.waitEvent(event, proc));
|
assertEquals(true, event.suspendIfNotReady(proc));
|
||||||
|
|
||||||
proc = queue.poll();
|
proc = queue.poll();
|
||||||
assertEquals(2, proc.getProcId());
|
assertEquals(2, proc.getProcId());
|
||||||
assertEquals(null, queue.poll(0));
|
assertEquals(null, queue.poll(0));
|
||||||
|
|
||||||
// resume
|
// resume
|
||||||
queue.wakeEvent(event);
|
event.wake(queue);
|
||||||
|
|
||||||
proc = queue.poll();
|
proc = queue.poll();
|
||||||
assertEquals(1, proc.getProcId());
|
assertEquals(1, proc.getProcId());
|
||||||
|
|
|
@ -467,21 +467,14 @@ public class TestRegionMergeTransactionOnCluster {
|
||||||
verifyRowCount(table, ROWSIZE);
|
verifyRowCount(table, ROWSIZE);
|
||||||
LOG.info("Verified " + table.getName());
|
LOG.info("Verified " + table.getName());
|
||||||
|
|
||||||
// Sleep here is an ugly hack to allow region transitions to finish
|
|
||||||
long timeout = System.currentTimeMillis() + waitTime;
|
|
||||||
List<Pair<RegionInfo, ServerName>> tableRegions;
|
List<Pair<RegionInfo, ServerName>> tableRegions;
|
||||||
while (System.currentTimeMillis() < timeout) {
|
TEST_UTIL.waitUntilAllRegionsAssigned(tablename);
|
||||||
|
LOG.info("All regions assigned for table - " + table.getName());
|
||||||
tableRegions = MetaTableAccessor.getTableRegionsAndLocations(
|
tableRegions = MetaTableAccessor.getTableRegionsAndLocations(
|
||||||
TEST_UTIL.getConnection(), tablename);
|
TEST_UTIL.getConnection(), tablename);
|
||||||
LOG.info("Found " + tableRegions.size() + ", expecting " + numRegions * replication);
|
assertEquals("Wrong number of regions in table " + tablename,
|
||||||
if (tableRegions.size() == numRegions * replication)
|
numRegions * replication, tableRegions.size());
|
||||||
break;
|
LOG.info(tableRegions.size() + "Regions after load: " + Joiner.on(',').join(tableRegions));
|
||||||
Thread.sleep(250);
|
|
||||||
}
|
|
||||||
LOG.info("Getting regions of " + table.getName());
|
|
||||||
tableRegions = MetaTableAccessor.getTableRegionsAndLocations(
|
|
||||||
TEST_UTIL.getConnection(), tablename);
|
|
||||||
LOG.info("Regions after load: " + Joiner.on(',').join(tableRegions));
|
|
||||||
assertEquals(numRegions * replication, tableRegions.size());
|
assertEquals(numRegions * replication, tableRegions.size());
|
||||||
return table;
|
return table;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue