From f19c2e9ce40b711b5cf1ac1ad1cef9d50167c371 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Thu, 18 Mar 2021 17:04:28 -0700 Subject: [PATCH] If ingested data has sparse columns, the ingested data with forceGuaranteedRollup=true can result in imperfect rollup and final dimension ordering can be different from dimensionSpec ordering in the ingestionSpec (#10948) * add IT * add IT * add the fix * fix checkstyle * fix compile * fix compile * fix test * fix test * address comments --- .../druid/indexer/IndexGeneratorJob.java | 2 +- .../parallel/PartialSegmentMergeTask.java | 2 +- .../tests/indexer/AbstractIndexerTest.java | 14 + .../indexer/ITCompactionSparseColumnTest.java | 256 ++++++++++++++++++ .../tests/indexer/ITCompactionTaskTest.java | 20 +- .../indexer/sparse_column_index_queries.json | 80 ++++++ .../indexer/sparse_column_index_task.json | 57 ++++ ...parse_column_with_dim_compaction_task.json | 19 ++ ...se_column_without_dim_compaction_task.json | 16 ++ .../org/apache/druid/segment/IndexMerger.java | 75 ++++- .../apache/druid/segment/IndexMergerV9.java | 39 ++- .../IndexMergerLongestSharedDimOrderTest.java | 177 ++++++++++++ .../appenderator/AppenderatorImpl.java | 3 +- .../UnifiedIndexerAppenderatorsManager.java | 27 ++ .../realtime/plumber/RealtimePlumber.java | 2 +- 15 files changed, 757 insertions(+), 32 deletions(-) create mode 100644 integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionSparseColumnTest.java create mode 100644 integration-tests/src/test/resources/indexer/sparse_column_index_queries.json create mode 100644 integration-tests/src/test/resources/indexer/sparse_column_index_task.json create mode 100644 integration-tests/src/test/resources/indexer/sparse_column_with_dim_compaction_task.json create mode 100644 integration-tests/src/test/resources/indexer/sparse_column_without_dim_compaction_task.json create mode 100644 processing/src/test/java/org/apache/druid/segment/IndexMergerLongestSharedDimOrderTest.java diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java index 2a2d7a6f2e5..a12e76571db 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java @@ -609,7 +609,7 @@ public class IndexGeneratorJob implements Jobby { boolean rollup = config.getSchema().getDataSchema().getGranularitySpec().isRollup(); return HadoopDruidIndexerConfig.INDEX_MERGER_V9 - .mergeQueryableIndex(indexes, rollup, aggs, file, config.getIndexSpec(), progressIndicator, null, -1); + .mergeQueryableIndex(indexes, rollup, aggs, null, file, config.getIndexSpec(), progressIndicator, null, -1); } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java index 8749a0ce419..f787cd3067b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java @@ -329,7 +329,7 @@ abstract class PartialSegmentMergeTask= indexes.size()) { - dimensionNames = IndexMerger.getMergedDimensionsFromQueryableIndexes(indexesToMerge); + dimensionNames = IndexMerger.getMergedDimensionsFromQueryableIndexes(indexesToMerge, dataSchema.getDimensionsSpec()); } final File outDir = new File(baseOutDir, StringUtils.format("merged_%d", suffix++)); mergedFiles.add( diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java index 3f39be5eed1..e97f52472a5 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java @@ -101,6 +101,20 @@ public abstract class AbstractIndexerTest unloadAndKillData(dataSource, first, last); } + protected void loadData(String indexTask, final String fullDatasourceName) throws Exception + { + String taskSpec = getResourceAsString(indexTask); + taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName); + final String taskID = indexer.submitTask(taskSpec); + LOG.info("TaskID for loading index task %s", taskID); + indexer.waitUntilTaskCompletes(taskID); + + ITRetryUtil.retryUntilTrue( + () -> coordinator.areSegmentsLoaded(fullDatasourceName), + "Segment Load" + ); + } + private void unloadAndKillData(final String dataSource, String start, String end) { // Wait for any existing index tasks to complete before disabling the datasource otherwise diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionSparseColumnTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionSparseColumnTest.java new file mode 100644 index 00000000000..26f7ed218ea --- /dev/null +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionSparseColumnTest.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tests.indexer; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import org.apache.commons.io.IOUtils; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.testing.IntegrationTestingConfig; +import org.apache.druid.testing.guice.DruidTestModuleFactory; +import org.apache.druid.testing.utils.ITRetryUtil; +import org.apache.druid.tests.TestNGGroup; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import java.io.Closeable; +import java.io.InputStream; +import java.lang.reflect.Method; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +@Test(groups = {TestNGGroup.COMPACTION, TestNGGroup.QUICKSTART_COMPATIBLE}) +@Guice(moduleFactory = DruidTestModuleFactory.class) +public class ITCompactionSparseColumnTest extends AbstractIndexerTest +{ + private static final Logger LOG = new Logger(ITCompactionTaskTest.class); + private static final String INDEX_DATASOURCE = "sparse_column_index_test"; + + private static final String INDEX_TASK = "/indexer/sparse_column_index_task.json"; + private static final String COMPACTION_QUERIES_RESOURCE = "/indexer/sparse_column_index_queries.json"; + + private static final String COMPACTION_TASK_WITHOUT_DIMENSION = "/indexer/sparse_column_without_dim_compaction_task.json"; + private static final String COMPACTION_TASK_WITH_DIMENSION = "/indexer/sparse_column_with_dim_compaction_task.json"; + + @Inject + private IntegrationTestingConfig config; + + private String fullDatasourceName; + + @BeforeMethod + public void setFullDatasourceName(Method method) + { + fullDatasourceName = INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix() + "-" + method.getName(); + } + + @Test + public void testCompactionPerfectRollUpWithoutDimensionSpec() throws Exception + { + try (final Closeable ignored = unloader(fullDatasourceName)) { + // Load and verify initial data + loadAndVerifyDataWithSparseColumn(fullDatasourceName); + // Compaction with perfect roll up. Rolls with "X", "H" (for the first and second columns respectively) should be roll up + String template = getResourceAsString(COMPACTION_TASK_WITHOUT_DIMENSION); + template = StringUtils.replace(template, "%%DATASOURCE%%", fullDatasourceName); + final String taskID = indexer.submitTask(template); + indexer.waitUntilTaskCompletes(taskID); + ITRetryUtil.retryUntilTrue( + () -> coordinator.areSegmentsLoaded(fullDatasourceName), + "Segment Compaction" + ); + // Verify compacted data + // Compacted data only have one segments. First segment have the following rows: + // The ordering of the columns will be "dimB", "dimA", "dimC" (This is the same as the ordering in the initial + // ingestion task) + List> segmentRows = ImmutableList.of( + Arrays.asList(1442016000000L, "F", null, "C", 1, 1), + Arrays.asList(1442016000000L, "J", null, "C", 1, 1), + Arrays.asList(1442016000000L, "R", null, "J", 1, 1), + Arrays.asList(1442016000000L, "S", null, "Z", 1, 1), + Arrays.asList(1442016000000L, "T", null, "H", 1, 1), + Arrays.asList(1442016000000L, "X", null, "H", 3, 3), + Arrays.asList(1442016000000L, "X", "A", null, 1, 1), + Arrays.asList(1442016000000L, "Z", null, "H", 1, 1) + ); + verifyCompactedData(segmentRows); + } + } + + @Test + public void testCompactionPerfectRollUpWithLexicographicDimensionSpec() throws Exception + { + try (final Closeable ignored = unloader(fullDatasourceName)) { + // Load and verify initial data + loadAndVerifyDataWithSparseColumn(fullDatasourceName); + // Compaction with perfect roll up. Rolls with "X", "H" (for the first and second columns respectively) should be roll up + String template = getResourceAsString(COMPACTION_TASK_WITH_DIMENSION); + template = StringUtils.replace(template, "%%DATASOURCE%%", fullDatasourceName); + // + List dimensionsOrder = ImmutableList.of("dimA", "dimB", "dimC"); + template = StringUtils.replace( + template, + "%%DIMENSION_NAMES%%", + jsonMapper.writeValueAsString(dimensionsOrder) + ); + final String taskID = indexer.submitTask(template); + indexer.waitUntilTaskCompletes(taskID); + ITRetryUtil.retryUntilTrue( + () -> coordinator.areSegmentsLoaded(fullDatasourceName), + "Segment Compaction" + ); + // Verify compacted data + // Compacted data only have one segments. First segment have the following rows: + // The ordering of the columns will be "dimA", "dimB", "dimC" + List> segmentRows = ImmutableList.of( + Arrays.asList(1442016000000L, null, "X", "A", 1, 1), + Arrays.asList(1442016000000L, "C", "F", null, 1, 1), + Arrays.asList(1442016000000L, "C", "J", null, 1, 1), + Arrays.asList(1442016000000L, "H", "T", null, 1, 1), + Arrays.asList(1442016000000L, "H", "X", null, 3, 3), + Arrays.asList(1442016000000L, "H", "Z", null, 1, 1), + Arrays.asList(1442016000000L, "J", "R", null, 1, 1), + Arrays.asList(1442016000000L, "Z", "S", null, 1, 1) + ); + verifyCompactedData(segmentRows); + } + } + + @Test + public void testCompactionPerfectRollUpWithNonLexicographicDimensionSpec() throws Exception + { + try (final Closeable ignored = unloader(fullDatasourceName)) { + // Load and verify initial data + loadAndVerifyDataWithSparseColumn(fullDatasourceName); + // Compaction with perfect roll up. Rolls with "X", "H" (for the first and second columns respectively) should be roll up + String template = getResourceAsString(COMPACTION_TASK_WITH_DIMENSION); + template = StringUtils.replace(template, "%%DATASOURCE%%", fullDatasourceName); + // + List dimensionsOrder = ImmutableList.of("dimC", "dimB", "dimA"); + template = StringUtils.replace( + template, + "%%DIMENSION_NAMES%%", + jsonMapper.writeValueAsString(dimensionsOrder) + ); + final String taskID = indexer.submitTask(template); + indexer.waitUntilTaskCompletes(taskID); + ITRetryUtil.retryUntilTrue( + () -> coordinator.areSegmentsLoaded(fullDatasourceName), + "Segment Compaction" + ); + // Verify compacted data + // Compacted data only have one segments. First segment have the following rows: + // The ordering of the columns will be "dimC", "dimB", "dimA" + List> segment1Rows = ImmutableList.of( + Arrays.asList(1442016000000L, null, "F", "C", 1, 1), + Arrays.asList(1442016000000L, null, "J", "C", 1, 1), + Arrays.asList(1442016000000L, null, "R", "J", 1, 1), + Arrays.asList(1442016000000L, null, "S", "Z", 1, 1), + Arrays.asList(1442016000000L, null, "T", "H", 1, 1), + Arrays.asList(1442016000000L, null, "X", "H", 3, 3), + Arrays.asList(1442016000000L, null, "Z", "H", 1, 1), + Arrays.asList(1442016000000L, "A", "X", null, 1, 1) + ); + verifyCompactedData(segment1Rows); + } + } + + private void loadAndVerifyDataWithSparseColumn(String fullDatasourceName) throws Exception + { + loadData(INDEX_TASK, fullDatasourceName); + List>>> expectedResultBeforeCompaction = new ArrayList<>(); + // First segments have the following rows: + List> segment1Rows = ImmutableList.of( + ImmutableList.of(1442016000000L, "F", "C", 1, 1), + ImmutableList.of(1442016000000L, "J", "C", 1, 1), + ImmutableList.of(1442016000000L, "X", "H", 1, 1) + ); + expectedResultBeforeCompaction.add(ImmutableMap.of("events", segment1Rows)); + // Second segments have the following rows: + List> segment2Rows = ImmutableList.of( + ImmutableList.of(1442016000000L, "S", "Z", 1, 1), + ImmutableList.of(1442016000000L, "X", "H", 1, 1), + ImmutableList.of(1442016000000L, "Z", "H", 1, 1) + ); + expectedResultBeforeCompaction.add(ImmutableMap.of("events", segment2Rows)); + // Third segments have the following rows: + List> segment3Rows = ImmutableList.of( + ImmutableList.of(1442016000000L, "R", "J", 1, 1), + ImmutableList.of(1442016000000L, "T", "H", 1, 1), + ImmutableList.of(1442016000000L, "X", "H", 1, 1) + ); + expectedResultBeforeCompaction.add(ImmutableMap.of("events", segment3Rows)); + // Fourth segments have the following rows: + List> segment4Rows = ImmutableList.of( + ImmutableList.of(1442016000000L, "X", "A", 1, 1) + ); + expectedResultBeforeCompaction.add(ImmutableMap.of("events", segment4Rows)); + verifyQueryResult(expectedResultBeforeCompaction, 10, 10, 1); + } + + private void verifyCompactedData(List> segmentRows) throws Exception + { + List>>> expectedResultAfterCompaction = new ArrayList<>(); + expectedResultAfterCompaction.add(ImmutableMap.of("events", segmentRows)); + verifyQueryResult(expectedResultAfterCompaction, 8, 10, 0.8); + } + + private void verifyQueryResult( + List>>> expectedScanResult, + int expectedNumRoll, + int expectedSumCount, + double expectedRollupRatio + ) throws Exception + { + InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(COMPACTION_QUERIES_RESOURCE); + String queryResponseTemplate = IOUtils.toString(is, StandardCharsets.UTF_8); + queryResponseTemplate = StringUtils.replace( + queryResponseTemplate, + "%%DATASOURCE%%", + fullDatasourceName + ); + queryResponseTemplate = StringUtils.replace( + queryResponseTemplate, + "%%EXPECTED_SCAN_RESULT%%", + jsonMapper.writeValueAsString(expectedScanResult) + ); + queryResponseTemplate = StringUtils.replace( + queryResponseTemplate, + "%%EXPECTED_SUM_COUNT%%", + jsonMapper.writeValueAsString(expectedSumCount) + ); + queryResponseTemplate = StringUtils.replace( + queryResponseTemplate, + "%%EXPECTED_ROLLUP_RATIO%%", + jsonMapper.writeValueAsString(expectedRollupRatio) + ); + queryResponseTemplate = StringUtils.replace( + queryResponseTemplate, + "%%EXPECTED_NUM_ROW%%", + jsonMapper.writeValueAsString(expectedNumRoll) + ); + queryHelper.testQueriesFromString(queryResponseTemplate); + } +} diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java index 0559d6edbaa..798332270bd 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java @@ -101,7 +101,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest public void testCompactionWithQueryGranularityInGranularitySpec() throws Exception { try (final Closeable ignored = unloader(fullDatasourceName)) { - loadData(INDEX_TASK); + loadData(INDEX_TASK, fullDatasourceName); // 4 segments across 2 days checkNumberOfSegments(4); List expectedIntervalAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName); @@ -140,7 +140,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest public void testCompactionWithSegmentGranularityAndQueryGranularityInGranularitySpec() throws Exception { try (final Closeable ignored = unloader(fullDatasourceName)) { - loadData(INDEX_TASK); + loadData(INDEX_TASK, fullDatasourceName); // 4 segments across 2 days checkNumberOfSegments(4); List expectedIntervalAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName); @@ -182,7 +182,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest ) throws Exception { try (final Closeable ignored = unloader(fullDatasourceName)) { - loadData(indexTask); + loadData(indexTask, fullDatasourceName); // 4 segments across 2 days checkNumberOfSegments(4); List expectedIntervalAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName); @@ -211,20 +211,6 @@ public class ITCompactionTaskTest extends AbstractIndexerTest } } - private void loadData(String indexTask) throws Exception - { - String taskSpec = getResourceAsString(indexTask); - taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName); - final String taskID = indexer.submitTask(taskSpec); - LOG.info("TaskID for loading index task %s", taskID); - indexer.waitUntilTaskCompletes(taskID); - - ITRetryUtil.retryUntilTrue( - () -> coordinator.areSegmentsLoaded(fullDatasourceName), - "Segment Load" - ); - } - private void compactData(String compactionResource, GranularityType newSegmentGranularity, GranularityType newQueryGranularity) throws Exception { String template = getResourceAsString(compactionResource); diff --git a/integration-tests/src/test/resources/indexer/sparse_column_index_queries.json b/integration-tests/src/test/resources/indexer/sparse_column_index_queries.json new file mode 100644 index 00000000000..193e69fbc9c --- /dev/null +++ b/integration-tests/src/test/resources/indexer/sparse_column_index_queries.json @@ -0,0 +1,80 @@ +[ + { + "description": "timeseries, 1 agg, all", + "query":{ + "queryType" : "timeBoundary", + "dataSource": "%%DATASOURCE%%" + }, + "expectedResults":[ + { + "timestamp" : "2015-09-12T00:00:00.000Z", + "result" : { + "minTime" : "2015-09-12T00:00:00.000Z", + "maxTime" : "2015-09-12T00:00:00.000Z" + } + } + ] + }, + { + "description": "scan, all", + "query": { + "queryType": "scan", + "dataSource": "%%DATASOURCE%%", + "intervals": [ + "2013-01-01/2020-01-02" + ], + "resultFormat":"compactedList" + }, + "expectedResults": %%EXPECTED_SCAN_RESULT%%, + "fieldsToTest": ["events"] + }, + { + "description": "roll up ratio", + "query": { + "queryType":"timeseries", + "dataSource":{ + "type":"table", + "name":"%%DATASOURCE%%" + }, + "intervals":{ + "type":"intervals", + "intervals":[ + "2013-01-01/2020-01-02" + ] + }, + "granularity":{ + "type":"all" + }, + "aggregations":[ + { + "type":"count", + "name":"a0" + }, + { + "type":"longSum", + "name":"a1", + "fieldName":"count", + "expression":null + } + ], + "postAggregations":[ + { + "type":"expression", + "name":"p0", + "expression":"((\"a0\" * 1.00) / \"a1\")", + "ordering":null + } + ] + }, + "expectedResults": [ + { + "timestamp" : "2015-09-12T00:00:00.000Z", + "result" : { + "a1" : %%EXPECTED_SUM_COUNT%%, + "p0" : %%EXPECTED_ROLLUP_RATIO%%, + "a0" : %%EXPECTED_NUM_ROW%% + } + } + ] + } +] \ No newline at end of file diff --git a/integration-tests/src/test/resources/indexer/sparse_column_index_task.json b/integration-tests/src/test/resources/indexer/sparse_column_index_task.json new file mode 100644 index 00000000000..3a21a856ac6 --- /dev/null +++ b/integration-tests/src/test/resources/indexer/sparse_column_index_task.json @@ -0,0 +1,57 @@ +{ + "type": "index_parallel", + "spec": { + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "inline", + "data": "{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"C\",\"dimB\":\"F\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"C\",\"dimB\":\"J\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"X\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"Z\",\"dimB\":\"S\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"X\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"Z\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"J\",\"dimB\":\"R\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"T\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"X\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimC\":\"A\",\"dimB\":\"X\",\"metA\":1}\n" + }, + "inputFormat": { + "type": "json" + } + }, + "tuningConfig": { + "type": "index_parallel", + "partitionsSpec": { + "type": "dynamic", + "maxRowsPerSegment": 3, + "maxTotalRows": 3 + }, + "maxRowsInMemory": 3 + }, + "dataSchema": { + "dataSource": "%%DATASOURCE%%", + "timestampSpec": { + "column": "time", + "format": "iso" + }, + "dimensionsSpec": { + "dimensions": [ + "dimB", + "dimA", + "dimC", + "dimD", + "dimE", + "dimF" + ] + }, + "granularitySpec": { + "queryGranularity": "hour", + "rollup": true, + "segmentGranularity": "hour" + }, + "metricsSpec": [ + { + "name": "count", + "type": "count" + }, + { + "name": "sum_metA", + "type": "longSum", + "fieldName": "metA" + } + ] + } + } +} \ No newline at end of file diff --git a/integration-tests/src/test/resources/indexer/sparse_column_with_dim_compaction_task.json b/integration-tests/src/test/resources/indexer/sparse_column_with_dim_compaction_task.json new file mode 100644 index 00000000000..9416a3f6bda --- /dev/null +++ b/integration-tests/src/test/resources/indexer/sparse_column_with_dim_compaction_task.json @@ -0,0 +1,19 @@ +{ + "type": "compact", + "dataSource": "%%DATASOURCE%%", + "dimensionsSpec": { + "dimensions": %%DIMENSION_NAMES%% + }, + "interval": "2010-10-29T05:00:00Z/2030-10-29T06:00:00Z", + "tuningConfig": { + "type": "index_parallel", + "maxRowsPerSegment": 3, + "maxRowsInMemory": 3, + "maxNumConcurrentSubTasks": 2, + "partitionsSpec": { + "type": "hashed", + "numShards": 1 + }, + "forceGuaranteedRollup": true + } +} \ No newline at end of file diff --git a/integration-tests/src/test/resources/indexer/sparse_column_without_dim_compaction_task.json b/integration-tests/src/test/resources/indexer/sparse_column_without_dim_compaction_task.json new file mode 100644 index 00000000000..a149d7a2512 --- /dev/null +++ b/integration-tests/src/test/resources/indexer/sparse_column_without_dim_compaction_task.json @@ -0,0 +1,16 @@ +{ + "type": "compact", + "dataSource": "%%DATASOURCE%%", + "interval": "2010-10-29T05:00:00Z/2030-10-29T06:00:00Z", + "tuningConfig": { + "type": "index_parallel", + "maxRowsPerSegment": 3, + "maxRowsInMemory": 3, + "maxNumConcurrentSubTasks": 2, + "partitionsSpec": { + "type": "hashed", + "numShards": 1 + }, + "forceGuaranteedRollup": true + } +} \ No newline at end of file diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMerger.java b/processing/src/main/java/org/apache/druid/segment/IndexMerger.java index f9fba3fc554..3876bf1ebbc 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexMerger.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexMerger.java @@ -28,6 +28,7 @@ import com.google.common.collect.PeekingIterator; import com.google.inject.ImplementedBy; import org.apache.druid.common.config.NullHandling; import org.apache.druid.common.utils.SerializerUtils; +import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.java.util.common.ByteBufferUtils; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; @@ -38,6 +39,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.segment.data.Indexed; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; +import org.apache.druid.utils.CollectionUtils; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -65,9 +67,12 @@ public interface IndexMerger int INVALID_ROW = -1; int UNLIMITED_MAX_COLUMNS_TO_MERGE = -1; - static List getMergedDimensionsFromQueryableIndexes(List indexes) + static List getMergedDimensionsFromQueryableIndexes( + List indexes, + @Nullable DimensionsSpec dimensionsSpec + ) { - return getMergedDimensions(toIndexableAdapters(indexes)); + return getMergedDimensions(toIndexableAdapters(indexes), dimensionsSpec); } static List toIndexableAdapters(List indexes) @@ -75,14 +80,18 @@ public interface IndexMerger return indexes.stream().map(QueryableIndexIndexableAdapter::new).collect(Collectors.toList()); } - static List getMergedDimensions(List indexes) + static List getMergedDimensions( + List indexes, + @Nullable DimensionsSpec dimensionsSpec + ) { if (indexes.size() == 0) { return ImmutableList.of(); } - List commonDimOrder = getLongestSharedDimOrder(indexes); + List commonDimOrder = getLongestSharedDimOrder(indexes, dimensionsSpec); if (commonDimOrder == null) { - log.warn("Indexes have incompatible dimension orders, using lexicographic order."); + log.warn("Indexes have incompatible dimension orders and there is no valid dimension ordering" + + " in the ingestionSpec, using lexicographic order."); return getLexicographicMergedDimensions(indexes); } else { return commonDimOrder; @@ -90,7 +99,10 @@ public interface IndexMerger } @Nullable - static List getLongestSharedDimOrder(List indexes) + static List getLongestSharedDimOrder( + List indexes, + @Nullable DimensionsSpec dimensionsSpec + ) { int maxSize = 0; Iterable orderingCandidate = null; @@ -106,6 +118,41 @@ public interface IndexMerger return null; } + if (isDimensionOrderingValid(indexes, orderingCandidate)) { + return ImmutableList.copyOf(orderingCandidate); + } else { + log.info("Indexes have incompatible dimension orders, try falling back on dimension ordering from ingestionSpec"); + // Check if there is a valid dimension ordering in the ingestionSpec to fall back on + if (dimensionsSpec == null || CollectionUtils.isNullOrEmpty(dimensionsSpec.getDimensionNames())) { + log.info("Cannot fall back on dimension ordering from ingestionSpec as it does not exist"); + return null; + } + List candidate = new ArrayList<>(dimensionsSpec.getDimensionNames()); + // Remove all dimensions that does not exist within the indexes from the candidate + Set allValidDimensions = indexes.stream() + .flatMap(indexableAdapter -> indexableAdapter.getDimensionNames().stream()) + .collect(Collectors.toSet()); + candidate.retainAll(allValidDimensions); + // Sanity check that there is no extra/missing columns + if (candidate.size() != allValidDimensions.size()) { + log.error("Dimension mismatched between ingestionSpec and indexes. ingestionSpec[%s] indexes[%s]", + candidate, + allValidDimensions); + return null; + } + + // Sanity check that all indexes dimension ordering is the same as the ordering in candidate + if (!isDimensionOrderingValid(indexes, candidate)) { + log.error("Dimension from ingestionSpec has invalid ordering"); + return null; + } + log.info("Dimension ordering from ingestionSpec is valid. Fall back on dimension ordering [%s]", candidate); + return candidate; + } + } + + static boolean isDimensionOrderingValid(List indexes, Iterable orderingCandidate) + { for (IndexableAdapter index : indexes) { Iterator candidateIter = orderingCandidate.iterator(); for (String matchDim : index.getDimensionNames()) { @@ -118,11 +165,11 @@ public interface IndexMerger } } if (!matched) { - return null; + return false; } } } - return ImmutableList.copyOf(orderingCandidate); + return true; } static List getLexicographicMergedDimensions(List indexes) @@ -205,6 +252,18 @@ public interface IndexMerger List indexes, boolean rollup, AggregatorFactory[] metricAggs, + @Nullable DimensionsSpec dimensionsSpec, + File outDir, + IndexSpec indexSpec, + @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, + int maxColumnsToMerge + ) throws IOException; + + File mergeQueryableIndex( + List indexes, + boolean rollup, + AggregatorFactory[] metricAggs, + @Nullable DimensionsSpec dimensionsSpec, File outDir, IndexSpec indexSpec, ProgressIndicator progress, diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java index 02618daf929..c5191d34e7d 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java @@ -28,6 +28,7 @@ import com.google.common.io.Files; import com.google.common.primitives.Ints; import com.google.inject.Inject; import org.apache.druid.common.config.NullHandling; +import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.io.ZeroCopyByteArrayOutputStream; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.FileUtils; @@ -875,6 +876,7 @@ public class IndexMergerV9 implements IndexMerger // while merging a single iterable false, index.getMetricAggs(), + null, outDir, indexSpec, progress, @@ -898,6 +900,31 @@ public class IndexMergerV9 implements IndexMerger indexes, rollup, metricAggs, + null, + outDir, + indexSpec, + segmentWriteOutMediumFactory, + maxColumnsToMerge + ); + } + + @Override + public File mergeQueryableIndex( + List indexes, + boolean rollup, + final AggregatorFactory[] metricAggs, + @Nullable DimensionsSpec dimensionsSpec, + File outDir, + IndexSpec indexSpec, + @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, + int maxColumnsToMerge + ) throws IOException + { + return mergeQueryableIndex( + indexes, + rollup, + metricAggs, + dimensionsSpec, outDir, indexSpec, new BaseProgressIndicator(), @@ -911,6 +938,7 @@ public class IndexMergerV9 implements IndexMerger List indexes, boolean rollup, final AggregatorFactory[] metricAggs, + @Nullable DimensionsSpec dimensionsSpec, File outDir, IndexSpec indexSpec, ProgressIndicator progress, @@ -922,6 +950,7 @@ public class IndexMergerV9 implements IndexMerger IndexMerger.toIndexableAdapters(indexes), rollup, metricAggs, + dimensionsSpec, outDir, indexSpec, progress, @@ -940,13 +969,14 @@ public class IndexMergerV9 implements IndexMerger int maxColumnsToMerge ) throws IOException { - return multiphaseMerge(indexes, rollup, metricAggs, outDir, indexSpec, new BaseProgressIndicator(), null, maxColumnsToMerge); + return multiphaseMerge(indexes, rollup, metricAggs, null, outDir, indexSpec, new BaseProgressIndicator(), null, maxColumnsToMerge); } private File multiphaseMerge( List indexes, final boolean rollup, final AggregatorFactory[] metricAggs, + @Nullable DimensionsSpec dimensionsSpec, File outDir, IndexSpec indexSpec, ProgressIndicator progress, @@ -964,6 +994,7 @@ public class IndexMergerV9 implements IndexMerger indexes, rollup, metricAggs, + dimensionsSpec, outDir, indexSpec, progress, @@ -997,6 +1028,7 @@ public class IndexMergerV9 implements IndexMerger phase, rollup, metricAggs, + dimensionsSpec, phaseOutDir, indexSpec, progress, @@ -1087,13 +1119,14 @@ public class IndexMergerV9 implements IndexMerger List indexes, final boolean rollup, final AggregatorFactory[] metricAggs, + @Nullable DimensionsSpec dimensionsSpec, File outDir, IndexSpec indexSpec, ProgressIndicator progress, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory ) throws IOException { - final List mergedDimensions = IndexMerger.getMergedDimensions(indexes); + final List mergedDimensions = IndexMerger.getMergedDimensions(indexes, dimensionsSpec); final List mergedMetrics = IndexMerger.mergeIndexed( indexes.stream().map(IndexableAdapter::getMetricNames).collect(Collectors.toList()) @@ -1196,7 +1229,7 @@ public class IndexMergerV9 implements IndexMerger FileUtils.deleteDirectory(outDir); org.apache.commons.io.FileUtils.forceMkdir(outDir); - final List mergedDimensions = IndexMerger.getMergedDimensions(indexes); + final List mergedDimensions = IndexMerger.getMergedDimensions(indexes, null); final List mergedMetrics = IndexMerger.mergeIndexed( indexes.stream().map(IndexableAdapter::getMetricNames).collect(Collectors.toList()) diff --git a/processing/src/test/java/org/apache/druid/segment/IndexMergerLongestSharedDimOrderTest.java b/processing/src/test/java/org/apache/druid/segment/IndexMergerLongestSharedDimOrderTest.java new file mode 100644 index 00000000000..2bb84c0eeb9 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerLongestSharedDimOrderTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.collections.bitmap.BitmapFactory; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.data.ListIndexed; +import org.joda.time.Interval; +import org.joda.time.chrono.ISOChronology; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.List; + +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class IndexMergerLongestSharedDimOrderTest +{ + @Mock + Supplier mockSupplier; + + @Mock + ColumnHolder mockColumnHolder; + + @Mock + SmooshedFileMapper mockSmooshedFileMapper; + + @Mock + BitmapFactory mockBitmapFactory; + + @Before + public void setUp() + { + when(mockSupplier.get()).thenReturn(mockColumnHolder); + // This value does not matter + when(mockColumnHolder.getLength()).thenReturn(1); + } + + @Test + public void testGetLongestSharedDimOrderWithNullDimensionSpecAndEmptyIndex() + { + List actual = IndexMerger.getLongestSharedDimOrder(ImmutableList.of(), null); + Assert.assertNull(actual); + } + + @Test + public void testGetLongestSharedDimOrderWithNullDimensionSpecAndValidOrdering() + { + QueryableIndexIndexableAdapter index1 = makeIndexWithDimensionList(ImmutableList.of("a", "b", "c")); + QueryableIndexIndexableAdapter index2 = makeIndexWithDimensionList(ImmutableList.of("b", "c")); + List actual = IndexMerger.getLongestSharedDimOrder(ImmutableList.of(index1, index2), null); + Assert.assertNotNull(actual); + Assert.assertEquals(ImmutableList.of("a", "b", "c"), actual); + + // Valid ordering as although second index has gap, it is still same ordering + index1 = makeIndexWithDimensionList(ImmutableList.of("a", "b", "c")); + index2 = makeIndexWithDimensionList(ImmutableList.of("a", "c")); + actual = IndexMerger.getLongestSharedDimOrder(ImmutableList.of(index1, index2), null); + Assert.assertNotNull(actual); + Assert.assertEquals(ImmutableList.of("a", "b", "c"), actual); + } + + @Test + public void testGetLongestSharedDimOrderWithNullDimensionSpecAndNoValidOrdering() + { + // No valid ordering as no index as all three dimensions + QueryableIndexIndexableAdapter index1 = makeIndexWithDimensionList(ImmutableList.of("a", "b")); + QueryableIndexIndexableAdapter index2 = makeIndexWithDimensionList(ImmutableList.of("b", "c")); + List actual = IndexMerger.getLongestSharedDimOrder(ImmutableList.of(index1, index2), null); + Assert.assertNull(actual); + + // No valid ordering as ordering is not the same in all indexes + index1 = makeIndexWithDimensionList(ImmutableList.of("a", "b", "c")); + index2 = makeIndexWithDimensionList(ImmutableList.of("c", "b")); + actual = IndexMerger.getLongestSharedDimOrder(ImmutableList.of(index1, index2), null); + Assert.assertNull(actual); + } + + + @Test + public void testGetLongestSharedDimOrderWithSchemalessDimensionSpecAndNoValidOrdering() + { + DimensionsSpec empty = new DimensionsSpec(ImmutableList.of()); + // No valid ordering as no index as all three dimensions + QueryableIndexIndexableAdapter index1 = makeIndexWithDimensionList(ImmutableList.of("a", "b")); + QueryableIndexIndexableAdapter index2 = makeIndexWithDimensionList(ImmutableList.of("b", "c")); + List actual = IndexMerger.getLongestSharedDimOrder(ImmutableList.of(index1, index2), empty); + Assert.assertNull(actual); + + // No valid ordering as ordering is not the same in all indexes + index1 = makeIndexWithDimensionList(ImmutableList.of("a", "b", "c")); + index2 = makeIndexWithDimensionList(ImmutableList.of("c", "b")); + actual = IndexMerger.getLongestSharedDimOrder(ImmutableList.of(index1, index2), empty); + Assert.assertNull(actual); + } + + @Test + public void testGetLongestSharedDimOrderWithValidSchemaDimensionSpecAndNoValidOrdering() + { + DimensionsSpec valid = new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("a", "b", "c"))); + // No valid ordering as no index has all three dimensions + QueryableIndexIndexableAdapter index1 = makeIndexWithDimensionList(ImmutableList.of("a", "b")); + QueryableIndexIndexableAdapter index2 = makeIndexWithDimensionList(ImmutableList.of("b", "c")); + List actual = IndexMerger.getLongestSharedDimOrder(ImmutableList.of(index1, index2), valid); + Assert.assertNotNull(actual); + Assert.assertEquals(ImmutableList.of("a", "b", "c"), actual); + } + + @Test + public void testGetLongestSharedDimOrderWithInvalidSchemaDimensionSpecAndNoValidOrdering() + { + DimensionsSpec valid = new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("a", "b", "c"))); + // No valid ordering as ordering is not the same in all indexes + QueryableIndexIndexableAdapter index1 = makeIndexWithDimensionList(ImmutableList.of("a", "b", "c")); + QueryableIndexIndexableAdapter index2 = makeIndexWithDimensionList(ImmutableList.of("c", "b")); + List actual = IndexMerger.getLongestSharedDimOrder(ImmutableList.of(index1, index2), valid); + // Since ordering of index2 is not the same as the ordering of the schema in DimensionSpec + Assert.assertNull(actual); + } + + @Test + public void testGetLongestSharedDimOrderWithValidSchemaDimensionSpecAndInvalidOrdering() + { + DimensionsSpec valid = new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("a", "b", "c"))); + // No valid ordering as ordering is not the same in all indexes + QueryableIndexIndexableAdapter index1 = makeIndexWithDimensionList(ImmutableList.of("a", "b", "c")); + QueryableIndexIndexableAdapter index2 = makeIndexWithDimensionList(ImmutableList.of("c", "b", "e")); + List actual = IndexMerger.getLongestSharedDimOrder(ImmutableList.of(index1, index2), valid); + // Since index2 has dimension that is not in the schema in DimensionSpec. This should not be possible. + Assert.assertNull(actual); + } + + private QueryableIndexIndexableAdapter makeIndexWithDimensionList(List dimensions) + { + return new QueryableIndexIndexableAdapter( + new SimpleQueryableIndex( + new Interval("2012-01-01/2012-01-02", ISOChronology.getInstance(DateTimes.inferTzFromString("America/Los_Angeles"))), + new ListIndexed<>(dimensions), + mockBitmapFactory, + ImmutableMap.of(ColumnHolder.TIME_COLUMN_NAME, mockSupplier), + mockSmooshedFileMapper, + null, + true + ) + ); + } +} + + diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index c7b58792a14..6f98817030e 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -824,6 +824,7 @@ public class AppenderatorImpl implements Appenderator indexes, schema.getGranularitySpec().isRollup(), schema.getAggregators(), + schema.getDimensionsSpec(), mergedTarget, tuningConfig.getIndexSpec(), tuningConfig.getSegmentWriteOutMediumFactory(), @@ -848,7 +849,7 @@ public class AppenderatorImpl implements Appenderator // semantics. () -> dataSegmentPusher.push( mergedFile, - sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes)), + sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes, schema.getDimensionsSpec())), useUniquePath ), exception -> exception instanceof Exception, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java index 00cef0d7ac9..369709882a7 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java @@ -30,6 +30,7 @@ import com.google.inject.Provider; import org.apache.druid.client.cache.Cache; import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.client.cache.CachePopulatorStats; +import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.guice.annotations.Processing; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexing.worker.config.WorkerConfig; @@ -520,6 +521,30 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, int maxColumnsToMerge ) + { + return mergeQueryableIndex( + indexes, + rollup, + metricAggs, + null, + outDir, + indexSpec, + segmentWriteOutMediumFactory, + maxColumnsToMerge + ); + } + + @Override + public File mergeQueryableIndex( + List indexes, + boolean rollup, + AggregatorFactory[] metricAggs, + @Nullable DimensionsSpec dimensionsSpec, + File outDir, + IndexSpec indexSpec, + @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, + int maxColumnsToMerge + ) { ListenableFuture mergeFuture = mergeExecutor.submit( new Callable() @@ -532,6 +557,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager indexes, rollup, metricAggs, + dimensionsSpec, outDir, indexSpec, segmentWriteOutMediumFactory, @@ -656,6 +682,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager List indexes, boolean rollup, AggregatorFactory[] metricAggs, + @Nullable DimensionsSpec dimensionsSpec, File outDir, IndexSpec indexSpec, ProgressIndicator progress, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java index 4ecab39cc93..578f3f27b32 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java @@ -456,7 +456,7 @@ public class RealtimePlumber implements Plumber DataSegment segment = dataSegmentPusher.push( mergedFile, - sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes)), + sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes, schema.getDimensionsSpec())), false ); log.info("Inserting [%s] to the metadata store", sink.getSegment().getId());