Prevent failed KafkaConsumer creation from blocking overlord startup (#6383)

* Prevent failed KafkaConsumer creation from blocking overlord startup

* PR comments

* Fix random task ID length

* Adjust test timer

* Use Integer.SIZE
This commit is contained in:
Jonathan Wei 2018-10-03 19:08:20 -07:00 committed by Fangjin Yang
parent 0b8085aff7
commit c7ac8785a1
3 changed files with 290 additions and 99 deletions

View File

@ -70,6 +70,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
@ -128,6 +129,7 @@ public class KafkaSupervisor implements Supervisor
private static final long MINIMUM_GET_OFFSET_PERIOD_MILLIS = 5000;
private static final long INITIAL_GET_OFFSET_DELAY_MILLIS = 15000;
private static final long INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS = 25000;
private static final int MAX_INITIALIZATION_RETRIES = 20;
private static final CopyOnWriteArrayList EMPTY_LIST = Lists.newCopyOnWriteArrayList();
public static final String IS_INCREMENTAL_HANDOFF_SUPPORTED = "IS_INCREMENTAL_HANDOFF_SUPPORTED";
@ -258,8 +260,12 @@ public class KafkaSupervisor implements Supervisor
private boolean listenerRegistered = false;
private long lastRunTime;
private int initRetryCounter = 0;
private volatile DateTime firstRunTime;
private volatile KafkaConsumer consumer;
private volatile boolean lifecycleStarted = false;
private volatile boolean started = false;
private volatile boolean stopped = false;
private volatile Map<Integer, Long> latestOffsetsFromKafka;
@ -358,77 +364,48 @@ public class KafkaSupervisor implements Supervisor
public void start()
{
synchronized (stateChangeLock) {
Preconditions.checkState(!started, "already started");
Preconditions.checkState(!lifecycleStarted, "already started");
Preconditions.checkState(!exec.isShutdown(), "already stopped");
// Try normal initialization first, if that fails then schedule periodic initialization retries
try {
consumer = getKafkaConsumer();
exec.submit(
() -> {
try {
long pollTimeout = Math.max(ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS);
while (!Thread.currentThread().isInterrupted() && !stopped) {
final Notice notice = notices.poll(pollTimeout, TimeUnit.MILLISECONDS);
if (notice == null) {
continue;
}
try {
notice.handle();
}
catch (Throwable e) {
log.makeAlert(e, "KafkaSupervisor[%s] failed to handle notice", dataSource)
.addData("noticeClass", notice.getClass().getSimpleName())
.emit();
}
}
}
catch (InterruptedException e) {
log.info("KafkaSupervisor[%s] interrupted, exiting", dataSource);
}
}
);
firstRunTime = DateTimes.nowUtc().plus(ioConfig.getStartDelay());
scheduledExec.scheduleAtFixedRate(
buildRunTask(),
ioConfig.getStartDelay().getMillis(),
Math.max(ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS),
TimeUnit.MILLISECONDS
);
reportingExec.scheduleAtFixedRate(
updateCurrentAndLatestOffsets(),
ioConfig.getStartDelay().getMillis() + INITIAL_GET_OFFSET_DELAY_MILLIS, // wait for tasks to start up
Math.max(
tuningConfig.getOffsetFetchPeriod().getMillis(), MINIMUM_GET_OFFSET_PERIOD_MILLIS
),
TimeUnit.MILLISECONDS
);
reportingExec.scheduleAtFixedRate(
emitLag(),
ioConfig.getStartDelay().getMillis() + INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS, // wait for tasks to start up
monitorSchedulerConfig.getEmitterPeriod().getMillis(),
TimeUnit.MILLISECONDS
);
started = true;
log.info(
"Started KafkaSupervisor[%s], first run in [%s], with spec: [%s]",
dataSource,
ioConfig.getStartDelay(),
spec.toString()
);
tryInit();
}
catch (Exception e) {
if (consumer != null) {
consumer.close();
if (!started) {
log.warn("First initialization attempt failed for KafkaSupervisor[%s], starting retries...", dataSource);
exec.submit(
() -> {
try {
RetryUtils.retry(
() -> {
tryInit();
return 0;
},
(throwable) -> {
return !started;
},
0,
MAX_INITIALIZATION_RETRIES,
null,
null
);
}
catch (Exception e2) {
log.makeAlert(
"Failed to initialize after %s retries, aborting. Please resubmit the supervisor spec to restart this supervisor [%s]",
MAX_INITIALIZATION_RETRIES,
supervisorId
).emit();
throw new RuntimeException(e2);
}
}
);
}
log.makeAlert(e, "Exception starting KafkaSupervisor[%s]", dataSource)
.emit();
throw Throwables.propagate(e);
}
lifecycleStarted = true;
}
}
@ -436,7 +413,7 @@ public class KafkaSupervisor implements Supervisor
public void stop(boolean stopGracefully)
{
synchronized (stateChangeLock) {
Preconditions.checkState(started, "not started");
Preconditions.checkState(lifecycleStarted, "lifecycle not started");
log.info("Beginning shutdown of KafkaSupervisor[%s]", dataSource);
@ -444,37 +421,39 @@ public class KafkaSupervisor implements Supervisor
scheduledExec.shutdownNow(); // stop recurring executions
reportingExec.shutdownNow();
Optional<TaskRunner> taskRunner = taskMaster.getTaskRunner();
if (taskRunner.isPresent()) {
taskRunner.get().unregisterListener(supervisorId);
}
// Stopping gracefully will synchronize the end offsets of the tasks and signal them to publish, and will block
// until the tasks have acknowledged or timed out. We want this behavior when we're explicitly shut down through
// the API, but if we shut down for other reasons (e.g. we lose leadership) we want to just stop and leave the
// tasks as they are.
synchronized (stopLock) {
if (stopGracefully) {
log.info("Posting GracefulShutdownNotice, signalling managed tasks to complete and publish");
notices.add(new GracefulShutdownNotice());
} else {
log.info("Posting ShutdownNotice");
notices.add(new ShutdownNotice());
if (started) {
Optional<TaskRunner> taskRunner = taskMaster.getTaskRunner();
if (taskRunner.isPresent()) {
taskRunner.get().unregisterListener(supervisorId);
}
long shutdownTimeoutMillis = tuningConfig.getShutdownTimeout().getMillis();
long endTime = System.currentTimeMillis() + shutdownTimeoutMillis;
while (!stopped) {
long sleepTime = endTime - System.currentTimeMillis();
if (sleepTime <= 0) {
log.info("Timed out while waiting for shutdown (timeout [%,dms])", shutdownTimeoutMillis);
stopped = true;
break;
// Stopping gracefully will synchronize the end offsets of the tasks and signal them to publish, and will block
// until the tasks have acknowledged or timed out. We want this behavior when we're explicitly shut down through
// the API, but if we shut down for other reasons (e.g. we lose leadership) we want to just stop and leave the
// tasks as they are.
synchronized (stopLock) {
if (stopGracefully) {
log.info("Posting GracefulShutdownNotice, signalling managed tasks to complete and publish");
notices.add(new GracefulShutdownNotice());
} else {
log.info("Posting ShutdownNotice");
notices.add(new ShutdownNotice());
}
long shutdownTimeoutMillis = tuningConfig.getShutdownTimeout().getMillis();
long endTime = System.currentTimeMillis() + shutdownTimeoutMillis;
while (!stopped) {
long sleepTime = endTime - System.currentTimeMillis();
if (sleepTime <= 0) {
log.info("Timed out while waiting for shutdown (timeout [%,dms])", shutdownTimeoutMillis);
stopped = true;
break;
}
stopLock.wait(sleepTime);
}
stopLock.wait(sleepTime);
}
log.info("Shutdown notice handled");
}
log.info("Shutdown notice handled");
taskClient.close();
workerExec.shutdownNow();
@ -945,6 +924,93 @@ public class KafkaSupervisor implements Supervisor
return Joiner.on("_").join("index_kafka", dataSource, hashCode);
}
@VisibleForTesting
protected void tryInit()
{
synchronized (stateChangeLock) {
if (started) {
log.warn("SUpervisor was already started, skipping init");
return;
}
if (stopped) {
log.warn("Supervisor was already stopped, skipping init.");
return;
}
try {
consumer = getKafkaConsumer();
exec.submit(
() -> {
try {
long pollTimeout = Math.max(ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS);
while (!Thread.currentThread().isInterrupted() && !stopped) {
final Notice notice = notices.poll(pollTimeout, TimeUnit.MILLISECONDS);
if (notice == null) {
continue;
}
try {
notice.handle();
}
catch (Throwable e) {
log.makeAlert(e, "KafkaSupervisor[%s] failed to handle notice", dataSource)
.addData("noticeClass", notice.getClass().getSimpleName())
.emit();
}
}
}
catch (InterruptedException e) {
log.info("KafkaSupervisor[%s] interrupted, exiting", dataSource);
}
}
);
firstRunTime = DateTimes.nowUtc().plus(ioConfig.getStartDelay());
scheduledExec.scheduleAtFixedRate(
buildRunTask(),
ioConfig.getStartDelay().getMillis(),
Math.max(ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS),
TimeUnit.MILLISECONDS
);
reportingExec.scheduleAtFixedRate(
updateCurrentAndLatestOffsets(),
ioConfig.getStartDelay().getMillis() + INITIAL_GET_OFFSET_DELAY_MILLIS, // wait for tasks to start up
Math.max(
tuningConfig.getOffsetFetchPeriod().getMillis(), MINIMUM_GET_OFFSET_PERIOD_MILLIS
),
TimeUnit.MILLISECONDS
);
reportingExec.scheduleAtFixedRate(
emitLag(),
ioConfig.getStartDelay().getMillis() + INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS, // wait for tasks to start up
monitorSchedulerConfig.getEmitterPeriod().getMillis(),
TimeUnit.MILLISECONDS
);
started = true;
log.info(
"Started KafkaSupervisor[%s], first run in [%s], with spec: [%s]",
dataSource,
ioConfig.getStartDelay(),
spec.toString()
);
}
catch (Exception e) {
if (consumer != null) {
consumer.close();
}
initRetryCounter++;
log.makeAlert(e, "Exception starting KafkaSupervisor[%s]", dataSource)
.emit();
throw new RuntimeException(e);
}
}
}
private KafkaConsumer<byte[], byte[]> getKafkaConsumer()
{
final Properties props = new Properties();
@ -1035,7 +1101,7 @@ public class KafkaSupervisor implements Supervisor
taskCount++;
final KafkaIndexTask kafkaTask = (KafkaIndexTask) task;
final String taskId = task.getId();
// Determine which task group this task belongs to based on one of the partitions handled by this task. If we
// later determine that this task is actively reading, we will make sure that it matches our current partition
// allocation (getTaskGroupIdForPartition(partition) should return the same value for every partition being read
@ -2440,4 +2506,31 @@ public class KafkaSupervisor implements Supervisor
}
}
// exposed for testing for visibility into initialization state
@VisibleForTesting
public boolean isStarted()
{
return started;
}
// exposed for testing for visibility into initialization state
@VisibleForTesting
public boolean isLifecycleStarted()
{
return lifecycleStarted;
}
// exposed for testing for visibility into initialization state
@VisibleForTesting
public int getInitRetryCounter()
{
return initRetryCounter;
}
// exposed for testing to allow "bootstrap.servers" to be changed after supervisor is created
@VisibleForTesting
public KafkaSupervisorIOConfig getIoConfig()
{
return ioConfig;
}
}

View File

@ -98,6 +98,7 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@ -2297,7 +2298,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testSuspendedNoRunningTasks() throws Exception
{
supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false, true);
supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false, true, kafkaHost);
addSomeEvents(1);
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -2330,7 +2331,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
final DateTime startTime = DateTimes.nowUtc();
supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false, true);
supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false, true, kafkaHost);
addSomeEvents(1);
Task id1 = createKafkaIndexTask(
@ -2424,7 +2425,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
replayAll();
supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false, true);
supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false, true, kafkaHost);
supervisor.start();
supervisor.runInternal();
verifyAll();
@ -2437,6 +2438,98 @@ public class KafkaSupervisorTest extends EasyMockSupport
verifyAll();
}
@Test
public void testFailedInitializationAndRecovery() throws Exception
{
// Block the supervisor initialization with a bad hostname config, make sure this doesn't block the lifecycle
supervisor = getSupervisor(
1,
1,
true,
"PT1H",
null,
null,
false,
false,
StringUtils.format("badhostname:%d", kafkaServer.getPort())
);
addSomeEvents(1);
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
null
)
).anyTimes();
replayAll();
supervisor.start();
Assert.assertTrue(supervisor.isLifecycleStarted());
Assert.assertFalse(supervisor.isStarted());
verifyAll();
while (supervisor.getInitRetryCounter() < 3) {
Thread.sleep(1000);
}
// Portion below is the same test as testNoInitialState(), testing the supervisor after the initialiation is fixed
resetAll();
Capture<KafkaIndexTask> captured = Capture.newInstance();
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
null
)
).anyTimes();
expect(taskQueue.add(capture(captured))).andReturn(true);
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
replayAll();
// Fix the bad hostname during the initialization retries and finish the supervisor start.
// This is equivalent to supervisor.start() in testNoInitialState().
// The test supervisor has a P1D period, so we need to manually trigger the initialization retry.
supervisor.getIoConfig().getConsumerProperties().put("bootstrap.servers", kafkaHost);
supervisor.tryInit();
Assert.assertTrue(supervisor.isLifecycleStarted());
Assert.assertTrue(supervisor.isStarted());
supervisor.runInternal();
verifyAll();
KafkaIndexTask task = captured.getValue();
Assert.assertEquals(dataSchema, task.getDataSchema());
Assert.assertEquals(KafkaTuningConfig.copyOf(tuningConfig), task.getTuningConfig());
KafkaIOConfig taskConfig = task.getIOConfig();
Assert.assertEquals(kafkaHost, taskConfig.getConsumerProperties().get("bootstrap.servers"));
Assert.assertEquals("myCustomValue", taskConfig.getConsumerProperties().get("myCustomKey"));
Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction());
Assert.assertFalse("minimumMessageTime", taskConfig.getMinimumMessageTime().isPresent());
Assert.assertFalse("maximumMessageTime", taskConfig.getMaximumMessageTime().isPresent());
Assert.assertFalse("skipOffsetGaps", taskConfig.isSkipOffsetGaps());
Assert.assertEquals(topic, taskConfig.getStartPartitions().getTopic());
Assert.assertEquals(0L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(0));
Assert.assertEquals(0L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(1));
Assert.assertEquals(0L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(2));
Assert.assertEquals(topic, taskConfig.getEndPartitions().getTopic());
Assert.assertEquals(Long.MAX_VALUE, (long) taskConfig.getEndPartitions().getPartitionOffsetMap().get(0));
Assert.assertEquals(Long.MAX_VALUE, (long) taskConfig.getEndPartitions().getPartitionOffsetMap().get(1));
Assert.assertEquals(Long.MAX_VALUE, (long) taskConfig.getEndPartitions().getPartitionOffsetMap().get(2));
}
private void addSomeEvents(int numEventsPerPartition) throws Exception
{
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
@ -2473,7 +2566,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
skipOffsetGaps,
false
false,
kafkaHost
);
}
@ -2485,15 +2579,19 @@ public class KafkaSupervisorTest extends EasyMockSupport
Period lateMessageRejectionPeriod,
Period earlyMessageRejectionPeriod,
boolean skipOffsetGaps,
boolean suspended
boolean suspended,
String kafkaHost
)
{
Map<String, String> consumerProperties = new HashMap<>();
consumerProperties.put("myCustomKey", "myCustomValue");
consumerProperties.put("bootstrap.servers", kafkaHost);
KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig(
topic,
replicas,
taskCount,
new Period(duration),
ImmutableMap.of("myCustomKey", "myCustomValue", "bootstrap.servers", kafkaHost),
consumerProperties,
new Period("P1D"),
new Period("PT30S"),
useEarliestOffset,

View File

@ -89,7 +89,7 @@ public class RealtimeIndexTask extends AbstractTask
private static final int TASK_ID_BITS_PER_SYMBOL = 4;
private static final int TASK_ID_SYMBOL_MASK = (1 << TASK_ID_BITS_PER_SYMBOL) - 1;
private static final int TASK_ID_LENGTH = Integer.BYTES / TASK_ID_BITS_PER_SYMBOL;
private static final int TASK_ID_LENGTH = Integer.SIZE / TASK_ID_BITS_PER_SYMBOL;
public static String makeRandomId()
{