MSQ: Use the same result coercion routines as the regular SQL endpoint. (#14046)

* MSQ: Use the same result coercion routines as the regular SQL endpoint.

The main changes are to move NativeQueryMaker.coerce to SqlResults, and
to formally make the list of sqlTypeNames from the MSQ results reports
use SqlTypeNames.

- Change the default to MSQ-compatible rather than MSQ-incompatible.
  The explicit marker function is now "notMsqCompatible()".
This commit is contained in:
Gian Merlino 2023-04-14 18:26:23 -07:00 committed by GitHub
parent 0884a22c41
commit eeed5ed7e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 682 additions and 561 deletions

View File

@ -38,6 +38,7 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.ints.IntArraySet;
import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.data.input.impl.DimensionSchema;
@ -185,6 +186,7 @@ import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.DruidNode;
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.calcite.run.SqlResults;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
@ -585,14 +587,10 @@ public class ControllerImpl implements Controller
QueryValidator.validateQueryDef(queryDef);
queryDefRef.set(queryDef);
long maxParseExceptions = -1;
if (task.getSqlQueryContext() != null) {
maxParseExceptions = Optional.ofNullable(
task.getSqlQueryContext().get(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED))
.map(DimensionHandlerUtils::convertObjectToLong)
.orElse(MSQWarnings.DEFAULT_MAX_PARSE_EXCEPTIONS_ALLOWED);
}
final long maxParseExceptions = task.getQuerySpec().getQuery().context().getLong(
MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED,
MSQWarnings.DEFAULT_MAX_PARSE_EXCEPTIONS_ALLOWED
);
ImmutableMap.Builder<String, Object> taskContextOverridesBuilder = ImmutableMap.builder();
taskContextOverridesBuilder
@ -1435,15 +1433,26 @@ public class ControllerImpl implements Controller
.stream()
.map(
mapping ->
columnSelectorFactory.makeColumnValueSelector(
mapping.getQueryColumn())
columnSelectorFactory.makeColumnValueSelector(mapping.getQueryColumn())
).collect(Collectors.toList());
final List<SqlTypeName> sqlTypeNames = task.getSqlTypeNames();
final List<Object[]> retVal = new ArrayList<>();
while (!cursor.isDone()) {
final Object[] row = new Object[columnMappings.size()];
for (int i = 0; i < row.length; i++) {
row[i] = selectors.get(i).getObject();
final Object value = selectors.get(i).getObject();
if (sqlTypeNames == null || task.getSqlResultsContext() == null) {
// SQL type unknown, or no SQL results context: pass-through as is.
row[i] = value;
} else {
row[i] = SqlResults.coerce(
context.jsonMapper(),
task.getSqlResultsContext(),
value,
sqlTypeNames.get(i)
);
}
}
retVal.add(row);
cursor.advance();
@ -1998,7 +2007,7 @@ public class ControllerImpl implements Controller
final QueryDefinition queryDef,
final Yielder<Object[]> resultsYielder,
final ColumnMappings columnMappings,
@Nullable final List<String> sqlTypeNames
@Nullable final List<SqlTypeName> sqlTypeNames
)
{
final RowSignature querySignature = queryDef.getFinalStageDefinition().getSignature();

View File

@ -36,7 +36,6 @@ import org.apache.druid.frame.processor.ReturnOrAwait;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.read.FrameReaderUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.msq.sql.MSQTaskQueryMaker;
import org.apache.druid.msq.statistics.ClusterByStatisticsCollector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.Cursor;
@ -51,14 +50,14 @@ public class KeyStatisticsCollectionProcessor implements FrameProcessor<ClusterB
{
/**
* Constant chosen such that a column full of "standard" values, with row count
* {@link MSQTaskQueryMaker#DEFAULT_ROWS_PER_SEGMENT}, and *some* redundancy between
* {@link org.apache.druid.msq.util.MultiStageQueryContext#DEFAULT_ROWS_PER_SEGMENT}, and *some* redundancy between
* rows (therefore: some "reasonable" compression) will not have any columns greater than 2GB in size.
*/
private static final int STANDARD_VALUE_SIZE = 1000;
/**
* Constant chosen such that a segment full of "standard" rows, with row count
* {@link MSQTaskQueryMaker#DEFAULT_ROWS_PER_SEGMENT}, and *some* redundancy between
* {@link org.apache.druid.msq.util.MultiStageQueryContext#DEFAULT_ROWS_PER_SEGMENT}, and *some* redundancy between
* rows (therefore: some "reasonable" compression) will not be larger than 5GB in size.
*/
private static final int STANDARD_ROW_SIZE = 2000;

View File

@ -25,9 +25,11 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Injector;
import com.google.inject.Key;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
@ -48,6 +50,7 @@ import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.StandardRetryPolicy;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.calcite.run.SqlResults;
import org.joda.time.Interval;
import javax.annotation.Nonnull;
@ -64,17 +67,29 @@ public class MSQControllerTask extends AbstractTask
private final MSQSpec querySpec;
// Enables users, and the web console, to see the original SQL query (if any). Not used by anything else in Druid.
/**
* Enables users, and the web console, to see the original SQL query (if any). Not used by anything else in Druid.
*/
@Nullable
private final String sqlQuery;
// Enables users, and the web console, to see the original SQL context (if any). Not used by any other Druid logic.
/**
* Enables users, and the web console, to see the original SQL context (if any). Not used by any other Druid logic.
*/
@Nullable
private final Map<String, Object> sqlQueryContext;
// Enables users, and the web console, to see the original SQL type names (if any). Not used by any other Druid logic.
/**
* Enables usage of {@link SqlResults#coerce(ObjectMapper, SqlResults.Context, Object, SqlTypeName)}.
*/
@Nullable
private final List<String> sqlTypeNames;
private final SqlResults.Context sqlResultsContext;
/**
* SQL type names for each field in the resultset.
*/
@Nullable
private final List<SqlTypeName> sqlTypeNames;
// Using an Injector directly because tasks do not have a way to provide their own Guice modules.
@JacksonInject
@ -88,7 +103,8 @@ public class MSQControllerTask extends AbstractTask
@JsonProperty("spec") MSQSpec querySpec,
@JsonProperty("sqlQuery") @Nullable String sqlQuery,
@JsonProperty("sqlQueryContext") @Nullable Map<String, Object> sqlQueryContext,
@JsonProperty("sqlTypeNames") @Nullable List<String> sqlTypeNames,
@JsonProperty("sqlResultsContext") @Nullable SqlResults.Context sqlResultsContext,
@JsonProperty("sqlTypeNames") @Nullable List<SqlTypeName> sqlTypeNames,
@JsonProperty("context") @Nullable Map<String, Object> context
)
{
@ -103,6 +119,7 @@ public class MSQControllerTask extends AbstractTask
this.querySpec = querySpec;
this.sqlQuery = sqlQuery;
this.sqlQueryContext = sqlQueryContext;
this.sqlResultsContext = sqlResultsContext;
this.sqlTypeNames = sqlTypeNames;
addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true);
@ -132,7 +149,15 @@ public class MSQControllerTask extends AbstractTask
@Nullable
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public String getSqlQuery()
public List<SqlTypeName> getSqlTypeNames()
{
return sqlTypeNames;
}
@Nullable
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
private String getSqlQuery()
{
return sqlQuery;
}
@ -140,7 +165,7 @@ public class MSQControllerTask extends AbstractTask
@Nullable
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public Map<String, Object> getSqlQueryContext()
private Map<String, Object> getSqlQueryContext()
{
return sqlQueryContext;
}
@ -148,9 +173,9 @@ public class MSQControllerTask extends AbstractTask
@Nullable
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public List<String> getSqlTypeNames()
public SqlResults.Context getSqlResultsContext()
{
return sqlTypeNames;
return sqlResultsContext;
}
@Override

View File

@ -26,6 +26,7 @@ import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
import org.apache.druid.query.Query;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.Objects;
public class MSQSpec
@ -87,6 +88,21 @@ public class MSQSpec
return tuningConfig;
}
public MSQSpec withOverriddenContext(Map<String, Object> contextOverride)
{
if (contextOverride == null || contextOverride.isEmpty()) {
return this;
} else {
return new MSQSpec(
query.withOverriddenContext(contextOverride),
columnMappings,
destination,
assignmentStrategy,
tuningConfig
);
}
}
@Override
public boolean equals(Object o)
{

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
@ -40,12 +41,12 @@ public class MSQResultsReport
*/
private final List<ColumnAndType> signature;
@Nullable
private final List<String> sqlTypeNames;
private final List<SqlTypeName> sqlTypeNames;
private final Yielder<Object[]> resultYielder;
public MSQResultsReport(
final List<ColumnAndType> signature,
@Nullable final List<String> sqlTypeNames,
@Nullable final List<SqlTypeName> sqlTypeNames,
final Yielder<Object[]> resultYielder
)
{
@ -60,7 +61,7 @@ public class MSQResultsReport
@JsonCreator
static MSQResultsReport fromJson(
@JsonProperty("signature") final List<ColumnAndType> signature,
@JsonProperty("sqlTypeNames") @Nullable final List<String> sqlTypeNames,
@JsonProperty("sqlTypeNames") @Nullable final List<SqlTypeName> sqlTypeNames,
@JsonProperty("results") final List<Object[]> results
)
{
@ -76,7 +77,7 @@ public class MSQResultsReport
@Nullable
@JsonProperty("sqlTypeNames")
@JsonInclude(JsonInclude.Include.NON_NULL)
public List<String> getSqlTypeNames()
public List<SqlTypeName> getSqlTypeNames()
{
return sqlTypeNames;
}

View File

@ -58,6 +58,7 @@ import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.calcite.rel.Grouping;
import org.apache.druid.sql.calcite.run.QueryMaker;
import org.apache.druid.sql.calcite.run.SqlResults;
import org.apache.druid.sql.calcite.table.RowSignatures;
import org.joda.time.Interval;
@ -77,11 +78,6 @@ public class MSQTaskQueryMaker implements QueryMaker
private static final String DESTINATION_REPORT = "taskReport";
private static final Granularity DEFAULT_SEGMENT_GRANULARITY = Granularities.ALL;
private static final int DEFAULT_ROWS_PER_SEGMENT = 3000000;
// Lower than the default to minimize the impact of per-row overheads that are not accounted for by
// OnheapIncrementalIndex. For example: overheads related to creating bitmaps during persist.
private static final int DEFAULT_ROWS_IN_MEMORY = 100000;
private final String targetDataSource;
private final OverlordClient overlordClient;
@ -111,14 +107,21 @@ public class MSQTaskQueryMaker implements QueryMaker
Hook.QUERY_PLAN.run(druidQuery.getQuery());
String taskId = MSQTasks.controllerTaskId(plannerContext.getSqlQueryId());
QueryContext queryContext = plannerContext.queryContext();
String msqMode = MultiStageQueryContext.getMSQMode(queryContext);
// SQL query context: context provided by the user, and potentially modified by handlers during planning.
// Does not directly influence task execution, but it does form the basis for the initial native query context,
// which *does* influence task execution.
final QueryContext sqlQueryContext = plannerContext.queryContext();
// Native query context: sqlQueryContext plus things that we add prior to creating a controller task.
final Map<String, Object> nativeQueryContext = new HashMap<>(sqlQueryContext.asMap());
final String msqMode = MultiStageQueryContext.getMSQMode(sqlQueryContext);
if (msqMode != null) {
MSQMode.populateDefaultQueryContext(msqMode, plannerContext.queryContextMap());
MSQMode.populateDefaultQueryContext(msqMode, nativeQueryContext);
}
final String ctxDestination =
DimensionHandlerUtils.convertObjectToString(MultiStageQueryContext.getDestination(queryContext));
DimensionHandlerUtils.convertObjectToString(MultiStageQueryContext.getDestination(sqlQueryContext));
Object segmentGranularity;
try {
@ -131,7 +134,7 @@ public class MSQTaskQueryMaker implements QueryMaker
+ "segment graularity");
}
final int maxNumTasks = MultiStageQueryContext.getMaxNumTasks(queryContext);
final int maxNumTasks = MultiStageQueryContext.getMaxNumTasks(sqlQueryContext);
if (maxNumTasks < 2) {
throw new IAE(MultiStageQueryContext.CTX_MAX_NUM_TASKS
@ -140,23 +143,13 @@ public class MSQTaskQueryMaker implements QueryMaker
// This parameter is used internally for the number of worker tasks only, so we subtract 1
final int maxNumWorkers = maxNumTasks - 1;
final int rowsPerSegment = MultiStageQueryContext.getRowsPerSegment(
queryContext,
DEFAULT_ROWS_PER_SEGMENT
);
final int maxRowsInMemory = MultiStageQueryContext.getRowsInMemory(
queryContext,
DEFAULT_ROWS_IN_MEMORY
);
final IndexSpec indexSpec = MultiStageQueryContext.getIndexSpec(queryContext, jsonMapper);
final boolean finalizeAggregations = MultiStageQueryContext.isFinalizeAggregations(queryContext);
final int rowsPerSegment = MultiStageQueryContext.getRowsPerSegment(sqlQueryContext);
final int maxRowsInMemory = MultiStageQueryContext.getRowsInMemory(sqlQueryContext);
final IndexSpec indexSpec = MultiStageQueryContext.getIndexSpec(sqlQueryContext, jsonMapper);
final boolean finalizeAggregations = MultiStageQueryContext.isFinalizeAggregations(sqlQueryContext);
final List<Interval> replaceTimeChunks =
Optional.ofNullable(plannerContext.queryContext().get(DruidSqlReplace.SQL_REPLACE_TIME_CHUNKS))
Optional.ofNullable(sqlQueryContext.get(DruidSqlReplace.SQL_REPLACE_TIME_CHUNKS))
.map(
s -> {
if (s instanceof String && "all".equals(StringUtils.toLowerCase((String) s))) {
@ -179,7 +172,7 @@ public class MSQTaskQueryMaker implements QueryMaker
final Map<String, ColumnType> aggregationIntermediateTypeMap =
finalizeAggregations ? null /* Not needed */ : buildAggregationIntermediateTypeMap(druidQuery);
final List<String> sqlTypeNames = new ArrayList<>();
final List<SqlTypeName> sqlTypeNames = new ArrayList<>();
final List<ColumnMapping> columnMappings = new ArrayList<>();
for (final Pair<Integer, String> entry : fieldMapping) {
@ -195,7 +188,7 @@ public class MSQTaskQueryMaker implements QueryMaker
sqlTypeName = druidQuery.getOutputRowType().getFieldList().get(entry.getKey()).getType().getSqlTypeName();
}
sqlTypeNames.add(sqlTypeName.getName());
sqlTypeNames.add(sqlTypeName);
columnMappings.add(new ColumnMapping(queryColumn, outputColumns));
}
@ -214,7 +207,7 @@ public class MSQTaskQueryMaker implements QueryMaker
throw new ISE("Unable to convert %s to a segment granularity", segmentGranularity);
}
final List<String> segmentSortOrder = MultiStageQueryContext.getSortOrder(queryContext);
final List<String> segmentSortOrder = MultiStageQueryContext.getSortOrder(sqlQueryContext);
MSQTaskQueryMakerUtils.validateSegmentSortOrder(
segmentSortOrder,
@ -245,15 +238,16 @@ public class MSQTaskQueryMaker implements QueryMaker
.query(druidQuery.getQuery().withOverriddenContext(nativeQueryContextOverrides))
.columnMappings(new ColumnMappings(columnMappings))
.destination(destination)
.assignmentStrategy(MultiStageQueryContext.getAssignmentStrategy(queryContext))
.assignmentStrategy(MultiStageQueryContext.getAssignmentStrategy(sqlQueryContext))
.tuningConfig(new MSQTuningConfig(maxNumWorkers, maxRowsInMemory, rowsPerSegment, indexSpec))
.build();
final MSQControllerTask controllerTask = new MSQControllerTask(
taskId,
querySpec,
querySpec.withOverriddenContext(nativeQueryContext),
MSQTaskQueryMakerUtils.maskSensitiveJsonKeys(plannerContext.getSql()),
plannerContext.queryContextMap(),
SqlResults.Context.fromPlannerContext(plannerContext),
sqlTypeNames,
null
);

View File

@ -108,8 +108,12 @@ public class MultiStageQueryContext
private static final String DEFAULT_DESTINATION = null;
public static final String CTX_ROWS_PER_SEGMENT = "rowsPerSegment";
static final int DEFAULT_ROWS_PER_SEGMENT = 3000000;
public static final String CTX_ROWS_IN_MEMORY = "rowsInMemory";
// Lower than the default to minimize the impact of per-row overheads that are not accounted for by
// OnheapIncrementalIndex. For example: overheads related to creating bitmaps during persist.
static final int DEFAULT_ROWS_IN_MEMORY = 100000;
/**
* Controls sort order within segments. Normally, this is the same as the overall order of the query (from the
@ -215,20 +219,17 @@ public class MultiStageQueryContext
);
}
public static int getRowsPerSegment(final QueryContext queryContext, int defaultRowsPerSegment)
public static int getRowsPerSegment(final QueryContext queryContext)
{
return queryContext.getInt(
CTX_ROWS_PER_SEGMENT,
defaultRowsPerSegment
DEFAULT_ROWS_PER_SEGMENT
);
}
public static int getRowsInMemory(final QueryContext queryContext, int defaultRowsInMemory)
public static int getRowsInMemory(final QueryContext queryContext)
{
return queryContext.getInt(
CTX_ROWS_IN_MEMORY,
defaultRowsInMemory
);
return queryContext.getInt(CTX_ROWS_IN_MEMORY, DEFAULT_ROWS_IN_MEMORY);
}
public static List<String> getSortOrder(final QueryContext queryContext)

View File

@ -141,7 +141,7 @@ public class MSQSelectTest extends MSQTestBase
)
.setExpectedRowSignature(resultSignature)
.setQueryContext(context)
.setExpectedResultRows(ImmutableList.of(new Object[]{2L})).verifyResults();
.setExpectedResultRows(ImmutableList.of(new Object[]{2})).verifyResults();
}
@Test
@ -186,7 +186,7 @@ public class MSQSelectTest extends MSQTestBase
0, 0, "shuffle"
)
.setExpectedResultRows(ImmutableList.of(
new Object[]{1L, !useDefault ? "" : null},
new Object[]{1L, ""},
new Object[]{1L, "10.1"},
new Object[]{1L, "2"},
new Object[]{1L, "1"},
@ -306,7 +306,7 @@ public class MSQSelectTest extends MSQTestBase
0, 0, "shuffle"
)
.setExpectedResultRows(ImmutableList.of(
new Object[]{1L, !useDefault ? "" : null},
new Object[]{1L, ""},
new Object[]{1L, "10.1"},
new Object[]{1L, "2"},
new Object[]{1L, "1"},
@ -525,7 +525,7 @@ public class MSQSelectTest extends MSQTestBase
0, 0, "shuffle"
)
.setExpectedResultRows(ImmutableList.of(
new Object[]{1L, !useDefault ? "" : null},
new Object[]{1L, ""},
new Object[]{1L, "10.1"},
new Object[]{1L, "2"},
new Object[]{1L, "1"},
@ -752,7 +752,7 @@ public class MSQSelectTest extends MSQTestBase
);
} else {
expectedResults = ImmutableList.of(
new Object[]{null, 3.6666666666666665},
new Object[]{"", 3.6666666666666665},
new Object[]{"a", 2.5},
new Object[]{"abc", 5.0}
);
@ -1316,31 +1316,51 @@ public class MSQSelectTest extends MSQTestBase
@Test
public void testScanWithMultiValueSelectQuery()
{
RowSignature resultSignature = RowSignature.builder()
.add("dim3", ColumnType.STRING)
.build();
RowSignature expectedScanSignature = RowSignature.builder()
.add("dim3", ColumnType.STRING)
.add("v0", ColumnType.STRING_ARRAY)
.build();
RowSignature expectedResultSignature = RowSignature.builder()
.add("dim3", ColumnType.STRING)
.add("dim3_array", ColumnType.STRING_ARRAY)
.build();
testSelectQuery()
.setSql("select dim3 from foo")
.setSql("select dim3, MV_TO_ARRAY(dim3) AS dim3_array from foo")
.setExpectedMSQSpec(MSQSpec.builder()
.query(newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("dim3")
.context(defaultScanQueryContext(context, resultSignature))
.virtualColumns(
expressionVirtualColumn(
"v0",
"mv_to_array(\"dim3\")",
ColumnType.STRING_ARRAY
)
)
.columns("dim3", "v0")
.context(defaultScanQueryContext(context, expectedScanSignature))
.build())
.columnMappings(ColumnMappings.identity(resultSignature))
.columnMappings(
new ColumnMappings(
ImmutableList.of(
new ColumnMapping("dim3", "dim3"),
new ColumnMapping("v0", "dim3_array")
)
)
)
.tuningConfig(MSQTuningConfig.defaultConfig())
.build())
.setExpectedRowSignature(resultSignature)
.setExpectedRowSignature(expectedResultSignature)
.setQueryContext(context)
.setExpectedResultRows(ImmutableList.of(
new Object[]{ImmutableList.of("a", "b")},
new Object[]{ImmutableList.of("b", "c")},
new Object[]{"d"},
new Object[]{!useDefault ? "" : null},
new Object[]{null},
new Object[]{null}
new Object[]{"[\"a\",\"b\"]", ImmutableList.of("a", "b")},
new Object[]{"[\"b\",\"c\"]", ImmutableList.of("b", "c")},
new Object[]{"d", ImmutableList.of("d")},
new Object[]{"", Collections.singletonList(useDefault ? null : "")},
new Object[]{NullHandling.defaultStringValue(), Collections.singletonList(null)},
new Object[]{NullHandling.defaultStringValue(), Collections.singletonList(null)}
)).verifyResults();
}
@ -1404,7 +1424,7 @@ public class MSQSelectTest extends MSQTestBase
.setExpectedRowSignature(resultSignature)
.setExpectedResultRows(
NullHandling.replaceWithDefault()
? ImmutableList.of(new Object[]{null, 3L}, new Object[]{"a", 2L})
? ImmutableList.of(new Object[]{"", 3L}, new Object[]{"a", 2L})
: ImmutableList.of(new Object[]{null, 2L}, new Object[]{"a", 2L})
)
@ -1452,9 +1472,7 @@ public class MSQSelectTest extends MSQTestBase
.tuningConfig(MSQTuningConfig.defaultConfig())
.build())
.setExpectedRowSignature(rowSignature)
.setExpectedResultRows(
expectedMultiValueFooRowsGroup()
)
.setExpectedResultRows(expectedMultiValueFooRowsGroup())
.verifyResults();
}
@ -1833,8 +1851,10 @@ public class MSQSelectTest extends MSQTestBase
private List<Object[]> expectedMultiValueFooRowsGroup()
{
ArrayList<Object[]> expected = new ArrayList<>();
expected.add(new Object[]{null, !useDefault ? 2L : 3L});
if (!useDefault) {
if (useDefault) {
expected.add(new Object[]{"", 3L});
} else {
expected.add(new Object[]{null, 2L});
expected.add(new Object[]{"", 1L});
}
expected.addAll(ImmutableList.of(

View File

@ -62,6 +62,7 @@ public class MSQControllerTaskTest
null,
null,
null,
null,
null);
Assert.assertTrue(msqWorkerTask.getInputSourceResources().isEmpty());
}

View File

@ -32,6 +32,7 @@ import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestFileUtils;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
@ -162,12 +163,7 @@ public class MSQWarningsTest extends MSQTestBase
+ " '[{\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\n"
+ " )\n"
+ ") group by 1")
.setQueryContext(
ImmutableMap.<String, Object>builder()
.putAll(DEFAULT_MSQ_CONTEXT)
.putAll(userContext)
.build()
)
.setQueryContext(QueryContexts.override(DEFAULT_MSQ_CONTEXT, userContext))
.setExpectedRowSignature(rowSignature)
.setExpectedResultRows(ImmutableList.of(new Object[]{1566172800000L, 10L}))
.setExpectedMSQSpec(
@ -250,12 +246,38 @@ public class MSQWarningsTest extends MSQTestBase
+ " '[{\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\n"
+ " )\n"
+ ") group by 1")
.setQueryContext(
ImmutableMap.<String, Object>builder()
.putAll(DEFAULT_MSQ_CONTEXT)
.putAll(userContext)
.build()
)
.setQueryContext(QueryContexts.override(DEFAULT_MSQ_CONTEXT, userContext))
.setExpectedRowSignature(rowSignature)
.setExpectedResultRows(ImmutableList.of(new Object[]{1566172800000L, 10L}))
.setExpectedMSQSpec(
MSQSpec.builder()
.query(defaultQuery.withOverriddenContext(userContext))
.columnMappings(defaultColumnMappings)
.tuningConfig(MSQTuningConfig.defaultConfig())
.build())
.verifyResults();
}
@Test
public void testSuccessWhenParseExceptionsOnLimitOverridesMode()
{
final Map<String, Object> userContext =
ImmutableMap.of(
MultiStageQueryContext.CTX_MSQ_MODE, "strict",
MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, 10 // Takes precedence over "strict" mode
);
testSelectQuery().setSql("SELECT\n"
+ " floor(TIME_PARSE(\"timestamp\") to day) AS __time,\n"
+ " count(*) as cnt\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{ \"files\": [\"" + toRead.getAbsolutePath() + "\"],\"type\":\"local\"}',\n"
+ " '{\"type\": \"json\"}',\n"
+ " '[{\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\n"
+ " )\n"
+ ") group by 1")
.setQueryContext(QueryContexts.override(DEFAULT_MSQ_CONTEXT, userContext))
.setExpectedRowSignature(rowSignature)
.setExpectedResultRows(ImmutableList.of(new Object[]{1566172800000L, 10L}))
.setExpectedMSQSpec(
@ -270,7 +292,9 @@ public class MSQWarningsTest extends MSQTestBase
@Test
public void testSuccessInNonStrictMode()
{
final Map<String, Object> userContext = ImmutableMap.of(MultiStageQueryContext.CTX_MSQ_MODE, "nonStrict");
final Map<String, Object> userContext =
QueryContexts.override(DEFAULT_MSQ_CONTEXT, ImmutableMap.of(MultiStageQueryContext.CTX_MSQ_MODE, "nonStrict"));
userContext.remove(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED);
testSelectQuery().setSql("SELECT\n"
+ " floor(TIME_PARSE(\"timestamp\") to day) AS __time,\n"
@ -282,17 +306,20 @@ public class MSQWarningsTest extends MSQTestBase
+ " '[{\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\n"
+ " )\n"
+ ") group by 1")
.setQueryContext(
ImmutableMap.<String, Object>builder()
.putAll(DEFAULT_MSQ_CONTEXT)
.putAll(userContext)
.build()
)
.setQueryContext(userContext)
.setExpectedRowSignature(rowSignature)
.setExpectedResultRows(ImmutableList.of(new Object[]{1566172800000L, 10L}))
.setExpectedMSQSpec(
MSQSpec.builder()
.query(defaultQuery.withOverriddenContext(userContext))
.query(
defaultQuery.withOverriddenContext(userContext)
.withOverriddenContext(
ImmutableMap.of(
MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED,
-1
)
)
)
.columnMappings(defaultColumnMappings)
.tuningConfig(MSQTuningConfig.defaultConfig())
.build())
@ -339,9 +366,7 @@ public class MSQWarningsTest extends MSQTestBase
+ " '[{\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\n"
+ " )\n"
+ ") group by 1 PARTITIONED by day ")
.setQueryContext(new ImmutableMap.Builder<String, Object>().putAll(DEFAULT_MSQ_CONTEXT)
.putAll(ROLLUP_CONTEXT_PARAMS)
.build())
.setQueryContext(QueryContexts.override(DEFAULT_MSQ_CONTEXT, ROLLUP_CONTEXT_PARAMS))
.setExpectedRollUp(true)
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
@ -391,12 +416,7 @@ public class MSQWarningsTest extends MSQTestBase
+ " '[{\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\n"
+ " )\n"
+ ") group by 1")
.setQueryContext(
ImmutableMap.<String, Object>builder()
.putAll(DEFAULT_MSQ_CONTEXT)
.putAll(userContext)
.build()
)
.setQueryContext(QueryContexts.override(DEFAULT_MSQ_CONTEXT, userContext))
.setExpectedRowSignature(rowSignature)
.setExpectedResultRows(ImmutableList.of(new Object[]{1566172800000L, 10L}))
.setExpectedMSQSpec(

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.frame.key.KeyColumn;
import org.apache.druid.frame.key.KeyOrder;
@ -103,7 +104,7 @@ public class MSQTaskReportTest
new CounterSnapshotsTree(),
new MSQResultsReport(
Collections.singletonList(new MSQResultsReport.ColumnAndType("s", ColumnType.STRING)),
ImmutableList.of("VARCHAR"),
ImmutableList.of(SqlTypeName.VARCHAR),
Yielders.each(Sequences.simple(results))
)
)

View File

@ -172,7 +172,6 @@ public class CalciteSelectQueryMSQTest extends CalciteQueryTest
@Override
public void testArrayAggQueryOnComplexDatatypes()
{
msqCompatible();
try {
testQuery("SELECT ARRAY_AGG(unique_dim1) FROM druid.foo", ImmutableList.of(), ImmutableList.of());
Assert.fail("query execution should fail");

View File

@ -90,6 +90,7 @@ import org.apache.druid.msq.indexing.error.InsertLockPreemptedFaultTest;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.apache.druid.msq.indexing.error.MSQFault;
import org.apache.druid.msq.indexing.error.MSQFaultUtils;
import org.apache.druid.msq.indexing.error.MSQWarnings;
import org.apache.druid.msq.indexing.error.TooManyAttemptsForWorker;
import org.apache.druid.msq.indexing.report.MSQResultsReport;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
@ -223,6 +224,8 @@ public class MSQTestBase extends BaseCalciteQueryTest
ImmutableMap.<String, Object>builder()
.put(QueryContexts.CTX_SQL_QUERY_ID, "test-query")
.put(QueryContexts.FINALIZE_KEY, true)
.put(QueryContexts.CTX_SQL_STRINGIFY_ARRAYS, false)
.put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, 0)
.build();
public static final Map<String, Object> DURABLE_STORAGE_MSQ_CONTEXT =

View File

@ -156,27 +156,33 @@ public class MultiStageQueryContextTest
@Test
public void getRowsPerSegment_noParameterSetReturnsDefaultValue()
{
Assert.assertEquals(1000, MultiStageQueryContext.getRowsPerSegment(QueryContext.empty(), 1000));
Assert.assertEquals(
MultiStageQueryContext.DEFAULT_ROWS_PER_SEGMENT,
MultiStageQueryContext.getRowsPerSegment(QueryContext.empty())
);
}
@Test
public void getRowsPerSegment_parameterSetReturnsCorrectValue()
{
Map<String, Object> propertyMap = ImmutableMap.of(CTX_ROWS_PER_SEGMENT, 10);
Assert.assertEquals(10, MultiStageQueryContext.getRowsPerSegment(QueryContext.of(propertyMap), 1000));
Assert.assertEquals(10, MultiStageQueryContext.getRowsPerSegment(QueryContext.of(propertyMap)));
}
@Test
public void getRowsInMemory_noParameterSetReturnsDefaultValue()
{
Assert.assertEquals(1000, MultiStageQueryContext.getRowsInMemory(QueryContext.empty(), 1000));
Assert.assertEquals(
MultiStageQueryContext.DEFAULT_ROWS_IN_MEMORY,
MultiStageQueryContext.getRowsInMemory(QueryContext.empty())
);
}
@Test
public void getRowsInMemory_parameterSetReturnsCorrectValue()
{
Map<String, Object> propertyMap = ImmutableMap.of(CTX_ROWS_IN_MEMORY, 10);
Assert.assertEquals(10, MultiStageQueryContext.getRowsInMemory(QueryContext.of(propertyMap), 1000));
Assert.assertEquals(10, MultiStageQueryContext.getRowsInMemory(QueryContext.of(propertyMap)));
}
@Test

View File

@ -4,7 +4,7 @@
"expectedResults": [
{
"__time": 1377910953000,
"isRobot": null,
"isRobot": "",
"added": 57,
"delta": -143,
"deleted": 200,
@ -12,7 +12,7 @@
},
{
"__time": 1377919965000,
"isRobot": null,
"isRobot": "",
"added": 459,
"delta": 330,
"deleted": 129,
@ -20,7 +20,7 @@
},
{
"__time": 1377933081000,
"isRobot": null,
"isRobot": "",
"added": 123,
"delta": 111,
"deleted": 12,

View File

@ -4,7 +4,7 @@
"expectedResults": [
{
"__time": 1377910953000,
"isRobot": null,
"isRobot": "",
"added": 57,
"delta": -143,
"deleted": 200,
@ -12,7 +12,7 @@
},
{
"__time": 1377910953000,
"isRobot": null,
"isRobot": "",
"added": 57,
"delta": -143,
"deleted": 200,
@ -20,7 +20,7 @@
},
{
"__time": 1377919965000,
"isRobot": null,
"isRobot": "",
"added": 459,
"delta": 330,
"deleted": 129,
@ -28,7 +28,7 @@
},
{
"__time": 1377919965000,
"isRobot": null,
"isRobot": "",
"added": 459,
"delta": 330,
"deleted": 129,
@ -36,7 +36,7 @@
},
{
"__time": 1377933081000,
"isRobot": null,
"isRobot": "",
"added": 123,
"delta": 111,
"deleted": 12,
@ -44,7 +44,7 @@
},
{
"__time": 1377933081000,
"isRobot": null,
"isRobot": "",
"added": 123,
"delta": 111,
"deleted": 12,

View File

@ -19,26 +19,20 @@
package org.apache.druid.sql.calcite.run;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.primitives.Ints;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.runtime.Hook;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.NlsString;
import org.apache.calcite.util.Pair;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.math.expr.Evals;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryToolChest;
@ -47,27 +41,19 @@ import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.OrDimFilter;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.data.ComparableList;
import org.apache.druid.segment.data.ComparableStringArray;
import org.apache.druid.server.QueryLifecycle;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.QueryResponse;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.CannotBuildQueryException;
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
@ -150,11 +136,11 @@ public class NativeQueryMaker implements QueryMaker
rowOrder = druidQuery.getOutputRowSignature().getColumnNames();
}
final List<SqlTypeName> columnTypes =
final List<RelDataType> columnTypes =
druidQuery.getOutputRowType()
.getFieldList()
.stream()
.map(f -> f.getType().getSqlTypeName())
.map(RelDataTypeField::getType)
.collect(Collectors.toList());
return execute(
@ -173,7 +159,11 @@ public class NativeQueryMaker implements QueryMaker
}
@SuppressWarnings("unchecked")
private <T> QueryResponse<Object[]> execute(Query<?> query, final List<String> newFields, final List<SqlTypeName> newTypes)
private <T> QueryResponse<Object[]> execute(
Query<?> query, // Not final: may be reassigned with query ID added
final List<String> newFields,
final List<RelDataType> newTypes
)
{
Hook.QUERY_PLAN.run(query);
@ -211,7 +201,7 @@ public class NativeQueryMaker implements QueryMaker
final QueryToolChest<T, Query<T>> toolChest,
final Query<T> query,
final List<String> newFields,
final List<SqlTypeName> newTypes
final List<RelDataType> newTypes
)
{
final List<String> originalFields = toolChest.resultArraySignature(query).getColumnNames();
@ -240,13 +230,19 @@ public class NativeQueryMaker implements QueryMaker
}
final Sequence<Object[]> sequence = toolChest.resultsAsArrays(query, results.getResults());
final SqlResults.Context sqlResultsContext = SqlResults.Context.fromPlannerContext(plannerContext);
return new QueryResponse<>(
Sequences.map(
sequence,
array -> {
final Object[] newArray = new Object[mapping.length];
for (int i = 0; i < mapping.length; i++) {
newArray[i] = coerce(array[mapping[i]], newTypes.get(i));
newArray[i] = SqlResults.coerce(
jsonMapper,
sqlResultsContext,
array[mapping[i]],
newTypes.get(i).getSqlTypeName()
);
}
return newArray;
}
@ -255,175 +251,6 @@ public class NativeQueryMaker implements QueryMaker
);
}
private Object coerce(final Object value, final SqlTypeName sqlType)
{
final Object coercedValue;
if (SqlTypeName.CHAR_TYPES.contains(sqlType)) {
if (value == null || value instanceof String) {
coercedValue = NullHandling.nullToEmptyIfNeeded((String) value);
} else if (value instanceof NlsString) {
coercedValue = ((NlsString) value).getValue();
} else if (value instanceof Number) {
coercedValue = String.valueOf(value);
} else if (value instanceof Boolean) {
coercedValue = String.valueOf(value);
} else if (value instanceof Collection) {
// Iterate through the collection, coercing each value. Useful for handling selects of multi-value dimensions.
final List<String> valueStrings = ((Collection<?>) value).stream()
.map(v -> (String) coerce(v, sqlType))
.collect(Collectors.toList());
try {
coercedValue = jsonMapper.writeValueAsString(valueStrings);
}
catch (IOException e) {
throw new RuntimeException(e);
}
} else {
throw new ISE("Cannot coerce [%s] to %s", value.getClass().getName(), sqlType);
}
} else if (value == null) {
coercedValue = null;
} else if (sqlType == SqlTypeName.DATE) {
return Calcites.jodaToCalciteDate(coerceDateTime(value, sqlType), plannerContext.getTimeZone());
} else if (sqlType == SqlTypeName.TIMESTAMP) {
return Calcites.jodaToCalciteTimestamp(coerceDateTime(value, sqlType), plannerContext.getTimeZone());
} else if (sqlType == SqlTypeName.BOOLEAN) {
if (value instanceof String) {
coercedValue = Evals.asBoolean(((String) value));
} else if (value instanceof Number) {
coercedValue = Evals.asBoolean(((Number) value).longValue());
} else {
throw new ISE("Cannot coerce [%s] to %s", value.getClass().getName(), sqlType);
}
} else if (sqlType == SqlTypeName.INTEGER) {
if (value instanceof String) {
coercedValue = Ints.tryParse((String) value);
} else if (value instanceof Number) {
coercedValue = ((Number) value).intValue();
} else {
throw new ISE("Cannot coerce [%s] to %s", value.getClass().getName(), sqlType);
}
} else if (sqlType == SqlTypeName.BIGINT) {
try {
coercedValue = DimensionHandlerUtils.convertObjectToLong(value);
}
catch (Exception e) {
throw new ISE("Cannot coerce [%s] to %s", value.getClass().getName(), sqlType);
}
} else if (sqlType == SqlTypeName.FLOAT) {
try {
coercedValue = DimensionHandlerUtils.convertObjectToFloat(value);
}
catch (Exception e) {
throw new ISE("Cannot coerce [%s] to %s", value.getClass().getName(), sqlType);
}
} else if (SqlTypeName.FRACTIONAL_TYPES.contains(sqlType)) {
try {
coercedValue = DimensionHandlerUtils.convertObjectToDouble(value);
}
catch (Exception e) {
throw new ISE("Cannot coerce [%s] to %s", value.getClass().getName(), sqlType);
}
} else if (sqlType == SqlTypeName.OTHER) {
// Complex type, try to serialize if we should, else print class name
if (plannerContext.getPlannerConfig().shouldSerializeComplexValues()) {
try {
coercedValue = jsonMapper.writeValueAsString(value);
}
catch (JsonProcessingException jex) {
throw new ISE(jex, "Cannot coerce [%s] to %s", value.getClass().getName(), sqlType);
}
} else {
coercedValue = value.getClass().getName();
}
} else if (sqlType == SqlTypeName.ARRAY) {
if (plannerContext.isStringifyArrays()) {
if (value instanceof String) {
coercedValue = NullHandling.nullToEmptyIfNeeded((String) value);
} else if (value instanceof NlsString) {
coercedValue = ((NlsString) value).getValue();
} else {
try {
coercedValue = jsonMapper.writeValueAsString(value);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
} else {
// the protobuf jdbc handler prefers lists (it actually can't handle java arrays as sql arrays, only java lists)
// the json handler could handle this just fine, but it handles lists as sql arrays as well so just convert
// here if needed
coercedValue = maybeCoerceArrayToList(value, true);
if (coercedValue == null) {
throw new ISE("Cannot coerce [%s] to %s", value.getClass().getName(), sqlType);
}
}
} else {
throw new ISE("Cannot coerce [%s] to %s", value.getClass().getName(), sqlType);
}
return coercedValue;
}
@VisibleForTesting
static Object maybeCoerceArrayToList(Object value, boolean mustCoerce)
{
if (value instanceof List) {
return value;
} else if (value instanceof String[]) {
return Arrays.asList((String[]) value);
} else if (value instanceof Long[]) {
return Arrays.asList((Long[]) value);
} else if (value instanceof Double[]) {
return Arrays.asList((Double[]) value);
} else if (value instanceof Object[]) {
final Object[] array = (Object[]) value;
final ArrayList<Object> lst = new ArrayList<>(array.length);
for (Object o : array) {
lst.add(maybeCoerceArrayToList(o, false));
}
return lst;
} else if (value instanceof long[]) {
return Arrays.stream((long[]) value).boxed().collect(Collectors.toList());
} else if (value instanceof double[]) {
return Arrays.stream((double[]) value).boxed().collect(Collectors.toList());
} else if (value instanceof float[]) {
final float[] array = (float[]) value;
final ArrayList<Object> lst = new ArrayList<>(array.length);
for (float f : array) {
lst.add(f);
}
return lst;
} else if (value instanceof ComparableStringArray) {
return Arrays.asList(((ComparableStringArray) value).getDelegate());
} else if (value instanceof ComparableList) {
return ((ComparableList) value).getDelegate();
} else if (mustCoerce) {
return null;
}
return value;
}
private static DateTime coerceDateTime(Object value, SqlTypeName sqlType)
{
final DateTime dateTime;
if (value instanceof Number) {
dateTime = DateTimes.utc(((Number) value).longValue());
} else if (value instanceof String) {
dateTime = DateTimes.utc(Long.parseLong((String) value));
} else if (value instanceof DateTime) {
dateTime = (DateTime) value;
} else {
throw new ISE("Cannot coerce[%s] to %s", value.getClass().getName(), sqlType);
}
return dateTime;
}
private static <T> List<T> mapColumnList(final List<T> in, final List<Pair<Integer, String>> fieldMapping)
{
final List<T> out = new ArrayList<>(fieldMapping.size());

View File

@ -0,0 +1,309 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.run;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Ints;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.NlsString;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.math.expr.Evals;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.data.ComparableList;
import org.apache.druid.segment.data.ComparableStringArray;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* Holder for the utility method {@link #coerce(ObjectMapper, Context, Object, SqlTypeName)}.
*/
public class SqlResults
{
public static Object coerce(
final ObjectMapper jsonMapper,
final Context context,
final Object value,
final SqlTypeName sqlTypeName
)
{
final Object coercedValue;
if (SqlTypeName.CHAR_TYPES.contains(sqlTypeName)) {
if (value == null || value instanceof String) {
coercedValue = NullHandling.nullToEmptyIfNeeded((String) value);
} else if (value instanceof NlsString) {
coercedValue = ((NlsString) value).getValue();
} else if (value instanceof Number) {
coercedValue = String.valueOf(value);
} else if (value instanceof Boolean) {
coercedValue = String.valueOf(value);
} else if (value instanceof Collection) {
// Iterate through the collection, coercing each value. Useful for handling selects of multi-value dimensions.
final List<String> valueStrings =
((Collection<?>) value).stream()
.map(v -> (String) coerce(jsonMapper, context, v, sqlTypeName))
.collect(Collectors.toList());
try {
coercedValue = jsonMapper.writeValueAsString(valueStrings);
}
catch (IOException e) {
throw new RuntimeException(e);
}
} else {
throw new ISE("Cannot coerce [%s] to %s", value.getClass().getName(), sqlTypeName);
}
} else if (value == null) {
coercedValue = null;
} else if (sqlTypeName == SqlTypeName.DATE) {
return Calcites.jodaToCalciteDate(coerceDateTime(value, sqlTypeName), context.getTimeZone());
} else if (sqlTypeName == SqlTypeName.TIMESTAMP) {
return Calcites.jodaToCalciteTimestamp(coerceDateTime(value, sqlTypeName), context.getTimeZone());
} else if (sqlTypeName == SqlTypeName.BOOLEAN) {
if (value instanceof String) {
coercedValue = Evals.asBoolean(((String) value));
} else if (value instanceof Number) {
coercedValue = Evals.asBoolean(((Number) value).longValue());
} else {
throw new ISE("Cannot coerce [%s] to %s", value.getClass().getName(), sqlTypeName);
}
} else if (sqlTypeName == SqlTypeName.INTEGER) {
if (value instanceof String) {
coercedValue = Ints.tryParse((String) value);
} else if (value instanceof Number) {
coercedValue = ((Number) value).intValue();
} else {
throw new ISE("Cannot coerce [%s] to %s", value.getClass().getName(), sqlTypeName);
}
} else if (sqlTypeName == SqlTypeName.BIGINT) {
try {
coercedValue = DimensionHandlerUtils.convertObjectToLong(value);
}
catch (Exception e) {
throw new ISE("Cannot coerce [%s] to %s", value.getClass().getName(), sqlTypeName);
}
} else if (sqlTypeName == SqlTypeName.FLOAT) {
try {
coercedValue = DimensionHandlerUtils.convertObjectToFloat(value);
}
catch (Exception e) {
throw new ISE("Cannot coerce [%s] to %s", value.getClass().getName(), sqlTypeName);
}
} else if (SqlTypeName.FRACTIONAL_TYPES.contains(sqlTypeName)) {
try {
coercedValue = DimensionHandlerUtils.convertObjectToDouble(value);
}
catch (Exception e) {
throw new ISE("Cannot coerce [%s] to %s", value.getClass().getName(), sqlTypeName);
}
} else if (sqlTypeName == SqlTypeName.OTHER) {
// Complex type, try to serialize if we should, else print class name
if (context.isSerializeComplexValues()) {
try {
coercedValue = jsonMapper.writeValueAsString(value);
}
catch (JsonProcessingException jex) {
throw new ISE(jex, "Cannot coerce [%s] to %s", value.getClass().getName(), sqlTypeName);
}
} else {
coercedValue = value.getClass().getName();
}
} else if (sqlTypeName == SqlTypeName.ARRAY) {
if (context.isStringifyArrays()) {
if (value instanceof String) {
coercedValue = NullHandling.nullToEmptyIfNeeded((String) value);
} else if (value instanceof NlsString) {
coercedValue = ((NlsString) value).getValue();
} else {
try {
coercedValue = jsonMapper.writeValueAsString(value);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
} else {
// the protobuf jdbc handler prefers lists (it actually can't handle java arrays as sql arrays, only java lists)
// the json handler could handle this just fine, but it handles lists as sql arrays as well so just convert
// here if needed
coercedValue = maybeCoerceArrayToList(value, true);
if (coercedValue == null) {
throw new ISE("Cannot coerce [%s] to %s", value.getClass().getName(), sqlTypeName);
}
}
} else {
throw new ISE("Cannot coerce [%s] to %s", value.getClass().getName(), sqlTypeName);
}
return coercedValue;
}
@VisibleForTesting
static Object maybeCoerceArrayToList(Object value, boolean mustCoerce)
{
if (value instanceof List) {
return value;
} else if (value instanceof String[]) {
return Arrays.asList((String[]) value);
} else if (value instanceof Long[]) {
return Arrays.asList((Long[]) value);
} else if (value instanceof Double[]) {
return Arrays.asList((Double[]) value);
} else if (value instanceof Object[]) {
final Object[] array = (Object[]) value;
final ArrayList<Object> lst = new ArrayList<>(array.length);
for (Object o : array) {
lst.add(maybeCoerceArrayToList(o, false));
}
return lst;
} else if (value instanceof long[]) {
return Arrays.stream((long[]) value).boxed().collect(Collectors.toList());
} else if (value instanceof double[]) {
return Arrays.stream((double[]) value).boxed().collect(Collectors.toList());
} else if (value instanceof float[]) {
final float[] array = (float[]) value;
final ArrayList<Object> lst = new ArrayList<>(array.length);
for (float f : array) {
lst.add(f);
}
return lst;
} else if (value instanceof ComparableStringArray) {
return Arrays.asList(((ComparableStringArray) value).getDelegate());
} else if (value instanceof ComparableList) {
return ((ComparableList) value).getDelegate();
} else if (mustCoerce) {
return null;
}
return value;
}
private static DateTime coerceDateTime(Object value, SqlTypeName sqlType)
{
final DateTime dateTime;
if (value instanceof Number) {
dateTime = DateTimes.utc(((Number) value).longValue());
} else if (value instanceof String) {
dateTime = DateTimes.utc(Long.parseLong((String) value));
} else if (value instanceof DateTime) {
dateTime = (DateTime) value;
} else {
throw new ISE("Cannot coerce[%s] to %s", value.getClass().getName(), sqlType);
}
return dateTime;
}
/**
* Context for {@link #coerce(ObjectMapper, Context, Object, SqlTypeName)}
*/
public static class Context
{
private final DateTimeZone timeZone;
private final boolean serializeComplexValues;
private final boolean stringifyArrays;
@JsonCreator
public Context(
@JsonProperty("timeZone") final DateTimeZone timeZone,
@JsonProperty("serializeComplexValues") final boolean serializeComplexValues,
@JsonProperty("stringifyArrays") final boolean stringifyArrays
)
{
this.timeZone = timeZone;
this.serializeComplexValues = serializeComplexValues;
this.stringifyArrays = stringifyArrays;
}
public static Context fromPlannerContext(final PlannerContext plannerContext)
{
return new Context(
plannerContext.getTimeZone(),
plannerContext.getPlannerConfig().shouldSerializeComplexValues(),
plannerContext.isStringifyArrays()
);
}
@JsonProperty
public DateTimeZone getTimeZone()
{
return timeZone;
}
@JsonProperty
public boolean isSerializeComplexValues()
{
return serializeComplexValues;
}
@JsonProperty
public boolean isStringifyArrays()
{
return stringifyArrays;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Context context = (Context) o;
return serializeComplexValues == context.serializeComplexValues
&& stringifyArrays == context.stringifyArrays
&& Objects.equals(timeZone, context.timeZone);
}
@Override
public int hashCode()
{
return Objects.hash(timeZone, serializeComplexValues, stringifyArrays);
}
@Override
public String toString()
{
return "Context{" +
"timeZone=" + timeZone +
", serializeComplexValues=" + serializeComplexValues +
", stringifyArrays=" + stringifyArrays +
'}';
}
}
}

View File

@ -267,7 +267,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
public boolean cannotVectorize = false;
public boolean skipVectorize = false;
public boolean msqCompatible = false;
public boolean msqCompatible = true;
public QueryLogHook queryLogHook;
@ -1010,9 +1010,9 @@ public class BaseCalciteQueryTest extends CalciteTestBase
skipVectorize = true;
}
protected void msqCompatible()
protected void notMsqCompatible()
{
msqCompatible = true;
msqCompatible = false;
}
protected static boolean isRewriteJoinToFilter(final Map<String, Object> queryContext)

View File

@ -27,7 +27,7 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.List;
public class NativeQueryMakerTest
public class SqlResultsTest
{
@Test
@ -105,12 +105,12 @@ public class NativeQueryMakerTest
@Test
public void testMustCoerce()
{
Assert.assertNull(NativeQueryMaker.maybeCoerceArrayToList("hello", true));
Assert.assertNull(SqlResults.maybeCoerceArrayToList("hello", true));
}
private static void assertCoerced(Object expected, Object toCoerce, boolean mustCoerce)
{
Object coerced = NativeQueryMaker.maybeCoerceArrayToList(toCoerce, mustCoerce);
Object coerced = SqlResults.maybeCoerceArrayToList(toCoerce, mustCoerce);
Assert.assertEquals(expected, coerced);
}
}