From 964a1fc9df69520e54e10edd33e0404ed6e8330d Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 5 May 2020 11:17:57 -0700 Subject: [PATCH] Remove ParseSpec.toInputFormat() (#9815) * Remove toInputFormat() from ParseSpec * fix test --- .../druid/data/input/impl/CSVParseSpec.java | 7 - .../data/input/impl/DelimitedParseSpec.java | 7 - .../druid/data/input/impl/JSONParseSpec.java | 7 - .../druid/data/input/impl/ParseSpec.java | 13 - ...rehoseFactoryToInputSourceAdaptorTest.java | 24 +- .../kafka/supervisor/KafkaSupervisor.java | 4 +- .../kinesis/supervisor/KinesisSupervisor.java | 4 +- .../druid/indexing/common/task/IndexTask.java | 14 +- .../parallel/ParallelIndexPhaseRunner.java | 16 +- .../parallel/ParallelIndexSupervisorTask.java | 6 +- .../SeekableStreamIndexTaskIOConfig.java | 5 +- .../SeekableStreamIndexTaskRunner.java | 2 +- .../SeekableStreamSamplerSpec.java | 2 +- .../SeekableStreamSupervisorIOConfig.java | 15 +- .../task/CompactionTaskParallelRunTest.java | 1 + .../common/task/CompactionTaskRunTest.java | 2 + .../indexing/common/task/IndexTaskTest.java | 700 +++++++++++------- ...bstractMultiPhaseParallelIndexingTest.java | 31 +- ...stractParallelIndexSupervisorTaskTest.java | 9 +- ...rtitionMultiPhaseParallelIndexingTest.java | 58 +- ...rtitionMultiPhaseParallelIndexingTest.java | 73 +- .../SeekableStreamIndexTaskTestBase.java | 28 +- .../seekablestream/StreamChunkParserTest.java | 34 +- .../SeekableStreamSupervisorStateTest.java | 4 +- 24 files changed, 594 insertions(+), 472 deletions(-) diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CSVParseSpec.java b/core/src/main/java/org/apache/druid/data/input/impl/CSVParseSpec.java index 51ffd3831a9..81c8a26ecd3 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/CSVParseSpec.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/CSVParseSpec.java @@ -22,7 +22,6 @@ package org.apache.druid.data.input.impl; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import org.apache.druid.data.input.InputFormat; import org.apache.druid.java.util.common.parsers.CSVParser; import org.apache.druid.java.util.common.parsers.Parser; @@ -97,12 +96,6 @@ public class CSVParseSpec extends ParseSpec return new CSVParser(listDelimiter, columns, hasHeaderRow, skipHeaderRows); } - @Override - public InputFormat toInputFormat() - { - return new CsvInputFormat(columns, listDelimiter, null, hasHeaderRow, skipHeaderRows); - } - @Override public ParseSpec withTimestampSpec(TimestampSpec spec) { diff --git a/core/src/main/java/org/apache/druid/data/input/impl/DelimitedParseSpec.java b/core/src/main/java/org/apache/druid/data/input/impl/DelimitedParseSpec.java index 3ee0f71f62b..5940e70e11f 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/DelimitedParseSpec.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/DelimitedParseSpec.java @@ -22,7 +22,6 @@ package org.apache.druid.data.input.impl; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import org.apache.druid.data.input.InputFormat; import org.apache.druid.java.util.common.parsers.DelimitedParser; import org.apache.druid.java.util.common.parsers.Parser; @@ -124,12 +123,6 @@ public class DelimitedParseSpec extends ParseSpec ); } - @Override - public InputFormat toInputFormat() - { - return new DelimitedInputFormat(columns, listDelimiter, delimiter, hasHeaderRow, null, skipHeaderRows); - } - @Override public ParseSpec withTimestampSpec(TimestampSpec spec) { diff --git a/core/src/main/java/org/apache/druid/data/input/impl/JSONParseSpec.java b/core/src/main/java/org/apache/druid/data/input/impl/JSONParseSpec.java index 870076d4a7d..3a7136b2f8c 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/JSONParseSpec.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/JSONParseSpec.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonParser.Feature; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.druid.data.input.InputFormat; import org.apache.druid.java.util.common.parsers.JSONPathParser; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.java.util.common.parsers.Parser; @@ -68,12 +67,6 @@ public class JSONParseSpec extends NestedDataParseSpec return new JSONPathParser(getFlattenSpec(), objectMapper); } - @Override - public InputFormat toInputFormat() - { - return new JsonInputFormat(getFlattenSpec(), getFeatureSpec()); - } - @Override public ParseSpec withTimestampSpec(TimestampSpec spec) { diff --git a/core/src/main/java/org/apache/druid/data/input/impl/ParseSpec.java b/core/src/main/java/org/apache/druid/data/input/impl/ParseSpec.java index adc52990338..33e9f44644d 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/ParseSpec.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/ParseSpec.java @@ -23,13 +23,10 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.google.common.base.Preconditions; -import org.apache.druid.data.input.InputFormat; import org.apache.druid.guice.annotations.ExtensionPoint; import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.java.util.common.parsers.Parser; -import javax.annotation.Nullable; - @Deprecated @ExtensionPoint @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "format") @@ -71,16 +68,6 @@ public abstract class ParseSpec return null; } - /** - * Returns null if it's not implemented yet. - * This method (and maybe this class) will be removed in favor of {@link InputFormat} in the future. - */ - @Nullable - public InputFormat toInputFormat() - { - return null; - } - @PublicApi public ParseSpec withTimestampSpec(TimestampSpec spec) { diff --git a/core/src/test/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptorTest.java b/core/src/test/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptorTest.java index c163de1ecb3..088bed58cfc 100644 --- a/core/src/test/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptorTest.java +++ b/core/src/test/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptorTest.java @@ -52,7 +52,7 @@ public class FirehoseFactoryToInputSourceAdaptorTest } final TestFirehoseFactory firehoseFactory = new TestFirehoseFactory(lines); final StringInputRowParser inputRowParser = new StringInputRowParser( - new UnimplementedInputFormatCsvParseSpec( + new CSVParseSpec( new TimestampSpec(null, "yyyyMMdd", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("timestamp", "name", "score"))), ",", @@ -95,28 +95,6 @@ public class FirehoseFactoryToInputSourceAdaptorTest } } - private static class UnimplementedInputFormatCsvParseSpec extends CSVParseSpec - { - private UnimplementedInputFormatCsvParseSpec( - TimestampSpec timestampSpec, - DimensionsSpec dimensionsSpec, - String listDelimiter, - List columns, - boolean hasHeaderRow, - int skipHeaderRows - ) - { - super(timestampSpec, dimensionsSpec, listDelimiter, columns, hasHeaderRow, skipHeaderRows); - } - - @Nullable - @Override - public InputFormat toInputFormat() - { - return null; - } - } - private static class TestFirehoseFactory implements FiniteFirehoseFactory { private final List lines; diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index ba12128ab1e..6c88c5a59c4 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -194,9 +194,7 @@ public class KafkaSupervisor extends SeekableStreamSupervisor true, minimumMessageTime, maximumMessageTime, - ioConfig.getInputFormat( - spec.getDataSchema().getParser() == null ? null : spec.getDataSchema().getParser().getParseSpec() - ) + ioConfig.getInputFormat() ); } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index c789fc7f3ac..0000ee693e3 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -140,9 +140,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor true, minimumMessageTime, maximumMessageTime, - ioConfig.getInputFormat( - spec.getDataSchema().getParser() == null ? null : spec.getDataSchema().getParser().getParseSpec() - ), + ioConfig.getInputFormat(), ioConfig.getEndpoint(), ioConfig.getRecordsPerFetch(), ioConfig.getFetchDelayMillis(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index be24995fb49..712340f4fd7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -44,7 +44,6 @@ import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.Rows; import org.apache.druid.data.input.impl.InputRowParser; -import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.hll.HyperLogLogCollector; import org.apache.druid.indexer.Checks; import org.apache.druid.indexer.IngestionState; @@ -1046,10 +1045,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler private static InputFormat getInputFormat(IndexIngestionSpec ingestionSchema) { - final InputRowParser parser = ingestionSchema.getDataSchema().getParser(); - return ingestionSchema.getIOConfig().getNonNullInputFormat( - parser == null ? null : parser.getParseSpec() - ); + return ingestionSchema.getIOConfig().getNonNullInputFormat(); } public static class IndexIngestionSpec extends IngestionSpec @@ -1184,13 +1180,9 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler } } - public InputFormat getNonNullInputFormat(@Nullable ParseSpec parseSpec) + public InputFormat getNonNullInputFormat() { - if (inputFormat == null) { - return Preconditions.checkNotNull(parseSpec, "parseSpec").toInputFormat(); - } else { - return inputFormat; - } + return Preconditions.checkNotNull(inputFormat, "inputFormat"); } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java index cfbc0ca771e..57c117da578 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java @@ -30,7 +30,6 @@ import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.SplitHintSpec; -import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.data.input.impl.SplittableInputSource; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatusPlus; @@ -196,19 +195,8 @@ public abstract class ParallelIndexPhaseRunner constructorFeeder() @@ -205,13 +214,11 @@ public class IndexTaskTest extends IngestionTestBase IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec( - useInputFormatApi, + createDefaultIngestionSpec( jsonMapper, tmpDir, null, null, - null, createTuningConfigWithMaxRowsPerSegment(2, true), false ), @@ -252,53 +259,65 @@ public class IndexTaskTest extends IngestionTestBase writer.write("2014-01-01T02:00:30Z,c,and|another,0|1,1\n"); } + final DimensionsSpec dimensionsSpec = new DimensionsSpec( + DimensionsSpec.getDefaultSchemas( + Arrays.asList( + "ts", + "dim", + "dim_array", + "dim_num_array", + "dimt", + "dimtarray1", + "dimtarray2", + "dimtnum_array" + ) + ) + ); + final List columns = Arrays.asList("ts", "dim", "dim_array", "dim_num_array", "val"); + final String listDelimiter = "|"; + final TransformSpec transformSpec = new TransformSpec( + new SelectorDimFilter("dim", "b", null), + ImmutableList.of( + new ExpressionTransform("dimt", "concat(dim,dim)", ExprMacroTable.nil()), + new ExpressionTransform("dimtarray1", "array(dim, dim)", ExprMacroTable.nil()), + new ExpressionTransform( + "dimtarray2", + "map(d -> concat(d, 'foo'), dim_array)", + ExprMacroTable.nil() + ), + new ExpressionTransform("dimtnum_array", "map(d -> d + 3, dim_num_array)", ExprMacroTable.nil()) + ) + ); + final IndexTuningConfig tuningConfig = createTuningConfigWithMaxRowsPerSegment(2, false); + final IndexIngestionSpec indexIngestionSpec; + if (useInputFormatApi) { + indexIngestionSpec = createIngestionSpec( + jsonMapper, + tmpDir, + DEFAULT_TIMESTAMP_SPEC, + dimensionsSpec, + new CsvInputFormat(columns, listDelimiter, null, false, 0), + transformSpec, + null, + tuningConfig, + false + ); + } else { + indexIngestionSpec = createIngestionSpec( + jsonMapper, + tmpDir, + new CSVParseSpec(DEFAULT_TIMESTAMP_SPEC, dimensionsSpec, listDelimiter, columns, false, 0), + transformSpec, + null, + tuningConfig, + false + ); + } + IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec( - useInputFormatApi, - jsonMapper, - tmpDir, - new CSVParseSpec( - new TimestampSpec( - "ts", - "auto", - null - ), - new DimensionsSpec( - DimensionsSpec.getDefaultSchemas( - Arrays.asList( - "ts", - "dim", - "dim_array", - "dim_num_array", - "dimt", - "dimtarray1", - "dimtarray2", - "dimtnum_array" - ) - ), - new ArrayList<>(), - new ArrayList<>() - ), - "|", - Arrays.asList("ts", "dim", "dim_array", "dim_num_array", "val"), - false, - 0 - ), - new TransformSpec( - new SelectorDimFilter("dim", "b", null), - ImmutableList.of( - new ExpressionTransform("dimt", "concat(dim,dim)", ExprMacroTable.nil()), - new ExpressionTransform("dimtarray1", "array(dim, dim)", ExprMacroTable.nil()), - new ExpressionTransform("dimtarray2", "map(d -> concat(d, 'foo'), dim_array)", ExprMacroTable.nil()), - new ExpressionTransform("dimtnum_array", "map(d -> d + 3, dim_num_array)", ExprMacroTable.nil()) - ) - ), - null, - createTuningConfigWithMaxRowsPerSegment(2, false), - false - ), + indexIngestionSpec, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, @@ -329,13 +348,22 @@ public class IndexTaskTest extends IngestionTestBase final List> transforms = cursorSequence .map(cursor -> { final DimensionSelector selector1 = cursor.getColumnSelectorFactory() - .makeDimensionSelector(new DefaultDimensionSpec("dimt", "dimt")); + .makeDimensionSelector(new DefaultDimensionSpec("dimt", "dimt")); final DimensionSelector selector2 = cursor.getColumnSelectorFactory() - .makeDimensionSelector(new DefaultDimensionSpec("dimtarray1", "dimtarray1")); + .makeDimensionSelector(new DefaultDimensionSpec( + "dimtarray1", + "dimtarray1" + )); final DimensionSelector selector3 = cursor.getColumnSelectorFactory() - .makeDimensionSelector(new DefaultDimensionSpec("dimtarray2", "dimtarray2")); + .makeDimensionSelector(new DefaultDimensionSpec( + "dimtarray2", + "dimtarray2" + )); final DimensionSelector selector4 = cursor.getColumnSelectorFactory() - .makeDimensionSelector(new DefaultDimensionSpec("dimtnum_array", "dimtnum_array")); + .makeDimensionSelector(new DefaultDimensionSpec( + "dimtnum_array", + "dimtnum_array" + )); Map row = new HashMap<>(); @@ -375,15 +403,14 @@ public class IndexTaskTest extends IngestionTestBase IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec( - useInputFormatApi, + createDefaultIngestionSpec( jsonMapper, tmpDir, - null, new ArbitraryGranularitySpec( Granularities.MINUTE, Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")) ), + null, createTuningConfigWithMaxRowsPerSegment(10, true), false ), @@ -414,16 +441,15 @@ public class IndexTaskTest extends IngestionTestBase IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec( - useInputFormatApi, + createDefaultIngestionSpec( jsonMapper, tmpDir, - null, new UniformGranularitySpec( Granularities.HOUR, Granularities.HOUR, Collections.singletonList(Intervals.of("2014-01-01T08:00:00Z/2014-01-01T09:00:00Z")) ), + null, createTuningConfigWithMaxRowsPerSegment(50, true), false ), @@ -454,13 +480,11 @@ public class IndexTaskTest extends IngestionTestBase IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec( - useInputFormatApi, + createDefaultIngestionSpec( jsonMapper, tmpDir, null, null, - null, createTuningConfigWithPartitionsSpec(new HashedPartitionsSpec(null, 1, null), true), false ), @@ -496,13 +520,11 @@ public class IndexTaskTest extends IngestionTestBase final IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec( - useInputFormatApi, + createDefaultIngestionSpec( jsonMapper, tmpDir, null, null, - null, createTuningConfigWithPartitionsSpec(new HashedPartitionsSpec(null, 2, ImmutableList.of("dim")), true), false ), @@ -574,13 +596,11 @@ public class IndexTaskTest extends IngestionTestBase IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec( - useInputFormatApi, + createDefaultIngestionSpec( jsonMapper, tmpDir, null, null, - null, createTuningConfigWithMaxRowsPerSegment(2, false), true ), @@ -624,16 +644,15 @@ public class IndexTaskTest extends IngestionTestBase IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec( - useInputFormatApi, + createDefaultIngestionSpec( jsonMapper, tmpDir, - null, new UniformGranularitySpec( Granularities.HOUR, Granularities.MINUTE, null ), + null, createTuningConfigWithMaxRowsPerSegment(2, true), false ), @@ -676,25 +695,37 @@ public class IndexTaskTest extends IngestionTestBase writer.write("2014-01-01T00:00:10Z,a,1\n"); } + final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null); + final IndexTuningConfig tuningConfig = createTuningConfigWithMaxRowsPerSegment(2, true); + final IndexIngestionSpec ingestionSpec; + if (useInputFormatApi) { + ingestionSpec = createIngestionSpec( + jsonMapper, + tmpDir, + new CSVParseSpec(timestampSpec, DimensionsSpec.EMPTY, null, null, true, 0), + null, + null, + tuningConfig, + false + ); + } else { + ingestionSpec = createIngestionSpec( + jsonMapper, + tmpDir, + timestampSpec, + DimensionsSpec.EMPTY, + new CsvInputFormat(null, null, null, true, 0), + null, + null, + tuningConfig, + false + ); + } + IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec( - useInputFormatApi, - jsonMapper, - tmpDir, - new CSVParseSpec( - new TimestampSpec("time", "auto", null), - DimensionsSpec.EMPTY, - null, - null, - true, - 0 - ), - null, - createTuningConfigWithMaxRowsPerSegment(2, true), - false - ), + ingestionSpec, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, @@ -723,25 +754,38 @@ public class IndexTaskTest extends IngestionTestBase writer.write("2014-01-01T00:00:10Z,a,1\n"); } + final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null); + final List columns = Arrays.asList("time", "dim", "val"); + final IndexTuningConfig tuningConfig = createTuningConfigWithMaxRowsPerSegment(2, true); + final IndexIngestionSpec ingestionSpec; + if (useInputFormatApi) { + ingestionSpec = createIngestionSpec( + jsonMapper, + tmpDir, + new CSVParseSpec(timestampSpec, DimensionsSpec.EMPTY, null, columns, true, 0), + null, + null, + tuningConfig, + false + ); + } else { + ingestionSpec = createIngestionSpec( + jsonMapper, + tmpDir, + timestampSpec, + DimensionsSpec.EMPTY, + new CsvInputFormat(columns, null, null, true, 0), + null, + null, + tuningConfig, + false + ); + } + IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec( - useInputFormatApi, - jsonMapper, - tmpDir, - new CSVParseSpec( - new TimestampSpec("time", "auto", null), - DimensionsSpec.EMPTY, - null, - Arrays.asList("time", "dim", "val"), - true, - 0 - ), - null, - createTuningConfigWithMaxRowsPerSegment(2, true), - false - ), + ingestionSpec, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, @@ -779,16 +823,15 @@ public class IndexTaskTest extends IngestionTestBase IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec( - useInputFormatApi, + createDefaultIngestionSpec( jsonMapper, tmpDir, - null, new UniformGranularitySpec( Granularities.HOUR, Granularities.MINUTE, null ), + null, createTuningConfig(2, 2, null, 2L, null, false, true), false ), @@ -826,17 +869,16 @@ public class IndexTaskTest extends IngestionTestBase IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec( - useInputFormatApi, + createDefaultIngestionSpec( jsonMapper, tmpDir, - null, new UniformGranularitySpec( Granularities.DAY, Granularities.DAY, true, null ), + null, createTuningConfig(3, 2, null, 2L, null, true, true), false ), @@ -873,17 +915,16 @@ public class IndexTaskTest extends IngestionTestBase IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec( - useInputFormatApi, + createDefaultIngestionSpec( jsonMapper, tmpDir, - null, new UniformGranularitySpec( Granularities.DAY, Granularities.DAY, true, null ), + null, createTuningConfig(3, 2, null, 2L, null, false, true), false ), @@ -937,24 +978,37 @@ public class IndexTaskTest extends IngestionTestBase writer.write("2014-01-01T00:00:10Z,a,1\n"); } + final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null); + final List columns = Arrays.asList("time", "dim", "val"); + // ignore parse exception + final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null, null, null, false, false); + // GranularitySpec.intervals and numShards must be null to verify reportParseException=false is respected both in // IndexTask.determineShardSpecs() and IndexTask.generateAndPublishSegments() - final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec( - useInputFormatApi, - jsonMapper, - tmpDir, - new CSVParseSpec( - new TimestampSpec("time", "auto", null), - DimensionsSpec.EMPTY, - null, - Arrays.asList("time", "dim", "val"), - true, - 0 - ), - null, - createTuningConfig(2, null, null, null, null, false, false), // ignore parse exception, - false - ); + final IndexIngestionSpec parseExceptionIgnoreSpec; + if (useInputFormatApi) { + parseExceptionIgnoreSpec = createIngestionSpec( + jsonMapper, + tmpDir, + timestampSpec, + DimensionsSpec.EMPTY, + new CsvInputFormat(columns, null, null, true, 0), + null, + null, + tuningConfig, + false + ); + } else { + parseExceptionIgnoreSpec = createIngestionSpec( + jsonMapper, + tmpDir, + new CSVParseSpec(timestampSpec, DimensionsSpec.EMPTY, null, columns, true, 0), + null, + null, + tuningConfig, + false + ); + } IndexTask indexTask = new IndexTask( null, @@ -987,27 +1041,39 @@ public class IndexTaskTest extends IngestionTestBase writer.write("2014-01-01T00:00:10Z,a,1\n"); } - final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec( - useInputFormatApi, - jsonMapper, - tmpDir, - new CSVParseSpec( - new TimestampSpec("time", "auto", null), - DimensionsSpec.EMPTY, - null, - Arrays.asList("time", "dim", "val"), - true, - 0 - ), - null, - createTuningConfig(2, null, null, null, null, false, true), // report parse exception - false - ); + final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null); + final List columns = Arrays.asList("time", "dim", "val"); + // report parse exception + final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null, null, null, false, true); + final IndexIngestionSpec indexIngestionSpec; + if (useInputFormatApi) { + indexIngestionSpec = createIngestionSpec( + jsonMapper, + tmpDir, + timestampSpec, + DimensionsSpec.EMPTY, + new CsvInputFormat(columns, null, null, true, 0), + null, + null, + tuningConfig, + false + ); + } else { + indexIngestionSpec = createIngestionSpec( + jsonMapper, + tmpDir, + new CSVParseSpec(timestampSpec, DimensionsSpec.EMPTY, null, columns, true, 0), + null, + null, + tuningConfig, + false + ); + } IndexTask indexTask = new IndexTask( null, null, - parseExceptionIgnoreSpec, + indexIngestionSpec, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, @@ -1075,31 +1141,43 @@ public class IndexTaskTest extends IngestionTestBase 7 ); - final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec( - useInputFormatApi, - jsonMapper, - tmpDir, - new JSONParseSpec( - new TimestampSpec("time", "auto", null), - new DimensionsSpec( - Arrays.asList( - new StringDimensionSchema("dim"), - new LongDimensionSchema("dimLong"), - new FloatDimensionSchema("dimFloat") - ) - ), - null, - null - ), - null, - tuningConfig, - false + final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null); + final DimensionsSpec dimensionsSpec = new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("dim"), + new LongDimensionSchema("dimLong"), + new FloatDimensionSchema("dimFloat") + ) ); + final IndexIngestionSpec ingestionSpec; + if (useInputFormatApi) { + ingestionSpec = createIngestionSpec( + jsonMapper, + tmpDir, + timestampSpec, + dimensionsSpec, + new JsonInputFormat(null, null), + null, + null, + tuningConfig, + false + ); + } else { + ingestionSpec = createIngestionSpec( + jsonMapper, + tmpDir, + new JSONParseSpec(timestampSpec, dimensionsSpec, null, null), + null, + null, + tuningConfig, + false + ); + } IndexTask indexTask = new IndexTask( null, null, - parseExceptionIgnoreSpec, + ingestionSpec, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, @@ -1195,33 +1273,44 @@ public class IndexTaskTest extends IngestionTestBase 5 ); - final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec( - useInputFormatApi, - jsonMapper, - tmpDir, - new CSVParseSpec( - new TimestampSpec("time", "auto", null), - new DimensionsSpec( - Arrays.asList( - new StringDimensionSchema("dim"), - new LongDimensionSchema("dimLong"), - new FloatDimensionSchema("dimFloat") - ) - ), - null, - Arrays.asList("time", "dim", "dimLong", "dimFloat", "val"), - true, - 0 - ), - null, - tuningConfig, - false + final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null); + final DimensionsSpec dimensionsSpec = new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("dim"), + new LongDimensionSchema("dimLong"), + new FloatDimensionSchema("dimFloat") + ) ); + final List columns = Arrays.asList("time", "dim", "dimLong", "dimFloat", "val"); + final IndexIngestionSpec ingestionSpec; + if (useInputFormatApi) { + ingestionSpec = createIngestionSpec( + jsonMapper, + tmpDir, + timestampSpec, + dimensionsSpec, + new CsvInputFormat(columns, null, null, true, 0), + null, + null, + tuningConfig, + false + ); + } else { + ingestionSpec = createIngestionSpec( + jsonMapper, + tmpDir, + new CSVParseSpec(timestampSpec, dimensionsSpec, null, columns, true, 0), + null, + null, + tuningConfig, + false + ); + } IndexTask indexTask = new IndexTask( null, null, - parseExceptionIgnoreSpec, + ingestionSpec, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, @@ -1308,33 +1397,44 @@ public class IndexTaskTest extends IngestionTestBase 5 ); - final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec( - useInputFormatApi, - jsonMapper, - tmpDir, - new CSVParseSpec( - new TimestampSpec("time", "auto", null), - new DimensionsSpec( - Arrays.asList( - new StringDimensionSchema("dim"), - new LongDimensionSchema("dimLong"), - new FloatDimensionSchema("dimFloat") - ) - ), - null, - Arrays.asList("time", "dim", "dimLong", "dimFloat", "val"), - true, - 0 - ), - null, - tuningConfig, - false + final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null); + final DimensionsSpec dimensionsSpec = new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("dim"), + new LongDimensionSchema("dimLong"), + new FloatDimensionSchema("dimFloat") + ) ); + final List columns = Arrays.asList("time", "dim", "dimLong", "dimFloat", "val"); + final IndexIngestionSpec ingestionSpec; + if (useInputFormatApi) { + ingestionSpec = createIngestionSpec( + jsonMapper, + tmpDir, + timestampSpec, + dimensionsSpec, + new CsvInputFormat(columns, null, null, true, 0), + null, + null, + tuningConfig, + false + ); + } else { + ingestionSpec = createIngestionSpec( + jsonMapper, + tmpDir, + new CSVParseSpec(timestampSpec, dimensionsSpec, null, columns, true, 0), + null, + null, + tuningConfig, + false + ); + } IndexTask indexTask = new IndexTask( null, null, - parseExceptionIgnoreSpec, + ingestionSpec, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, @@ -1389,45 +1489,55 @@ public class IndexTaskTest extends IngestionTestBase File tmpFile = File.createTempFile("druid", "index", tmpDir); try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { - writer.write("time,,\n"); + writer.write("ts,,\n"); writer.write("2014-01-01T00:00:10Z,a,1\n"); } tmpFile = File.createTempFile("druid", "index", tmpDir); try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { - writer.write("time,dim,\n"); + writer.write("ts,dim,\n"); writer.write("2014-01-01T00:00:10Z,a,1\n"); } tmpFile = File.createTempFile("druid", "index", tmpDir); try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { - writer.write("time,,val\n"); + writer.write("ts,,val\n"); writer.write("2014-01-01T00:00:10Z,a,1\n"); } - final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec( - useInputFormatApi, - jsonMapper, - tmpDir, - new CSVParseSpec( - new TimestampSpec("time", "auto", null), - DimensionsSpec.EMPTY, - null, - null, - true, - 0 - ), - null, - createTuningConfig(2, 1, null, null, null, true, true), // report parse exception - false - ); + // report parse exception + final IndexTuningConfig tuningConfig = createTuningConfig(2, 1, null, null, null, true, true); + final IndexIngestionSpec ingestionSpec; + if (useInputFormatApi) { + ingestionSpec = createIngestionSpec( + jsonMapper, + tmpDir, + DEFAULT_TIMESTAMP_SPEC, + DimensionsSpec.EMPTY, + new CsvInputFormat(null, null, null, true, 0), + null, + null, + tuningConfig, + false + ); + } else { + ingestionSpec = createIngestionSpec( + jsonMapper, + tmpDir, + new CSVParseSpec(DEFAULT_TIMESTAMP_SPEC, DimensionsSpec.EMPTY, null, null, true, 0), + null, + null, + tuningConfig, + false + ); + } IndexTask indexTask = new IndexTask( null, null, - parseExceptionIgnoreSpec, + ingestionSpec, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, @@ -1472,27 +1582,38 @@ public class IndexTaskTest extends IngestionTestBase writer.write("2014-01-01T00:00:10Z,a,1\n"); } - final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec( - useInputFormatApi, - jsonMapper, - tmpDir, - new CSVParseSpec( - new TimestampSpec("time", "auto", null), - DimensionsSpec.EMPTY, - null, - Arrays.asList("time", "", ""), - true, - 0 - ), - null, - createTuningConfig(2, null, null, null, null, false, true), // report parse exception - false - ); + final List columns = Arrays.asList("ts", "", ""); + // report parse exception + final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null, null, null, false, true); + final IndexIngestionSpec ingestionSpec; + if (useInputFormatApi) { + ingestionSpec = createIngestionSpec( + jsonMapper, + tmpDir, + DEFAULT_TIMESTAMP_SPEC, + DimensionsSpec.EMPTY, + new CsvInputFormat(columns, null, null, true, 0), + null, + null, + tuningConfig, + false + ); + } else { + ingestionSpec = createIngestionSpec( + jsonMapper, + tmpDir, + new CSVParseSpec(DEFAULT_TIMESTAMP_SPEC, DimensionsSpec.EMPTY, null, columns, true, 0), + null, + null, + tuningConfig, + false + ); + } IndexTask indexTask = new IndexTask( null, null, - parseExceptionIgnoreSpec, + ingestionSpec, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, @@ -1529,17 +1650,16 @@ public class IndexTaskTest extends IngestionTestBase final IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec( - useInputFormatApi, + createDefaultIngestionSpec( jsonMapper, tmpDir, - null, new UniformGranularitySpec( Granularities.DAY, Granularities.DAY, true, null ), + null, createTuningConfig(3, 2, null, 2L, null, false, true), false ), @@ -1598,17 +1718,16 @@ public class IndexTaskTest extends IngestionTestBase final IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec( - useInputFormatApi, + createDefaultIngestionSpec( jsonMapper, tmpDir, - null, new UniformGranularitySpec( segmentGranularity, Granularities.DAY, true, null ), + null, createTuningConfig(3, 2, null, 2L, null, false, true), false ), @@ -1643,13 +1762,11 @@ public class IndexTaskTest extends IngestionTestBase final IndexTask task = new IndexTask( null, null, - createIngestionSpec( - useInputFormatApi, + createDefaultIngestionSpec( jsonMapper, tmpDir, null, null, - null, createTuningConfigWithPartitionsSpec(new SingleDimensionPartitionsSpec(null, 1, null, false), true), false ), @@ -1762,20 +1879,58 @@ public class IndexTaskTest extends IngestionTestBase ); } + private IndexIngestionSpec createDefaultIngestionSpec( + ObjectMapper objectMapper, + File baseDir, + @Nullable GranularitySpec granularitySpec, + @Nullable TransformSpec transformSpec, + IndexTuningConfig tuningConfig, + boolean appendToExisting + ) + { + if (useInputFormatApi) { + return createIngestionSpec( + objectMapper, + baseDir, + DEFAULT_TIMESTAMP_SPEC, + DEFAULT_DIMENSIONS_SPEC, + DEFAULT_INPUT_FORMAT, + transformSpec, + granularitySpec, + tuningConfig, + appendToExisting + ); + } else { + return createIngestionSpec( + objectMapper, + baseDir, + DEFAULT_PARSE_SPEC, + transformSpec, + granularitySpec, + tuningConfig, + appendToExisting + ); + } + } + static IndexIngestionSpec createIngestionSpec( ObjectMapper objectMapper, File baseDir, @Nullable ParseSpec parseSpec, - GranularitySpec granularitySpec, + @Nullable TransformSpec transformSpec, + @Nullable GranularitySpec granularitySpec, IndexTuningConfig tuningConfig, boolean appendToExisting ) { return createIngestionSpec( - false, objectMapper, baseDir, parseSpec, + null, + null, + null, + transformSpec, granularitySpec, tuningConfig, appendToExisting @@ -1783,21 +1938,25 @@ public class IndexTaskTest extends IngestionTestBase } static IndexIngestionSpec createIngestionSpec( - boolean useInputFormatApi, ObjectMapper objectMapper, File baseDir, - @Nullable ParseSpec parseSpec, - GranularitySpec granularitySpec, + TimestampSpec timestampSpec, + DimensionsSpec dimensionsSpec, + InputFormat inputFormat, + @Nullable TransformSpec transformSpec, + @Nullable GranularitySpec granularitySpec, IndexTuningConfig tuningConfig, boolean appendToExisting ) { return createIngestionSpec( - useInputFormatApi, objectMapper, baseDir, - parseSpec, - TransformSpec.NONE, + null, + timestampSpec, + dimensionsSpec, + inputFormat, + transformSpec, granularitySpec, tuningConfig, appendToExisting @@ -1805,22 +1964,25 @@ public class IndexTaskTest extends IngestionTestBase } private static IndexIngestionSpec createIngestionSpec( - boolean useInputFormatApi, ObjectMapper objectMapper, File baseDir, @Nullable ParseSpec parseSpec, - TransformSpec transformSpec, - GranularitySpec granularitySpec, + @Nullable TimestampSpec timestampSpec, + @Nullable DimensionsSpec dimensionsSpec, + @Nullable InputFormat inputFormat, + @Nullable TransformSpec transformSpec, + @Nullable GranularitySpec granularitySpec, IndexTuningConfig tuningConfig, boolean appendToExisting ) { - if (useInputFormatApi) { + if (inputFormat != null) { + Preconditions.checkArgument(parseSpec == null, "Can't use parseSpec"); return new IndexIngestionSpec( new DataSchema( "test", - parseSpec == null ? DEFAULT_TIMESTAMP_SPEC : parseSpec.getTimestampSpec(), - parseSpec == null ? DEFAULT_DIMENSIONS_SPEC : parseSpec.getDimensionsSpec(), + Preconditions.checkNotNull(timestampSpec, "timestampSpec"), + Preconditions.checkNotNull(dimensionsSpec, "dimensionsSpec"), new AggregatorFactory[]{ new LongSumAggregatorFactory("val", "val") }, @@ -1834,7 +1996,7 @@ public class IndexTaskTest extends IngestionTestBase new IndexIOConfig( null, new LocalInputSource(baseDir, "druid*"), - parseSpec == null ? DEFAULT_INPUT_FORMAT : parseSpec.toInputFormat(), + inputFormat, appendToExisting ), tuningConfig diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java index 194a556a129..7970e1dd9b7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java @@ -19,10 +19,14 @@ package org.apache.druid.indexing.common.task.batch.parallel; +import com.google.common.base.Preconditions; +import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.MaxSizeSplitHintSpec; +import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.LocalInputSource; import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.data.input.impl.StringInputRowParser; +import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.DimensionBasedPartitionsSpec; @@ -55,6 +59,7 @@ import org.apache.druid.timeline.DataSegment; import org.joda.time.Interval; import org.junit.Assert; +import javax.annotation.Nullable; import java.io.File; import java.util.Collections; import java.util.List; @@ -82,8 +87,16 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn this.useInputFormatApi = useInputFormatApi; } + boolean isUseInputFormatApi() + { + return useInputFormatApi; + } + Set runTestTask( - ParseSpec parseSpec, + @Nullable TimestampSpec timestampSpec, + @Nullable DimensionsSpec dimensionsSpec, + @Nullable InputFormat inputFormat, + @Nullable ParseSpec parseSpec, Interval interval, File inputDir, String filter, @@ -93,6 +106,9 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn ) { final ParallelIndexSupervisorTask task = newTask( + timestampSpec, + dimensionsSpec, + inputFormat, parseSpec, interval, inputDir, @@ -108,7 +124,10 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn } private ParallelIndexSupervisorTask newTask( - ParseSpec parseSpec, + @Nullable TimestampSpec timestampSpec, + @Nullable DimensionsSpec dimensionsSpec, + @Nullable InputFormat inputFormat, + @Nullable ParseSpec parseSpec, Interval interval, File inputDir, String filter, @@ -154,17 +173,18 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn final ParallelIndexIngestionSpec ingestionSpec; if (useInputFormatApi) { + Preconditions.checkArgument(parseSpec == null); ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig( null, new LocalInputSource(inputDir, filter), - parseSpec.toInputFormat(), + inputFormat, false ); ingestionSpec = new ParallelIndexIngestionSpec( new DataSchema( "dataSource", - parseSpec.getTimestampSpec(), - parseSpec.getDimensionsSpec(), + timestampSpec, + dimensionsSpec, new AggregatorFactory[]{ new LongSumAggregatorFactory("val", "val") }, @@ -175,6 +195,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn tuningConfig ); } else { + Preconditions.checkArgument(inputFormat == null); ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig( new LocalFirehoseFactory(inputDir, filter, null), false diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index 4542ac00cf0..ca33815f28e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -38,6 +38,7 @@ import org.apache.druid.client.indexing.NoopIndexingServiceClient; import org.apache.druid.client.indexing.TaskStatusResponse; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.impl.CSVParseSpec; +import org.apache.druid.data.input.impl.CsvInputFormat; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.data.input.impl.TimestampSpec; @@ -131,7 +132,13 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase false, 0 ); - static final InputFormat DEFAULT_INPUT_FORMAT = DEFAULT_PARSE_SPEC.toInputFormat(); + static final InputFormat DEFAULT_INPUT_FORMAT = new CsvInputFormat( + Arrays.asList("ts", "dim", "val"), + null, + false, + false, + 0 + ); static final ParallelIndexTuningConfig DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING = new ParallelIndexTuningConfig( null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java index c1165ddd0f9..bcd3cbe8584 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java @@ -20,7 +20,9 @@ package org.apache.druid.indexing.common.task.batch.parallel; import com.google.common.collect.ImmutableList; +import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.impl.CSVParseSpec; +import org.apache.druid.data.input.impl.CsvInputFormat; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.data.input.impl.TimestampSpec; @@ -54,18 +56,25 @@ import java.util.Set; @RunWith(Parameterized.class) public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPhaseParallelIndexingTest { + private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec("ts", "auto", null); + private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2")) + ); private static final ParseSpec PARSE_SPEC = new CSVParseSpec( - new TimestampSpec( - "ts", - "auto", - null - ), - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2"))), + TIMESTAMP_SPEC, + DIMENSIONS_SPEC, null, Arrays.asList("ts", "dim1", "dim2", "val"), false, 0 ); + private static final InputFormat INPUT_FORMAT = new CsvInputFormat( + Arrays.asList("ts", "dim1", "dim2", "val"), + null, + false, + false, + 0 + ); private static final int MAX_NUM_CONCURRENT_SUB_TASKS = 2; private static final Interval INTERVAL_TO_INDEX = Intervals.of("2017-12/P1M"); @@ -112,15 +121,34 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh @Test public void testRun() throws Exception { - final Set publishedSegments = runTestTask( - PARSE_SPEC, - INTERVAL_TO_INDEX, - inputDir, - "test_*", - new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")), - MAX_NUM_CONCURRENT_SUB_TASKS, - TaskState.SUCCESS - ); + final Set publishedSegments; + if (isUseInputFormatApi()) { + publishedSegments = runTestTask( + TIMESTAMP_SPEC, + DIMENSIONS_SPEC, + INPUT_FORMAT, + null, + INTERVAL_TO_INDEX, + inputDir, + "test_*", + new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")), + MAX_NUM_CONCURRENT_SUB_TASKS, + TaskState.SUCCESS + ); + } else { + publishedSegments = runTestTask( + null, + null, + null, + PARSE_SPEC, + INTERVAL_TO_INDEX, + inputDir, + "test_*", + new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")), + MAX_NUM_CONCURRENT_SUB_TASKS, + TaskState.SUCCESS + ); + } assertHashedPartition(publishedSegments); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java index da2f7a93952..f62799b4963 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java @@ -25,7 +25,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Multimap; import com.google.common.collect.SetMultimap; import org.apache.druid.common.config.NullValueHandlingConfig; +import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.impl.CSVParseSpec; +import org.apache.druid.data.input.impl.CsvInputFormat; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.data.input.impl.TimestampSpec; @@ -79,18 +81,25 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP private static final String LIST_DELIMITER = "|"; private static final List DIMS = ImmutableList.of(DIM1, DIM2); private static final String TEST_FILE_NAME_PREFIX = "test_"; + private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec(TIME, "auto", null); + private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(Arrays.asList(TIME, DIM1, DIM2)) + ); private static final ParseSpec PARSE_SPEC = new CSVParseSpec( - new TimestampSpec( - TIME, - "auto", - null - ), - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList(TIME, DIM1, DIM2))), + TIMESTAMP_SPEC, + DIMENSIONS_SPEC, LIST_DELIMITER, Arrays.asList(TIME, DIM1, DIM2, "val"), false, 0 ); + private static final InputFormat INPUT_FORMAT = new CsvInputFormat( + Arrays.asList(TIME, DIM1, DIM2, "val"), + LIST_DELIMITER, + false, + false, + 0 + ); @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}, maxNumConcurrentSubTasks={2}, useMultiValueDim={3}") public static Iterable constructorFeeder() @@ -192,20 +201,44 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP public void createsCorrectRangePartitions() throws Exception { int targetRowsPerSegment = NUM_ROW / DIM_FILE_CARDINALITY / NUM_PARTITION; - final Set publishedSegments = runTestTask( - PARSE_SPEC, - INTERVAL_TO_INDEX, - inputDir, - TEST_FILE_NAME_PREFIX + "*", - new SingleDimensionPartitionsSpec( - targetRowsPerSegment, - null, - DIM1, - false - ), - maxNumConcurrentSubTasks, - useMultivalueDim ? TaskState.FAILED : TaskState.SUCCESS - ); + final Set publishedSegments; + if (isUseInputFormatApi()) { + publishedSegments = runTestTask( + TIMESTAMP_SPEC, + DIMENSIONS_SPEC, + INPUT_FORMAT, + null, + INTERVAL_TO_INDEX, + inputDir, + TEST_FILE_NAME_PREFIX + "*", + new SingleDimensionPartitionsSpec( + targetRowsPerSegment, + null, + DIM1, + false + ), + maxNumConcurrentSubTasks, + useMultivalueDim ? TaskState.FAILED : TaskState.SUCCESS + ); + } else { + publishedSegments = runTestTask( + null, + null, + null, + PARSE_SPEC, + INTERVAL_TO_INDEX, + inputDir, + TEST_FILE_NAME_PREFIX + "*", + new SingleDimensionPartitionsSpec( + targetRowsPerSegment, + null, + DIM1, + false + ), + maxNumConcurrentSubTasks, + useMultivalueDim ? TaskState.FAILED : TaskState.SUCCESS + ); + } if (!useMultivalueDim) { assertRangePartitions(publishedSegments); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java index d1476916102..03a119350d8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java @@ -19,8 +19,6 @@ package org.apache.druid.indexing.seekablestream; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; @@ -87,7 +85,6 @@ import org.easymock.EasyMockSupport; import org.joda.time.Interval; import org.junit.Assert; -import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.lang.reflect.InvocationTargetException; @@ -149,12 +146,12 @@ public class SeekableStreamIndexTaskTestBase extends EasyMockSupport static { OBJECT_MAPPER = new TestUtils().getTestObjectMapper(); - OBJECT_MAPPER.registerSubtypes(new NamedType(UnimplementedInputFormatJsonParseSpec.class, "json")); + OBJECT_MAPPER.registerSubtypes(new NamedType(JSONParseSpec.class, "json")); OLD_DATA_SCHEMA = new DataSchema( "test_ds", OBJECT_MAPPER.convertValue( new StringInputRowParser( - new UnimplementedInputFormatJsonParseSpec( + new JSONParseSpec( new TimestampSpec("timestamp", "iso", null), new DimensionsSpec( Arrays.asList( @@ -443,25 +440,4 @@ public class SeekableStreamIndexTaskTestBase extends EasyMockSupport return segmentDescriptor; } } - - private static class UnimplementedInputFormatJsonParseSpec extends JSONParseSpec - { - @JsonCreator - private UnimplementedInputFormatJsonParseSpec( - @JsonProperty("timestampSpec") TimestampSpec timestampSpec, - @JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec, - @JsonProperty("flattenSpec") JSONPathSpec flattenSpec, - @JsonProperty("featureSpec") Map featureSpec - ) - { - super(timestampSpec, dimensionsSpec, flattenSpec, featureSpec); - } - - @Nullable - @Override - public InputFormat toInputFormat() - { - return null; - } - } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java index 76a02db3138..c1814d6695f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java @@ -22,7 +22,6 @@ package org.apache.druid.indexing.seekablestream; import com.google.common.collect.Iterables; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputEntityReader; -import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -63,7 +62,12 @@ public class StreamChunkParserTest public void testWithParserAndNullInputformatParseProperly() throws IOException { final InputRowParser parser = new StringInputRowParser( - new NotConvertibleToInputFormatParseSpec(), + new JSONParseSpec( + TIMESTAMP_SPEC, + DimensionsSpec.EMPTY, + JSONPathSpec.DEFAULT, + Collections.emptyMap() + ), StringUtils.UTF8_STRING ); final StreamChunkParser chunkParser = new StreamChunkParser( @@ -109,7 +113,12 @@ public class StreamChunkParserTest public void testBothParserAndInputFormatParseProperlyUsingInputFormat() throws IOException { final InputRowParser parser = new StringInputRowParser( - new NotConvertibleToInputFormatParseSpec(), + new JSONParseSpec( + TIMESTAMP_SPEC, + DimensionsSpec.EMPTY, + JSONPathSpec.DEFAULT, + Collections.emptyMap() + ), StringUtils.UTF8_STRING ); final TrackingJsonInputFormat inputFormat = new TrackingJsonInputFormat( @@ -138,25 +147,6 @@ public class StreamChunkParserTest Assert.assertEquals("val2", Iterables.getOnlyElement(row.getDimension("met"))); } - private static class NotConvertibleToInputFormatParseSpec extends JSONParseSpec - { - private NotConvertibleToInputFormatParseSpec() - { - super( - TIMESTAMP_SPEC, - DimensionsSpec.EMPTY, - JSONPathSpec.DEFAULT, - Collections.emptyMap() - ); - } - - @Override - public InputFormat toInputFormat() - { - return null; - } - } - private static class TrackingJsonInputFormat extends JsonInputFormat { private boolean used; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index d34bbbbec82..73bbaca4bad 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -1027,9 +1027,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport true, minimumMessageTime, maximumMessageTime, - ioConfig.getInputFormat( - getDataSchema().getParser() == null ? null : getDataSchema().getParser().getParseSpec() - ) + ioConfig.getInputFormat() ) { };