Fix indexMerger to respect the includeAllDimensions flag (#12428)

* Fix indexMerger to respect flag includeAllDimensions flag; jsonInputFormat should set keepNullColumns if useFieldDiscovery is set

* address comments
This commit is contained in:
Jihoon Son 2022-04-13 12:43:11 -07:00 committed by GitHub
parent f24e9c6862
commit 5e5625f3ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 517 additions and 341 deletions

View File

@ -74,7 +74,11 @@ public class JsonInputFormat extends NestedInputFormat
super(flattenSpec);
this.featureSpec = featureSpec == null ? Collections.emptyMap() : featureSpec;
this.objectMapper = new ObjectMapper();
this.keepNullColumns = keepNullColumns == null ? false : keepNullColumns;
if (keepNullColumns != null) {
this.keepNullColumns = keepNullColumns;
} else {
this.keepNullColumns = flattenSpec != null && flattenSpec.isUseFieldDiscovery();
}
for (Entry<String, Boolean> entry : this.featureSpec.entrySet()) {
Feature feature = Feature.valueOf(entry.getKey());
objectMapper.configure(feature, entry.getValue());
@ -88,6 +92,12 @@ public class JsonInputFormat extends NestedInputFormat
return featureSpec;
}
@JsonProperty
public boolean isKeepNullColumns()
{
return keepNullColumns;
}
@Override
public boolean isSplittable()
{

View File

@ -41,7 +41,7 @@ public class JsonInputFormatTest
final ObjectMapper mapper = new ObjectMapper();
final JsonInputFormat format = new JsonInputFormat(
new JSONPathSpec(
false,
true,
ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"),
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"),
@ -52,7 +52,7 @@ public class JsonInputFormatTest
)
),
ImmutableMap.of(Feature.ALLOW_COMMENTS.name(), true, Feature.ALLOW_UNQUOTED_FIELD_NAMES.name(), false),
false
true
);
final byte[] bytes = mapper.writeValueAsBytes(format);
final JsonInputFormat fromJson = (JsonInputFormat) mapper.readValue(bytes, InputFormat.class);
@ -72,4 +72,37 @@ public class JsonInputFormatTest
.withIgnoredFields("objectMapper")
.verify();
}
@Test
public void test_unsetUseFieldDiscovery_unsetKeepNullColumnsByDefault()
{
final JsonInputFormat format = new JsonInputFormat(
new JSONPathSpec(false, null),
null,
null
);
Assert.assertFalse(format.isKeepNullColumns());
}
@Test
public void testUseFieldDiscovery_setKeepNullColumnsByDefault()
{
final JsonInputFormat format = new JsonInputFormat(
new JSONPathSpec(true, null),
null,
null
);
Assert.assertTrue(format.isKeepNullColumns());
}
@Test
public void testUseFieldDiscovery_doNotChangeKeepNullColumnsUserSets()
{
final JsonInputFormat format = new JsonInputFormat(
new JSONPathSpec(true, null),
null,
false
);
Assert.assertFalse(format.isKeepNullColumns());
}
}

View File

@ -1,201 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InlineInputSource;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class HashPartitionMultiPhaseParallelIndexingWithNullColumnTest extends AbstractMultiPhaseParallelIndexingTest
{
private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec("ts", "auto", null);
private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2"))
);
private static final InputFormat JSON_FORMAT = new JsonInputFormat(null, null, null);
private static final List<Interval> INTERVAL_TO_INDEX = Collections.singletonList(Intervals.of("2022-01/P1M"));
public HashPartitionMultiPhaseParallelIndexingWithNullColumnTest()
{
super(LockGranularity.TIME_CHUNK, true, 0., 0.);
}
@Test
public void testIngestNullColumn() throws JsonProcessingException
{
final List<DimensionSchema> dimensionSchemas = DimensionsSpec.getDefaultSchemas(
Arrays.asList("ts", "unknownDim")
);
ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
null,
null,
null,
new ParallelIndexIngestionSpec(
new DataSchema(
DATASOURCE,
TIMESTAMP_SPEC,
DIMENSIONS_SPEC.withDimensions(dimensionSchemas),
DEFAULT_METRICS_SPEC,
new UniformGranularitySpec(
Granularities.DAY,
Granularities.MINUTE,
INTERVAL_TO_INDEX
),
null
),
new ParallelIndexIOConfig(
null,
getInputSource(),
JSON_FORMAT,
false,
null
),
newTuningConfig(
new HashedPartitionsSpec(
10,
null,
ImmutableList.of("ts", "unknownDim")
),
2,
true
)
),
null
);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
Set<DataSegment> segments = getIndexingServiceClient().getPublishedSegments(task);
Assert.assertFalse(segments.isEmpty());
for (DataSegment segment : segments) {
for (int i = 0; i < dimensionSchemas.size(); i++) {
Assert.assertEquals(dimensionSchemas.get(i).getName(), segment.getDimensions().get(i));
}
}
}
@Test
public void testIngestNullColumn_storeEmptyColumnsOff_shouldNotStoreEmptyColumns() throws JsonProcessingException
{
ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
null,
null,
null,
new ParallelIndexIngestionSpec(
new DataSchema(
DATASOURCE,
TIMESTAMP_SPEC,
DIMENSIONS_SPEC.withDimensions(
DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "unknownDim"))
),
DEFAULT_METRICS_SPEC,
new UniformGranularitySpec(
Granularities.DAY,
Granularities.MINUTE,
INTERVAL_TO_INDEX
),
null
),
new ParallelIndexIOConfig(
null,
getInputSource(),
JSON_FORMAT,
false,
null
),
newTuningConfig(
new HashedPartitionsSpec(
10,
null,
ImmutableList.of("ts", "unknownDim")
),
2,
true
)
),
null
);
task.addToContext(Tasks.STORE_EMPTY_COLUMNS_KEY, false);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
Set<DataSegment> segments = getIndexingServiceClient().getPublishedSegments(task);
Assert.assertFalse(segments.isEmpty());
for (DataSegment segment : segments) {
Assert.assertFalse(segment.getDimensions().contains("unknownDim"));
}
}
private InputSource getInputSource() throws JsonProcessingException
{
final ObjectMapper mapper = getObjectMapper();
final List<Map<String, Object>> rows = ImmutableList.of(
ImmutableMap.of(
"ts", "2022-01-01",
"dim1", "val1",
"dim2", "val11"
),
ImmutableMap.of(
"ts", "2022-01-02",
"dim1", "val2",
"dim2", "val12"
),
ImmutableMap.of(
"ts", "2022-01-03",
"dim1", "val3",
"dim2", "val13"
)
);
final String data = StringUtils.format(
"%s\n%s\n%s\n",
mapper.writeValueAsString(rows.get(0)),
mapper.writeValueAsString(rows.get(1)),
mapper.writeValueAsString(rows.get(2))
);
return new InlineInputSource(data);
}
}

View File

@ -0,0 +1,435 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputEntityIteratingReader;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@RunWith(Parameterized.class)
public class MultiPhaseParallelIndexingWithNullColumnTest extends AbstractMultiPhaseParallelIndexingTest
{
private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec("ts", "auto", null);
private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2"))
);
private static final InputFormat JSON_FORMAT = new JsonInputFormat(null, null, null);
private static final List<Interval> INTERVAL_TO_INDEX = Collections.singletonList(Intervals.of("2022-01/P1M"));
@Parameterized.Parameters
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(
new Object[]{
new HashedPartitionsSpec(
10,
null,
ImmutableList.of("ts", "unknownDim")
)
},
new Object[]{
new DimensionRangePartitionsSpec(
10,
null,
Collections.singletonList("unknownDim"),
false
)
}
);
}
private final PartitionsSpec partitionsSpec;
public MultiPhaseParallelIndexingWithNullColumnTest(PartitionsSpec partitionsSpec)
{
super(LockGranularity.TIME_CHUNK, true, 0., 0.);
this.partitionsSpec = partitionsSpec;
getObjectMapper().registerSubtypes(SplittableInlineDataSource.class);
}
@Test
public void testIngestNullColumn() throws JsonProcessingException
{
final List<DimensionSchema> dimensionSchemas = DimensionsSpec.getDefaultSchemas(
Arrays.asList("ts", "unknownDim")
);
ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
null,
null,
null,
new ParallelIndexIngestionSpec(
new DataSchema(
DATASOURCE,
TIMESTAMP_SPEC,
DIMENSIONS_SPEC.withDimensions(dimensionSchemas),
DEFAULT_METRICS_SPEC,
new UniformGranularitySpec(
Granularities.DAY,
Granularities.MINUTE,
INTERVAL_TO_INDEX
),
null
),
new ParallelIndexIOConfig(
null,
getInputSource(),
JSON_FORMAT,
false,
null
),
newTuningConfig(
partitionsSpec,
2,
true
)
),
null
);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
Set<DataSegment> segments = getIndexingServiceClient().getPublishedSegments(task);
Assert.assertFalse(segments.isEmpty());
for (DataSegment segment : segments) {
Assert.assertEquals(dimensionSchemas.size(), segment.getDimensions().size());
for (int i = 0; i < dimensionSchemas.size(); i++) {
Assert.assertEquals(dimensionSchemas.get(i).getName(), segment.getDimensions().get(i));
}
}
}
@Test
public void testIngestNullColumn_useFieldDiscovery_includeAllDimensions_shouldStoreAllColumns() throws JsonProcessingException
{
final List<DimensionSchema> dimensionSchemas = DimensionsSpec.getDefaultSchemas(
Arrays.asList("ts", "unknownDim", "dim1")
);
ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
null,
null,
null,
new ParallelIndexIngestionSpec(
new DataSchema(
DATASOURCE,
TIMESTAMP_SPEC,
new DimensionsSpec.Builder().setDimensions(dimensionSchemas).setIncludeAllDimensions(true).build(),
DEFAULT_METRICS_SPEC,
new UniformGranularitySpec(
Granularities.DAY,
Granularities.MINUTE,
INTERVAL_TO_INDEX
),
null
),
new ParallelIndexIOConfig(
null,
getInputSource(),
new JsonInputFormat(
new JSONPathSpec(true, null),
null,
null
),
false,
null
),
newTuningConfig(
partitionsSpec,
2,
true
)
),
null
);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
Set<DataSegment> segments = getIndexingServiceClient().getPublishedSegments(task);
Assert.assertFalse(segments.isEmpty());
final List<String> expectedExplicitDimensions = ImmutableList.of("ts", "unknownDim", "dim1");
final Set<String> expectedImplicitDimensions = ImmutableSet.of("dim2", "dim3");
for (DataSegment segment : segments) {
Assert.assertEquals(
expectedExplicitDimensions,
segment.getDimensions().subList(0, expectedExplicitDimensions.size())
);
Assert.assertEquals(
expectedImplicitDimensions,
new HashSet<>(segment.getDimensions().subList(expectedExplicitDimensions.size(), segment.getDimensions().size()))
);
}
}
@Test
public void testIngestNullColumn_explicitPathSpec_useFieldDiscovery_includeAllDimensions_shouldStoreAllColumns()
throws JsonProcessingException
{
ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
null,
null,
null,
new ParallelIndexIngestionSpec(
new DataSchema(
DATASOURCE,
TIMESTAMP_SPEC,
new DimensionsSpec.Builder().setIncludeAllDimensions(true).build(),
DEFAULT_METRICS_SPEC,
new UniformGranularitySpec(
Granularities.DAY,
Granularities.MINUTE,
null
),
null
),
new ParallelIndexIOConfig(
null,
getInputSource(),
new JsonInputFormat(
new JSONPathSpec(
true,
ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.PATH, "dim1", "$.dim1"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "k", "$.dim4.k")
)
),
null,
null
),
false,
null
),
newTuningConfig(
partitionsSpec,
2,
true
)
),
null
);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
Set<DataSegment> segments = getIndexingServiceClient().getPublishedSegments(task);
Assert.assertFalse(segments.isEmpty());
final List<String> expectedExplicitDimensions = ImmutableList.of("dim1", "k");
final Set<String> expectedImplicitDimensions = ImmutableSet.of("dim2", "dim3");
for (DataSegment segment : segments) {
Assert.assertEquals(
expectedExplicitDimensions,
segment.getDimensions().subList(0, expectedExplicitDimensions.size())
);
Assert.assertEquals(
expectedImplicitDimensions,
new HashSet<>(segment.getDimensions().subList(expectedExplicitDimensions.size(), segment.getDimensions().size()))
);
}
}
@Test
public void testIngestNullColumn_storeEmptyColumnsOff_shouldNotStoreEmptyColumns() throws JsonProcessingException
{
ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
null,
null,
null,
new ParallelIndexIngestionSpec(
new DataSchema(
DATASOURCE,
TIMESTAMP_SPEC,
DIMENSIONS_SPEC.withDimensions(
DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "unknownDim"))
),
DEFAULT_METRICS_SPEC,
new UniformGranularitySpec(
Granularities.DAY,
Granularities.MINUTE,
INTERVAL_TO_INDEX
),
null
),
new ParallelIndexIOConfig(
null,
getInputSource(),
JSON_FORMAT,
false,
null
),
newTuningConfig(
partitionsSpec,
2,
true
)
),
null
);
task.addToContext(Tasks.STORE_EMPTY_COLUMNS_KEY, false);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
Set<DataSegment> segments = getIndexingServiceClient().getPublishedSegments(task);
Assert.assertFalse(segments.isEmpty());
final List<DimensionSchema> expectedDimensions = DimensionsSpec.getDefaultSchemas(
Collections.singletonList("ts")
);
for (DataSegment segment : segments) {
Assert.assertEquals(expectedDimensions.size(), segment.getDimensions().size());
for (int i = 0; i < expectedDimensions.size(); i++) {
Assert.assertEquals(expectedDimensions.get(i).getName(), segment.getDimensions().get(i));
}
}
}
private InputSource getInputSource() throws JsonProcessingException
{
final ObjectMapper mapper = getObjectMapper();
final List<Map<String, Object>> rows = new ArrayList<>();
Map<String, Object> row;
for (int i = 0; i < 3; i++) {
rows.add(row(StringUtils.format("2022-01-%02d", i + 1), "val1", "val2", null));
}
rows.add(row("2022-01-04", null, null, null, ImmutableMap.of("k", "v")));
final String data = StringUtils.format(
"%s\n%s\n%s\n%s\n",
mapper.writeValueAsString(rows.get(0)),
mapper.writeValueAsString(rows.get(1)),
mapper.writeValueAsString(rows.get(2)),
mapper.writeValueAsString(rows.get(3))
);
return new SplittableInlineDataSource(ImmutableList.of(data));
}
private static Map<String, Object> row(String timestamp, Object... dims)
{
Map<String, Object> row = new HashMap<>();
row.put("ts", timestamp);
IntStream.range(0, dims.length).forEach(i -> row.put("dim" + (i + 1), dims[i]));
return row;
}
/**
* Splittable inlineDataSource to run tests with range partitioning which requires the inputSource to be splittable.
*/
private static final class SplittableInlineDataSource implements SplittableInputSource<String>
{
private final List<String> data;
@JsonCreator
public SplittableInlineDataSource(@JsonProperty("data") List<String> data)
{
this.data = data;
}
@JsonProperty
public List<String> getData()
{
return data;
}
@Override
public Stream<InputSplit<String>> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
{
return data.stream().map(InputSplit::new);
}
@Override
public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
{
return data.size();
}
@Override
public InputSource withSplit(InputSplit<String> split)
{
return new SplittableInlineDataSource(ImmutableList.of(split.get()));
}
@Override
public boolean needsFormat()
{
return true;
}
@Override
public InputSourceReader reader(
InputRowSchema inputRowSchema,
@Nullable InputFormat inputFormat,
File temporaryDirectory
)
{
return new InputEntityIteratingReader(
inputRowSchema,
inputFormat,
data.stream().map(str -> new ByteEntity(StringUtils.toUtf8(str))).iterator(),
temporaryDirectory
);
}
}
}

View File

@ -29,9 +29,7 @@ import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState;
@ -40,13 +38,9 @@ import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
@ -388,122 +382,6 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
}
}
@Test
public void testIngestNullColumn()
{
// storeEmptyColumns flag should do nothing with using inputFormat or multiValueDim
if (!isUseInputFormatApi() || useMultivalueDim) {
return;
}
int targetRowsPerSegment = NUM_ROW * 2 / DIM_FILE_CARDINALITY / NUM_PARTITION;
final List<DimensionSchema> dimensionSchemas = DimensionsSpec.getDefaultSchemas(
Arrays.asList("ts", "unknownDim")
);
ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
null,
null,
null,
new ParallelIndexIngestionSpec(
new DataSchema(
DATASOURCE,
TIMESTAMP_SPEC,
DIMENSIONS_SPEC.withDimensions(dimensionSchemas),
DEFAULT_METRICS_SPEC,
new UniformGranularitySpec(
Granularities.DAY,
Granularities.MINUTE,
intervalToIndex == null ? null : Collections.singletonList(intervalToIndex)
),
null
),
new ParallelIndexIOConfig(
null,
new LocalInputSource(inputDir, TEST_FILE_NAME_PREFIX + "*"),
DEFAULT_INPUT_FORMAT,
false,
null
),
newTuningConfig(
new DimensionRangePartitionsSpec(
targetRowsPerSegment,
null,
Collections.singletonList("unknownDim"),
false
),
maxNumConcurrentSubTasks,
true
)
),
null
);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
Set<DataSegment> segments = getIndexingServiceClient().getPublishedSegments(task);
for (DataSegment segment : segments) {
for (int i = 0; i < dimensionSchemas.size(); i++) {
Assert.assertEquals(dimensionSchemas.get(i).getName(), segment.getDimensions().get(i));
}
}
}
@Test
public void testIngestNullColumn_storeEmptyColumnsOff_shouldNotStoreEmptyColumns()
{
// storeEmptyColumns flag should do nothing with using inputFormat or multiValueDim
if (!isUseInputFormatApi() || useMultivalueDim) {
return;
}
int targetRowsPerSegment = NUM_ROW * 2 / DIM_FILE_CARDINALITY / NUM_PARTITION;
ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
null,
null,
null,
new ParallelIndexIngestionSpec(
new DataSchema(
DATASOURCE,
TIMESTAMP_SPEC,
DIMENSIONS_SPEC.withDimensions(
DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "unknownDim"))
),
DEFAULT_METRICS_SPEC,
new UniformGranularitySpec(
Granularities.DAY,
Granularities.MINUTE,
intervalToIndex == null ? null : Collections.singletonList(intervalToIndex)
),
null
),
new ParallelIndexIOConfig(
null,
new LocalInputSource(inputDir, TEST_FILE_NAME_PREFIX + "*"),
DEFAULT_INPUT_FORMAT,
false,
null
),
newTuningConfig(
new DimensionRangePartitionsSpec(
targetRowsPerSegment,
null,
Collections.singletonList("unknownDim"),
false
),
maxNumConcurrentSubTasks,
true
)
),
null
);
task.addToContext(Tasks.STORE_EMPTY_COLUMNS_KEY, false);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
Set<DataSegment> segments = getIndexingServiceClient().getPublishedSegments(task);
for (DataSegment segment : segments) {
Assert.assertFalse(segment.getDimensions().contains("unknownDim"));
}
}
private ParallelIndexSupervisorTask runTestTask(
PartitionsSpec partitionsSpec,
File inputDirectory,

View File

@ -205,8 +205,7 @@ public class IndexMergerV9 implements IndexMerger
final File outDir,
final ProgressIndicator progress,
final List<String> mergedDimensions, // should have both explicit and implicit dimensions
// a subset of mergedDimensions that are explicitly specified in DimensionsSpec
final Set<String> explicitDimensions,
final DimensionsSpecInspector dimensionsSpecInspector,
final List<String> mergedMetrics,
final Function<List<TransformableRowIterator>, TimeAndDimsIterator> rowMergerFn,
final boolean fillRowNumConversions,
@ -333,7 +332,7 @@ public class IndexMergerV9 implements IndexMerger
if (!merger.hasOnlyNulls()) {
ColumnDescriptor columnDesc = merger.makeColumnDescriptor();
makeColumn(v9Smoosher, mergedDimensions.get(i), columnDesc);
} else if (shouldStore(explicitDimensions, mergedDimensions.get(i))) {
} else if (dimensionsSpecInspector.shouldStore(mergedDimensions.get(i))) {
// shouldStore AND hasOnlyNulls
ColumnDescriptor columnDesc = ColumnDescriptor
.builder()
@ -357,7 +356,7 @@ public class IndexMergerV9 implements IndexMerger
progress,
indexSpec,
mergers,
explicitDimensions
dimensionsSpecInspector
);
makeMetadataBinary(v9Smoosher, progress, segmentMetadata);
@ -374,14 +373,6 @@ public class IndexMergerV9 implements IndexMerger
}
}
private boolean shouldStore(
Set<String> explicitDimensions,
String dimension
)
{
return storeEmptyColumns && explicitDimensions.contains(dimension);
}
private void makeMetadataBinary(
final FileSmoosher v9Smoosher,
final ProgressIndicator progress,
@ -404,7 +395,7 @@ public class IndexMergerV9 implements IndexMerger
final ProgressIndicator progress,
final IndexSpec indexSpec,
final List<DimensionMergerV9> mergers,
final Set<String> explicitDimensions
final DimensionsSpecInspector dimensionsSpecInspector
) throws IOException
{
final Set<String> columnSet = new HashSet<>(mergedDimensions);
@ -439,7 +430,7 @@ public class IndexMergerV9 implements IndexMerger
nonNullOnlyColumns.add(mergedDimensions.get(i));
allDimensions.add(null);
allColumns.add(null);
} else if (shouldStore(explicitDimensions, mergedDimensions.get(i))) {
} else if (dimensionsSpecInspector.shouldStore(mergedDimensions.get(i))) {
// shouldStore AND hasOnlyNulls
allDimensions.add(mergedDimensions.get(i));
allColumns.add(mergedDimensions.get(i));
@ -1315,7 +1306,7 @@ public class IndexMergerV9 implements IndexMerger
outDir,
progress,
mergedDimensions,
dimensionsSpec == null ? ImmutableSet.of() : new HashSet<>(dimensionsSpec.getDimensionNames()),
new DimensionsSpecInspector(storeEmptyColumns, dimensionsSpec),
mergedMetrics,
rowMergerFn,
true,
@ -1460,4 +1451,34 @@ public class IndexMergerV9 implements IndexMerger
);
}
}
private static class DimensionsSpecInspector
{
private final boolean storeEmptyColumns;
private final Set<String> explicitDimensions;
private final boolean includeAllDimensions;
private DimensionsSpecInspector(
boolean storeEmptyColumns,
@Nullable DimensionsSpec dimensionsSpec
)
{
this.storeEmptyColumns = storeEmptyColumns;
this.explicitDimensions = dimensionsSpec == null
? ImmutableSet.of()
: new HashSet<>(dimensionsSpec.getDimensionNames());
this.includeAllDimensions = dimensionsSpec != null && dimensionsSpec.isIncludeAllDimensions();
}
/**
* Returns true if the given dimension should be stored in the segment even when the column has only nulls.
* If it has non-nulls, then the column must be stored.
*
* @see DimensionMerger#hasOnlyNulls()
*/
private boolean shouldStore(String dimension)
{
return storeEmptyColumns && (includeAllDimensions || explicitDimensions.contains(dimension));
}
}
}