fix kafka consumer concurrent access during shutdown (#3193)

This commit is contained in:
David Lim 2016-06-28 14:23:17 -06:00 committed by Charles Allen
parent 496b801bc3
commit 1d40df4bb7
1 changed files with 92 additions and 66 deletions

View File

@ -91,6 +91,7 @@ public class KafkaSupervisor implements Supervisor
private static final EmittingLogger log = new EmittingLogger(KafkaSupervisor.class); private static final EmittingLogger log = new EmittingLogger(KafkaSupervisor.class);
private static final Random RANDOM = new Random(); private static final Random RANDOM = new Random();
private static final long MAX_RUN_FREQUENCY_MILLIS = 1000; // prevent us from running too often in response to events private static final long MAX_RUN_FREQUENCY_MILLIS = 1000; // prevent us from running too often in response to events
private static final int SHUTDOWN_TIMEOUT_MILLIS = 30000;
// Internal data structures // Internal data structures
// -------------------------------------------------------- // --------------------------------------------------------
@ -166,6 +167,7 @@ public class KafkaSupervisor implements Supervisor
private final ScheduledExecutorService scheduledExec; private final ScheduledExecutorService scheduledExec;
private final BlockingQueue<Notice> notices = new LinkedBlockingDeque<>(); private final BlockingQueue<Notice> notices = new LinkedBlockingDeque<>();
private final Object stopLock = new Object(); private final Object stopLock = new Object();
private final Object stateChangeLock = new Object();
private boolean listenerRegistered = false; private boolean listenerRegistered = false;
private long lastRunTime; private long lastRunTime;
@ -238,96 +240,110 @@ public class KafkaSupervisor implements Supervisor
@Override @Override
public void start() public void start()
{ {
Preconditions.checkState(!started, "already started"); synchronized (stateChangeLock) {
Preconditions.checkState(!exec.isShutdown(), "already stopped"); Preconditions.checkState(!started, "already started");
Preconditions.checkState(!exec.isShutdown(), "already stopped");
try { try {
consumer = getKafkaConsumer(); consumer = getKafkaConsumer();
exec.submit( exec.submit(
new Runnable() new Runnable()
{
@Override
public void run()
{ {
try { @Override
while (!Thread.currentThread().isInterrupted()) { public void run()
final Notice notice = notices.take(); {
try {
while (!Thread.currentThread().isInterrupted()) {
final Notice notice = notices.take();
try { try {
notice.handle(); notice.handle();
} }
catch (Exception e) { catch (Exception e) {
log.makeAlert(e, "KafkaSupervisor[%s] failed to handle notice", dataSource) log.makeAlert(e, "KafkaSupervisor[%s] failed to handle notice", dataSource)
.addData("noticeClass", notice.getClass().getSimpleName()) .addData("noticeClass", notice.getClass().getSimpleName())
.emit(); .emit();
}
} }
} }
} catch (InterruptedException e) {
catch (InterruptedException e) { log.info("KafkaSupervisor[%s] interrupted, exiting", dataSource);
log.info("KafkaSupervisor[%s] interrupted, exiting", dataSource); }
} }
} }
} );
}
catch (Exception e) {
log.makeAlert(e, "Exception starting KafkaSupervisor[%s]", dataSource)
.emit();
throw Throwables.propagate(e);
}
firstRunTime = DateTime.now().plus(ioConfig.getStartDelay());
scheduledExec.scheduleAtFixedRate(
buildRunTask(),
ioConfig.getStartDelay().getMillis(),
Math.max(ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS),
TimeUnit.MILLISECONDS
); );
}
catch (Exception e) {
log.makeAlert(e, "Exception starting KafkaSupervisor[%s]", dataSource)
.emit();
throw Throwables.propagate(e);
}
firstRunTime = DateTime.now().plus(ioConfig.getStartDelay()); started = true;
scheduledExec.scheduleAtFixedRate( log.info("Started KafkaSupervisor[%s], first run in [%s]", dataSource, ioConfig.getStartDelay());
buildRunTask(), }
ioConfig.getStartDelay().getMillis(),
Math.max(ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS),
TimeUnit.MILLISECONDS
);
started = true;
log.info("Started KafkaSupervisor[%s], first run in [%s]", dataSource, ioConfig.getStartDelay());
} }
@Override @Override
public void stop(boolean stopGracefully) public void stop(boolean stopGracefully)
{ {
Preconditions.checkState(started, "not started"); synchronized (stateChangeLock) {
Preconditions.checkState(started, "not started");
log.info("Beginning shutdown of KafkaSupervisor[%s]", dataSource); log.info("Beginning shutdown of KafkaSupervisor[%s]", dataSource);
try { try {
scheduledExec.shutdownNow(); // stop recurring executions scheduledExec.shutdownNow(); // stop recurring executions
Optional<TaskRunner> taskRunner = taskMaster.getTaskRunner(); Optional<TaskRunner> taskRunner = taskMaster.getTaskRunner();
if (taskRunner.isPresent()) { if (taskRunner.isPresent()) {
taskRunner.get().unregisterListener(supervisorId); taskRunner.get().unregisterListener(supervisorId);
} }
// Stopping gracefully will synchronize the end offsets of the tasks and signal them to publish, and will block // Stopping gracefully will synchronize the end offsets of the tasks and signal them to publish, and will block
// until the tasks have acknowledged or timed out. We want this behavior when we're explicitly shut down through // until the tasks have acknowledged or timed out. We want this behavior when we're explicitly shut down through
// the API, but if we shut down for other reasons (e.g. we lose leadership) we want to just stop and leave the // the API, but if we shut down for other reasons (e.g. we lose leadership) we want to just stop and leave the
// tasks as they are. // tasks as they are.
if (stopGracefully) {
log.info("Stopping gracefully, signalling managed tasks to complete and publish");
synchronized (stopLock) { synchronized (stopLock) {
notices.add(new ShutdownNotice()); if (stopGracefully) {
log.info("Posting GracefulShutdownNotice, signalling managed tasks to complete and publish");
notices.add(new GracefulShutdownNotice());
} else {
log.info("Posting ShutdownNotice");
notices.add(new ShutdownNotice());
}
long endTime = System.currentTimeMillis() + SHUTDOWN_TIMEOUT_MILLIS;
while (!stopped) { while (!stopped) {
stopLock.wait(); long sleepTime = endTime - System.currentTimeMillis();
if (sleepTime <= 0) {
log.info("Timed out while waiting for shutdown");
stopped = true;
break;
}
stopLock.wait(sleepTime);
} }
} }
log.info("Shutdown notice handled"); log.info("Shutdown notice handled");
exec.shutdownNow();
started = false;
log.info("KafkaSupervisor[%s] has stopped", dataSource);
}
catch (Exception e) {
log.makeAlert(e, "Exception stopping KafkaSupervisor[%s]", dataSource)
.emit();
} }
exec.shutdownNow();
consumer.close();
started = false;
log.info("KafkaSupervisor[%s] has stopped", dataSource);
}
catch (Exception e) {
log.makeAlert(e, "Exception stopping KafkaSupervisor[%s]", dataSource)
.emit();
} }
} }
@ -394,7 +410,7 @@ public class KafkaSupervisor implements Supervisor
} }
} }
private class ShutdownNotice implements Notice private class GracefulShutdownNotice extends ShutdownNotice
{ {
@Override @Override
public void handle() public void handle()
@ -414,6 +430,16 @@ public class KafkaSupervisor implements Supervisor
} }
checkTaskDuration(); checkTaskDuration();
super.handle();
}
}
private class ShutdownNotice implements Notice
{
@Override
public void handle()
{
consumer.close();
synchronized (stopLock) { synchronized (stopLock) {
stopped = true; stopped = true;