From 53b6467fc83cd4a78d87b5fd1557c84b2a5b2513 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 22 May 2019 21:13:57 -0700 Subject: [PATCH] SeekableStreamIndexTaskRunner: Lazy init of runner. (#7729) The main motivation is that this fixes #7724, by making it so the overlord doesn't try to create a task runner and parser when all it really wants to do is create a task object and serialize it. --- .../druid/indexing/kafka/KafkaIndexTask.java | 6 +- .../indexing/kafka/KafkaIndexTaskTest.java | 67 ++++++++++++++----- .../indexing/kinesis/KinesisIndexTask.java | 3 +- .../SeekableStreamIndexTask.java | 32 +++++---- 4 files changed, 76 insertions(+), 32 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java index df56cced6ec..314f99cb385 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java @@ -131,18 +131,20 @@ public class KafkaIndexTask extends SeekableStreamIndexTask { if (context != null && context.get(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED) != null && ((boolean) context.get(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED))) { + //noinspection unchecked return new IncrementalPublishingKafkaIndexTaskRunner( this, - parser, + dataSchema.getParser(), authorizerMapper, chatHandlerProvider, savedParseExceptions, rowIngestionMetersFactory ); } else { + //noinspection unchecked return new LegacyKafkaIndexTaskRunner( this, - parser, + dataSchema.getParser(), authorizerMapper, chatHandlerProvider, savedParseExceptions, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 6b9a20365b5..4c9b39b9d6e 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -195,7 +195,7 @@ import static org.apache.druid.query.QueryPlus.wrap; public class KafkaIndexTaskTest { private static final Logger log = new Logger(KafkaIndexTaskTest.class); - private static final ObjectMapper objectMapper = TestHelper.makeJsonMapper(); + private static final ObjectMapper OBJECT_MAPPER = new TestUtils().getTestObjectMapper(); private static final long POLL_RETRY_MS = 100; private static TestingCluster zkServer; @@ -204,6 +204,10 @@ public class KafkaIndexTaskTest private static ListeningExecutorService taskExec; private static int topicPostfix; + static { + new KafkaIndexTaskModule().getJacksonModules().forEach(OBJECT_MAPPER::registerModule); + } + private final List runningTasks = new ArrayList<>(); private long handoffConditionTimeout = 0; @@ -244,7 +248,7 @@ public class KafkaIndexTaskTest private static final DataSchema DATA_SCHEMA = new DataSchema( "test_ds", - objectMapper.convertValue( + OBJECT_MAPPER.convertValue( new StringInputRowParser( new JSONParseSpec( new TimestampSpec("timestamp", "iso", null), @@ -272,7 +276,7 @@ public class KafkaIndexTaskTest }, new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null), null, - objectMapper + OBJECT_MAPPER ); private static List> generateRecords(String topic) @@ -730,10 +734,11 @@ public class KafkaIndexTaskTest SegmentDescriptor desc7 = sd(task, "2013/P1D", 0); Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors()); Assert.assertEquals( - new KafkaDataSourceMetadata( - new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 10L, 1, 2L)) - ), - metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())); + new KafkaDataSourceMetadata( + new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 10L, 1, 2L)) + ), + metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) + ); Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors()); Assert.assertEquals( @@ -2011,7 +2016,7 @@ public class KafkaIndexTaskTest Assert.assertEquals(2, countEvents(task)); Assert.assertEquals(Status.READING, task.getRunner().getStatus()); - Map currentOffsets = objectMapper.readValue( + Map currentOffsets = OBJECT_MAPPER.readValue( task.getRunner().pause().getEntity().toString(), new TypeReference>() { @@ -2147,7 +2152,7 @@ public class KafkaIndexTaskTest final Map context = new HashMap<>(); context.put( SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY, - objectMapper.writerFor(KafkaSupervisor.CHECKPOINTS_TYPE_REF).writeValueAsString(sequences) + OBJECT_MAPPER.writerFor(KafkaSupervisor.CHECKPOINTS_TYPE_REF).writeValueAsString(sequences) ); final KafkaIndexTask task = createTask( @@ -2267,7 +2272,7 @@ public class KafkaIndexTaskTest Assert.assertEquals(Status.READING, task.getRunner().getStatus()); //verify the 2 indexed records - final QuerySegmentSpec firstInterval = objectMapper.readValue( + final QuerySegmentSpec firstInterval = OBJECT_MAPPER.readValue( "\"2008/2010\"", QuerySegmentSpec.class ); Iterable scanResultValues = scanData(task, firstInterval); @@ -2287,7 +2292,7 @@ public class KafkaIndexTaskTest Assert.assertEquals(2, countEvents(task)); Assert.assertEquals(Status.READING, task.getRunner().getStatus()); - final QuerySegmentSpec rollbackedInterval = objectMapper.readValue( + final QuerySegmentSpec rollbackedInterval = OBJECT_MAPPER.readValue( "\"2010/2012\"", QuerySegmentSpec.class ); scanResultValues = scanData(task, rollbackedInterval); @@ -2304,7 +2309,7 @@ public class KafkaIndexTaskTest kafkaProducer.commitTransaction(); } - final QuerySegmentSpec endInterval = objectMapper.readValue( + final QuerySegmentSpec endInterval = OBJECT_MAPPER.readValue( "\"2008/2049\"", QuerySegmentSpec.class ); Iterable scanResultValues1 = scanData(task, endInterval); @@ -2388,6 +2393,36 @@ public class KafkaIndexTaskTest Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); } + @Test + public void testSerde() throws Exception + { + // This is both a serde test and a regression test for https://github.com/apache/incubator-druid/issues/7724. + + final KafkaIndexTask task = createTask( + "taskid", + DATA_SCHEMA.withTransformSpec( + new TransformSpec( + null, + ImmutableList.of(new ExpressionTransform("beep", "nofunc()", ExprMacroTable.nil())) + ) + ), + new KafkaIndexTaskIOConfig( + 0, + "sequence", + new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(), ImmutableSet.of()), + new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of()), + ImmutableMap.of(), + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + true, + null, + null + ) + ); + + final Task task1 = OBJECT_MAPPER.readValue(OBJECT_MAPPER.writeValueAsBytes(task), Task.class); + Assert.assertEquals(task, task1); + } + private List scanData(final Task task, QuerySegmentSpec spec) { ScanQuery query = new Druids.ScanQueryBuilder().dataSource( @@ -2513,7 +2548,7 @@ public class KafkaIndexTaskTest if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) { final TreeMap> checkpoints = new TreeMap<>(); checkpoints.put(0, ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap()); - final String checkpointsJson = objectMapper + final String checkpointsJson = OBJECT_MAPPER .writerFor(KafkaSupervisor.CHECKPOINTS_TYPE_REF) .writeValueAsString(checkpoints); context.put(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY, checkpointsJson); @@ -2530,7 +2565,7 @@ public class KafkaIndexTaskTest null, null, rowIngestionMetersFactory, - objectMapper + OBJECT_MAPPER ); task.setPollRetryMs(POLL_RETRY_MS); return task; @@ -2544,7 +2579,7 @@ public class KafkaIndexTaskTest dataSchema.getAggregators(), dataSchema.getGranularitySpec(), dataSchema.getTransformSpec(), - objectMapper + OBJECT_MAPPER ); } @@ -2861,7 +2896,7 @@ public class KafkaIndexTaskTest private IngestionStatsAndErrorsTaskReportData getTaskReportData() throws IOException { - Map taskReports = objectMapper.readValue( + Map taskReports = OBJECT_MAPPER.readValue( reportsFile, new TypeReference>() { diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java index ff1847a6de1..f3dfe3bfcef 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java @@ -72,9 +72,10 @@ public class KinesisIndexTask extends SeekableStreamIndexTask @Override protected SeekableStreamIndexTaskRunner createTaskRunner() { + //noinspection unchecked return new KinesisIndexTaskRunner( this, - parser, + dataSchema.getParser(), authorizerMapper, chatHandlerProvider, savedParseExceptions, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java index 833cc172787..37d472bb59f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java @@ -25,7 +25,6 @@ import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import org.apache.druid.data.input.InputRow; -import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator; import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; @@ -57,7 +56,6 @@ import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.utils.CircularBuffer; import javax.annotation.Nullable; -import java.nio.ByteBuffer; import java.util.Map; @@ -67,9 +65,7 @@ public abstract class SeekableStreamIndexTask runner; protected final DataSchema dataSchema; - protected final InputRowParser parser; protected final SeekableStreamIndexTaskTuningConfig tuningConfig; protected final SeekableStreamIndexTaskIOConfig ioConfig; protected final Optional chatHandlerProvider; @@ -78,6 +74,12 @@ public abstract class SeekableStreamIndexTask savedParseExceptions; + // Lazily initialized, to avoid calling it on the overlord when tasks are instantiated. + // See https://github.com/apache/incubator-druid/issues/7724 for issues that can cause. + // By the way, lazily init is synchronized because the runner may be needed in multiple threads. + private final Object runnerInitLock = new Object(); + private volatile SeekableStreamIndexTaskRunner runner; + public SeekableStreamIndexTask( final String id, @Nullable final TaskResource taskResource, @@ -99,7 +101,6 @@ public abstract class SeekableStreamIndexTask) dataSchema.getParser(), "parser"); this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig"); this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig"); this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider); @@ -111,7 +112,6 @@ public abstract class SeekableStreamIndexTask QueryRunner getQueryRunner(Query query) { - if (runner.getAppenderator() == null) { + if (getRunner().getAppenderator() == null) { // Not yet initialized, no data yet, just return a noop runner. return new NoopQueryRunner<>(); } - return (queryPlus, responseContext) -> queryPlus.run(runner.getAppenderator(), responseContext); + return (queryPlus, responseContext) -> queryPlus.run(getRunner().getAppenderator(), responseContext); } public Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox) @@ -283,13 +282,20 @@ public abstract class SeekableStreamIndexTask getRunner() { + if (runner == null) { + synchronized (runnerInitLock) { + if (runner == null) { + runner = createTaskRunner(); + } + } + } + return runner; } - }