SeekableStreamIndexTaskRunner: Lazy init of runner. (#7729)

The main motivation is that this fixes #7724, by making it so the overlord
doesn't try to create a task runner and parser when all it really wants to
do is create a task object and serialize it.
This commit is contained in:
Gian Merlino 2019-05-22 21:13:57 -07:00 committed by Fangjin Yang
parent ffc2397bcd
commit 53b6467fc8
4 changed files with 76 additions and 32 deletions

View File

@ -131,18 +131,20 @@ public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long>
{
if (context != null && context.get(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED) != null
&& ((boolean) context.get(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED))) {
//noinspection unchecked
return new IncrementalPublishingKafkaIndexTaskRunner(
this,
parser,
dataSchema.getParser(),
authorizerMapper,
chatHandlerProvider,
savedParseExceptions,
rowIngestionMetersFactory
);
} else {
//noinspection unchecked
return new LegacyKafkaIndexTaskRunner(
this,
parser,
dataSchema.getParser(),
authorizerMapper,
chatHandlerProvider,
savedParseExceptions,

View File

@ -195,7 +195,7 @@ import static org.apache.druid.query.QueryPlus.wrap;
public class KafkaIndexTaskTest
{
private static final Logger log = new Logger(KafkaIndexTaskTest.class);
private static final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
private static final ObjectMapper OBJECT_MAPPER = new TestUtils().getTestObjectMapper();
private static final long POLL_RETRY_MS = 100;
private static TestingCluster zkServer;
@ -204,6 +204,10 @@ public class KafkaIndexTaskTest
private static ListeningExecutorService taskExec;
private static int topicPostfix;
static {
new KafkaIndexTaskModule().getJacksonModules().forEach(OBJECT_MAPPER::registerModule);
}
private final List<Task> runningTasks = new ArrayList<>();
private long handoffConditionTimeout = 0;
@ -244,7 +248,7 @@ public class KafkaIndexTaskTest
private static final DataSchema DATA_SCHEMA = new DataSchema(
"test_ds",
objectMapper.convertValue(
OBJECT_MAPPER.convertValue(
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("timestamp", "iso", null),
@ -272,7 +276,7 @@ public class KafkaIndexTaskTest
},
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
null,
objectMapper
OBJECT_MAPPER
);
private static List<ProducerRecord<byte[], byte[]>> generateRecords(String topic)
@ -730,10 +734,11 @@ public class KafkaIndexTaskTest
SegmentDescriptor desc7 = sd(task, "2013/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 10L, 1, 2L))
),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()));
new KafkaDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 10L, 1, 2L))
),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors());
Assert.assertEquals(
@ -2011,7 +2016,7 @@ public class KafkaIndexTaskTest
Assert.assertEquals(2, countEvents(task));
Assert.assertEquals(Status.READING, task.getRunner().getStatus());
Map<Integer, Long> currentOffsets = objectMapper.readValue(
Map<Integer, Long> currentOffsets = OBJECT_MAPPER.readValue(
task.getRunner().pause().getEntity().toString(),
new TypeReference<Map<Integer, Long>>()
{
@ -2147,7 +2152,7 @@ public class KafkaIndexTaskTest
final Map<String, Object> context = new HashMap<>();
context.put(
SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY,
objectMapper.writerFor(KafkaSupervisor.CHECKPOINTS_TYPE_REF).writeValueAsString(sequences)
OBJECT_MAPPER.writerFor(KafkaSupervisor.CHECKPOINTS_TYPE_REF).writeValueAsString(sequences)
);
final KafkaIndexTask task = createTask(
@ -2267,7 +2272,7 @@ public class KafkaIndexTaskTest
Assert.assertEquals(Status.READING, task.getRunner().getStatus());
//verify the 2 indexed records
final QuerySegmentSpec firstInterval = objectMapper.readValue(
final QuerySegmentSpec firstInterval = OBJECT_MAPPER.readValue(
"\"2008/2010\"", QuerySegmentSpec.class
);
Iterable<ScanResultValue> scanResultValues = scanData(task, firstInterval);
@ -2287,7 +2292,7 @@ public class KafkaIndexTaskTest
Assert.assertEquals(2, countEvents(task));
Assert.assertEquals(Status.READING, task.getRunner().getStatus());
final QuerySegmentSpec rollbackedInterval = objectMapper.readValue(
final QuerySegmentSpec rollbackedInterval = OBJECT_MAPPER.readValue(
"\"2010/2012\"", QuerySegmentSpec.class
);
scanResultValues = scanData(task, rollbackedInterval);
@ -2304,7 +2309,7 @@ public class KafkaIndexTaskTest
kafkaProducer.commitTransaction();
}
final QuerySegmentSpec endInterval = objectMapper.readValue(
final QuerySegmentSpec endInterval = OBJECT_MAPPER.readValue(
"\"2008/2049\"", QuerySegmentSpec.class
);
Iterable<ScanResultValue> scanResultValues1 = scanData(task, endInterval);
@ -2388,6 +2393,36 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
}
@Test
public void testSerde() throws Exception
{
// This is both a serde test and a regression test for https://github.com/apache/incubator-druid/issues/7724.
final KafkaIndexTask task = createTask(
"taskid",
DATA_SCHEMA.withTransformSpec(
new TransformSpec(
null,
ImmutableList.of(new ExpressionTransform("beep", "nofunc()", ExprMacroTable.nil()))
)
),
new KafkaIndexTaskIOConfig(
0,
"sequence",
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of()),
ImmutableMap.of(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null
)
);
final Task task1 = OBJECT_MAPPER.readValue(OBJECT_MAPPER.writeValueAsBytes(task), Task.class);
Assert.assertEquals(task, task1);
}
private List<ScanResultValue> scanData(final Task task, QuerySegmentSpec spec)
{
ScanQuery query = new Druids.ScanQueryBuilder().dataSource(
@ -2513,7 +2548,7 @@ public class KafkaIndexTaskTest
if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) {
final TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap());
final String checkpointsJson = objectMapper
final String checkpointsJson = OBJECT_MAPPER
.writerFor(KafkaSupervisor.CHECKPOINTS_TYPE_REF)
.writeValueAsString(checkpoints);
context.put(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY, checkpointsJson);
@ -2530,7 +2565,7 @@ public class KafkaIndexTaskTest
null,
null,
rowIngestionMetersFactory,
objectMapper
OBJECT_MAPPER
);
task.setPollRetryMs(POLL_RETRY_MS);
return task;
@ -2544,7 +2579,7 @@ public class KafkaIndexTaskTest
dataSchema.getAggregators(),
dataSchema.getGranularitySpec(),
dataSchema.getTransformSpec(),
objectMapper
OBJECT_MAPPER
);
}
@ -2861,7 +2896,7 @@ public class KafkaIndexTaskTest
private IngestionStatsAndErrorsTaskReportData getTaskReportData() throws IOException
{
Map<String, TaskReport> taskReports = objectMapper.readValue(
Map<String, TaskReport> taskReports = OBJECT_MAPPER.readValue(
reportsFile,
new TypeReference<Map<String, TaskReport>>()
{

View File

@ -72,9 +72,10 @@ public class KinesisIndexTask extends SeekableStreamIndexTask<String, String>
@Override
protected SeekableStreamIndexTaskRunner<String, String> createTaskRunner()
{
//noinspection unchecked
return new KinesisIndexTaskRunner(
this,
parser,
dataSchema.getParser(),
authorizerMapper,
chatHandlerProvider,
savedParseExceptions,

View File

@ -25,7 +25,6 @@ import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
@ -57,7 +56,6 @@ import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.utils.CircularBuffer;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Map;
@ -67,9 +65,7 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
public static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15;
private static final EmittingLogger log = new EmittingLogger(SeekableStreamIndexTask.class);
private final SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> runner;
protected final DataSchema dataSchema;
protected final InputRowParser<ByteBuffer> parser;
protected final SeekableStreamIndexTaskTuningConfig tuningConfig;
protected final SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> ioConfig;
protected final Optional<ChatHandlerProvider> chatHandlerProvider;
@ -78,6 +74,12 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
protected final RowIngestionMetersFactory rowIngestionMetersFactory;
protected final CircularBuffer<Throwable> savedParseExceptions;
// Lazily initialized, to avoid calling it on the overlord when tasks are instantiated.
// See https://github.com/apache/incubator-druid/issues/7724 for issues that can cause.
// By the way, lazily init is synchronized because the runner may be needed in multiple threads.
private final Object runnerInitLock = new Object();
private volatile SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> runner;
public SeekableStreamIndexTask(
final String id,
@Nullable final TaskResource taskResource,
@ -99,7 +101,6 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
context
);
this.dataSchema = Preconditions.checkNotNull(dataSchema, "dataSchema");
this.parser = Preconditions.checkNotNull((InputRowParser<ByteBuffer>) dataSchema.getParser(), "parser");
this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig");
this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");
this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
@ -111,7 +112,6 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
this.context = context;
this.authorizerMapper = authorizerMapper;
this.rowIngestionMetersFactory = rowIngestionMetersFactory;
this.runner = createTaskRunner();
}
private static String makeTaskId(String dataSource, String type)
@ -130,7 +130,6 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
return StringUtils.format("%s_%s", type, dataSource);
}
@Override
public int getPriority()
{
@ -164,7 +163,7 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
@Override
public TaskStatus run(final TaskToolbox toolbox)
{
return runner.run(toolbox);
return getRunner().run(toolbox);
}
@Override
@ -177,19 +176,19 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
public void stopGracefully(TaskConfig taskConfig)
{
if (taskConfig.isRestoreTasksOnRestart()) {
runner.stopGracefully();
getRunner().stopGracefully();
}
}
@Override
public <T> QueryRunner<T> getQueryRunner(Query<T> query)
{
if (runner.getAppenderator() == null) {
if (getRunner().getAppenderator() == null) {
// Not yet initialized, no data yet, just return a noop runner.
return new NoopQueryRunner<>();
}
return (queryPlus, responseContext) -> queryPlus.run(runner.getAppenderator(), responseContext);
return (queryPlus, responseContext) -> queryPlus.run(getRunner().getAppenderator(), responseContext);
}
public Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox)
@ -283,13 +282,20 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
@VisibleForTesting
public Appenderator getAppenderator()
{
return runner.getAppenderator();
return getRunner().getAppenderator();
}
@VisibleForTesting
public SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> getRunner()
{
if (runner == null) {
synchronized (runnerInitLock) {
if (runner == null) {
runner = createTaskRunner();
}
}
}
return runner;
}
}