fix bug in nested v4 format merger from refactoring (#14053)

This commit is contained in:
Clint Wylie 2023-04-10 20:38:58 -07:00 committed by GitHub
parent 00d777d848
commit d61bd7f8f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 37 additions and 14 deletions

View File

@ -35,6 +35,7 @@ import org.apache.druid.segment.data.CloseableIndexed;
import org.apache.druid.segment.data.ImmutableBitmapValues; import org.apache.druid.segment.data.ImmutableBitmapValues;
import org.apache.druid.segment.data.IndexedIterable; import org.apache.druid.segment.data.IndexedIterable;
import org.apache.druid.segment.nested.NestedCommonFormatColumn; import org.apache.druid.segment.nested.NestedCommonFormatColumn;
import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
import org.apache.druid.segment.nested.SortedValueDictionary; import org.apache.druid.segment.nested.SortedValueDictionary;
import org.apache.druid.segment.selector.settable.SettableColumnValueSelector; import org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
import org.apache.druid.segment.selector.settable.SettableLongColumnValueSelector; import org.apache.druid.segment.selector.settable.SettableLongColumnValueSelector;
@ -174,7 +175,9 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
if (columnHolder == null) { if (columnHolder == null) {
return null; return null;
} }
if (!(columnHolder.getColumnFormat() instanceof NestedCommonFormatColumn.Format)) { final ColumnFormat format = columnHolder.getColumnFormat();
if (!(format instanceof NestedCommonFormatColumn.Format
|| format instanceof NestedDataComplexTypeSerde.NestedColumnFormatV4)) {
return null; return null;
} }

View File

@ -40,10 +40,10 @@ import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.expression.TestExprMacroTable; import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.segment.AutoTypeColumnSchema;
import org.apache.druid.segment.IncrementalIndexSegment; import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.IndexBuilder; import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.NestedDataDimensionSchema;
import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment; import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.TestHelper;
@ -96,11 +96,11 @@ public class NestedDataTestUtils
DimensionsSpec.builder() DimensionsSpec.builder()
.setDimensions( .setDimensions(
Arrays.asList( Arrays.asList(
new AutoTypeColumnSchema("dim"), new NestedDataDimensionSchema("dim"),
new AutoTypeColumnSchema("nest_json"), new NestedDataDimensionSchema("nest_json"),
new AutoTypeColumnSchema("nester_json"), new NestedDataDimensionSchema("nester_json"),
new AutoTypeColumnSchema("variant_json"), new NestedDataDimensionSchema("variant_json"),
new AutoTypeColumnSchema("list_json") new NestedDataDimensionSchema("list_json")
) )
) )
.build(); .build();

View File

@ -53,6 +53,7 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
@ -222,15 +223,34 @@ public class IndexBuilder
Preconditions.checkNotNull(indexMerger, "indexMerger"); Preconditions.checkNotNull(indexMerger, "indexMerger");
Preconditions.checkNotNull(tmpDir, "tmpDir"); Preconditions.checkNotNull(tmpDir, "tmpDir");
try (final IncrementalIndex incrementalIndex = buildIncrementalIndex()) { try (final IncrementalIndex incrementalIndex = buildIncrementalIndex()) {
List<IndexableAdapter> adapters = Collections.singletonList(
new QueryableIndexIndexableAdapter(
indexIO.loadIndex(
indexMerger.persist(
incrementalIndex,
new File(
tmpDir,
StringUtils.format("testIndex-%s", ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE))
),
indexSpec,
null
)
)
)
);
// Do a 'merge' of the persisted segment even though there is only one; this time it will be reading from the
// queryable index instead of the incremental index, which also mimics the behavior of real ingestion tasks
// which persist incremental indexes as intermediate segments and then merges all the intermediate segments to
// publish
return indexIO.loadIndex( return indexIO.loadIndex(
indexMerger.persist( indexMerger.merge(
incrementalIndex, adapters,
new File( schema.isRollup(),
tmpDir, schema.getMetrics(),
StringUtils.format("testIndex-%s", ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE)) tmpDir,
), schema.getDimensionsSpec(),
indexSpec, indexSpec,
null Integer.MAX_VALUE
) )
); );
} }