KafkaIndexTask: Allow run thread to stop gracefully instead of interrupting (#3534)

* allow run thread to gracefully complete instead of interrupting when stopGracefully() is called

* add comments
This commit is contained in:
David Lim 2016-10-17 08:52:19 -06:00 committed by Fangjin Yang
parent c1d3b8a30c
commit c2ae734848
2 changed files with 74 additions and 13 deletions

View File

@ -120,7 +120,7 @@ public class Execs
executor.getQueue().put(r);
}
catch (InterruptedException e) {
throw new RejectedExecutionException("Got Interrupted while adding to the Queue");
throw new RejectedExecutionException("Got Interrupted while adding to the Queue", e);
}
}
}

View File

@ -96,6 +96,7 @@ import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@ -119,6 +120,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
private static final Random RANDOM = new Random();
private static final long POLL_TIMEOUT = 100;
private static final long POLL_RETRY_MS = 30000;
private static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15;
private static final String METADATA_NEXT_PARTITIONS = "nextPartitions";
private final DataSchema dataSchema;
@ -159,8 +161,22 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
private final Lock pauseLock = new ReentrantLock();
private final Condition hasPaused = pauseLock.newCondition();
private final Condition shouldResume = pauseLock.newCondition();
// [pollRetryLock] and [isAwaitingRetry] is used when the Kafka consumer returns an OffsetOutOfRangeException and we
// pause polling from Kafka for POLL_RETRY_MS before trying again. This allows us to signal the sleeping thread and
// resume the main run loop in the case of a pause or stop request from a Jetty thread.
private final Lock pollRetryLock = new ReentrantLock();
private final Condition isAwaitingRetry = pollRetryLock.newCondition();
// [statusLock] is used to synchronize the Jetty thread calling stopGracefully() with the main run thread. It prevents
// the main run thread from switching into a publishing state while the stopGracefully() thread thinks it's still in
// a pre-publishing state. This is important because stopGracefully() will try to use the [stopRequested] flag to stop
// the main thread where possible, but this flag is not honored once publishing has begun so in this case we must
// interrupt the thread. The lock ensures that if the run thread is about to transition into publishing state, it
// blocks until after stopGracefully() has set [stopRequested] and then does a final check on [stopRequested] before
// transitioning to publishing state.
private final Object statusLock = new Object();
private volatile boolean pauseRequested = false;
private volatile long pauseMillis = 0;
@ -373,7 +389,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
pollRetryLock.lockInterruptibly();
try {
long nanos = TimeUnit.MILLISECONDS.toNanos(POLL_RETRY_MS);
while (nanos > 0L && !pauseRequested) {
while (nanos > 0L && !pauseRequested && !stopRequested) {
nanos = isAwaitingRetry.awaitNanos(nanos);
}
}
@ -462,11 +478,14 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
driver.persist(committerSupplier.get()); // persist pending data
}
synchronized (statusLock) {
if (stopRequested && !publishOnStop) {
throw new InterruptedException("Stopping without publishing");
}
status = Status.PUBLISHING;
}
final TransactionalSegmentPublisher publisher = new TransactionalSegmentPublisher()
{
@Override
@ -523,7 +542,13 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
);
}
}
catch (InterruptedException e) {
catch (InterruptedException | RejectedExecutionException e) {
// handle the InterruptedException that gets wrapped in a RejectedExecutionException
if (e instanceof RejectedExecutionException
&& (e.getCause() == null || !(e.getCause() instanceof InterruptedException))) {
throw e;
}
// if we were interrupted because we were asked to stop, handle the exception and return success, else rethrow
if (!stopRequested) {
Thread.currentThread().interrupt();
@ -552,11 +577,47 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
@Override
public void stopGracefully()
{
log.info("Stopping gracefully.");
log.info("Stopping gracefully (status: [%s])", status);
stopRequested = true;
if (runThread.isAlive()) {
log.info("Interrupting run thread (status: [%s])", status);
synchronized (statusLock) {
if (status == Status.PUBLISHING) {
runThread.interrupt();
return;
}
}
try {
if (pauseLock.tryLock(LOCK_ACQUIRE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
try {
if (pauseRequested) {
pauseRequested = false;
shouldResume.signalAll();
}
}
finally {
pauseLock.unlock();
}
} else {
log.warn("While stopping: failed to acquire pauseLock before timeout, interrupting run thread");
runThread.interrupt();
return;
}
if (pollRetryLock.tryLock(LOCK_ACQUIRE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
try {
isAwaitingRetry.signalAll();
}
finally {
pollRetryLock.unlock();
}
} else {
log.warn("While stopping: failed to acquire pollRetryLock before timeout, interrupting run thread");
runThread.interrupt();
}
}
catch (Exception e) {
Throwables.propagate(e);
}
}
@ -883,14 +944,14 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
* Checks if the pauseRequested flag was set and if so blocks:
* a) if pauseMillis == PAUSE_FOREVER, until pauseRequested is cleared
* b) if pauseMillis != PAUSE_FOREVER, until pauseMillis elapses -or- pauseRequested is cleared
* <p>
* <p/>
* If pauseMillis is changed while paused, the new pause timeout will be applied. This allows adjustment of the
* pause timeout (making a timed pause into an indefinite pause and vice versa is valid) without having to resume
* and ensures that the loop continues to stay paused without ingesting any new events. You will need to signal
* shouldResume after adjusting pauseMillis for the new value to take effect.
* <p>
* <p/>
* Sets paused = true and signals paused so callers can be notified when the pause command has been accepted.
* <p>
* <p/>
* Additionally, pauses if all partitions assignments have been read and pauseAfterRead flag is set.
*
* @return true if a pause request was handled, false otherwise