diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java index 96da93c34d0..e470a951877 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java @@ -279,8 +279,8 @@ public class MSQReplaceTest extends MSQTestBase public void testReplaceOnFooWithAllClusteredByDimExplicitSort(String contextName, Map context) { RowSignature rowSignature = RowSignature.builder() - .add("__time", ColumnType.LONG) .add("dim1", ColumnType.STRING) + .add("__time", ColumnType.LONG) .add("m1", ColumnType.FLOAT) .build(); @@ -323,12 +323,12 @@ public class MSQReplaceTest extends MSQTestBase .setExpectedShardSpec(DimensionRangeShardSpec.class) .setExpectedResultRows( ImmutableList.of( - new Object[]{946684800000L, NullHandling.sqlCompatible() ? "" : null, 1.0f}, - new Object[]{978307200000L, "1", 4.0f}, - new Object[]{946771200000L, "10.1", 2.0f}, - new Object[]{946857600000L, "2", 3.0f}, - new Object[]{978480000000L, "abc", 6.0f}, - new Object[]{978393600000L, "def", 5.0f} + new Object[]{NullHandling.sqlCompatible() ? "" : null, 946684800000L, 1.0f}, + new Object[]{"1", 978307200000L, 4.0f}, + new Object[]{"10.1", 946771200000L, 2.0f}, + new Object[]{"2", 946857600000L, 3.0f}, + new Object[]{"abc", 978480000000L, 6.0f}, + new Object[]{"def", 978393600000L, 5.0f} ) ) .setExpectedSegmentGenerationProgressCountersForStageWorker( @@ -365,8 +365,8 @@ public class MSQReplaceTest extends MSQTestBase // forceSegmentSortByTime = false. (Same expectations as the prior test, // testReplaceOnFooWithAllClusteredByDimExplicitSort.) RowSignature rowSignature = RowSignature.builder() - .add("__time", ColumnType.LONG) .add("dim1", ColumnType.STRING) + .add("__time", ColumnType.LONG) .add("m1", ColumnType.FLOAT) .build(); @@ -409,12 +409,12 @@ public class MSQReplaceTest extends MSQTestBase .setExpectedShardSpec(DimensionRangeShardSpec.class) .setExpectedResultRows( ImmutableList.of( - new Object[]{946684800000L, NullHandling.sqlCompatible() ? "" : null, 1.0f}, - new Object[]{978307200000L, "1", 4.0f}, - new Object[]{946771200000L, "10.1", 2.0f}, - new Object[]{946857600000L, "2", 3.0f}, - new Object[]{978480000000L, "abc", 6.0f}, - new Object[]{978393600000L, "def", 5.0f} + new Object[]{NullHandling.sqlCompatible() ? "" : null, 946684800000L, 1.0f}, + new Object[]{"1", 978307200000L, 4.0f}, + new Object[]{"10.1", 946771200000L, 2.0f}, + new Object[]{"2", 946857600000L, 3.0f}, + new Object[]{"abc", 978480000000L, 6.0f}, + new Object[]{"def", 978393600000L, 5.0f} ) ) .setExpectedSegmentGenerationProgressCountersForStageWorker( diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 5076e9f3d44..258b5c97751 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -141,6 +141,7 @@ import org.apache.druid.segment.Segment; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.TestIndex; import org.apache.druid.segment.column.ColumnConfig; +import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.loading.DataSegmentPusher; @@ -1288,17 +1289,20 @@ public class MSQTestBase extends BaseCalciteQueryTest Assert.assertEquals(expectedDestinationIntervals, destination.getReplaceTimeChunks()); } if (expectedSegments != null) { + final int timeIndex = + MSQResultsReport.ColumnAndType.toRowSignature(expectedRowSignature) + .indexOf(ColumnHolder.TIME_COLUMN_NAME); Assert.assertEquals(expectedSegments, segmentIdVsOutputRowsMap.keySet()); for (Object[] row : transformedOutputRows) { - List diskSegmentList = segmentIdVsOutputRowsMap.keySet() - .stream() - .filter(segmentId -> segmentId.getInterval() - .contains((Long) row[0])) - .filter(segmentId -> { - List> lists = segmentIdVsOutputRowsMap.get(segmentId); - return lists.contains(Arrays.asList(row)); - }) - .collect(Collectors.toList()); + List diskSegmentList = segmentIdVsOutputRowsMap + .keySet() + .stream() + .filter(segmentId -> segmentId.getInterval().contains((Long) row[timeIndex])) + .filter(segmentId -> { + List> lists = segmentIdVsOutputRowsMap.get(segmentId); + return lists.contains(Arrays.asList(row)); + }) + .collect(Collectors.toList()); if (diskSegmentList.size() != 1) { throw new IllegalStateException("Single key in multiple partitions"); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java index 4e5e8634f18..f98287124ed 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java @@ -241,14 +241,12 @@ public class InputSourceSampler List physicalDimensionSchemas = new ArrayList<>(); RowSignature.Builder signatureBuilder = RowSignature.builder(); - signatureBuilder.add( - ColumnHolder.TIME_COLUMN_NAME, - index.getColumnCapabilities(ColumnHolder.TIME_COLUMN_NAME).toColumnType() - ); - for (IncrementalIndex.DimensionDesc dimensionDesc : index.getDimensions()) { - if (!SamplerInputRow.SAMPLER_ORDERING_COLUMN.equals(dimensionDesc.getName())) { - final ColumnType columnType = dimensionDesc.getCapabilities().toColumnType(); - signatureBuilder.add(dimensionDesc.getName(), columnType); + for (final String dimensionName : index.getDimensionNames(true)) { + if (ColumnHolder.TIME_COLUMN_NAME.equals(dimensionName)) { + signatureBuilder.addTimeColumn(); + } else if (!SamplerInputRow.SAMPLER_ORDERING_COLUMN.equals(dimensionName)) { + final IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(dimensionName); + signatureBuilder.add(dimensionDesc.getName(), ColumnType.fromCapabilities(dimensionDesc.getCapabilities())); // use explicitly specified dimension schema if it exists if (dataSchema != null && dataSchema.getDimensionsSpec() != null && @@ -271,7 +269,7 @@ public class InputSourceSampler if (!SamplerInputRow.SAMPLER_ORDERING_COLUMN.equals(aggregatorFactory.getName())) { signatureBuilder.add( aggregatorFactory.getName(), - index.getColumnCapabilities(aggregatorFactory.getName()).toColumnType() + ColumnType.fromCapabilities(index.getColumnCapabilities(aggregatorFactory.getName())) ); } } diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java index ca2c2725c6f..9dd72d15fa4 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java @@ -45,6 +45,7 @@ import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.VirtualColumn; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.filter.Filters; import org.apache.druid.timeline.SegmentId; @@ -98,13 +99,11 @@ public class ScanQueryEngine } else { final Set availableColumns = Sets.newLinkedHashSet( Iterables.concat( - Collections.singleton(ColumnHolder.TIME_COLUMN_NAME), + adapter.getRowSignature().getColumnNames(), Iterables.transform( Arrays.asList(query.getVirtualColumns().getVirtualColumns()), VirtualColumn::getOutputName - ), - adapter.getAvailableDimensions(), - adapter.getAvailableMetrics() + ) ) ); @@ -152,11 +151,7 @@ public class ScanQueryEngine for (String column : allColumns) { final BaseObjectColumnValueSelector selector = factory.makeColumnValueSelector(column); ColumnCapabilities columnCapabilities = factory.getColumnCapabilities(column); - rowSignatureBuilder.add( - column, - columnCapabilities == null ? null : columnCapabilities.toColumnType() - ); - + rowSignatureBuilder.add(column, ColumnType.fromCapabilities(columnCapabilities)); columnSelectors.add(selector); } diff --git a/processing/src/main/java/org/apache/druid/segment/FilteredStorageAdapter.java b/processing/src/main/java/org/apache/druid/segment/FilteredStorageAdapter.java index 3fcfe2dd78a..46b9f5fe061 100644 --- a/processing/src/main/java/org/apache/druid/segment/FilteredStorageAdapter.java +++ b/processing/src/main/java/org/apache/druid/segment/FilteredStorageAdapter.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.filter.Filter; import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.data.Indexed; import org.apache.druid.segment.filter.AndFilter; import org.joda.time.Interval; @@ -69,6 +70,12 @@ public class FilteredStorageAdapter implements StorageAdapter return baseStorageAdapter.getInterval(); } + @Override + public RowSignature getRowSignature() + { + return baseStorageAdapter.getRowSignature(); + } + @Override public Indexed getAvailableDimensions() { diff --git a/processing/src/main/java/org/apache/druid/segment/QueryableIndexStorageAdapter.java b/processing/src/main/java/org/apache/druid/segment/QueryableIndexStorageAdapter.java index e2c88071607..a0e23e71f74 100644 --- a/processing/src/main/java/org/apache/druid/segment/QueryableIndexStorageAdapter.java +++ b/processing/src/main/java/org/apache/druid/segment/QueryableIndexStorageAdapter.java @@ -19,11 +19,14 @@ package org.apache.druid.segment; +import org.apache.druid.query.OrderBy; import org.apache.druid.segment.column.BaseColumn; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnIndexSupplier; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.DictionaryEncodedColumn; +import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.data.Indexed; import org.apache.druid.segment.index.semantic.DictionaryEncodedStringValueIndex; import org.joda.time.DateTime; @@ -55,6 +58,40 @@ public class QueryableIndexStorageAdapter implements StorageAdapter return index.getDataInterval(); } + @Override + public RowSignature getRowSignature() + { + final LinkedHashSet columns = new LinkedHashSet<>(); + + for (final OrderBy orderBy : index.getOrdering()) { + columns.add(orderBy.getColumnName()); + } + + // Add __time after the defined ordering, if __time wasn't part of it. + columns.add(ColumnHolder.TIME_COLUMN_NAME); + + for (final String dimension : getAvailableDimensions()) { + columns.add(dimension); + } + + for (final String metric : getAvailableMetrics()) { + columns.add(metric); + } + + final RowSignature.Builder builder = RowSignature.builder(); + for (final String column : columns) { + final ColumnType columnType = ColumnType.fromCapabilities(index.getColumnCapabilities(column)); + + // index.getOrdering() may include columns that don't exist, such as if they were omitted due to + // being 100% nulls. Don't add those to the row signature. + if (columnType != null) { + builder.add(column, columnType); + } + } + + return builder.build(); + } + @Override public Indexed getAvailableDimensions() { diff --git a/processing/src/main/java/org/apache/druid/segment/StorageAdapter.java b/processing/src/main/java/org/apache/druid/segment/StorageAdapter.java index 3ad6c930002..c0949692e4f 100644 --- a/processing/src/main/java/org/apache/druid/segment/StorageAdapter.java +++ b/processing/src/main/java/org/apache/druid/segment/StorageAdapter.java @@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.OrderBy; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.data.Indexed; import org.apache.druid.segment.vector.VectorCursor; @@ -34,7 +35,6 @@ import org.joda.time.Interval; import javax.annotation.Nullable; import java.util.List; -import java.util.Optional; /** * @@ -140,10 +140,7 @@ public interface StorageAdapter extends CursorFactory, ColumnInspector, CursorHo builder.addTimeColumn(); for (final String column : Iterables.concat(getAvailableDimensions(), getAvailableMetrics())) { - builder.add( - column, - Optional.ofNullable(getColumnCapabilities(column)).map(ColumnCapabilities::toColumnType).orElse(null) - ); + builder.add(column, ColumnType.fromCapabilities(getColumnCapabilities(column))); } return builder.build(); diff --git a/processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java b/processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java index 435771b4780..2f9552a1d3c 100644 --- a/processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java +++ b/processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java @@ -35,6 +35,8 @@ import org.apache.druid.query.filter.NullFilter; import org.apache.druid.query.filter.RangeFilter; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnCapabilitiesImpl; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.TypeSignature; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.data.Indexed; @@ -169,6 +171,22 @@ public class UnnestStorageAdapter implements StorageAdapter return baseAdapter.getInterval(); } + @Override + public RowSignature getRowSignature() + { + final RowSignature.Builder builder = RowSignature.builder(); + + final RowSignature baseSignature = baseAdapter.getRowSignature(); + for (int i = 0; i < baseSignature.size(); i++) { + final String column = baseSignature.getColumnName(i); + if (!outputColumnName.equals(column)) { + builder.add(column, ColumnType.fromCapabilities(getColumnCapabilities(column))); + } + } + + return builder.add(outputColumnName, ColumnType.fromCapabilities(getColumnCapabilities(outputColumnName))).build(); + } + @Override public Indexed getAvailableDimensions() { diff --git a/processing/src/main/java/org/apache/druid/segment/column/ColumnType.java b/processing/src/main/java/org/apache/druid/segment/column/ColumnType.java index dbfed07749c..b670d8f4370 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/ColumnType.java +++ b/processing/src/main/java/org/apache/druid/segment/column/ColumnType.java @@ -124,6 +124,12 @@ public class ColumnType extends BaseTypeSignature return Types.fromString(ColumnTypeFactory.getInstance(), typeName); } + @Nullable + public static ColumnType fromCapabilities(@Nullable ColumnCapabilities capabilities) + { + return capabilities != null ? capabilities.toColumnType() : null; + } + public static ColumnType ofArray(ColumnType elementType) { return ColumnTypeFactory.getInstance().ofArray(elementType); diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java index ab92a00897b..c9a6d209697 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -19,6 +19,7 @@ package org.apache.druid.segment.incremental; +import com.google.common.collect.Iterables; import org.apache.druid.segment.CursorBuildSpec; import org.apache.druid.segment.CursorHolder; import org.apache.druid.segment.DimensionDictionarySelector; @@ -30,6 +31,7 @@ import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnCapabilitiesImpl; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.data.Indexed; import org.apache.druid.segment.data.ListIndexed; import org.joda.time.Interval; @@ -122,6 +124,18 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter return index.getInterval(); } + @Override + public RowSignature getRowSignature() + { + final RowSignature.Builder builder = RowSignature.builder(); + + for (final String column : Iterables.concat(index.getDimensionNames(true), index.getMetricNames())) { + builder.add(column, ColumnType.fromCapabilities(index.getColumnCapabilities(column))); + } + + return builder.build(); + } + @Override public Indexed getAvailableDimensions() { diff --git a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java index f3668ca45e1..765ab4fbd91 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java +++ b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java @@ -33,6 +33,8 @@ import org.apache.druid.segment.Metadata; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.data.Indexed; import org.apache.druid.segment.data.ListIndexed; import org.apache.druid.segment.filter.Filters; @@ -99,6 +101,24 @@ public class HashJoinSegmentStorageAdapter implements StorageAdapter return baseAdapter.getInterval(); } + @Override + public RowSignature getRowSignature() + { + // Use a Set since we may encounter duplicates, if a field from a Joinable shadows one of the base fields. + final LinkedHashSet columns = new LinkedHashSet<>(baseAdapter.getRowSignature().getColumnNames()); + + for (final JoinableClause clause : clauses) { + columns.addAll(clause.getAvailableColumnsPrefixed()); + } + + final RowSignature.Builder builder = RowSignature.builder(); + for (final String column : columns) { + builder.add(column, ColumnType.fromCapabilities(getColumnCapabilities(column))); + } + + return builder.build(); + } + @Override public Indexed getAvailableDimensions() { diff --git a/processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java b/processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java index 9591dc315f0..5fe6d8a698a 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java +++ b/processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java @@ -39,7 +39,6 @@ import org.apache.druid.segment.QueryableIndexStorageAdapter; import org.apache.druid.segment.SimpleAscendingOffset; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.BaseColumn; -import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.data.ReadableOffset; @@ -88,12 +87,7 @@ public class BroadcastSegmentIndexedTable implements IndexedTable segment.getId() ); - RowSignature.Builder sigBuilder = RowSignature.builder(); - sigBuilder.add(ColumnHolder.TIME_COLUMN_NAME, ColumnType.LONG); - for (String column : queryableIndex.getColumnNames()) { - sigBuilder.add(column, adapter.getColumnCapabilities(column).toColumnType()); - } - this.rowSignature = sigBuilder.build(); + this.rowSignature = adapter.getRowSignature(); // initialize keycolumn index builders final ArrayList indexBuilders = new ArrayList<>(rowSignature.size()); diff --git a/processing/src/test/java/org/apache/druid/frame/key/KeyTestUtils.java b/processing/src/test/java/org/apache/druid/frame/key/KeyTestUtils.java index e5ab3c203f4..cae9439a331 100644 --- a/processing/src/test/java/org/apache/druid/frame/key/KeyTestUtils.java +++ b/processing/src/test/java/org/apache/druid/frame/key/KeyTestUtils.java @@ -30,13 +30,11 @@ import org.apache.druid.frame.write.FrameWriters; import org.apache.druid.frame.write.RowBasedFrameWriter; import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.RowBasedColumnSelectorFactory; -import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import java.util.Collections; import java.util.List; -import java.util.Optional; public class KeyTestUtils { @@ -56,10 +54,10 @@ public class KeyTestUtils final RowSignature.Builder builder = RowSignature.builder(); for (final KeyColumn keyColumn : keyColumns) { - final ColumnCapabilities capabilities = inspector.getColumnCapabilities(keyColumn.columnName()); - final ColumnType columnType = - Optional.ofNullable(capabilities).map(ColumnCapabilities::toColumnType).orElse(null); - builder.add(keyColumn.columnName(), columnType); + builder.add( + keyColumn.columnName(), + ColumnType.fromCapabilities(inspector.getColumnCapabilities(keyColumn.columnName())) + ); } return builder.build(); diff --git a/processing/src/test/java/org/apache/druid/query/lookup/LookupSegmentTest.java b/processing/src/test/java/org/apache/druid/query/lookup/LookupSegmentTest.java index 5826301bcd7..142b23b8391 100644 --- a/processing/src/test/java/org/apache/druid/query/lookup/LookupSegmentTest.java +++ b/processing/src/test/java/org/apache/druid/query/lookup/LookupSegmentTest.java @@ -32,6 +32,8 @@ import org.apache.druid.segment.CursorHolder; import org.apache.druid.segment.DimensionDictionarySelector; import org.apache.druid.segment.RowBasedStorageAdapter; import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.ValueType; import org.apache.druid.timeline.SegmentId; import org.hamcrest.CoreMatchers; @@ -119,6 +121,18 @@ public class LookupSegmentTest Assert.assertNull(LOOKUP_SEGMENT.asQueryableIndex()); } + @Test + public void test_asStorageAdapter_getRowSignature() + { + Assert.assertEquals( + RowSignature.builder() + .add("k", ColumnType.STRING) + .add("v", ColumnType.STRING) + .build(), + LOOKUP_SEGMENT.asStorageAdapter().getRowSignature() + ); + } + @Test public void test_asStorageAdapter_getAvailableDimensions() { diff --git a/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java b/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java index 1f46534266f..b8c9721b5a0 100644 --- a/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java +++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java @@ -255,12 +255,12 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest Assert.assertEquals(6, index.getNumRows()); Assert.assertEquals( ImmutableList.of( - ImmutableList.of(timestamp, "1", "2", 1L), - ImmutableList.of(timestamp, "1", "2", 1L), - ImmutableList.of(timestamp + 1, "1", "2", 1L), - ImmutableList.of(timestamp, "3", "4", 1L), - ImmutableList.of(timestamp, "3", "4", 1L), - ImmutableList.of(timestamp + 1, "3", "4", 1L) + ImmutableList.of("1", "2", timestamp, 1L), + ImmutableList.of("1", "2", timestamp, 1L), + ImmutableList.of("1", "2", timestamp + 1, 1L), + ImmutableList.of("3", "4", timestamp, 1L), + ImmutableList.of("3", "4", timestamp, 1L), + ImmutableList.of("3", "4", timestamp + 1, 1L) ), FrameTestUtil.readRowsFromAdapter(new QueryableIndexStorageAdapter(index), null, false).toList() ); @@ -325,10 +325,10 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest Assert.assertEquals(4, index.getNumRows()); Assert.assertEquals( ImmutableList.of( - ImmutableList.of(timestamp, "1", "2", 2L), - ImmutableList.of(timestamp + 1, "1", "2", 1L), - ImmutableList.of(timestamp, "3", "4", 2L), - ImmutableList.of(timestamp + 1, "3", "4", 1L) + ImmutableList.of("1", "2", timestamp, 2L), + ImmutableList.of("1", "2", timestamp + 1, 1L), + ImmutableList.of("3", "4", timestamp, 2L), + ImmutableList.of("3", "4", timestamp + 1, 1L) ), FrameTestUtil.readRowsFromAdapter(new QueryableIndexStorageAdapter(index), null, false).toList() ); diff --git a/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapterTest.java b/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapterTest.java index b26da1ceba7..7ae04d9fcb1 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapterTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapterTest.java @@ -58,6 +58,31 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag ); } + @Test + public void test_getRowSignature_factToCountry() + { + Assert.assertEquals( + ImmutableList.of( + "__time", + "channel", + "regionIsoCode", + "countryNumber", + "countryIsoCode", + "user", + "isRobot", + "isAnonymous", + "namespace", + "page", + "delta", + "channel_uniques", + "c1.countryNumber", + "c1.countryIsoCode", + "c1.countryName" + ), + Lists.newArrayList(makeFactToCountrySegment().getRowSignature().getColumnNames()) + ); + } + @Test public void test_getAvailableDimensions_factToCountry() { diff --git a/processing/src/test/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTableTest.java b/processing/src/test/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTableTest.java index 73daadc7dd1..9b8f7f4d916 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTableTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTableTest.java @@ -43,6 +43,7 @@ import org.apache.druid.segment.IndexMerger; import org.apache.druid.segment.IndexMergerV9; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.QueryableIndexSegment; +import org.apache.druid.segment.QueryableIndexStorageAdapter; import org.apache.druid.segment.SegmentLazyLoadFailCallback; import org.apache.druid.segment.SimpleAscendingOffset; import org.apache.druid.segment.TestIndex; @@ -141,9 +142,8 @@ public class BroadcastSegmentIndexedTableTest extends InitializedNullHandlingTes segment.getTotalSpace() ); backingSegment = (QueryableIndexSegment) factory.factorize(dataSegment, segment, false, SegmentLazyLoadFailCallback.NOOP); - - columnNames = ImmutableList.builder().add(ColumnHolder.TIME_COLUMN_NAME) - .addAll(backingSegment.asQueryableIndex().getColumnNames()).build(); + columnNames = + new QueryableIndexStorageAdapter(backingSegment.asQueryableIndex()).getRowSignature().getColumnNames(); broadcastTable = new BroadcastSegmentIndexedTable(backingSegment, keyColumns, dataSegment.getVersion()); } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/sink/Sink.java b/server/src/main/java/org/apache/druid/segment/realtime/sink/Sink.java index 8d4fc0dd7f8..5e59c0e9b15 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/sink/Sink.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/sink/Sink.java @@ -24,6 +24,7 @@ import com.google.common.base.Predicate; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import com.google.errorprone.annotations.concurrent.GuardedBy; import org.apache.druid.data.input.InputRow; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; @@ -37,7 +38,6 @@ import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.column.ColumnFormat; -import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.incremental.AppendableIndexSpec; @@ -92,6 +92,7 @@ public class Sink implements Iterable, Overshadowable private final LinkedHashSet dimOrder = new LinkedHashSet<>(); // columns excluding current index (the in-memory fire hydrant), includes __time column + @GuardedBy("hydrantLock") private final LinkedHashSet columnsExcludingCurrIndex = new LinkedHashSet<>(); // column types for columns in {@code columnsExcludingCurrIndex} @@ -397,6 +398,7 @@ public class Sink implements Iterable, Overshadowable /** * Merge the column from the index with the existing columns. */ + @GuardedBy("hydrantLock") private void overwriteIndexDimensions(StorageAdapter storageAdapter) { RowSignature rowSignature = storageAdapter.getRowSignature(); @@ -414,20 +416,19 @@ public class Sink implements Iterable, Overshadowable synchronized (hydrantLock) { RowSignature.Builder builder = RowSignature.builder(); - builder.addTimeColumn(); - + // Add columns from columnsExcludingCurrIndex. for (String dim : columnsExcludingCurrIndex) { - if (!ColumnHolder.TIME_COLUMN_NAME.equals(dim)) { - builder.add(dim, columnTypeExcludingCurrIndex.get(dim)); - } + builder.add(dim, columnTypeExcludingCurrIndex.get(dim)); } - IncrementalIndexStorageAdapter incrementalIndexStorageAdapter = new IncrementalIndexStorageAdapter(currHydrant.getIndex()); - RowSignature incrementalIndexSignature = incrementalIndexStorageAdapter.getRowSignature(); + // Add columns from the currHydrant that do not yet exist in columnsExcludingCurrIndex. + IncrementalIndexStorageAdapter currStorageAdapter = + new IncrementalIndexStorageAdapter(currHydrant.getIndex()); + RowSignature currSignature = currStorageAdapter.getRowSignature(); - for (String dim : incrementalIndexSignature.getColumnNames()) { - if (!columnsExcludingCurrIndex.contains(dim) && !ColumnHolder.TIME_COLUMN_NAME.equals(dim)) { - builder.add(dim, incrementalIndexSignature.getColumnType(dim).orElse(null)); + for (String dim : currSignature.getColumnNames()) { + if (!columnsExcludingCurrIndex.contains(dim)) { + builder.add(dim, currSignature.getColumnType(dim).orElse(null)); } }