If ingested data has sparse columns, the ingested data with forceGuaranteedRollup=true can result in imperfect rollup and final dimension ordering can be different from dimensionSpec ordering in the ingestionSpec (#10948)

* add IT

* add IT

* add the fix

* fix checkstyle

* fix compile

* fix compile

* fix test

* fix test

* address comments
This commit is contained in:
Maytas Monsereenusorn 2021-03-18 17:04:28 -07:00 committed by GitHub
parent 83fcab1d0f
commit f19c2e9ce4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 757 additions and 32 deletions

View File

@ -609,7 +609,7 @@ public class IndexGeneratorJob implements Jobby
{ {
boolean rollup = config.getSchema().getDataSchema().getGranularitySpec().isRollup(); boolean rollup = config.getSchema().getDataSchema().getGranularitySpec().isRollup();
return HadoopDruidIndexerConfig.INDEX_MERGER_V9 return HadoopDruidIndexerConfig.INDEX_MERGER_V9
.mergeQueryableIndex(indexes, rollup, aggs, file, config.getIndexSpec(), progressIndicator, null, -1); .mergeQueryableIndex(indexes, rollup, aggs, null, file, config.getIndexSpec(), progressIndicator, null, -1);
} }
@Override @Override

View File

@ -329,7 +329,7 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec, P extends PartitionL
}); });
} }
if (maxNumSegmentsToMerge >= indexes.size()) { if (maxNumSegmentsToMerge >= indexes.size()) {
dimensionNames = IndexMerger.getMergedDimensionsFromQueryableIndexes(indexesToMerge); dimensionNames = IndexMerger.getMergedDimensionsFromQueryableIndexes(indexesToMerge, dataSchema.getDimensionsSpec());
} }
final File outDir = new File(baseOutDir, StringUtils.format("merged_%d", suffix++)); final File outDir = new File(baseOutDir, StringUtils.format("merged_%d", suffix++));
mergedFiles.add( mergedFiles.add(

View File

@ -101,6 +101,20 @@ public abstract class AbstractIndexerTest
unloadAndKillData(dataSource, first, last); unloadAndKillData(dataSource, first, last);
} }
protected void loadData(String indexTask, final String fullDatasourceName) throws Exception
{
String taskSpec = getResourceAsString(indexTask);
taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName);
final String taskID = indexer.submitTask(taskSpec);
LOG.info("TaskID for loading index task %s", taskID);
indexer.waitUntilTaskCompletes(taskID);
ITRetryUtil.retryUntilTrue(
() -> coordinator.areSegmentsLoaded(fullDatasourceName),
"Segment Load"
);
}
private void unloadAndKillData(final String dataSource, String start, String end) private void unloadAndKillData(final String dataSource, String start, String end)
{ {
// Wait for any existing index tasks to complete before disabling the datasource otherwise // Wait for any existing index tasks to complete before disabling the datasource otherwise

View File

@ -0,0 +1,256 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.tests.indexer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.commons.io.IOUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.tests.TestNGGroup;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.Closeable;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@Test(groups = {TestNGGroup.COMPACTION, TestNGGroup.QUICKSTART_COMPATIBLE})
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITCompactionSparseColumnTest extends AbstractIndexerTest
{
private static final Logger LOG = new Logger(ITCompactionTaskTest.class);
private static final String INDEX_DATASOURCE = "sparse_column_index_test";
private static final String INDEX_TASK = "/indexer/sparse_column_index_task.json";
private static final String COMPACTION_QUERIES_RESOURCE = "/indexer/sparse_column_index_queries.json";
private static final String COMPACTION_TASK_WITHOUT_DIMENSION = "/indexer/sparse_column_without_dim_compaction_task.json";
private static final String COMPACTION_TASK_WITH_DIMENSION = "/indexer/sparse_column_with_dim_compaction_task.json";
@Inject
private IntegrationTestingConfig config;
private String fullDatasourceName;
@BeforeMethod
public void setFullDatasourceName(Method method)
{
fullDatasourceName = INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix() + "-" + method.getName();
}
@Test
public void testCompactionPerfectRollUpWithoutDimensionSpec() throws Exception
{
try (final Closeable ignored = unloader(fullDatasourceName)) {
// Load and verify initial data
loadAndVerifyDataWithSparseColumn(fullDatasourceName);
// Compaction with perfect roll up. Rolls with "X", "H" (for the first and second columns respectively) should be roll up
String template = getResourceAsString(COMPACTION_TASK_WITHOUT_DIMENSION);
template = StringUtils.replace(template, "%%DATASOURCE%%", fullDatasourceName);
final String taskID = indexer.submitTask(template);
indexer.waitUntilTaskCompletes(taskID);
ITRetryUtil.retryUntilTrue(
() -> coordinator.areSegmentsLoaded(fullDatasourceName),
"Segment Compaction"
);
// Verify compacted data
// Compacted data only have one segments. First segment have the following rows:
// The ordering of the columns will be "dimB", "dimA", "dimC" (This is the same as the ordering in the initial
// ingestion task)
List<List<Object>> segmentRows = ImmutableList.of(
Arrays.asList(1442016000000L, "F", null, "C", 1, 1),
Arrays.asList(1442016000000L, "J", null, "C", 1, 1),
Arrays.asList(1442016000000L, "R", null, "J", 1, 1),
Arrays.asList(1442016000000L, "S", null, "Z", 1, 1),
Arrays.asList(1442016000000L, "T", null, "H", 1, 1),
Arrays.asList(1442016000000L, "X", null, "H", 3, 3),
Arrays.asList(1442016000000L, "X", "A", null, 1, 1),
Arrays.asList(1442016000000L, "Z", null, "H", 1, 1)
);
verifyCompactedData(segmentRows);
}
}
@Test
public void testCompactionPerfectRollUpWithLexicographicDimensionSpec() throws Exception
{
try (final Closeable ignored = unloader(fullDatasourceName)) {
// Load and verify initial data
loadAndVerifyDataWithSparseColumn(fullDatasourceName);
// Compaction with perfect roll up. Rolls with "X", "H" (for the first and second columns respectively) should be roll up
String template = getResourceAsString(COMPACTION_TASK_WITH_DIMENSION);
template = StringUtils.replace(template, "%%DATASOURCE%%", fullDatasourceName);
//
List<String> dimensionsOrder = ImmutableList.of("dimA", "dimB", "dimC");
template = StringUtils.replace(
template,
"%%DIMENSION_NAMES%%",
jsonMapper.writeValueAsString(dimensionsOrder)
);
final String taskID = indexer.submitTask(template);
indexer.waitUntilTaskCompletes(taskID);
ITRetryUtil.retryUntilTrue(
() -> coordinator.areSegmentsLoaded(fullDatasourceName),
"Segment Compaction"
);
// Verify compacted data
// Compacted data only have one segments. First segment have the following rows:
// The ordering of the columns will be "dimA", "dimB", "dimC"
List<List<Object>> segmentRows = ImmutableList.of(
Arrays.asList(1442016000000L, null, "X", "A", 1, 1),
Arrays.asList(1442016000000L, "C", "F", null, 1, 1),
Arrays.asList(1442016000000L, "C", "J", null, 1, 1),
Arrays.asList(1442016000000L, "H", "T", null, 1, 1),
Arrays.asList(1442016000000L, "H", "X", null, 3, 3),
Arrays.asList(1442016000000L, "H", "Z", null, 1, 1),
Arrays.asList(1442016000000L, "J", "R", null, 1, 1),
Arrays.asList(1442016000000L, "Z", "S", null, 1, 1)
);
verifyCompactedData(segmentRows);
}
}
@Test
public void testCompactionPerfectRollUpWithNonLexicographicDimensionSpec() throws Exception
{
try (final Closeable ignored = unloader(fullDatasourceName)) {
// Load and verify initial data
loadAndVerifyDataWithSparseColumn(fullDatasourceName);
// Compaction with perfect roll up. Rolls with "X", "H" (for the first and second columns respectively) should be roll up
String template = getResourceAsString(COMPACTION_TASK_WITH_DIMENSION);
template = StringUtils.replace(template, "%%DATASOURCE%%", fullDatasourceName);
//
List<String> dimensionsOrder = ImmutableList.of("dimC", "dimB", "dimA");
template = StringUtils.replace(
template,
"%%DIMENSION_NAMES%%",
jsonMapper.writeValueAsString(dimensionsOrder)
);
final String taskID = indexer.submitTask(template);
indexer.waitUntilTaskCompletes(taskID);
ITRetryUtil.retryUntilTrue(
() -> coordinator.areSegmentsLoaded(fullDatasourceName),
"Segment Compaction"
);
// Verify compacted data
// Compacted data only have one segments. First segment have the following rows:
// The ordering of the columns will be "dimC", "dimB", "dimA"
List<List<Object>> segment1Rows = ImmutableList.of(
Arrays.asList(1442016000000L, null, "F", "C", 1, 1),
Arrays.asList(1442016000000L, null, "J", "C", 1, 1),
Arrays.asList(1442016000000L, null, "R", "J", 1, 1),
Arrays.asList(1442016000000L, null, "S", "Z", 1, 1),
Arrays.asList(1442016000000L, null, "T", "H", 1, 1),
Arrays.asList(1442016000000L, null, "X", "H", 3, 3),
Arrays.asList(1442016000000L, null, "Z", "H", 1, 1),
Arrays.asList(1442016000000L, "A", "X", null, 1, 1)
);
verifyCompactedData(segment1Rows);
}
}
private void loadAndVerifyDataWithSparseColumn(String fullDatasourceName) throws Exception
{
loadData(INDEX_TASK, fullDatasourceName);
List<Map<String, List<List<Object>>>> expectedResultBeforeCompaction = new ArrayList<>();
// First segments have the following rows:
List<List<Object>> segment1Rows = ImmutableList.of(
ImmutableList.of(1442016000000L, "F", "C", 1, 1),
ImmutableList.of(1442016000000L, "J", "C", 1, 1),
ImmutableList.of(1442016000000L, "X", "H", 1, 1)
);
expectedResultBeforeCompaction.add(ImmutableMap.of("events", segment1Rows));
// Second segments have the following rows:
List<List<Object>> segment2Rows = ImmutableList.of(
ImmutableList.of(1442016000000L, "S", "Z", 1, 1),
ImmutableList.of(1442016000000L, "X", "H", 1, 1),
ImmutableList.of(1442016000000L, "Z", "H", 1, 1)
);
expectedResultBeforeCompaction.add(ImmutableMap.of("events", segment2Rows));
// Third segments have the following rows:
List<List<Object>> segment3Rows = ImmutableList.of(
ImmutableList.of(1442016000000L, "R", "J", 1, 1),
ImmutableList.of(1442016000000L, "T", "H", 1, 1),
ImmutableList.of(1442016000000L, "X", "H", 1, 1)
);
expectedResultBeforeCompaction.add(ImmutableMap.of("events", segment3Rows));
// Fourth segments have the following rows:
List<List<Object>> segment4Rows = ImmutableList.of(
ImmutableList.of(1442016000000L, "X", "A", 1, 1)
);
expectedResultBeforeCompaction.add(ImmutableMap.of("events", segment4Rows));
verifyQueryResult(expectedResultBeforeCompaction, 10, 10, 1);
}
private void verifyCompactedData(List<List<Object>> segmentRows) throws Exception
{
List<Map<String, List<List<Object>>>> expectedResultAfterCompaction = new ArrayList<>();
expectedResultAfterCompaction.add(ImmutableMap.of("events", segmentRows));
verifyQueryResult(expectedResultAfterCompaction, 8, 10, 0.8);
}
private void verifyQueryResult(
List<Map<String, List<List<Object>>>> expectedScanResult,
int expectedNumRoll,
int expectedSumCount,
double expectedRollupRatio
) throws Exception
{
InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(COMPACTION_QUERIES_RESOURCE);
String queryResponseTemplate = IOUtils.toString(is, StandardCharsets.UTF_8);
queryResponseTemplate = StringUtils.replace(
queryResponseTemplate,
"%%DATASOURCE%%",
fullDatasourceName
);
queryResponseTemplate = StringUtils.replace(
queryResponseTemplate,
"%%EXPECTED_SCAN_RESULT%%",
jsonMapper.writeValueAsString(expectedScanResult)
);
queryResponseTemplate = StringUtils.replace(
queryResponseTemplate,
"%%EXPECTED_SUM_COUNT%%",
jsonMapper.writeValueAsString(expectedSumCount)
);
queryResponseTemplate = StringUtils.replace(
queryResponseTemplate,
"%%EXPECTED_ROLLUP_RATIO%%",
jsonMapper.writeValueAsString(expectedRollupRatio)
);
queryResponseTemplate = StringUtils.replace(
queryResponseTemplate,
"%%EXPECTED_NUM_ROW%%",
jsonMapper.writeValueAsString(expectedNumRoll)
);
queryHelper.testQueriesFromString(queryResponseTemplate);
}
}

View File

@ -101,7 +101,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
public void testCompactionWithQueryGranularityInGranularitySpec() throws Exception public void testCompactionWithQueryGranularityInGranularitySpec() throws Exception
{ {
try (final Closeable ignored = unloader(fullDatasourceName)) { try (final Closeable ignored = unloader(fullDatasourceName)) {
loadData(INDEX_TASK); loadData(INDEX_TASK, fullDatasourceName);
// 4 segments across 2 days // 4 segments across 2 days
checkNumberOfSegments(4); checkNumberOfSegments(4);
List<String> expectedIntervalAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName); List<String> expectedIntervalAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
@ -140,7 +140,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
public void testCompactionWithSegmentGranularityAndQueryGranularityInGranularitySpec() throws Exception public void testCompactionWithSegmentGranularityAndQueryGranularityInGranularitySpec() throws Exception
{ {
try (final Closeable ignored = unloader(fullDatasourceName)) { try (final Closeable ignored = unloader(fullDatasourceName)) {
loadData(INDEX_TASK); loadData(INDEX_TASK, fullDatasourceName);
// 4 segments across 2 days // 4 segments across 2 days
checkNumberOfSegments(4); checkNumberOfSegments(4);
List<String> expectedIntervalAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName); List<String> expectedIntervalAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
@ -182,7 +182,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
) throws Exception ) throws Exception
{ {
try (final Closeable ignored = unloader(fullDatasourceName)) { try (final Closeable ignored = unloader(fullDatasourceName)) {
loadData(indexTask); loadData(indexTask, fullDatasourceName);
// 4 segments across 2 days // 4 segments across 2 days
checkNumberOfSegments(4); checkNumberOfSegments(4);
List<String> expectedIntervalAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName); List<String> expectedIntervalAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
@ -211,20 +211,6 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
} }
} }
private void loadData(String indexTask) throws Exception
{
String taskSpec = getResourceAsString(indexTask);
taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName);
final String taskID = indexer.submitTask(taskSpec);
LOG.info("TaskID for loading index task %s", taskID);
indexer.waitUntilTaskCompletes(taskID);
ITRetryUtil.retryUntilTrue(
() -> coordinator.areSegmentsLoaded(fullDatasourceName),
"Segment Load"
);
}
private void compactData(String compactionResource, GranularityType newSegmentGranularity, GranularityType newQueryGranularity) throws Exception private void compactData(String compactionResource, GranularityType newSegmentGranularity, GranularityType newQueryGranularity) throws Exception
{ {
String template = getResourceAsString(compactionResource); String template = getResourceAsString(compactionResource);

View File

@ -0,0 +1,80 @@
[
{
"description": "timeseries, 1 agg, all",
"query":{
"queryType" : "timeBoundary",
"dataSource": "%%DATASOURCE%%"
},
"expectedResults":[
{
"timestamp" : "2015-09-12T00:00:00.000Z",
"result" : {
"minTime" : "2015-09-12T00:00:00.000Z",
"maxTime" : "2015-09-12T00:00:00.000Z"
}
}
]
},
{
"description": "scan, all",
"query": {
"queryType": "scan",
"dataSource": "%%DATASOURCE%%",
"intervals": [
"2013-01-01/2020-01-02"
],
"resultFormat":"compactedList"
},
"expectedResults": %%EXPECTED_SCAN_RESULT%%,
"fieldsToTest": ["events"]
},
{
"description": "roll up ratio",
"query": {
"queryType":"timeseries",
"dataSource":{
"type":"table",
"name":"%%DATASOURCE%%"
},
"intervals":{
"type":"intervals",
"intervals":[
"2013-01-01/2020-01-02"
]
},
"granularity":{
"type":"all"
},
"aggregations":[
{
"type":"count",
"name":"a0"
},
{
"type":"longSum",
"name":"a1",
"fieldName":"count",
"expression":null
}
],
"postAggregations":[
{
"type":"expression",
"name":"p0",
"expression":"((\"a0\" * 1.00) / \"a1\")",
"ordering":null
}
]
},
"expectedResults": [
{
"timestamp" : "2015-09-12T00:00:00.000Z",
"result" : {
"a1" : %%EXPECTED_SUM_COUNT%%,
"p0" : %%EXPECTED_ROLLUP_RATIO%%,
"a0" : %%EXPECTED_NUM_ROW%%
}
}
]
}
]

View File

@ -0,0 +1,57 @@
{
"type": "index_parallel",
"spec": {
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "inline",
"data": "{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"C\",\"dimB\":\"F\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"C\",\"dimB\":\"J\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"X\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"Z\",\"dimB\":\"S\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"X\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"Z\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"J\",\"dimB\":\"R\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"T\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"X\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimC\":\"A\",\"dimB\":\"X\",\"metA\":1}\n"
},
"inputFormat": {
"type": "json"
}
},
"tuningConfig": {
"type": "index_parallel",
"partitionsSpec": {
"type": "dynamic",
"maxRowsPerSegment": 3,
"maxTotalRows": 3
},
"maxRowsInMemory": 3
},
"dataSchema": {
"dataSource": "%%DATASOURCE%%",
"timestampSpec": {
"column": "time",
"format": "iso"
},
"dimensionsSpec": {
"dimensions": [
"dimB",
"dimA",
"dimC",
"dimD",
"dimE",
"dimF"
]
},
"granularitySpec": {
"queryGranularity": "hour",
"rollup": true,
"segmentGranularity": "hour"
},
"metricsSpec": [
{
"name": "count",
"type": "count"
},
{
"name": "sum_metA",
"type": "longSum",
"fieldName": "metA"
}
]
}
}
}

View File

@ -0,0 +1,19 @@
{
"type": "compact",
"dataSource": "%%DATASOURCE%%",
"dimensionsSpec": {
"dimensions": %%DIMENSION_NAMES%%
},
"interval": "2010-10-29T05:00:00Z/2030-10-29T06:00:00Z",
"tuningConfig": {
"type": "index_parallel",
"maxRowsPerSegment": 3,
"maxRowsInMemory": 3,
"maxNumConcurrentSubTasks": 2,
"partitionsSpec": {
"type": "hashed",
"numShards": 1
},
"forceGuaranteedRollup": true
}
}

View File

@ -0,0 +1,16 @@
{
"type": "compact",
"dataSource": "%%DATASOURCE%%",
"interval": "2010-10-29T05:00:00Z/2030-10-29T06:00:00Z",
"tuningConfig": {
"type": "index_parallel",
"maxRowsPerSegment": 3,
"maxRowsInMemory": 3,
"maxNumConcurrentSubTasks": 2,
"partitionsSpec": {
"type": "hashed",
"numShards": 1
},
"forceGuaranteedRollup": true
}
}

View File

@ -28,6 +28,7 @@ import com.google.common.collect.PeekingIterator;
import com.google.inject.ImplementedBy; import com.google.inject.ImplementedBy;
import org.apache.druid.common.config.NullHandling; import org.apache.druid.common.config.NullHandling;
import org.apache.druid.common.utils.SerializerUtils; import org.apache.druid.common.utils.SerializerUtils;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.java.util.common.ByteBufferUtils; import org.apache.druid.java.util.common.ByteBufferUtils;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.Pair;
@ -38,6 +39,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.data.Indexed; import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -65,9 +67,12 @@ public interface IndexMerger
int INVALID_ROW = -1; int INVALID_ROW = -1;
int UNLIMITED_MAX_COLUMNS_TO_MERGE = -1; int UNLIMITED_MAX_COLUMNS_TO_MERGE = -1;
static List<String> getMergedDimensionsFromQueryableIndexes(List<QueryableIndex> indexes) static List<String> getMergedDimensionsFromQueryableIndexes(
List<QueryableIndex> indexes,
@Nullable DimensionsSpec dimensionsSpec
)
{ {
return getMergedDimensions(toIndexableAdapters(indexes)); return getMergedDimensions(toIndexableAdapters(indexes), dimensionsSpec);
} }
static List<IndexableAdapter> toIndexableAdapters(List<QueryableIndex> indexes) static List<IndexableAdapter> toIndexableAdapters(List<QueryableIndex> indexes)
@ -75,14 +80,18 @@ public interface IndexMerger
return indexes.stream().map(QueryableIndexIndexableAdapter::new).collect(Collectors.toList()); return indexes.stream().map(QueryableIndexIndexableAdapter::new).collect(Collectors.toList());
} }
static List<String> getMergedDimensions(List<IndexableAdapter> indexes) static List<String> getMergedDimensions(
List<IndexableAdapter> indexes,
@Nullable DimensionsSpec dimensionsSpec
)
{ {
if (indexes.size() == 0) { if (indexes.size() == 0) {
return ImmutableList.of(); return ImmutableList.of();
} }
List<String> commonDimOrder = getLongestSharedDimOrder(indexes); List<String> commonDimOrder = getLongestSharedDimOrder(indexes, dimensionsSpec);
if (commonDimOrder == null) { if (commonDimOrder == null) {
log.warn("Indexes have incompatible dimension orders, using lexicographic order."); log.warn("Indexes have incompatible dimension orders and there is no valid dimension ordering"
+ " in the ingestionSpec, using lexicographic order.");
return getLexicographicMergedDimensions(indexes); return getLexicographicMergedDimensions(indexes);
} else { } else {
return commonDimOrder; return commonDimOrder;
@ -90,7 +99,10 @@ public interface IndexMerger
} }
@Nullable @Nullable
static List<String> getLongestSharedDimOrder(List<IndexableAdapter> indexes) static List<String> getLongestSharedDimOrder(
List<IndexableAdapter> indexes,
@Nullable DimensionsSpec dimensionsSpec
)
{ {
int maxSize = 0; int maxSize = 0;
Iterable<String> orderingCandidate = null; Iterable<String> orderingCandidate = null;
@ -106,6 +118,41 @@ public interface IndexMerger
return null; return null;
} }
if (isDimensionOrderingValid(indexes, orderingCandidate)) {
return ImmutableList.copyOf(orderingCandidate);
} else {
log.info("Indexes have incompatible dimension orders, try falling back on dimension ordering from ingestionSpec");
// Check if there is a valid dimension ordering in the ingestionSpec to fall back on
if (dimensionsSpec == null || CollectionUtils.isNullOrEmpty(dimensionsSpec.getDimensionNames())) {
log.info("Cannot fall back on dimension ordering from ingestionSpec as it does not exist");
return null;
}
List<String> candidate = new ArrayList<>(dimensionsSpec.getDimensionNames());
// Remove all dimensions that does not exist within the indexes from the candidate
Set<String> allValidDimensions = indexes.stream()
.flatMap(indexableAdapter -> indexableAdapter.getDimensionNames().stream())
.collect(Collectors.toSet());
candidate.retainAll(allValidDimensions);
// Sanity check that there is no extra/missing columns
if (candidate.size() != allValidDimensions.size()) {
log.error("Dimension mismatched between ingestionSpec and indexes. ingestionSpec[%s] indexes[%s]",
candidate,
allValidDimensions);
return null;
}
// Sanity check that all indexes dimension ordering is the same as the ordering in candidate
if (!isDimensionOrderingValid(indexes, candidate)) {
log.error("Dimension from ingestionSpec has invalid ordering");
return null;
}
log.info("Dimension ordering from ingestionSpec is valid. Fall back on dimension ordering [%s]", candidate);
return candidate;
}
}
static boolean isDimensionOrderingValid(List<IndexableAdapter> indexes, Iterable<String> orderingCandidate)
{
for (IndexableAdapter index : indexes) { for (IndexableAdapter index : indexes) {
Iterator<String> candidateIter = orderingCandidate.iterator(); Iterator<String> candidateIter = orderingCandidate.iterator();
for (String matchDim : index.getDimensionNames()) { for (String matchDim : index.getDimensionNames()) {
@ -118,11 +165,11 @@ public interface IndexMerger
} }
} }
if (!matched) { if (!matched) {
return null; return false;
} }
} }
} }
return ImmutableList.copyOf(orderingCandidate); return true;
} }
static List<String> getLexicographicMergedDimensions(List<IndexableAdapter> indexes) static List<String> getLexicographicMergedDimensions(List<IndexableAdapter> indexes)
@ -205,6 +252,18 @@ public interface IndexMerger
List<QueryableIndex> indexes, List<QueryableIndex> indexes,
boolean rollup, boolean rollup,
AggregatorFactory[] metricAggs, AggregatorFactory[] metricAggs,
@Nullable DimensionsSpec dimensionsSpec,
File outDir,
IndexSpec indexSpec,
@Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
int maxColumnsToMerge
) throws IOException;
File mergeQueryableIndex(
List<QueryableIndex> indexes,
boolean rollup,
AggregatorFactory[] metricAggs,
@Nullable DimensionsSpec dimensionsSpec,
File outDir, File outDir,
IndexSpec indexSpec, IndexSpec indexSpec,
ProgressIndicator progress, ProgressIndicator progress,

View File

@ -28,6 +28,7 @@ import com.google.common.io.Files;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.druid.common.config.NullHandling; import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.io.ZeroCopyByteArrayOutputStream; import org.apache.druid.io.ZeroCopyByteArrayOutputStream;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.FileUtils;
@ -875,6 +876,7 @@ public class IndexMergerV9 implements IndexMerger
// while merging a single iterable // while merging a single iterable
false, false,
index.getMetricAggs(), index.getMetricAggs(),
null,
outDir, outDir,
indexSpec, indexSpec,
progress, progress,
@ -898,6 +900,31 @@ public class IndexMergerV9 implements IndexMerger
indexes, indexes,
rollup, rollup,
metricAggs, metricAggs,
null,
outDir,
indexSpec,
segmentWriteOutMediumFactory,
maxColumnsToMerge
);
}
@Override
public File mergeQueryableIndex(
List<QueryableIndex> indexes,
boolean rollup,
final AggregatorFactory[] metricAggs,
@Nullable DimensionsSpec dimensionsSpec,
File outDir,
IndexSpec indexSpec,
@Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
int maxColumnsToMerge
) throws IOException
{
return mergeQueryableIndex(
indexes,
rollup,
metricAggs,
dimensionsSpec,
outDir, outDir,
indexSpec, indexSpec,
new BaseProgressIndicator(), new BaseProgressIndicator(),
@ -911,6 +938,7 @@ public class IndexMergerV9 implements IndexMerger
List<QueryableIndex> indexes, List<QueryableIndex> indexes,
boolean rollup, boolean rollup,
final AggregatorFactory[] metricAggs, final AggregatorFactory[] metricAggs,
@Nullable DimensionsSpec dimensionsSpec,
File outDir, File outDir,
IndexSpec indexSpec, IndexSpec indexSpec,
ProgressIndicator progress, ProgressIndicator progress,
@ -922,6 +950,7 @@ public class IndexMergerV9 implements IndexMerger
IndexMerger.toIndexableAdapters(indexes), IndexMerger.toIndexableAdapters(indexes),
rollup, rollup,
metricAggs, metricAggs,
dimensionsSpec,
outDir, outDir,
indexSpec, indexSpec,
progress, progress,
@ -940,13 +969,14 @@ public class IndexMergerV9 implements IndexMerger
int maxColumnsToMerge int maxColumnsToMerge
) throws IOException ) throws IOException
{ {
return multiphaseMerge(indexes, rollup, metricAggs, outDir, indexSpec, new BaseProgressIndicator(), null, maxColumnsToMerge); return multiphaseMerge(indexes, rollup, metricAggs, null, outDir, indexSpec, new BaseProgressIndicator(), null, maxColumnsToMerge);
} }
private File multiphaseMerge( private File multiphaseMerge(
List<IndexableAdapter> indexes, List<IndexableAdapter> indexes,
final boolean rollup, final boolean rollup,
final AggregatorFactory[] metricAggs, final AggregatorFactory[] metricAggs,
@Nullable DimensionsSpec dimensionsSpec,
File outDir, File outDir,
IndexSpec indexSpec, IndexSpec indexSpec,
ProgressIndicator progress, ProgressIndicator progress,
@ -964,6 +994,7 @@ public class IndexMergerV9 implements IndexMerger
indexes, indexes,
rollup, rollup,
metricAggs, metricAggs,
dimensionsSpec,
outDir, outDir,
indexSpec, indexSpec,
progress, progress,
@ -997,6 +1028,7 @@ public class IndexMergerV9 implements IndexMerger
phase, phase,
rollup, rollup,
metricAggs, metricAggs,
dimensionsSpec,
phaseOutDir, phaseOutDir,
indexSpec, indexSpec,
progress, progress,
@ -1087,13 +1119,14 @@ public class IndexMergerV9 implements IndexMerger
List<IndexableAdapter> indexes, List<IndexableAdapter> indexes,
final boolean rollup, final boolean rollup,
final AggregatorFactory[] metricAggs, final AggregatorFactory[] metricAggs,
@Nullable DimensionsSpec dimensionsSpec,
File outDir, File outDir,
IndexSpec indexSpec, IndexSpec indexSpec,
ProgressIndicator progress, ProgressIndicator progress,
@Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
) throws IOException ) throws IOException
{ {
final List<String> mergedDimensions = IndexMerger.getMergedDimensions(indexes); final List<String> mergedDimensions = IndexMerger.getMergedDimensions(indexes, dimensionsSpec);
final List<String> mergedMetrics = IndexMerger.mergeIndexed( final List<String> mergedMetrics = IndexMerger.mergeIndexed(
indexes.stream().map(IndexableAdapter::getMetricNames).collect(Collectors.toList()) indexes.stream().map(IndexableAdapter::getMetricNames).collect(Collectors.toList())
@ -1196,7 +1229,7 @@ public class IndexMergerV9 implements IndexMerger
FileUtils.deleteDirectory(outDir); FileUtils.deleteDirectory(outDir);
org.apache.commons.io.FileUtils.forceMkdir(outDir); org.apache.commons.io.FileUtils.forceMkdir(outDir);
final List<String> mergedDimensions = IndexMerger.getMergedDimensions(indexes); final List<String> mergedDimensions = IndexMerger.getMergedDimensions(indexes, null);
final List<String> mergedMetrics = IndexMerger.mergeIndexed( final List<String> mergedMetrics = IndexMerger.mergeIndexed(
indexes.stream().map(IndexableAdapter::getMetricNames).collect(Collectors.toList()) indexes.stream().map(IndexableAdapter::getMetricNames).collect(Collectors.toList())

View File

@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.collections.bitmap.BitmapFactory;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.data.ListIndexed;
import org.joda.time.Interval;
import org.joda.time.chrono.ISOChronology;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.List;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class IndexMergerLongestSharedDimOrderTest
{
@Mock
Supplier<ColumnHolder> mockSupplier;
@Mock
ColumnHolder mockColumnHolder;
@Mock
SmooshedFileMapper mockSmooshedFileMapper;
@Mock
BitmapFactory mockBitmapFactory;
@Before
public void setUp()
{
when(mockSupplier.get()).thenReturn(mockColumnHolder);
// This value does not matter
when(mockColumnHolder.getLength()).thenReturn(1);
}
@Test
public void testGetLongestSharedDimOrderWithNullDimensionSpecAndEmptyIndex()
{
List<String> actual = IndexMerger.getLongestSharedDimOrder(ImmutableList.of(), null);
Assert.assertNull(actual);
}
@Test
public void testGetLongestSharedDimOrderWithNullDimensionSpecAndValidOrdering()
{
QueryableIndexIndexableAdapter index1 = makeIndexWithDimensionList(ImmutableList.of("a", "b", "c"));
QueryableIndexIndexableAdapter index2 = makeIndexWithDimensionList(ImmutableList.of("b", "c"));
List<String> actual = IndexMerger.getLongestSharedDimOrder(ImmutableList.of(index1, index2), null);
Assert.assertNotNull(actual);
Assert.assertEquals(ImmutableList.of("a", "b", "c"), actual);
// Valid ordering as although second index has gap, it is still same ordering
index1 = makeIndexWithDimensionList(ImmutableList.of("a", "b", "c"));
index2 = makeIndexWithDimensionList(ImmutableList.of("a", "c"));
actual = IndexMerger.getLongestSharedDimOrder(ImmutableList.of(index1, index2), null);
Assert.assertNotNull(actual);
Assert.assertEquals(ImmutableList.of("a", "b", "c"), actual);
}
@Test
public void testGetLongestSharedDimOrderWithNullDimensionSpecAndNoValidOrdering()
{
// No valid ordering as no index as all three dimensions
QueryableIndexIndexableAdapter index1 = makeIndexWithDimensionList(ImmutableList.of("a", "b"));
QueryableIndexIndexableAdapter index2 = makeIndexWithDimensionList(ImmutableList.of("b", "c"));
List<String> actual = IndexMerger.getLongestSharedDimOrder(ImmutableList.of(index1, index2), null);
Assert.assertNull(actual);
// No valid ordering as ordering is not the same in all indexes
index1 = makeIndexWithDimensionList(ImmutableList.of("a", "b", "c"));
index2 = makeIndexWithDimensionList(ImmutableList.of("c", "b"));
actual = IndexMerger.getLongestSharedDimOrder(ImmutableList.of(index1, index2), null);
Assert.assertNull(actual);
}
@Test
public void testGetLongestSharedDimOrderWithSchemalessDimensionSpecAndNoValidOrdering()
{
DimensionsSpec empty = new DimensionsSpec(ImmutableList.of());
// No valid ordering as no index as all three dimensions
QueryableIndexIndexableAdapter index1 = makeIndexWithDimensionList(ImmutableList.of("a", "b"));
QueryableIndexIndexableAdapter index2 = makeIndexWithDimensionList(ImmutableList.of("b", "c"));
List<String> actual = IndexMerger.getLongestSharedDimOrder(ImmutableList.of(index1, index2), empty);
Assert.assertNull(actual);
// No valid ordering as ordering is not the same in all indexes
index1 = makeIndexWithDimensionList(ImmutableList.of("a", "b", "c"));
index2 = makeIndexWithDimensionList(ImmutableList.of("c", "b"));
actual = IndexMerger.getLongestSharedDimOrder(ImmutableList.of(index1, index2), empty);
Assert.assertNull(actual);
}
@Test
public void testGetLongestSharedDimOrderWithValidSchemaDimensionSpecAndNoValidOrdering()
{
DimensionsSpec valid = new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("a", "b", "c")));
// No valid ordering as no index has all three dimensions
QueryableIndexIndexableAdapter index1 = makeIndexWithDimensionList(ImmutableList.of("a", "b"));
QueryableIndexIndexableAdapter index2 = makeIndexWithDimensionList(ImmutableList.of("b", "c"));
List<String> actual = IndexMerger.getLongestSharedDimOrder(ImmutableList.of(index1, index2), valid);
Assert.assertNotNull(actual);
Assert.assertEquals(ImmutableList.of("a", "b", "c"), actual);
}
@Test
public void testGetLongestSharedDimOrderWithInvalidSchemaDimensionSpecAndNoValidOrdering()
{
DimensionsSpec valid = new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("a", "b", "c")));
// No valid ordering as ordering is not the same in all indexes
QueryableIndexIndexableAdapter index1 = makeIndexWithDimensionList(ImmutableList.of("a", "b", "c"));
QueryableIndexIndexableAdapter index2 = makeIndexWithDimensionList(ImmutableList.of("c", "b"));
List<String> actual = IndexMerger.getLongestSharedDimOrder(ImmutableList.of(index1, index2), valid);
// Since ordering of index2 is not the same as the ordering of the schema in DimensionSpec
Assert.assertNull(actual);
}
@Test
public void testGetLongestSharedDimOrderWithValidSchemaDimensionSpecAndInvalidOrdering()
{
DimensionsSpec valid = new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("a", "b", "c")));
// No valid ordering as ordering is not the same in all indexes
QueryableIndexIndexableAdapter index1 = makeIndexWithDimensionList(ImmutableList.of("a", "b", "c"));
QueryableIndexIndexableAdapter index2 = makeIndexWithDimensionList(ImmutableList.of("c", "b", "e"));
List<String> actual = IndexMerger.getLongestSharedDimOrder(ImmutableList.of(index1, index2), valid);
// Since index2 has dimension that is not in the schema in DimensionSpec. This should not be possible.
Assert.assertNull(actual);
}
private QueryableIndexIndexableAdapter makeIndexWithDimensionList(List<String> dimensions)
{
return new QueryableIndexIndexableAdapter(
new SimpleQueryableIndex(
new Interval("2012-01-01/2012-01-02", ISOChronology.getInstance(DateTimes.inferTzFromString("America/Los_Angeles"))),
new ListIndexed<>(dimensions),
mockBitmapFactory,
ImmutableMap.of(ColumnHolder.TIME_COLUMN_NAME, mockSupplier),
mockSmooshedFileMapper,
null,
true
)
);
}
}

View File

@ -824,6 +824,7 @@ public class AppenderatorImpl implements Appenderator
indexes, indexes,
schema.getGranularitySpec().isRollup(), schema.getGranularitySpec().isRollup(),
schema.getAggregators(), schema.getAggregators(),
schema.getDimensionsSpec(),
mergedTarget, mergedTarget,
tuningConfig.getIndexSpec(), tuningConfig.getIndexSpec(),
tuningConfig.getSegmentWriteOutMediumFactory(), tuningConfig.getSegmentWriteOutMediumFactory(),
@ -848,7 +849,7 @@ public class AppenderatorImpl implements Appenderator
// semantics. // semantics.
() -> dataSegmentPusher.push( () -> dataSegmentPusher.push(
mergedFile, mergedFile,
sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes)), sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes, schema.getDimensionsSpec())),
useUniquePath useUniquePath
), ),
exception -> exception instanceof Exception, exception -> exception instanceof Exception,

View File

@ -30,6 +30,7 @@ import com.google.inject.Provider;
import org.apache.druid.client.cache.Cache; import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats; import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.guice.annotations.Processing; import org.apache.druid.guice.annotations.Processing;
import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.indexing.worker.config.WorkerConfig;
@ -520,6 +521,30 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
@Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
int maxColumnsToMerge int maxColumnsToMerge
) )
{
return mergeQueryableIndex(
indexes,
rollup,
metricAggs,
null,
outDir,
indexSpec,
segmentWriteOutMediumFactory,
maxColumnsToMerge
);
}
@Override
public File mergeQueryableIndex(
List<QueryableIndex> indexes,
boolean rollup,
AggregatorFactory[] metricAggs,
@Nullable DimensionsSpec dimensionsSpec,
File outDir,
IndexSpec indexSpec,
@Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
int maxColumnsToMerge
)
{ {
ListenableFuture<File> mergeFuture = mergeExecutor.submit( ListenableFuture<File> mergeFuture = mergeExecutor.submit(
new Callable<File>() new Callable<File>()
@ -532,6 +557,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
indexes, indexes,
rollup, rollup,
metricAggs, metricAggs,
dimensionsSpec,
outDir, outDir,
indexSpec, indexSpec,
segmentWriteOutMediumFactory, segmentWriteOutMediumFactory,
@ -656,6 +682,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
List<QueryableIndex> indexes, List<QueryableIndex> indexes,
boolean rollup, boolean rollup,
AggregatorFactory[] metricAggs, AggregatorFactory[] metricAggs,
@Nullable DimensionsSpec dimensionsSpec,
File outDir, File outDir,
IndexSpec indexSpec, IndexSpec indexSpec,
ProgressIndicator progress, ProgressIndicator progress,

View File

@ -456,7 +456,7 @@ public class RealtimePlumber implements Plumber
DataSegment segment = dataSegmentPusher.push( DataSegment segment = dataSegmentPusher.push(
mergedFile, mergedFile,
sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes)), sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes, schema.getDimensionsSpec())),
false false
); );
log.info("Inserting [%s] to the metadata store", sink.getSegment().getId()); log.info("Inserting [%s] to the metadata store", sink.getSegment().getId());