Cast the values read in the EXTERN function to the type in the row signature (#15183)

This commit is contained in:
Laksh Singla 2023-10-19 10:24:23 +05:30 committed by GitHub
parent 780207869b
commit fa311dd0b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 124 additions and 2 deletions

View File

@ -45,6 +45,7 @@ public class ExternalSegment extends RowBasedSegment<InputRow>
{
private final InputSource inputSource;
private final RowSignature signature;
public static final String SEGMENT_ID = "__external";
/**
@ -145,6 +146,7 @@ public class ExternalSegment extends RowBasedSegment<InputRow>
signature
);
this.inputSource = inputSource;
this.signature = signature;
}
/**
@ -154,4 +156,12 @@ public class ExternalSegment extends RowBasedSegment<InputRow>
{
return inputSource;
}
/**
* Returns the signature of the external input source
*/
public RowSignature signature()
{
return signature;
}
}

View File

@ -21,6 +21,8 @@ package org.apache.druid.msq.querykit.scan;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.math.expr.ExpressionType;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.filter.DruidPredicateFactory;
import org.apache.druid.query.filter.ValueMatcher;
@ -32,6 +34,7 @@ import org.apache.druid.segment.IdLookup;
import org.apache.druid.segment.RowIdSupplier;
import org.apache.druid.segment.SimpleSettableOffset;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.data.IndexedInts;
import javax.annotation.Nullable;
@ -48,16 +51,19 @@ public class ExternalColumnSelectorFactory implements ColumnSelectorFactory
private final ColumnSelectorFactory delegate;
private final InputSource inputSource;
private final RowSignature rowSignature;
private final SimpleSettableOffset offset;
public ExternalColumnSelectorFactory(
final ColumnSelectorFactory delgate,
final InputSource inputSource,
final RowSignature rowSignature,
final SimpleSettableOffset offset
)
{
this.delegate = delgate;
this.inputSource = inputSource;
this.rowSignature = rowSignature;
this.offset = offset;
}
@ -67,6 +73,7 @@ public class ExternalColumnSelectorFactory implements ColumnSelectorFactory
return new DimensionSelector()
{
final DimensionSelector delegateDimensionSelector = delegate.makeDimensionSelector(dimensionSpec);
final ExpressionType expressionType = ExpressionType.fromColumnType(dimensionSpec.getOutputType());
@Override
public IndexedInts getRow()
@ -97,7 +104,10 @@ public class ExternalColumnSelectorFactory implements ColumnSelectorFactory
public Object getObject()
{
try {
return delegateDimensionSelector.getObject();
if (expressionType == null) {
return delegateDimensionSelector.getObject();
}
return ExprEval.ofType(expressionType, delegateDimensionSelector.getObject()).value();
}
catch (Exception e) {
throw createException(e, dimensionSpec.getDimension(), inputSource, offset);
@ -144,6 +154,9 @@ public class ExternalColumnSelectorFactory implements ColumnSelectorFactory
return new ColumnValueSelector()
{
final ColumnValueSelector delegateColumnValueSelector = delegate.makeColumnValueSelector(columnName);
final ExpressionType expressionType = ExpressionType.fromColumnType(
rowSignature.getColumnType(columnName).orElse(null)
);
@Override
public double getDouble()
@ -195,7 +208,10 @@ public class ExternalColumnSelectorFactory implements ColumnSelectorFactory
public Object getObject()
{
try {
return delegateColumnValueSelector.getObject();
if (expressionType == null) {
return delegateColumnValueSelector.getObject();
}
return ExprEval.ofType(expressionType, delegateColumnValueSelector.getObject()).value();
}
catch (Exception e) {
throw createException(e, columnName, inputSource, offset);

View File

@ -392,6 +392,7 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
return new ExternalColumnSelectorFactory(
baseColumnSelectorFactory,
((ExternalSegment) segment).externalInputSource(),
((ExternalSegment) segment).signature(),
cursorOffset
);
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.msq.exec;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.data.input.impl.InlineInputSource;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.java.util.common.ISE;
@ -857,6 +858,100 @@ public class MSQArraysTest extends MSQTestBase
.verifyResults();
}
@Test
public void testScanExternBooleanArray()
{
final List<Object[]> expectedRows = Collections.singletonList(
new Object[]{Arrays.asList(1L, 0L, null)}
);
RowSignature scanSignature = RowSignature.builder()
.add("a_bool", ColumnType.LONG_ARRAY)
.build();
Query<?> expectedQuery = newScanQueryBuilder()
.dataSource(
new ExternalDataSource(
new InlineInputSource("{\"a_bool\":[true,false,null]}"),
new JsonInputFormat(null, null, null, null, null),
scanSignature
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("a_bool")
.context(defaultScanQueryContext(context, scanSignature))
.build();
testSelectQuery().setSql("SELECT a_bool FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{\"type\": \"inline\", \"data\":\"{\\\"a_bool\\\":[true,false,null]}\"}',\n"
+ " '{\"type\": \"json\"}',\n"
+ " '[{\"name\": \"a_bool\", \"type\": \"ARRAY<LONG>\"}]'\n"
+ " )\n"
+ ")")
.setQueryContext(context)
.setExpectedMSQSpec(MSQSpec
.builder()
.query(expectedQuery)
.columnMappings(new ColumnMappings(ImmutableList.of(
new ColumnMapping("a_bool", "a_bool")
)))
.tuningConfig(MSQTuningConfig.defaultConfig())
.destination(TaskReportMSQDestination.INSTANCE)
.build()
)
.setExpectedRowSignature(scanSignature)
.setExpectedResultRows(expectedRows)
.verifyResults();
}
@Test
public void testScanExternArrayWithNonConvertibleType()
{
final List<Object[]> expectedRows = Collections.singletonList(
new Object[]{Arrays.asList(null, null)}
);
RowSignature scanSignature = RowSignature.builder()
.add("a_bool", ColumnType.LONG_ARRAY)
.build();
Query<?> expectedQuery = newScanQueryBuilder()
.dataSource(
new ExternalDataSource(
new InlineInputSource("{\"a_bool\":[\"Test\",\"Test2\"]}"),
new JsonInputFormat(null, null, null, null, null),
scanSignature
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("a_bool")
.context(defaultScanQueryContext(context, scanSignature))
.build();
testSelectQuery().setSql("SELECT a_bool FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{\"type\": \"inline\", \"data\":\"{\\\"a_bool\\\":[\\\"Test\\\",\\\"Test2\\\"]}\"}',\n"
+ " '{\"type\": \"json\"}',\n"
+ " '[{\"name\": \"a_bool\", \"type\": \"ARRAY<LONG>\"}]'\n"
+ " )\n"
+ ")")
.setQueryContext(context)
.setExpectedMSQSpec(MSQSpec
.builder()
.query(expectedQuery)
.columnMappings(new ColumnMappings(ImmutableList.of(
new ColumnMapping("a_bool", "a_bool")
)))
.tuningConfig(MSQTuningConfig.defaultConfig())
.destination(TaskReportMSQDestination.INSTANCE)
.build()
)
.setExpectedRowSignature(scanSignature)
.setExpectedResultRows(expectedRows)
.verifyResults();
}
private List<Object[]> expectedMultiValueFooRowsToArray()
{
List<Object[]> expectedRows = new ArrayList<>();