From 5e5625f3ae9fa663697214f49e66b5c296d342fb Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 13 Apr 2022 12:43:11 -0700 Subject: [PATCH] Fix indexMerger to respect the includeAllDimensions flag (#12428) * Fix indexMerger to respect flag includeAllDimensions flag; jsonInputFormat should set keepNullColumns if useFieldDiscovery is set * address comments --- .../data/input/impl/JsonInputFormat.java | 12 +- .../data/input/impl/JsonInputFormatTest.java | 37 +- ...aseParallelIndexingWithNullColumnTest.java | 201 -------- ...aseParallelIndexingWithNullColumnTest.java | 435 ++++++++++++++++++ ...rtitionMultiPhaseParallelIndexingTest.java | 122 ----- .../apache/druid/segment/IndexMergerV9.java | 51 +- 6 files changed, 517 insertions(+), 341 deletions(-) delete mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingWithNullColumnTest.java create mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingWithNullColumnTest.java diff --git a/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java index 8a9a3fd5296..0ce4cbfa8b5 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java @@ -74,7 +74,11 @@ public class JsonInputFormat extends NestedInputFormat super(flattenSpec); this.featureSpec = featureSpec == null ? Collections.emptyMap() : featureSpec; this.objectMapper = new ObjectMapper(); - this.keepNullColumns = keepNullColumns == null ? false : keepNullColumns; + if (keepNullColumns != null) { + this.keepNullColumns = keepNullColumns; + } else { + this.keepNullColumns = flattenSpec != null && flattenSpec.isUseFieldDiscovery(); + } for (Entry entry : this.featureSpec.entrySet()) { Feature feature = Feature.valueOf(entry.getKey()); objectMapper.configure(feature, entry.getValue()); @@ -88,6 +92,12 @@ public class JsonInputFormat extends NestedInputFormat return featureSpec; } + @JsonProperty + public boolean isKeepNullColumns() + { + return keepNullColumns; + } + @Override public boolean isSplittable() { diff --git a/core/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java b/core/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java index aa9c95a7e46..0577dd72fb4 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java @@ -41,7 +41,7 @@ public class JsonInputFormatTest final ObjectMapper mapper = new ObjectMapper(); final JsonInputFormat format = new JsonInputFormat( new JSONPathSpec( - false, + true, ImmutableList.of( new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"), new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"), @@ -52,7 +52,7 @@ public class JsonInputFormatTest ) ), ImmutableMap.of(Feature.ALLOW_COMMENTS.name(), true, Feature.ALLOW_UNQUOTED_FIELD_NAMES.name(), false), - false + true ); final byte[] bytes = mapper.writeValueAsBytes(format); final JsonInputFormat fromJson = (JsonInputFormat) mapper.readValue(bytes, InputFormat.class); @@ -72,4 +72,37 @@ public class JsonInputFormatTest .withIgnoredFields("objectMapper") .verify(); } + + @Test + public void test_unsetUseFieldDiscovery_unsetKeepNullColumnsByDefault() + { + final JsonInputFormat format = new JsonInputFormat( + new JSONPathSpec(false, null), + null, + null + ); + Assert.assertFalse(format.isKeepNullColumns()); + } + + @Test + public void testUseFieldDiscovery_setKeepNullColumnsByDefault() + { + final JsonInputFormat format = new JsonInputFormat( + new JSONPathSpec(true, null), + null, + null + ); + Assert.assertTrue(format.isKeepNullColumns()); + } + + @Test + public void testUseFieldDiscovery_doNotChangeKeepNullColumnsUserSets() + { + final JsonInputFormat format = new JsonInputFormat( + new JSONPathSpec(true, null), + null, + false + ); + Assert.assertFalse(format.isKeepNullColumns()); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingWithNullColumnTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingWithNullColumnTest.java deleted file mode 100644 index 1c8669b909e..00000000000 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingWithNullColumnTest.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.common.task.batch.parallel; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import org.apache.druid.data.input.InputFormat; -import org.apache.druid.data.input.InputSource; -import org.apache.druid.data.input.impl.DimensionSchema; -import org.apache.druid.data.input.impl.DimensionsSpec; -import org.apache.druid.data.input.impl.InlineInputSource; -import org.apache.druid.data.input.impl.JsonInputFormat; -import org.apache.druid.data.input.impl.TimestampSpec; -import org.apache.druid.indexer.TaskState; -import org.apache.druid.indexer.partitions.HashedPartitionsSpec; -import org.apache.druid.indexing.common.LockGranularity; -import org.apache.druid.indexing.common.task.Tasks; -import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.granularity.Granularities; -import org.apache.druid.segment.indexing.DataSchema; -import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; -import org.apache.druid.timeline.DataSegment; -import org.joda.time.Interval; -import org.junit.Assert; -import org.junit.Test; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; - -public class HashPartitionMultiPhaseParallelIndexingWithNullColumnTest extends AbstractMultiPhaseParallelIndexingTest -{ - private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec("ts", "auto", null); - private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec( - DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2")) - ); - private static final InputFormat JSON_FORMAT = new JsonInputFormat(null, null, null); - private static final List INTERVAL_TO_INDEX = Collections.singletonList(Intervals.of("2022-01/P1M")); - - public HashPartitionMultiPhaseParallelIndexingWithNullColumnTest() - { - super(LockGranularity.TIME_CHUNK, true, 0., 0.); - } - - @Test - public void testIngestNullColumn() throws JsonProcessingException - { - final List dimensionSchemas = DimensionsSpec.getDefaultSchemas( - Arrays.asList("ts", "unknownDim") - ); - ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask( - null, - null, - null, - new ParallelIndexIngestionSpec( - new DataSchema( - DATASOURCE, - TIMESTAMP_SPEC, - DIMENSIONS_SPEC.withDimensions(dimensionSchemas), - DEFAULT_METRICS_SPEC, - new UniformGranularitySpec( - Granularities.DAY, - Granularities.MINUTE, - INTERVAL_TO_INDEX - ), - null - ), - new ParallelIndexIOConfig( - null, - getInputSource(), - JSON_FORMAT, - false, - null - ), - newTuningConfig( - new HashedPartitionsSpec( - 10, - null, - ImmutableList.of("ts", "unknownDim") - ), - 2, - true - ) - ), - null - ); - - Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode()); - - Set segments = getIndexingServiceClient().getPublishedSegments(task); - Assert.assertFalse(segments.isEmpty()); - for (DataSegment segment : segments) { - for (int i = 0; i < dimensionSchemas.size(); i++) { - Assert.assertEquals(dimensionSchemas.get(i).getName(), segment.getDimensions().get(i)); - } - } - } - - @Test - public void testIngestNullColumn_storeEmptyColumnsOff_shouldNotStoreEmptyColumns() throws JsonProcessingException - { - ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask( - null, - null, - null, - new ParallelIndexIngestionSpec( - new DataSchema( - DATASOURCE, - TIMESTAMP_SPEC, - DIMENSIONS_SPEC.withDimensions( - DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "unknownDim")) - ), - DEFAULT_METRICS_SPEC, - new UniformGranularitySpec( - Granularities.DAY, - Granularities.MINUTE, - INTERVAL_TO_INDEX - ), - null - ), - new ParallelIndexIOConfig( - null, - getInputSource(), - JSON_FORMAT, - false, - null - ), - newTuningConfig( - new HashedPartitionsSpec( - 10, - null, - ImmutableList.of("ts", "unknownDim") - ), - 2, - true - ) - ), - null - ); - - task.addToContext(Tasks.STORE_EMPTY_COLUMNS_KEY, false); - Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode()); - - Set segments = getIndexingServiceClient().getPublishedSegments(task); - Assert.assertFalse(segments.isEmpty()); - for (DataSegment segment : segments) { - Assert.assertFalse(segment.getDimensions().contains("unknownDim")); - } - } - - private InputSource getInputSource() throws JsonProcessingException - { - final ObjectMapper mapper = getObjectMapper(); - final List> rows = ImmutableList.of( - ImmutableMap.of( - "ts", "2022-01-01", - "dim1", "val1", - "dim2", "val11" - ), - ImmutableMap.of( - "ts", "2022-01-02", - "dim1", "val2", - "dim2", "val12" - ), - ImmutableMap.of( - "ts", "2022-01-03", - "dim1", "val3", - "dim2", "val13" - ) - ); - final String data = StringUtils.format( - "%s\n%s\n%s\n", - mapper.writeValueAsString(rows.get(0)), - mapper.writeValueAsString(rows.get(1)), - mapper.writeValueAsString(rows.get(2)) - ); - return new InlineInputSource(data); - } -} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingWithNullColumnTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingWithNullColumnTest.java new file mode 100644 index 00000000000..2253d9ac334 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingWithNullColumnTest.java @@ -0,0 +1,435 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task.batch.parallel; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSource; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SplitHintSpec; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.InputEntityIteratingReader; +import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; +import org.apache.druid.indexer.partitions.HashedPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.indexing.common.task.Tasks; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; +import org.apache.druid.java.util.common.parsers.JSONPathFieldType; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.annotation.Nullable; +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +@RunWith(Parameterized.class) +public class MultiPhaseParallelIndexingWithNullColumnTest extends AbstractMultiPhaseParallelIndexingTest +{ + private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec("ts", "auto", null); + private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2")) + ); + private static final InputFormat JSON_FORMAT = new JsonInputFormat(null, null, null); + private static final List INTERVAL_TO_INDEX = Collections.singletonList(Intervals.of("2022-01/P1M")); + + @Parameterized.Parameters + public static Iterable constructorFeeder() + { + return ImmutableList.of( + new Object[]{ + new HashedPartitionsSpec( + 10, + null, + ImmutableList.of("ts", "unknownDim") + ) + }, + new Object[]{ + new DimensionRangePartitionsSpec( + 10, + null, + Collections.singletonList("unknownDim"), + false + ) + } + ); + } + + private final PartitionsSpec partitionsSpec; + + public MultiPhaseParallelIndexingWithNullColumnTest(PartitionsSpec partitionsSpec) + { + super(LockGranularity.TIME_CHUNK, true, 0., 0.); + this.partitionsSpec = partitionsSpec; + getObjectMapper().registerSubtypes(SplittableInlineDataSource.class); + } + + @Test + public void testIngestNullColumn() throws JsonProcessingException + { + final List dimensionSchemas = DimensionsSpec.getDefaultSchemas( + Arrays.asList("ts", "unknownDim") + ); + ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask( + null, + null, + null, + new ParallelIndexIngestionSpec( + new DataSchema( + DATASOURCE, + TIMESTAMP_SPEC, + DIMENSIONS_SPEC.withDimensions(dimensionSchemas), + DEFAULT_METRICS_SPEC, + new UniformGranularitySpec( + Granularities.DAY, + Granularities.MINUTE, + INTERVAL_TO_INDEX + ), + null + ), + new ParallelIndexIOConfig( + null, + getInputSource(), + JSON_FORMAT, + false, + null + ), + newTuningConfig( + partitionsSpec, + 2, + true + ) + ), + null + ); + + Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode()); + + Set segments = getIndexingServiceClient().getPublishedSegments(task); + Assert.assertFalse(segments.isEmpty()); + for (DataSegment segment : segments) { + Assert.assertEquals(dimensionSchemas.size(), segment.getDimensions().size()); + for (int i = 0; i < dimensionSchemas.size(); i++) { + Assert.assertEquals(dimensionSchemas.get(i).getName(), segment.getDimensions().get(i)); + } + } + } + + @Test + public void testIngestNullColumn_useFieldDiscovery_includeAllDimensions_shouldStoreAllColumns() throws JsonProcessingException + { + final List dimensionSchemas = DimensionsSpec.getDefaultSchemas( + Arrays.asList("ts", "unknownDim", "dim1") + ); + ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask( + null, + null, + null, + new ParallelIndexIngestionSpec( + new DataSchema( + DATASOURCE, + TIMESTAMP_SPEC, + new DimensionsSpec.Builder().setDimensions(dimensionSchemas).setIncludeAllDimensions(true).build(), + DEFAULT_METRICS_SPEC, + new UniformGranularitySpec( + Granularities.DAY, + Granularities.MINUTE, + INTERVAL_TO_INDEX + ), + null + ), + new ParallelIndexIOConfig( + null, + getInputSource(), + new JsonInputFormat( + new JSONPathSpec(true, null), + null, + null + ), + false, + null + ), + newTuningConfig( + partitionsSpec, + 2, + true + ) + ), + null + ); + + Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode()); + + Set segments = getIndexingServiceClient().getPublishedSegments(task); + Assert.assertFalse(segments.isEmpty()); + final List expectedExplicitDimensions = ImmutableList.of("ts", "unknownDim", "dim1"); + final Set expectedImplicitDimensions = ImmutableSet.of("dim2", "dim3"); + for (DataSegment segment : segments) { + Assert.assertEquals( + expectedExplicitDimensions, + segment.getDimensions().subList(0, expectedExplicitDimensions.size()) + ); + Assert.assertEquals( + expectedImplicitDimensions, + new HashSet<>(segment.getDimensions().subList(expectedExplicitDimensions.size(), segment.getDimensions().size())) + ); + } + } + + @Test + public void testIngestNullColumn_explicitPathSpec_useFieldDiscovery_includeAllDimensions_shouldStoreAllColumns() + throws JsonProcessingException + { + ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask( + null, + null, + null, + new ParallelIndexIngestionSpec( + new DataSchema( + DATASOURCE, + TIMESTAMP_SPEC, + new DimensionsSpec.Builder().setIncludeAllDimensions(true).build(), + DEFAULT_METRICS_SPEC, + new UniformGranularitySpec( + Granularities.DAY, + Granularities.MINUTE, + null + ), + null + ), + new ParallelIndexIOConfig( + null, + getInputSource(), + new JsonInputFormat( + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.PATH, "dim1", "$.dim1"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "k", "$.dim4.k") + ) + ), + null, + null + ), + false, + null + ), + newTuningConfig( + partitionsSpec, + 2, + true + ) + ), + null + ); + + Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode()); + + Set segments = getIndexingServiceClient().getPublishedSegments(task); + Assert.assertFalse(segments.isEmpty()); + final List expectedExplicitDimensions = ImmutableList.of("dim1", "k"); + final Set expectedImplicitDimensions = ImmutableSet.of("dim2", "dim3"); + for (DataSegment segment : segments) { + Assert.assertEquals( + expectedExplicitDimensions, + segment.getDimensions().subList(0, expectedExplicitDimensions.size()) + ); + Assert.assertEquals( + expectedImplicitDimensions, + new HashSet<>(segment.getDimensions().subList(expectedExplicitDimensions.size(), segment.getDimensions().size())) + ); + } + + } + + @Test + public void testIngestNullColumn_storeEmptyColumnsOff_shouldNotStoreEmptyColumns() throws JsonProcessingException + { + ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask( + null, + null, + null, + new ParallelIndexIngestionSpec( + new DataSchema( + DATASOURCE, + TIMESTAMP_SPEC, + DIMENSIONS_SPEC.withDimensions( + DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "unknownDim")) + ), + DEFAULT_METRICS_SPEC, + new UniformGranularitySpec( + Granularities.DAY, + Granularities.MINUTE, + INTERVAL_TO_INDEX + ), + null + ), + new ParallelIndexIOConfig( + null, + getInputSource(), + JSON_FORMAT, + false, + null + ), + newTuningConfig( + partitionsSpec, + 2, + true + ) + ), + null + ); + + task.addToContext(Tasks.STORE_EMPTY_COLUMNS_KEY, false); + Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode()); + + Set segments = getIndexingServiceClient().getPublishedSegments(task); + Assert.assertFalse(segments.isEmpty()); + final List expectedDimensions = DimensionsSpec.getDefaultSchemas( + Collections.singletonList("ts") + ); + for (DataSegment segment : segments) { + Assert.assertEquals(expectedDimensions.size(), segment.getDimensions().size()); + for (int i = 0; i < expectedDimensions.size(); i++) { + Assert.assertEquals(expectedDimensions.get(i).getName(), segment.getDimensions().get(i)); + } + } + } + + private InputSource getInputSource() throws JsonProcessingException + { + final ObjectMapper mapper = getObjectMapper(); + final List> rows = new ArrayList<>(); + Map row; + for (int i = 0; i < 3; i++) { + rows.add(row(StringUtils.format("2022-01-%02d", i + 1), "val1", "val2", null)); + } + rows.add(row("2022-01-04", null, null, null, ImmutableMap.of("k", "v"))); + final String data = StringUtils.format( + "%s\n%s\n%s\n%s\n", + mapper.writeValueAsString(rows.get(0)), + mapper.writeValueAsString(rows.get(1)), + mapper.writeValueAsString(rows.get(2)), + mapper.writeValueAsString(rows.get(3)) + ); + + return new SplittableInlineDataSource(ImmutableList.of(data)); + } + + private static Map row(String timestamp, Object... dims) + { + Map row = new HashMap<>(); + row.put("ts", timestamp); + IntStream.range(0, dims.length).forEach(i -> row.put("dim" + (i + 1), dims[i])); + return row; + } + + /** + * Splittable inlineDataSource to run tests with range partitioning which requires the inputSource to be splittable. + */ + private static final class SplittableInlineDataSource implements SplittableInputSource + { + private final List data; + + @JsonCreator + public SplittableInlineDataSource(@JsonProperty("data") List data) + { + this.data = data; + } + + @JsonProperty + public List getData() + { + return data; + } + + @Override + public Stream> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + { + return data.stream().map(InputSplit::new); + } + + @Override + public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + { + return data.size(); + } + + @Override + public InputSource withSplit(InputSplit split) + { + return new SplittableInlineDataSource(ImmutableList.of(split.get())); + } + + @Override + public boolean needsFormat() + { + return true; + } + + @Override + public InputSourceReader reader( + InputRowSchema inputRowSchema, + @Nullable InputFormat inputFormat, + File temporaryDirectory + ) + { + return new InputEntityIteratingReader( + inputRowSchema, + inputFormat, + data.stream().map(str -> new ByteEntity(StringUtils.toUtf8(str))).iterator(), + temporaryDirectory + ); + } + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java index 4314a342cbd..759d70fe2f9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java @@ -29,9 +29,7 @@ import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.StringTuple; import org.apache.druid.data.input.impl.CSVParseSpec; import org.apache.druid.data.input.impl.CsvInputFormat; -import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; -import org.apache.druid.data.input.impl.LocalInputSource; import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexer.TaskState; @@ -40,13 +38,9 @@ import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; import org.apache.druid.indexing.common.LockGranularity; -import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.query.scan.ScanResultValue; -import org.apache.druid.segment.indexing.DataSchema; -import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.DimensionRangeShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; @@ -388,122 +382,6 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP } } - @Test - public void testIngestNullColumn() - { - // storeEmptyColumns flag should do nothing with using inputFormat or multiValueDim - if (!isUseInputFormatApi() || useMultivalueDim) { - return; - } - int targetRowsPerSegment = NUM_ROW * 2 / DIM_FILE_CARDINALITY / NUM_PARTITION; - final List dimensionSchemas = DimensionsSpec.getDefaultSchemas( - Arrays.asList("ts", "unknownDim") - ); - ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask( - null, - null, - null, - new ParallelIndexIngestionSpec( - new DataSchema( - DATASOURCE, - TIMESTAMP_SPEC, - DIMENSIONS_SPEC.withDimensions(dimensionSchemas), - DEFAULT_METRICS_SPEC, - new UniformGranularitySpec( - Granularities.DAY, - Granularities.MINUTE, - intervalToIndex == null ? null : Collections.singletonList(intervalToIndex) - ), - null - ), - new ParallelIndexIOConfig( - null, - new LocalInputSource(inputDir, TEST_FILE_NAME_PREFIX + "*"), - DEFAULT_INPUT_FORMAT, - false, - null - ), - newTuningConfig( - new DimensionRangePartitionsSpec( - targetRowsPerSegment, - null, - Collections.singletonList("unknownDim"), - false - ), - maxNumConcurrentSubTasks, - true - ) - ), - null - ); - - Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode()); - - Set segments = getIndexingServiceClient().getPublishedSegments(task); - for (DataSegment segment : segments) { - for (int i = 0; i < dimensionSchemas.size(); i++) { - Assert.assertEquals(dimensionSchemas.get(i).getName(), segment.getDimensions().get(i)); - } - } - } - - @Test - public void testIngestNullColumn_storeEmptyColumnsOff_shouldNotStoreEmptyColumns() - { - // storeEmptyColumns flag should do nothing with using inputFormat or multiValueDim - if (!isUseInputFormatApi() || useMultivalueDim) { - return; - } - int targetRowsPerSegment = NUM_ROW * 2 / DIM_FILE_CARDINALITY / NUM_PARTITION; - ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask( - null, - null, - null, - new ParallelIndexIngestionSpec( - new DataSchema( - DATASOURCE, - TIMESTAMP_SPEC, - DIMENSIONS_SPEC.withDimensions( - DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "unknownDim")) - ), - DEFAULT_METRICS_SPEC, - new UniformGranularitySpec( - Granularities.DAY, - Granularities.MINUTE, - intervalToIndex == null ? null : Collections.singletonList(intervalToIndex) - ), - null - ), - new ParallelIndexIOConfig( - null, - new LocalInputSource(inputDir, TEST_FILE_NAME_PREFIX + "*"), - DEFAULT_INPUT_FORMAT, - false, - null - ), - newTuningConfig( - new DimensionRangePartitionsSpec( - targetRowsPerSegment, - null, - Collections.singletonList("unknownDim"), - false - ), - maxNumConcurrentSubTasks, - true - ) - ), - null - ); - - task.addToContext(Tasks.STORE_EMPTY_COLUMNS_KEY, false); - Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode()); - - Set segments = getIndexingServiceClient().getPublishedSegments(task); - for (DataSegment segment : segments) { - Assert.assertFalse(segment.getDimensions().contains("unknownDim")); - } - } - private ParallelIndexSupervisorTask runTestTask( PartitionsSpec partitionsSpec, File inputDirectory, diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java index 03018c752d4..daa4696371f 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java @@ -205,8 +205,7 @@ public class IndexMergerV9 implements IndexMerger final File outDir, final ProgressIndicator progress, final List mergedDimensions, // should have both explicit and implicit dimensions - // a subset of mergedDimensions that are explicitly specified in DimensionsSpec - final Set explicitDimensions, + final DimensionsSpecInspector dimensionsSpecInspector, final List mergedMetrics, final Function, TimeAndDimsIterator> rowMergerFn, final boolean fillRowNumConversions, @@ -333,7 +332,7 @@ public class IndexMergerV9 implements IndexMerger if (!merger.hasOnlyNulls()) { ColumnDescriptor columnDesc = merger.makeColumnDescriptor(); makeColumn(v9Smoosher, mergedDimensions.get(i), columnDesc); - } else if (shouldStore(explicitDimensions, mergedDimensions.get(i))) { + } else if (dimensionsSpecInspector.shouldStore(mergedDimensions.get(i))) { // shouldStore AND hasOnlyNulls ColumnDescriptor columnDesc = ColumnDescriptor .builder() @@ -357,7 +356,7 @@ public class IndexMergerV9 implements IndexMerger progress, indexSpec, mergers, - explicitDimensions + dimensionsSpecInspector ); makeMetadataBinary(v9Smoosher, progress, segmentMetadata); @@ -374,14 +373,6 @@ public class IndexMergerV9 implements IndexMerger } } - private boolean shouldStore( - Set explicitDimensions, - String dimension - ) - { - return storeEmptyColumns && explicitDimensions.contains(dimension); - } - private void makeMetadataBinary( final FileSmoosher v9Smoosher, final ProgressIndicator progress, @@ -404,7 +395,7 @@ public class IndexMergerV9 implements IndexMerger final ProgressIndicator progress, final IndexSpec indexSpec, final List mergers, - final Set explicitDimensions + final DimensionsSpecInspector dimensionsSpecInspector ) throws IOException { final Set columnSet = new HashSet<>(mergedDimensions); @@ -439,7 +430,7 @@ public class IndexMergerV9 implements IndexMerger nonNullOnlyColumns.add(mergedDimensions.get(i)); allDimensions.add(null); allColumns.add(null); - } else if (shouldStore(explicitDimensions, mergedDimensions.get(i))) { + } else if (dimensionsSpecInspector.shouldStore(mergedDimensions.get(i))) { // shouldStore AND hasOnlyNulls allDimensions.add(mergedDimensions.get(i)); allColumns.add(mergedDimensions.get(i)); @@ -1315,7 +1306,7 @@ public class IndexMergerV9 implements IndexMerger outDir, progress, mergedDimensions, - dimensionsSpec == null ? ImmutableSet.of() : new HashSet<>(dimensionsSpec.getDimensionNames()), + new DimensionsSpecInspector(storeEmptyColumns, dimensionsSpec), mergedMetrics, rowMergerFn, true, @@ -1460,4 +1451,34 @@ public class IndexMergerV9 implements IndexMerger ); } } + + private static class DimensionsSpecInspector + { + private final boolean storeEmptyColumns; + private final Set explicitDimensions; + private final boolean includeAllDimensions; + + private DimensionsSpecInspector( + boolean storeEmptyColumns, + @Nullable DimensionsSpec dimensionsSpec + ) + { + this.storeEmptyColumns = storeEmptyColumns; + this.explicitDimensions = dimensionsSpec == null + ? ImmutableSet.of() + : new HashSet<>(dimensionsSpec.getDimensionNames()); + this.includeAllDimensions = dimensionsSpec != null && dimensionsSpec.isIncludeAllDimensions(); + } + + /** + * Returns true if the given dimension should be stored in the segment even when the column has only nulls. + * If it has non-nulls, then the column must be stored. + * + * @see DimensionMerger#hasOnlyNulls() + */ + private boolean shouldStore(String dimension) + { + return storeEmptyColumns && (includeAllDimensions || explicitDimensions.contains(dimension)); + } + } }