MSQ: Fix validation of time position in collations. (#16961)

* MSQ: Fix validation of time position in collations.

It is possible for the collation to refer to a field that isn't mapped,
such as when the DML includes "CLUSTERED BY some_function(some_field)".
In this case, the collation refers to a projected column that is not
part of the field mappings. Prior to this patch, that would lead to an
out of bounds list access on fieldMappings.

This patch fixes the problem by identifying the position of __time in
the fieldMappings first, rather than retrieving each collation field
from fieldMappings.

Fixes a bug introduced in #16849.

* Fix test. Better warning message.
This commit is contained in:
Gian Merlino 2024-08-27 00:02:32 -07:00 committed by GitHub
parent 3b88b57d70
commit ed3dbd6242
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 99 additions and 10 deletions

View File

@ -479,22 +479,29 @@ public class MSQTaskSqlEngine implements SqlEngine
);
}
} else if (!rootCollation.getFieldCollations().isEmpty()) {
int timePosition = -1;
for (int i = 0; i < rootCollation.getFieldCollations().size(); i++) {
final String fieldCollationName =
fieldMappings.get(rootCollation.getFieldCollations().get(i).getFieldIndex()).getValue();
if (ColumnHolder.TIME_COLUMN_NAME.equals(fieldCollationName)) {
timePosition = i;
int timePositionInRow = -1;
for (int i = 0; i < fieldMappings.size(); i++) {
Entry<Integer, String> entry = fieldMappings.get(i);
if (ColumnHolder.TIME_COLUMN_NAME.equals(entry.getValue())) {
timePositionInRow = i;
break;
}
}
if (timePosition > 0) {
int timePositionInCollation = -1;
for (int i = 0; i < rootCollation.getFieldCollations().size(); i++) {
if (rootCollation.getFieldCollations().get(i).getFieldIndex() == timePositionInRow) {
timePositionInCollation = i;
break;
}
}
if (timePositionInCollation > 0) {
throw InvalidSqlInput.exception(
"Sort order (CLUSTERED BY) cannot include[%s] in position[%d] unless context parameter[%s] "
+ "is set to[false]. %s",
ColumnHolder.TIME_COLUMN_NAME,
timePosition,
timePositionInCollation,
MultiStageQueryContext.CTX_FORCE_TIME_SORT,
DimensionsSpec.WARNING_NON_TIME_SORT_ORDER
);

View File

@ -277,6 +277,86 @@ public class MSQReplaceTest extends MSQTestBase
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testReplaceOnFooWithAllClusteredByDimExplicitSort(String contextName, Map<String, Object> context)
{
// Tests [CLUSTERED BY LOWER(dim1)], i.e. an expression that is not actually stored.
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("m1", ColumnType.FLOAT)
.build();
DataSegment existingDataSegment0 = DataSegment.builder()
.interval(Intervals.of("2000-01-01T/2000-01-04T"))
.size(50)
.version(MSQTestTaskActionClient.VERSION)
.dataSource("foo")
.build();
DataSegment existingDataSegment1 = DataSegment.builder()
.interval(Intervals.of("2001-01-01T/2001-01-04T"))
.size(50)
.version(MSQTestTaskActionClient.VERSION)
.dataSource("foo")
.build();
Mockito.doCallRealMethod()
.doReturn(ImmutableSet.of(existingDataSegment0, existingDataSegment1))
.when(testTaskActionClient)
.submit(new RetrieveUsedSegmentsAction("foo", ImmutableList.of(Intervals.ETERNITY)));
testIngestQuery().setSql(" REPLACE INTO foo OVERWRITE ALL "
+ "SELECT __time, dim1, m1 "
+ "FROM foo "
+ "PARTITIONED BY ALL "
+ "CLUSTERED BY LOWER(dim1)")
.setExpectedDataSource("foo")
.setExpectedRowSignature(rowSignature)
.setQueryContext(context)
.setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
.setExpectedSegments(
ImmutableSet.of(
SegmentId.of("foo", Intervals.ETERNITY, "test", 0)
)
)
.setExpectedShardSpec(NumberedShardSpec.class)
.setExpectedResultRows(
ImmutableList.of(
new Object[]{946684800000L, NullHandling.sqlCompatible() ? "" : null, 1.0f},
new Object[]{946771200000L, "10.1", 2.0f},
new Object[]{946857600000L, "2", 3.0f},
new Object[]{978307200000L, "1", 4.0f},
new Object[]{978393600000L, "def", 5.0f},
new Object[]{978480000000L, "abc", 6.0f}
)
)
.setExpectedSegmentGenerationProgressCountersForStageWorker(
CounterSnapshotMatcher
.with().segmentRowsProcessed(6),
1, 0
)
.setExpectedLastCompactionState(
expectedCompactionState(
context,
Collections.emptyList(),
DimensionsSpec.builder()
.setDimensions(
ImmutableList.of(
new StringDimensionSchema("dim1"),
new FloatDimensionSchema("m1")
)
)
.setDimensionExclusions(Collections.singletonList("__time"))
.build(),
GranularityType.ALL,
Intervals.ETERNITY
)
)
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testReplaceOnFooWithAllClusteredByExpression(String contextName, Map<String, Object> context)
{
RowSignature rowSignature = RowSignature.builder()
.add("dim1", ColumnType.STRING)

View File

@ -60,8 +60,10 @@ public class DimensionsSpec
public static final String WARNING_NON_TIME_SORT_ORDER = StringUtils.format(
"Warning: support for segments not sorted by[%s] is experimental. Such segments are not readable by older "
+ "version of Druid, and certain queries cannot run on them. See "
+ "https://druid.apache.org/docs/latest/ingestion/partitioning#sorting for details before using this option.",
ColumnHolder.TIME_COLUMN_NAME
+ "https://druid.apache.org/docs/latest/ingestion/partitioning#sorting for details before setting "
+ "%s to[false].",
ColumnHolder.TIME_COLUMN_NAME,
PARAMETER_FORCE_TIME_SORT
);
public static final boolean DEFAULT_FORCE_TIME_SORT = true;