Place __time in signatures according to sort order. (#16958)

* Place __time in signatures according to sort order.

Updates a variety of places to put __time in row signatures according
to its position in the sort order, rather than always first, including:

- InputSourceSampler.
- ScanQueryEngine (in the default signature when "columns" is empty).
- Various StorageAdapters, which also have the effect of reordering
  the column order in segmentMetadata queries, and therefore in SQL
  schemas as well.

Follow-up to #16849.

* Fix compilation.

* Additional fixes.

* Fix.

* Fix style.

* Omit nonexistent columns from the row signature.

* Fix tests.
This commit is contained in:
Gian Merlino 2024-08-26 21:45:51 -07:00 committed by GitHub
parent 7ee7e194c4
commit 5d2ed33b89
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 211 additions and 83 deletions

View File

@ -279,8 +279,8 @@ public class MSQReplaceTest extends MSQTestBase
public void testReplaceOnFooWithAllClusteredByDimExplicitSort(String contextName, Map<String, Object> context)
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("__time", ColumnType.LONG)
.add("m1", ColumnType.FLOAT)
.build();
@ -323,12 +323,12 @@ public class MSQReplaceTest extends MSQTestBase
.setExpectedShardSpec(DimensionRangeShardSpec.class)
.setExpectedResultRows(
ImmutableList.of(
new Object[]{946684800000L, NullHandling.sqlCompatible() ? "" : null, 1.0f},
new Object[]{978307200000L, "1", 4.0f},
new Object[]{946771200000L, "10.1", 2.0f},
new Object[]{946857600000L, "2", 3.0f},
new Object[]{978480000000L, "abc", 6.0f},
new Object[]{978393600000L, "def", 5.0f}
new Object[]{NullHandling.sqlCompatible() ? "" : null, 946684800000L, 1.0f},
new Object[]{"1", 978307200000L, 4.0f},
new Object[]{"10.1", 946771200000L, 2.0f},
new Object[]{"2", 946857600000L, 3.0f},
new Object[]{"abc", 978480000000L, 6.0f},
new Object[]{"def", 978393600000L, 5.0f}
)
)
.setExpectedSegmentGenerationProgressCountersForStageWorker(
@ -365,8 +365,8 @@ public class MSQReplaceTest extends MSQTestBase
// forceSegmentSortByTime = false. (Same expectations as the prior test,
// testReplaceOnFooWithAllClusteredByDimExplicitSort.)
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("__time", ColumnType.LONG)
.add("m1", ColumnType.FLOAT)
.build();
@ -409,12 +409,12 @@ public class MSQReplaceTest extends MSQTestBase
.setExpectedShardSpec(DimensionRangeShardSpec.class)
.setExpectedResultRows(
ImmutableList.of(
new Object[]{946684800000L, NullHandling.sqlCompatible() ? "" : null, 1.0f},
new Object[]{978307200000L, "1", 4.0f},
new Object[]{946771200000L, "10.1", 2.0f},
new Object[]{946857600000L, "2", 3.0f},
new Object[]{978480000000L, "abc", 6.0f},
new Object[]{978393600000L, "def", 5.0f}
new Object[]{NullHandling.sqlCompatible() ? "" : null, 946684800000L, 1.0f},
new Object[]{"1", 978307200000L, 4.0f},
new Object[]{"10.1", 946771200000L, 2.0f},
new Object[]{"2", 946857600000L, 3.0f},
new Object[]{"abc", 978480000000L, 6.0f},
new Object[]{"def", 978393600000L, 5.0f}
)
)
.setExpectedSegmentGenerationProgressCountersForStageWorker(

View File

@ -141,6 +141,7 @@ import org.apache.druid.segment.Segment;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.loading.DataSegmentPusher;
@ -1288,17 +1289,20 @@ public class MSQTestBase extends BaseCalciteQueryTest
Assert.assertEquals(expectedDestinationIntervals, destination.getReplaceTimeChunks());
}
if (expectedSegments != null) {
final int timeIndex =
MSQResultsReport.ColumnAndType.toRowSignature(expectedRowSignature)
.indexOf(ColumnHolder.TIME_COLUMN_NAME);
Assert.assertEquals(expectedSegments, segmentIdVsOutputRowsMap.keySet());
for (Object[] row : transformedOutputRows) {
List<SegmentId> diskSegmentList = segmentIdVsOutputRowsMap.keySet()
.stream()
.filter(segmentId -> segmentId.getInterval()
.contains((Long) row[0]))
.filter(segmentId -> {
List<List<Object>> lists = segmentIdVsOutputRowsMap.get(segmentId);
return lists.contains(Arrays.asList(row));
})
.collect(Collectors.toList());
List<SegmentId> diskSegmentList = segmentIdVsOutputRowsMap
.keySet()
.stream()
.filter(segmentId -> segmentId.getInterval().contains((Long) row[timeIndex]))
.filter(segmentId -> {
List<List<Object>> lists = segmentIdVsOutputRowsMap.get(segmentId);
return lists.contains(Arrays.asList(row));
})
.collect(Collectors.toList());
if (diskSegmentList.size() != 1) {
throw new IllegalStateException("Single key in multiple partitions");
}

View File

@ -241,14 +241,12 @@ public class InputSourceSampler
List<DimensionSchema> physicalDimensionSchemas = new ArrayList<>();
RowSignature.Builder signatureBuilder = RowSignature.builder();
signatureBuilder.add(
ColumnHolder.TIME_COLUMN_NAME,
index.getColumnCapabilities(ColumnHolder.TIME_COLUMN_NAME).toColumnType()
);
for (IncrementalIndex.DimensionDesc dimensionDesc : index.getDimensions()) {
if (!SamplerInputRow.SAMPLER_ORDERING_COLUMN.equals(dimensionDesc.getName())) {
final ColumnType columnType = dimensionDesc.getCapabilities().toColumnType();
signatureBuilder.add(dimensionDesc.getName(), columnType);
for (final String dimensionName : index.getDimensionNames(true)) {
if (ColumnHolder.TIME_COLUMN_NAME.equals(dimensionName)) {
signatureBuilder.addTimeColumn();
} else if (!SamplerInputRow.SAMPLER_ORDERING_COLUMN.equals(dimensionName)) {
final IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(dimensionName);
signatureBuilder.add(dimensionDesc.getName(), ColumnType.fromCapabilities(dimensionDesc.getCapabilities()));
// use explicitly specified dimension schema if it exists
if (dataSchema != null &&
dataSchema.getDimensionsSpec() != null &&
@ -271,7 +269,7 @@ public class InputSourceSampler
if (!SamplerInputRow.SAMPLER_ORDERING_COLUMN.equals(aggregatorFactory.getName())) {
signatureBuilder.add(
aggregatorFactory.getName(),
index.getColumnCapabilities(aggregatorFactory.getName()).toColumnType()
ColumnType.fromCapabilities(index.getColumnCapabilities(aggregatorFactory.getName()))
);
}
}

View File

@ -45,6 +45,7 @@ import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.timeline.SegmentId;
@ -98,13 +99,11 @@ public class ScanQueryEngine
} else {
final Set<String> availableColumns = Sets.newLinkedHashSet(
Iterables.concat(
Collections.singleton(ColumnHolder.TIME_COLUMN_NAME),
adapter.getRowSignature().getColumnNames(),
Iterables.transform(
Arrays.asList(query.getVirtualColumns().getVirtualColumns()),
VirtualColumn::getOutputName
),
adapter.getAvailableDimensions(),
adapter.getAvailableMetrics()
)
)
);
@ -152,11 +151,7 @@ public class ScanQueryEngine
for (String column : allColumns) {
final BaseObjectColumnValueSelector selector = factory.makeColumnValueSelector(column);
ColumnCapabilities columnCapabilities = factory.getColumnCapabilities(column);
rowSignatureBuilder.add(
column,
columnCapabilities == null ? null : columnCapabilities.toColumnType()
);
rowSignatureBuilder.add(column, ColumnType.fromCapabilities(columnCapabilities));
columnSelectors.add(selector);
}

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.filter.AndFilter;
import org.joda.time.Interval;
@ -69,6 +70,12 @@ public class FilteredStorageAdapter implements StorageAdapter
return baseStorageAdapter.getInterval();
}
@Override
public RowSignature getRowSignature()
{
return baseStorageAdapter.getRowSignature();
}
@Override
public Indexed<String> getAvailableDimensions()
{

View File

@ -19,11 +19,14 @@
package org.apache.druid.segment;
import org.apache.druid.query.OrderBy;
import org.apache.druid.segment.column.BaseColumn;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnIndexSupplier;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.DictionaryEncodedColumn;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.index.semantic.DictionaryEncodedStringValueIndex;
import org.joda.time.DateTime;
@ -55,6 +58,40 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
return index.getDataInterval();
}
@Override
public RowSignature getRowSignature()
{
final LinkedHashSet<String> columns = new LinkedHashSet<>();
for (final OrderBy orderBy : index.getOrdering()) {
columns.add(orderBy.getColumnName());
}
// Add __time after the defined ordering, if __time wasn't part of it.
columns.add(ColumnHolder.TIME_COLUMN_NAME);
for (final String dimension : getAvailableDimensions()) {
columns.add(dimension);
}
for (final String metric : getAvailableMetrics()) {
columns.add(metric);
}
final RowSignature.Builder builder = RowSignature.builder();
for (final String column : columns) {
final ColumnType columnType = ColumnType.fromCapabilities(index.getColumnCapabilities(column));
// index.getOrdering() may include columns that don't exist, such as if they were omitted due to
// being 100% nulls. Don't add those to the row signature.
if (columnType != null) {
builder.add(column, columnType);
}
}
return builder.build();
}
@Override
public Indexed<String> getAvailableDimensions()
{

View File

@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.OrderBy;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.vector.VectorCursor;
@ -34,7 +35,6 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Optional;
/**
*
@ -140,10 +140,7 @@ public interface StorageAdapter extends CursorFactory, ColumnInspector, CursorHo
builder.addTimeColumn();
for (final String column : Iterables.concat(getAvailableDimensions(), getAvailableMetrics())) {
builder.add(
column,
Optional.ofNullable(getColumnCapabilities(column)).map(ColumnCapabilities::toColumnType).orElse(null)
);
builder.add(column, ColumnType.fromCapabilities(getColumnCapabilities(column)));
}
return builder.build();

View File

@ -35,6 +35,8 @@ import org.apache.druid.query.filter.NullFilter;
import org.apache.druid.query.filter.RangeFilter;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.TypeSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.Indexed;
@ -169,6 +171,22 @@ public class UnnestStorageAdapter implements StorageAdapter
return baseAdapter.getInterval();
}
@Override
public RowSignature getRowSignature()
{
final RowSignature.Builder builder = RowSignature.builder();
final RowSignature baseSignature = baseAdapter.getRowSignature();
for (int i = 0; i < baseSignature.size(); i++) {
final String column = baseSignature.getColumnName(i);
if (!outputColumnName.equals(column)) {
builder.add(column, ColumnType.fromCapabilities(getColumnCapabilities(column)));
}
}
return builder.add(outputColumnName, ColumnType.fromCapabilities(getColumnCapabilities(outputColumnName))).build();
}
@Override
public Indexed<String> getAvailableDimensions()
{

View File

@ -124,6 +124,12 @@ public class ColumnType extends BaseTypeSignature<ValueType>
return Types.fromString(ColumnTypeFactory.getInstance(), typeName);
}
@Nullable
public static ColumnType fromCapabilities(@Nullable ColumnCapabilities capabilities)
{
return capabilities != null ? capabilities.toColumnType() : null;
}
public static ColumnType ofArray(ColumnType elementType)
{
return ColumnTypeFactory.getInstance().ofArray(elementType);

View File

@ -19,6 +19,7 @@
package org.apache.druid.segment.incremental;
import com.google.common.collect.Iterables;
import org.apache.druid.segment.CursorBuildSpec;
import org.apache.druid.segment.CursorHolder;
import org.apache.druid.segment.DimensionDictionarySelector;
@ -30,6 +31,7 @@ import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.data.ListIndexed;
import org.joda.time.Interval;
@ -122,6 +124,18 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
return index.getInterval();
}
@Override
public RowSignature getRowSignature()
{
final RowSignature.Builder builder = RowSignature.builder();
for (final String column : Iterables.concat(index.getDimensionNames(true), index.getMetricNames())) {
builder.add(column, ColumnType.fromCapabilities(index.getColumnCapabilities(column)));
}
return builder.build();
}
@Override
public Indexed<String> getAvailableDimensions()
{

View File

@ -33,6 +33,8 @@ import org.apache.druid.segment.Metadata;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.data.ListIndexed;
import org.apache.druid.segment.filter.Filters;
@ -99,6 +101,24 @@ public class HashJoinSegmentStorageAdapter implements StorageAdapter
return baseAdapter.getInterval();
}
@Override
public RowSignature getRowSignature()
{
// Use a Set since we may encounter duplicates, if a field from a Joinable shadows one of the base fields.
final LinkedHashSet<String> columns = new LinkedHashSet<>(baseAdapter.getRowSignature().getColumnNames());
for (final JoinableClause clause : clauses) {
columns.addAll(clause.getAvailableColumnsPrefixed());
}
final RowSignature.Builder builder = RowSignature.builder();
for (final String column : columns) {
builder.add(column, ColumnType.fromCapabilities(getColumnCapabilities(column)));
}
return builder.build();
}
@Override
public Indexed<String> getAvailableDimensions()
{

View File

@ -39,7 +39,6 @@ import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.SimpleAscendingOffset;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.BaseColumn;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.data.ReadableOffset;
@ -88,12 +87,7 @@ public class BroadcastSegmentIndexedTable implements IndexedTable
segment.getId()
);
RowSignature.Builder sigBuilder = RowSignature.builder();
sigBuilder.add(ColumnHolder.TIME_COLUMN_NAME, ColumnType.LONG);
for (String column : queryableIndex.getColumnNames()) {
sigBuilder.add(column, adapter.getColumnCapabilities(column).toColumnType());
}
this.rowSignature = sigBuilder.build();
this.rowSignature = adapter.getRowSignature();
// initialize keycolumn index builders
final ArrayList<RowBasedIndexBuilder> indexBuilders = new ArrayList<>(rowSignature.size());

View File

@ -30,13 +30,11 @@ import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.frame.write.RowBasedFrameWriter;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.RowBasedColumnSelectorFactory;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
public class KeyTestUtils
{
@ -56,10 +54,10 @@ public class KeyTestUtils
final RowSignature.Builder builder = RowSignature.builder();
for (final KeyColumn keyColumn : keyColumns) {
final ColumnCapabilities capabilities = inspector.getColumnCapabilities(keyColumn.columnName());
final ColumnType columnType =
Optional.ofNullable(capabilities).map(ColumnCapabilities::toColumnType).orElse(null);
builder.add(keyColumn.columnName(), columnType);
builder.add(
keyColumn.columnName(),
ColumnType.fromCapabilities(inspector.getColumnCapabilities(keyColumn.columnName()))
);
}
return builder.build();

View File

@ -32,6 +32,8 @@ import org.apache.druid.segment.CursorHolder;
import org.apache.druid.segment.DimensionDictionarySelector;
import org.apache.druid.segment.RowBasedStorageAdapter;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.timeline.SegmentId;
import org.hamcrest.CoreMatchers;
@ -119,6 +121,18 @@ public class LookupSegmentTest
Assert.assertNull(LOOKUP_SEGMENT.asQueryableIndex());
}
@Test
public void test_asStorageAdapter_getRowSignature()
{
Assert.assertEquals(
RowSignature.builder()
.add("k", ColumnType.STRING)
.add("v", ColumnType.STRING)
.build(),
LOOKUP_SEGMENT.asStorageAdapter().getRowSignature()
);
}
@Test
public void test_asStorageAdapter_getAvailableDimensions()
{

View File

@ -255,12 +255,12 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
Assert.assertEquals(6, index.getNumRows());
Assert.assertEquals(
ImmutableList.of(
ImmutableList.of(timestamp, "1", "2", 1L),
ImmutableList.of(timestamp, "1", "2", 1L),
ImmutableList.of(timestamp + 1, "1", "2", 1L),
ImmutableList.of(timestamp, "3", "4", 1L),
ImmutableList.of(timestamp, "3", "4", 1L),
ImmutableList.of(timestamp + 1, "3", "4", 1L)
ImmutableList.of("1", "2", timestamp, 1L),
ImmutableList.of("1", "2", timestamp, 1L),
ImmutableList.of("1", "2", timestamp + 1, 1L),
ImmutableList.of("3", "4", timestamp, 1L),
ImmutableList.of("3", "4", timestamp, 1L),
ImmutableList.of("3", "4", timestamp + 1, 1L)
),
FrameTestUtil.readRowsFromAdapter(new QueryableIndexStorageAdapter(index), null, false).toList()
);
@ -325,10 +325,10 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
Assert.assertEquals(4, index.getNumRows());
Assert.assertEquals(
ImmutableList.of(
ImmutableList.of(timestamp, "1", "2", 2L),
ImmutableList.of(timestamp + 1, "1", "2", 1L),
ImmutableList.of(timestamp, "3", "4", 2L),
ImmutableList.of(timestamp + 1, "3", "4", 1L)
ImmutableList.of("1", "2", timestamp, 2L),
ImmutableList.of("1", "2", timestamp + 1, 1L),
ImmutableList.of("3", "4", timestamp, 2L),
ImmutableList.of("3", "4", timestamp + 1, 1L)
),
FrameTestUtil.readRowsFromAdapter(new QueryableIndexStorageAdapter(index), null, false).toList()
);

View File

@ -58,6 +58,31 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
);
}
@Test
public void test_getRowSignature_factToCountry()
{
Assert.assertEquals(
ImmutableList.of(
"__time",
"channel",
"regionIsoCode",
"countryNumber",
"countryIsoCode",
"user",
"isRobot",
"isAnonymous",
"namespace",
"page",
"delta",
"channel_uniques",
"c1.countryNumber",
"c1.countryIsoCode",
"c1.countryName"
),
Lists.newArrayList(makeFactToCountrySegment().getRowSignature().getColumnNames())
);
}
@Test
public void test_getAvailableDimensions_factToCountry()
{

View File

@ -43,6 +43,7 @@ import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.SimpleAscendingOffset;
import org.apache.druid.segment.TestIndex;
@ -141,9 +142,8 @@ public class BroadcastSegmentIndexedTableTest extends InitializedNullHandlingTes
segment.getTotalSpace()
);
backingSegment = (QueryableIndexSegment) factory.factorize(dataSegment, segment, false, SegmentLazyLoadFailCallback.NOOP);
columnNames = ImmutableList.<String>builder().add(ColumnHolder.TIME_COLUMN_NAME)
.addAll(backingSegment.asQueryableIndex().getColumnNames()).build();
columnNames =
new QueryableIndexStorageAdapter(backingSegment.asQueryableIndex()).getRowSignature().getColumnNames();
broadcastTable = new BroadcastSegmentIndexedTable(backingSegment, keyColumns, dataSegment.getVersion());
}

View File

@ -24,6 +24,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
@ -37,7 +38,6 @@ import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.column.ColumnFormat;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
@ -92,6 +92,7 @@ public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
private final LinkedHashSet<String> dimOrder = new LinkedHashSet<>();
// columns excluding current index (the in-memory fire hydrant), includes __time column
@GuardedBy("hydrantLock")
private final LinkedHashSet<String> columnsExcludingCurrIndex = new LinkedHashSet<>();
// column types for columns in {@code columnsExcludingCurrIndex}
@ -397,6 +398,7 @@ public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
/**
* Merge the column from the index with the existing columns.
*/
@GuardedBy("hydrantLock")
private void overwriteIndexDimensions(StorageAdapter storageAdapter)
{
RowSignature rowSignature = storageAdapter.getRowSignature();
@ -414,20 +416,19 @@ public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
synchronized (hydrantLock) {
RowSignature.Builder builder = RowSignature.builder();
builder.addTimeColumn();
// Add columns from columnsExcludingCurrIndex.
for (String dim : columnsExcludingCurrIndex) {
if (!ColumnHolder.TIME_COLUMN_NAME.equals(dim)) {
builder.add(dim, columnTypeExcludingCurrIndex.get(dim));
}
builder.add(dim, columnTypeExcludingCurrIndex.get(dim));
}
IncrementalIndexStorageAdapter incrementalIndexStorageAdapter = new IncrementalIndexStorageAdapter(currHydrant.getIndex());
RowSignature incrementalIndexSignature = incrementalIndexStorageAdapter.getRowSignature();
// Add columns from the currHydrant that do not yet exist in columnsExcludingCurrIndex.
IncrementalIndexStorageAdapter currStorageAdapter =
new IncrementalIndexStorageAdapter(currHydrant.getIndex());
RowSignature currSignature = currStorageAdapter.getRowSignature();
for (String dim : incrementalIndexSignature.getColumnNames()) {
if (!columnsExcludingCurrIndex.contains(dim) && !ColumnHolder.TIME_COLUMN_NAME.equals(dim)) {
builder.add(dim, incrementalIndexSignature.getColumnType(dim).orElse(null));
for (String dim : currSignature.getColumnNames()) {
if (!columnsExcludingCurrIndex.contains(dim)) {
builder.add(dim, currSignature.getColumnType(dim).orElse(null));
}
}