diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingTest.java index ef2310a51a1..eb984c879ac 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingTest.java @@ -154,64 +154,6 @@ public class MultiPhaseParallelIndexingTest extends AbstractParallelIndexSupervi assertHashedPartition(publishedSegments); } - @Test - public void testMissingIntervals() - { - expectedException.expect(IllegalStateException.class); - expectedException.expectMessage( - "forceGuaranteedRollup is set but intervals is missing in granularitySpec" - ); - newTask( - null, - new ParallelIndexIOConfig( - new LocalFirehoseFactory(inputDir, "test_*", null), - false - ), - new HashedPartitionsSpec(null, 2, null) - ); - } - - @Test - public void testMissingNumShards() - { - expectedException.expect(IllegalStateException.class); - expectedException.expectMessage( - "forceGuaranteedRollup is set but numShards is missing in partitionsSpec" - ); - newTask( - Intervals.of("2017/2018"), - Granularities.DAY, - new ParallelIndexIOConfig(new LocalFirehoseFactory(inputDir, "test_*", null), false), - new ParallelIndexTuningConfig( - null, - null, - null, - null, - null, - null, - new HashedPartitionsSpec(null, null, null), - null, - null, - null, - true, - null, - null, - null, - null, - 2, - null, - null, - null, - null, - null, - null, - null, - null, - null - ) - ); - } - private Set runTestTask(Interval interval, HashedPartitionsSpec partitionsSpec) throws Exception { final ParallelIndexSupervisorTask task = newTask( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java index 98f6ea80f50..a5c1e97ba41 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java @@ -19,16 +19,21 @@ package org.apache.druid.indexing.common.task.batch.parallel; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; +import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.client.indexing.NoopIndexingServiceClient; import org.apache.druid.data.input.impl.CSVParseSpec; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.partitions.HashedPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory; +import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.TestAppenderatorsManager; @@ -38,6 +43,8 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; +import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory; import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider; import org.apache.druid.server.security.AuthorizerMapper; @@ -45,127 +52,224 @@ import org.joda.time.Interval; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.rules.ExpectedException; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; +import java.util.List; import java.util.Map; public class ParallelIndexSupervisorTaskSerdeTest { - @Rule - public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + private static final ObjectMapper OBJECT_MAPPER = createObjectMapper(); - private static final ParseSpec DEFAULT_PARSE_SPEC = new CSVParseSpec( - new TimestampSpec( - "ts", - "auto", - null - ), - new DimensionsSpec( - DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim")), - new ArrayList<>(), - new ArrayList<>() - ), - null, - Arrays.asList("ts", "dim", "val"), - false, - 0 - ); - - private final TestUtils testUtils = new TestUtils(); - - @Test - public void testSerde() throws IOException + private static ObjectMapper createObjectMapper() { - final ObjectMapper objectMapper = testUtils.getTestObjectMapper(); + TestUtils testUtils = new TestUtils(); + ObjectMapper objectMapper = testUtils.getTestObjectMapper(); objectMapper.registerSubtypes( new NamedType(LocalFirehoseFactory.class, "local") ); - - final ParallelIndexSupervisorTask task = newTask( - objectMapper, - Intervals.of("2018/2019") - ); - final String json = objectMapper.writeValueAsString(task); - Assert.assertEquals(task, objectMapper.readValue(json, Task.class)); + return objectMapper; } - private ParallelIndexSupervisorTask newTask( - ObjectMapper objectMapper, - Interval interval - ) + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void serde() throws IOException { - // set up ingestion spec - final ParallelIndexIngestionSpec ingestionSpec = new ParallelIndexIngestionSpec( - new DataSchema( - "dataSource", - objectMapper.convertValue( - new StringInputRowParser( - DEFAULT_PARSE_SPEC, - null - ), - Map.class - ), - new AggregatorFactory[]{ - new LongSumAggregatorFactory("val", "val") - }, - new UniformGranularitySpec( - Granularities.DAY, - Granularities.MINUTE, - interval == null ? null : Collections.singletonList(interval) - ), - null, - objectMapper - ), - new ParallelIndexIOConfig( - new LocalFirehoseFactory(new File("tmp"), "test_*", null), - false - ), - new ParallelIndexTuningConfig( - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - 2, - null, - null, - null, - null, - null, - null, - null, - null, - null + ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTaskBuilder() + .ingestionSpec( + new ParallelIndexIngestionSpecBuilder() + .inputIntervals(Collections.singletonList(Intervals.of("2018/2019"))) + .build() ) + .build(); + + String json = OBJECT_MAPPER.writeValueAsString(task); + Assert.assertEquals(task, OBJECT_MAPPER.readValue(json, Task.class)); + } + + @Test + public void forceGuaranteedRollupWithMissingIntervals() + { + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage( + "forceGuaranteedRollup is set but intervals is missing in granularitySpec" ); - // set up test tools - return new ParallelIndexSupervisorTask( - "taskId", - new TaskResource("group", 1), - ingestionSpec, - new HashMap<>(), - new NoopIndexingServiceClient(), - new NoopChatHandlerProvider(), - new AuthorizerMapper(Collections.emptyMap()), - new DropwizardRowIngestionMetersFactory(), - new TestAppenderatorsManager() + Integer numShards = 2; + new ParallelIndexSupervisorTaskBuilder() + .ingestionSpec( + new ParallelIndexIngestionSpecBuilder() + .forceGuaranteedRollup(true) + .partitionsSpec(new HashedPartitionsSpec(null, numShards, null)) + .build() + ) + .build(); + } + + @Test + public void forceGuaranteedRollupWithMissingNumShards() + { + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage( + "forceGuaranteedRollup is set but numShards is missing in partitionsSpec" ); + + Integer numShards = null; + new ParallelIndexSupervisorTaskBuilder() + .ingestionSpec( + new ParallelIndexIngestionSpecBuilder() + .forceGuaranteedRollup(true) + .partitionsSpec(new HashedPartitionsSpec(null, numShards, null)) + .build() + ) + .build(); + } + + private static class ParallelIndexSupervisorTaskBuilder + { + private static final String ID = "taskId"; + private final TaskResource taskResource = new TaskResource("group", 1); + private final Map context = Collections.emptyMap(); + private final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient(); + private final ChatHandlerProvider chatHandlerProvider = new NoopChatHandlerProvider(); + private final AuthorizerMapper authorizerMapper = new AuthorizerMapper(Collections.emptyMap()); + private final RowIngestionMetersFactory rowIngestionMetersFactory = new DropwizardRowIngestionMetersFactory(); + private final AppenderatorsManager appenderatorsManager = new TestAppenderatorsManager(); + + private ParallelIndexIngestionSpec ingestionSpec; + + ParallelIndexSupervisorTaskBuilder ingestionSpec(ParallelIndexIngestionSpec ingestionSpec) + { + this.ingestionSpec = ingestionSpec; + return this; + } + + ParallelIndexSupervisorTask build() + { + return new ParallelIndexSupervisorTask( + ID, + taskResource, + ingestionSpec, + context, + indexingServiceClient, + chatHandlerProvider, + authorizerMapper, + rowIngestionMetersFactory, + appenderatorsManager + ); + } + } + + private static class ParallelIndexIngestionSpecBuilder + { + private static final ParseSpec PARSE_SPEC = new CSVParseSpec( + new TimestampSpec( + "ts", + "auto", + null + ), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim")), + new ArrayList<>(), + new ArrayList<>() + ), + null, + Arrays.asList("ts", "dim", "val"), + false, + 0 + ); + + private static final TypeReference> PARSER_TYPE = new TypeReference>() + { + }; + + private final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig( + new LocalFirehoseFactory(new File("tmp"), "test_*", null), + false + ); + + // For dataSchema.granularitySpec + @Nullable + private List inputIntervals = null; + + // For tuningConfig + @Nullable + private Boolean forceGuaranteedRollup = null; + @Nullable + PartitionsSpec partitionsSpec = null; + + ParallelIndexIngestionSpecBuilder inputIntervals(List inputIntervals) + { + this.inputIntervals = inputIntervals; + return this; + } + + @SuppressWarnings("SameParameterValue") + ParallelIndexIngestionSpecBuilder forceGuaranteedRollup(boolean forceGuaranteedRollup) + { + this.forceGuaranteedRollup = forceGuaranteedRollup; + return this; + } + + ParallelIndexIngestionSpecBuilder partitionsSpec(PartitionsSpec partitionsSpec) + { + this.partitionsSpec = partitionsSpec; + return this; + } + + ParallelIndexIngestionSpec build() + { + DataSchema dataSchema = new DataSchema( + "dataSource", + OBJECT_MAPPER.convertValue( + new StringInputRowParser(PARSE_SPEC, null), + PARSER_TYPE + ), + new AggregatorFactory[]{ + new LongSumAggregatorFactory("val", "val") + }, + new UniformGranularitySpec(Granularities.DAY, Granularities.MINUTE, inputIntervals), + null, + OBJECT_MAPPER + ); + + ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig( + null, + null, + null, + null, + null, + null, + partitionsSpec, + null, + null, + null, + forceGuaranteedRollup, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + + return new ParallelIndexIngestionSpec(dataSchema, ioConfig, tuningConfig); + } } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/LocalFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/LocalFirehoseFactory.java index e750349c2f6..beb713adf2d 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/LocalFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/LocalFirehoseFactory.java @@ -32,12 +32,14 @@ import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.utils.CompressionUtils; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.util.Collection; /** + * Firehose that reads data from files on local disk */ public class LocalFirehoseFactory extends AbstractTextFilesFirehoseFactory { @@ -45,6 +47,7 @@ public class LocalFirehoseFactory extends AbstractTextFilesFirehoseFactory private final File baseDir; private final String filter; + @Nullable private final StringInputRowParser parser; @JsonCreator @@ -52,7 +55,7 @@ public class LocalFirehoseFactory extends AbstractTextFilesFirehoseFactory @JsonProperty("baseDir") File baseDir, @JsonProperty("filter") String filter, // Backwards compatible - @JsonProperty("parser") StringInputRowParser parser + @Nullable @JsonProperty("parser") StringInputRowParser parser ) { this.baseDir = baseDir; @@ -73,6 +76,7 @@ public class LocalFirehoseFactory extends AbstractTextFilesFirehoseFactory } @JsonProperty + @Nullable public StringInputRowParser getParser() { return parser;