From cb3a9d2b5778c568f8a6fad096b95bfd1f36bd21 Mon Sep 17 00:00:00 2001 From: Karan Kumar Date: Wed, 28 Jun 2023 17:51:58 +0530 Subject: [PATCH] Adding Interactive API's for MSQ engine (#14416) This PR aims to expose a new API called "@path("/druid/v2/sql/statements/")" which takes the same payload as the current "/druid/v2/sql" endpoint and allows users to fetch results in an async manner. --- .../apache/druid/msq/guice/SqlTaskModule.java | 9 +- .../druid/msq/indexing/MSQControllerTask.java | 18 +- .../error/InsertCannotBeEmptyFault.java | 2 +- .../druid/msq/sql/MSQTaskQueryMaker.java | 9 +- .../druid/msq/sql/MSQTaskSqlEngine.java | 4 +- .../druid/msq/sql/SqlStatementState.java | 42 + .../apache/druid/msq/sql/SqlTaskStatus.java | 1 + .../msq/sql/entity/ColumnNameAndTypes.java | 102 ++ .../msq/sql/entity/ResultSetInformation.java | 142 +++ .../msq/sql/entity/SqlStatementResult.java | 185 ++++ .../sql/resources/SqlStatementResource.java | 670 +++++++++++++ .../sql/{ => resources}/SqlTaskResource.java | 4 +- .../msq/util/SqlStatementResourceHelper.java | 285 ++++++ .../msq/indexing/MSQControllerTaskTest.java | 1 + .../indexing/report/MSQTaskReportTest.java | 2 +- .../sql/SqlMsqStatementResourcePostTest.java | 256 +++++ .../msq/sql/SqlStatementResourceTest.java | 947 ++++++++++++++++++ .../sql/entity/ColumnNameAndTypesTest.java | 53 + .../sql/entity/ResultSetInformationTest.java | 58 ++ .../sql/entity/SqlStatementResultTest.java | 95 ++ .../apache/druid/msq/test/MSQTestBase.java | 24 +- .../test/MSQTestOverlordServiceClient.java | 83 +- .../druid/indexing/common/task/IndexTask.java | 2 +- .../apache/druid/error/DruidException.java | 41 +- .../org/apache/druid/query/ExecutionMode.java | 35 + .../org/apache/druid/query/QueryContexts.java | 17 +- .../apache/druid/query/QueryContextsTest.java | 22 + .../druid/rpc/indexing/OverlordClient.java | 3 + .../rpc/indexing/OverlordClientImpl.java | 17 + .../client/indexing/NoopOverlordClient.java | 6 + .../rpc/indexing/OverlordClientImplTest.java | 58 ++ .../org/apache/druid/sql/DirectStatement.java | 4 +- .../druid/sql/calcite/util/CalciteTests.java | 34 + .../druid/sql/http/SqlResourceTest.java | 2 +- 34 files changed, 3170 insertions(+), 63 deletions(-) create mode 100644 extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SqlStatementState.java create mode 100644 extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/entity/ColumnNameAndTypes.java create mode 100644 extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/entity/ResultSetInformation.java create mode 100644 extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/entity/SqlStatementResult.java create mode 100644 extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java rename extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/{ => resources}/SqlTaskResource.java (98%) create mode 100644 extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java create mode 100644 extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlMsqStatementResourcePostTest.java create mode 100644 extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlStatementResourceTest.java create mode 100644 extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/entity/ColumnNameAndTypesTest.java create mode 100644 extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/entity/ResultSetInformationTest.java create mode 100644 extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/entity/SqlStatementResultTest.java create mode 100644 processing/src/main/java/org/apache/druid/query/ExecutionMode.java create mode 100644 server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/SqlTaskModule.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/SqlTaskModule.java index d8f8b06f9b9..bcd8abfbb87 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/SqlTaskModule.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/SqlTaskModule.java @@ -21,12 +21,16 @@ package org.apache.druid.msq.guice; import com.fasterxml.jackson.databind.Module; import com.google.inject.Binder; +import org.apache.druid.client.indexing.HttpIndexingServiceClient; +import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.discovery.NodeRole; import org.apache.druid.guice.Jerseys; +import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.annotations.LoadScope; import org.apache.druid.initialization.DruidModule; -import org.apache.druid.msq.sql.SqlTaskResource; +import org.apache.druid.msq.sql.resources.SqlStatementResource; +import org.apache.druid.msq.sql.resources.SqlTaskResource; import java.util.Collections; import java.util.List; @@ -43,6 +47,9 @@ public class SqlTaskModule implements DruidModule // Force eager initialization. LifecycleModule.register(binder, SqlTaskResource.class); Jerseys.addResource(binder, SqlTaskResource.class); + LifecycleModule.register(binder, SqlStatementResource.class); + binder.bind(IndexingServiceClient.class).to(HttpIndexingServiceClient.class).in(LazySingleton.class); + Jerseys.addResource(binder, SqlStatementResource.class); } @Override diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java index 3265f2640af..02280525738 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java @@ -30,6 +30,7 @@ import com.google.common.collect.ImmutableSet; import com.google.inject.Injector; import com.google.inject.Key; import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.client.indexing.ClientTaskQuery; import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskLock; @@ -49,6 +50,7 @@ import org.apache.druid.msq.exec.MSQTasks; import org.apache.druid.rpc.ServiceClientFactory; import org.apache.druid.rpc.StandardRetryPolicy; import org.apache.druid.rpc.indexing.OverlordClient; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.sql.calcite.run.SqlResults; import org.joda.time.Interval; @@ -60,7 +62,7 @@ import java.util.Map; import java.util.Set; @JsonTypeName(MSQControllerTask.TYPE) -public class MSQControllerTask extends AbstractTask +public class MSQControllerTask extends AbstractTask implements ClientTaskQuery { public static final String TYPE = "query_controller"; public static final String DUMMY_DATASOURCE_FOR_SELECT = "__query_select"; @@ -91,6 +93,9 @@ public class MSQControllerTask extends AbstractTask @Nullable private final List sqlTypeNames; + @Nullable + private final List nativeTypeNames; + // Using an Injector directly because tasks do not have a way to provide their own Guice modules. @JacksonInject private Injector injector; @@ -105,6 +110,7 @@ public class MSQControllerTask extends AbstractTask @JsonProperty("sqlQueryContext") @Nullable Map sqlQueryContext, @JsonProperty("sqlResultsContext") @Nullable SqlResults.Context sqlResultsContext, @JsonProperty("sqlTypeNames") @Nullable List sqlTypeNames, + @JsonProperty("nativeTypeNames") @Nullable List nativeTypeNames, @JsonProperty("context") @Nullable Map context ) { @@ -121,6 +127,7 @@ public class MSQControllerTask extends AbstractTask this.sqlQueryContext = sqlQueryContext; this.sqlResultsContext = sqlResultsContext; this.sqlTypeNames = sqlTypeNames; + this.nativeTypeNames = nativeTypeNames; addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true); } @@ -154,6 +161,15 @@ public class MSQControllerTask extends AbstractTask return sqlTypeNames; } + + @Nullable + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public List getNativeTypeNames() + { + return nativeTypeNames; + } + @Nullable @JsonProperty @JsonInclude(JsonInclude.Include.NON_NULL) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertCannotBeEmptyFault.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertCannotBeEmptyFault.java index 4285ace4180..7948e813615 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertCannotBeEmptyFault.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertCannotBeEmptyFault.java @@ -29,7 +29,7 @@ import java.util.Objects; @JsonTypeName(InsertCannotBeEmptyFault.CODE) public class InsertCannotBeEmptyFault extends BaseMSQFault { - static final String CODE = "InsertCannotBeEmpty"; + public static final String CODE = "InsertCannotBeEmpty"; private final String dataSource; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java index cde4906b193..c0de08a809f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java @@ -78,6 +78,8 @@ public class MSQTaskQueryMaker implements QueryMaker private static final String DESTINATION_DATASOURCE = "dataSource"; private static final String DESTINATION_REPORT = "taskReport"; + public static final String USER_KEY = "__user"; + private static final Granularity DEFAULT_SEGMENT_GRANULARITY = Granularities.ALL; private final String targetDataSource; @@ -116,6 +118,9 @@ public class MSQTaskQueryMaker implements QueryMaker // Native query context: sqlQueryContext plus things that we add prior to creating a controller task. final Map nativeQueryContext = new HashMap<>(sqlQueryContext.asMap()); + // adding user + nativeQueryContext.put(USER_KEY, plannerContext.getAuthenticationResult().getIdentity()); + final String msqMode = MultiStageQueryContext.getMSQMode(sqlQueryContext); if (msqMode != null) { MSQMode.populateDefaultQueryContext(msqMode, nativeQueryContext); @@ -174,6 +179,7 @@ public class MSQTaskQueryMaker implements QueryMaker finalizeAggregations ? null /* Not needed */ : buildAggregationIntermediateTypeMap(druidQuery); final List sqlTypeNames = new ArrayList<>(); + final List columnTypeList = new ArrayList<>(); final List columnMappings = QueryUtils.buildColumnMappings(fieldMapping, druidQuery); for (final Pair entry : fieldMapping) { @@ -187,8 +193,8 @@ public class MSQTaskQueryMaker implements QueryMaker } else { sqlTypeName = druidQuery.getOutputRowType().getFieldList().get(entry.getKey()).getType().getSqlTypeName(); } - sqlTypeNames.add(sqlTypeName); + columnTypeList.add(druidQuery.getOutputRowSignature().getColumnType(queryColumn).orElse(ColumnType.STRING)); } final MSQDestination destination; @@ -248,6 +254,7 @@ public class MSQTaskQueryMaker implements QueryMaker plannerContext.queryContextMap(), SqlResults.Context.fromPlannerContext(plannerContext), sqlTypeNames, + columnTypeList, null ); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java index 7d27103e0b9..c5fe182ea96 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java @@ -296,7 +296,9 @@ public class MSQTaskSqlEngine implements SqlEngine private static RelDataType getMSQStructType(RelDataTypeFactory typeFactory) { return typeFactory.createStructType( - ImmutableList.of(Calcites.createSqlType(typeFactory, SqlTypeName.VARCHAR)), + ImmutableList.of( + Calcites.createSqlType(typeFactory, SqlTypeName.VARCHAR) + ), TASK_STRUCT_FIELD_NAMES ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SqlStatementState.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SqlStatementState.java new file mode 100644 index 00000000000..e7f7da95d8e --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SqlStatementState.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.sql; + +import org.apache.druid.sql.http.SqlQuery; + +import javax.servlet.http.HttpServletRequest; + +/** + * Represents the status of the sql statements issues via + * {@link org.apache.druid.msq.sql.resources.SqlStatementResource#doPost(SqlQuery, HttpServletRequest)} and returned in + * {@link org.apache.druid.msq.sql.entity.SqlStatementResult} + */ +public enum SqlStatementState +{ + // The statement is accepted but not yes assigned any worker. In MSQ engine, the statement is in ACCEPTED state + // till the overlord assigns a TaskLocation to the controller task. + ACCEPTED, + // The statement is running. + RUNNING, + // The statement is successful. + SUCCESS, + // The statement failed. + FAILED +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SqlTaskStatus.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SqlTaskStatus.java index 9fa14ee9944..9c92a2589f2 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SqlTaskStatus.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SqlTaskStatus.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.error.ErrorResponse; import org.apache.druid.indexer.TaskState; +import org.apache.druid.msq.sql.resources.SqlTaskResource; import javax.annotation.Nullable; import java.util.Objects; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/entity/ColumnNameAndTypes.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/entity/ColumnNameAndTypes.java new file mode 100644 index 00000000000..3f7188c5d93 --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/entity/ColumnNameAndTypes.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.sql.entity; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +/** + * The column name and its sql {@link org.apache.calcite.sql.type.SqlTypeName} and its native {@link org.apache.druid.segment.column.ColumnType} + */ + +public class ColumnNameAndTypes +{ + + private final String colName; + private final String sqlTypeName; + + private final String nativeTypeName; + + @JsonCreator + public ColumnNameAndTypes( + @JsonProperty("name") String colName, + @JsonProperty("type") String sqlTypeName, + @JsonProperty("nativeType") String nativeTypeName + ) + { + + this.colName = colName; + this.sqlTypeName = sqlTypeName; + this.nativeTypeName = nativeTypeName; + } + + @JsonProperty("name") + public String getColName() + { + return colName; + } + + @JsonProperty("type") + public String getSqlTypeName() + { + return sqlTypeName; + } + + @JsonProperty("nativeType") + public String getNativeTypeName() + { + return nativeTypeName; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ColumnNameAndTypes that = (ColumnNameAndTypes) o; + return Objects.equals(colName, that.colName) + && Objects.equals(sqlTypeName, that.sqlTypeName) + && Objects.equals(nativeTypeName, that.nativeTypeName); + } + + @Override + public int hashCode() + { + return Objects.hash(colName, sqlTypeName, nativeTypeName); + } + + @Override + public String toString() + { + return "ColumnNameAndTypes{" + + "colName='" + colName + '\'' + + ", sqlTypeName='" + sqlTypeName + '\'' + + ", nativeTypeName='" + nativeTypeName + '\'' + + '}'; + } +} + + diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/entity/ResultSetInformation.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/entity/ResultSetInformation.java new file mode 100644 index 00000000000..43201fdac6e --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/entity/ResultSetInformation.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.sql.entity; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.sql.http.ResultFormat; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.Objects; + +public class ResultSetInformation +{ + + @Nullable + private final Long numRows; + @Nullable + private final Long sizeInBytes; + + @Nullable + private final ResultFormat resultFormat; + + @Nullable + private final List records; + + @Nullable + private final String dataSource; + + @JsonCreator + public ResultSetInformation( + @JsonProperty("resultFormat") @Nullable ResultFormat resultFormat, + @JsonProperty("numRows") @Nullable Long numRows, + @JsonProperty("sizeInBytes") @Nullable Long sizeInBytes, + @JsonProperty("dataSource") @Nullable String dataSource, + @JsonProperty("sampleRecords") @Nullable + List records + ) + { + this.numRows = numRows; + this.sizeInBytes = sizeInBytes; + this.resultFormat = resultFormat; + this.dataSource = dataSource; + this.records = records; + } + + @Nullable + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public Long getNumRows() + { + return numRows; + } + + @Nullable + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public Long getSizeInBytes() + { + return sizeInBytes; + } + + @JsonProperty + @Nullable + @JsonInclude(JsonInclude.Include.NON_NULL) + public ResultFormat getResultFormat() + { + return resultFormat; + } + + @JsonProperty + @Nullable + @JsonInclude(JsonInclude.Include.NON_NULL) + public String getDataSource() + { + return dataSource; + } + + @Nullable + @JsonProperty("sampleRecords") + @JsonInclude(JsonInclude.Include.NON_NULL) + public List getRecords() + { + return records; + } + + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ResultSetInformation that = (ResultSetInformation) o; + return Objects.equals(numRows, that.numRows) + && Objects.equals(sizeInBytes, that.sizeInBytes) + && resultFormat == that.resultFormat + && Objects.equals(records, that.records) + && Objects.equals(dataSource, that.dataSource); + } + + @Override + public int hashCode() + { + return Objects.hash(numRows, sizeInBytes, resultFormat, records, dataSource); + } + + @Override + public String toString() + { + return "ResultSetInformation{" + + "totalRows=" + numRows + + ", totalSize=" + sizeInBytes + + ", resultFormat=" + resultFormat + + ", records=" + records + + ", dataSource='" + dataSource + '\'' + + '}'; + } + +} + diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/entity/SqlStatementResult.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/entity/SqlStatementResult.java new file mode 100644 index 00000000000..de66550a587 --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/entity/SqlStatementResult.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.sql.entity; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.error.ErrorResponse; +import org.apache.druid.msq.sql.SqlStatementState; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.Objects; + +public class SqlStatementResult +{ + + private final String queryId; + + private final SqlStatementState state; + + private final DateTime createdAt; + + @Nullable + private final List sqlRowSignature; + + @Nullable + private final Long durationMs; + + @Nullable + private final ResultSetInformation resultSetInformation; + + @Nullable + private final ErrorResponse errorResponse; + + + @JsonCreator + public SqlStatementResult( + @JsonProperty("queryId") + String queryId, + @JsonProperty("state") + SqlStatementState state, + @JsonProperty("createdAt") + DateTime createdAt, + @Nullable @JsonProperty("schema") + List sqlRowSignature, + @Nullable @JsonProperty("durationMs") + Long durationMs, + @Nullable @JsonProperty("result") + ResultSetInformation resultSetInformation, + @Nullable @JsonProperty("errorDetails") + ErrorResponse errorResponse + + ) + { + this.queryId = queryId; + this.state = state; + this.createdAt = createdAt; + this.sqlRowSignature = sqlRowSignature; + this.durationMs = durationMs; + this.resultSetInformation = resultSetInformation; + this.errorResponse = errorResponse; + } + + @JsonProperty + public String getQueryId() + { + return queryId; + } + + @JsonProperty + public SqlStatementState getState() + { + return state; + } + + @JsonProperty + public DateTime getCreatedAt() + { + return createdAt; + } + + @JsonProperty("schema") + @Nullable + @JsonInclude(JsonInclude.Include.NON_NULL) + public List getSqlRowSignature() + { + return sqlRowSignature; + } + + @JsonProperty + @Nullable + @JsonInclude(JsonInclude.Include.NON_NULL) + public Long getDurationMs() + { + return durationMs; + } + + @JsonProperty("result") + @Nullable + @JsonInclude(JsonInclude.Include.NON_NULL) + public ResultSetInformation getResultSetInformation() + { + return resultSetInformation; + } + + @JsonProperty("errorDetails") + @Nullable + @JsonInclude(JsonInclude.Include.NON_NULL) + public ErrorResponse getErrorResponse() + { + return errorResponse; + } + + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SqlStatementResult that = (SqlStatementResult) o; + return Objects.equals(queryId, that.queryId) && state == that.state && Objects.equals( + createdAt, + that.createdAt + ) && Objects.equals(sqlRowSignature, that.sqlRowSignature) && Objects.equals( + durationMs, + that.durationMs + ) && Objects.equals(resultSetInformation, that.resultSetInformation) && Objects.equals( + errorResponse == null ? null : errorResponse.getAsMap(), + that.errorResponse == null ? null : that.errorResponse.getAsMap() + ); + } + + @Override + public int hashCode() + { + return Objects.hash( + queryId, + state, + createdAt, + sqlRowSignature, + durationMs, + resultSetInformation, + errorResponse == null ? null : errorResponse.getAsMap() + ); + } + + @Override + public String toString() + { + return "SqlStatementResult{" + + "queryId='" + queryId + '\'' + + ", state=" + state + + ", createdAt=" + createdAt + + ", sqlRowSignature=" + sqlRowSignature + + ", durationInMs=" + durationMs + + ", resultSetInformation=" + resultSetInformation + + ", errorResponse=" + (errorResponse == null + ? "{}" + : errorResponse.getAsMap().toString()) + + '}'; + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java new file mode 100644 index 00000000000..6ce5c780052 --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java @@ -0,0 +1,670 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.sql.resources; + + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.CountingOutputStream; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.inject.Inject; +import org.apache.druid.client.indexing.TaskPayloadResponse; +import org.apache.druid.client.indexing.TaskStatusResponse; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.ErrorResponse; +import org.apache.druid.error.QueryExceptionCompat; +import org.apache.druid.guice.annotations.MSQ; +import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.common.guava.Yielders; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.msq.indexing.MSQControllerTask; +import org.apache.druid.msq.sql.MSQTaskQueryMaker; +import org.apache.druid.msq.sql.MSQTaskSqlEngine; +import org.apache.druid.msq.sql.SqlStatementState; +import org.apache.druid.msq.sql.entity.ColumnNameAndTypes; +import org.apache.druid.msq.sql.entity.ResultSetInformation; +import org.apache.druid.msq.sql.entity.SqlStatementResult; +import org.apache.druid.msq.util.SqlStatementResourceHelper; +import org.apache.druid.query.ExecutionMode; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.QueryException; +import org.apache.druid.rpc.indexing.OverlordClient; +import org.apache.druid.server.QueryResponse; +import org.apache.druid.server.security.Access; +import org.apache.druid.server.security.AuthenticationResult; +import org.apache.druid.server.security.AuthorizationUtils; +import org.apache.druid.server.security.AuthorizerMapper; +import org.apache.druid.server.security.ForbiddenException; +import org.apache.druid.sql.DirectStatement; +import org.apache.druid.sql.HttpStatement; +import org.apache.druid.sql.SqlRowTransformer; +import org.apache.druid.sql.SqlStatementFactory; +import org.apache.druid.sql.http.ResultFormat; +import org.apache.druid.sql.http.SqlQuery; +import org.apache.druid.sql.http.SqlResource; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; + + +@Path("/druid/v2/sql/statements/") +public class SqlStatementResource +{ + + private static final Logger log = new Logger(SqlStatementResource.class); + private final SqlStatementFactory msqSqlStatementFactory; + private final AuthorizerMapper authorizerMapper; + private final ObjectMapper jsonMapper; + private final OverlordClient overlordClient; + + + @Inject + public SqlStatementResource( + final @MSQ SqlStatementFactory msqSqlStatementFactory, + final AuthorizerMapper authorizerMapper, + final ObjectMapper jsonMapper, + final OverlordClient overlordClient + ) + { + this.msqSqlStatementFactory = msqSqlStatementFactory; + this.authorizerMapper = authorizerMapper; + this.jsonMapper = jsonMapper; + this.overlordClient = overlordClient; + } + + + @GET + @Path("/enabled") + @Produces(MediaType.APPLICATION_JSON) + public Response isEnabled(@Context final HttpServletRequest request) + { + // All authenticated users are authorized for this API: check an empty resource list. + final Access authResult = AuthorizationUtils.authorizeAllResourceActions( + request, + Collections.emptyList(), + authorizerMapper + ); + + if (!authResult.isAllowed()) { + throw new ForbiddenException(authResult.toString()); + } + + return Response.ok(ImmutableMap.of("enabled", true)).build(); + } + + @POST + @Produces(MediaType.APPLICATION_JSON) + @Consumes(MediaType.APPLICATION_JSON) + public Response doPost(final SqlQuery sqlQuery, @Context final HttpServletRequest req) + { + final HttpStatement stmt = msqSqlStatementFactory.httpStatement(sqlQuery, req); + final String sqlQueryId = stmt.sqlQueryId(); + final String currThreadName = Thread.currentThread().getName(); + try { + ExecutionMode executionMode = QueryContexts.getAsEnum( + QueryContexts.CTX_EXECUTION_MODE, + sqlQuery.getContext().get(QueryContexts.CTX_EXECUTION_MODE), + ExecutionMode.class + ); + if (ExecutionMode.ASYNC != executionMode) { + return buildNonOkResponse( + DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build( + StringUtils.format( + "The statement sql api only supports sync mode[%s]. Please set context parameter [%s=%s] in the context payload", + ExecutionMode.ASYNC, + QueryContexts.CTX_EXECUTION_MODE, + ExecutionMode.ASYNC + ) + ) + ); + } + + + Thread.currentThread().setName(StringUtils.format("statement_sql[%s]", sqlQueryId)); + + final DirectStatement.ResultSet plan = stmt.plan(); + // in case the engine is async, the query is not run yet. We just return the taskID in case of non explain queries. + final QueryResponse response = plan.run(); + final Sequence sequence = response.getResults(); + final SqlRowTransformer rowTransformer = plan.createRowTransformer(); + + final boolean isTaskStruct = MSQTaskSqlEngine.TASK_STRUCT_FIELD_NAMES.equals(rowTransformer.getFieldList()); + + if (isTaskStruct) { + return buildTaskResponse(sequence, stmt.query().authResult().getIdentity()); + } else { + // Used for EXPLAIN + return buildStandardResponse(sequence, sqlQuery, sqlQueryId, rowTransformer); + } + } + catch (DruidException e) { + stmt.reporter().failed(e); + return buildNonOkResponse(e); + } + catch (QueryException queryException) { + stmt.reporter().failed(queryException); + final DruidException underlyingException = DruidException.fromFailure(new QueryExceptionCompat(queryException)); + return buildNonOkResponse(underlyingException); + } + catch (ForbiddenException e) { + log.debug("Got forbidden request for reason [%s]", e.getErrorMessage()); + return buildNonOkResponse( + DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.FORBIDDEN) + .build(Access.DEFAULT_ERROR_MESSAGE) + ); + } + // Calcite throws java.lang.AssertionError at various points in planning/validation. + catch (AssertionError | Exception e) { + stmt.reporter().failed(e); + log.noStackTrace().warn(e, "Failed to handle query: %s", sqlQueryId); + + return buildNonOkResponse( + DruidException.forPersona(DruidException.Persona.DEVELOPER) + .ofCategory(DruidException.Category.UNCATEGORIZED) + .build(e.getMessage()) + ); + } + finally { + stmt.close(); + Thread.currentThread().setName(currThreadName); + } + } + + + @GET + @Path("/{id}") + @Produces(MediaType.APPLICATION_JSON) + public Response doGetStatus( + @PathParam("id") final String queryId, @Context final HttpServletRequest req + ) + { + try { + Access authResult = AuthorizationUtils.authorizeAllResourceActions( + req, + Collections.emptyList(), + authorizerMapper + ); + if (!authResult.isAllowed()) { + throw new ForbiddenException(authResult.toString()); + } + final AuthenticationResult authenticationResult = AuthorizationUtils.authenticationResultFromRequest(req); + + Optional sqlStatementResult = getStatementStatus( + queryId, + authenticationResult.getIdentity(), + true + ); + + if (sqlStatementResult.isPresent()) { + return Response.ok().entity(sqlStatementResult.get()).build(); + } else { + return Response.status(Response.Status.NOT_FOUND).build(); + } + } + catch (DruidException e) { + return buildNonOkResponse(e); + } + catch (ForbiddenException e) { + log.debug("Got forbidden request for reason [%s]", e.getErrorMessage()); + return buildNonOkResponse( + DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.FORBIDDEN) + .build(Access.DEFAULT_ERROR_MESSAGE) + ); + } + catch (Exception e) { + log.noStackTrace().warn(e, "Failed to handle query: %s", queryId); + return buildNonOkResponse(DruidException.forPersona(DruidException.Persona.DEVELOPER) + .ofCategory(DruidException.Category.UNCATEGORIZED) + .build(e, "Failed to handle query: [%s]", queryId)); + } + } + + @GET + @Path("/{id}/results") + @Produces(MediaType.APPLICATION_JSON) + public Response doGetResults( + @PathParam("id") final String queryId, + @QueryParam("offset") Long offset, + @QueryParam("numRows") Long numberOfRows, + @Context final HttpServletRequest req + ) + { + try { + Access authResult = AuthorizationUtils.authorizeAllResourceActions( + req, + Collections.emptyList(), + authorizerMapper + ); + if (!authResult.isAllowed()) { + throw new ForbiddenException(authResult.toString()); + } + final AuthenticationResult authenticationResult = AuthorizationUtils.authenticationResultFromRequest(req); + + if (offset != null && offset < 0) { + return buildNonOkResponse( + DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build( + "offset cannot be negative. Please pass a positive number." + ) + ); + } + if (numberOfRows != null && numberOfRows < 0) { + return buildNonOkResponse( + DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build( + "numRows cannot be negative. Please pass a positive number." + ) + ); + } + + final long start = offset == null ? 0 : offset; + final long last = SqlStatementResourceHelper.getLastIndex(numberOfRows, start); + + TaskStatusResponse taskResponse = contactOverlord(overlordClient.taskStatus(queryId)); + if (taskResponse == null) { + return Response.status(Response.Status.NOT_FOUND).build(); + } + + TaskStatusPlus statusPlus = taskResponse.getStatus(); + if (statusPlus == null || !MSQControllerTask.TYPE.equals(statusPlus.getType())) { + return Response.status(Response.Status.NOT_FOUND).build(); + } + + MSQControllerTask msqControllerTask = getMSQControllerTaskOrThrow(queryId, authenticationResult.getIdentity()); + SqlStatementState sqlStatementState = SqlStatementResourceHelper.getSqlStatementState(statusPlus); + + if (sqlStatementState == SqlStatementState.RUNNING || sqlStatementState == SqlStatementState.ACCEPTED) { + return buildNonOkResponse( + DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build( + "Query[%s] is currently in [%s] state. Please wait for it to complete.", + queryId, + sqlStatementState + ) + ); + } else if (sqlStatementState == SqlStatementState.FAILED) { + return buildNonOkResponse( + DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build( + "Query[%s] failed. Hit status api for more details.", + queryId + ) + ); + } else { + Optional> signature = SqlStatementResourceHelper.getSignature(msqControllerTask); + if (!signature.isPresent()) { + return Response.ok().build(); + } + Optional> results = SqlStatementResourceHelper.getResults(SqlStatementResourceHelper.getPayload( + contactOverlord(overlordClient.taskReportAsMap(queryId)))); + + return Response.ok((StreamingOutput) outputStream -> { + CountingOutputStream os = new CountingOutputStream(outputStream); + + try (final ResultFormat.Writer writer = ResultFormat.OBJECT.createFormatter(os, jsonMapper)) { + List rowSignature = signature.get(); + writer.writeResponseStart(); + + for (long k = start; k < Math.min(last, results.get().size()); k++) { + writer.writeRowStart(); + for (int i = 0; i < rowSignature.size(); i++) { + writer.writeRowField( + rowSignature.get(i).getColName(), + ((List) results.get().get(Math.toIntExact(k))).get(i) + ); + } + writer.writeRowEnd(); + } + + writer.writeResponseEnd(); + } + catch (Exception e) { + log.error(e, "Unable to stream results back for query[%s]", queryId); + throw new ISE(e, "Unable to stream results back for query[%s]", queryId); + } + }).build(); + + } + } + catch (DruidException e) { + return buildNonOkResponse(e); + } + catch (ForbiddenException e) { + log.debug("Got forbidden request for reason [%s]", e.getErrorMessage()); + return buildNonOkResponse( + DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.FORBIDDEN) + .build(Access.DEFAULT_ERROR_MESSAGE) + ); + } + catch (Exception e) { + log.noStackTrace().warn(e, "Failed to handle query: %s", queryId); + return buildNonOkResponse(DruidException.forPersona(DruidException.Persona.DEVELOPER) + .ofCategory(DruidException.Category.UNCATEGORIZED) + .build(e, "Failed to handle query: [%s]", queryId)); + } + } + + /** + * Queries can be canceled while in any {@link SqlStatementState}. Canceling a query that has already completed will be a no-op. + * + * @param queryId queryId + * @param req httpServletRequest + * @return HTTP 404 if the query ID does not exist,expired or originated by different user. HTTP 202 if the deletion + * request has been accepted. + */ + @DELETE + @Path("/{id}") + @Produces(MediaType.APPLICATION_JSON) + public Response deleteQuery(@PathParam("id") final String queryId, @Context final HttpServletRequest req) + { + + try { + Access authResult = AuthorizationUtils.authorizeAllResourceActions( + req, + Collections.emptyList(), + authorizerMapper + ); + if (!authResult.isAllowed()) { + throw new ForbiddenException(authResult.toString()); + } + final AuthenticationResult authenticationResult = AuthorizationUtils.authenticationResultFromRequest(req); + + Optional sqlStatementResult = getStatementStatus( + queryId, + authenticationResult.getIdentity(), + false + ); + if (sqlStatementResult.isPresent()) { + switch (sqlStatementResult.get().getState()) { + case ACCEPTED: + case RUNNING: + overlordClient.cancelTask(queryId); + return Response.status(Response.Status.ACCEPTED).build(); + case SUCCESS: + case FAILED: + // we would also want to clean up the results in the future. + return Response.ok().build(); + default: + throw new ISE("Illegal State[%s] encountered", sqlStatementResult.get().getState()); + } + + } else { + return Response.status(Response.Status.NOT_FOUND).build(); + } + } + catch (DruidException e) { + return buildNonOkResponse(e); + } + catch (ForbiddenException e) { + log.debug("Got forbidden request for reason [%s]", e.getErrorMessage()); + return buildNonOkResponse( + DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.FORBIDDEN) + .build(Access.DEFAULT_ERROR_MESSAGE) + ); + } + catch (Exception e) { + log.noStackTrace().warn(e, "Failed to handle query: %s", queryId); + return buildNonOkResponse(DruidException.forPersona(DruidException.Persona.DEVELOPER) + .ofCategory(DruidException.Category.UNCATEGORIZED) + .build(e, "Failed to handle query: [%s]", queryId)); + } + } + + private Response buildStandardResponse( + Sequence sequence, + SqlQuery sqlQuery, + String sqlQueryId, + SqlRowTransformer rowTransformer + ) throws IOException + { + final Yielder yielder0 = Yielders.each(sequence); + + try { + final Response.ResponseBuilder responseBuilder = Response.ok((StreamingOutput) outputStream -> { + CountingOutputStream os = new CountingOutputStream(outputStream); + Yielder yielder = yielder0; + + try (final ResultFormat.Writer writer = sqlQuery.getResultFormat().createFormatter(os, jsonMapper)) { + writer.writeResponseStart(); + + if (sqlQuery.includeHeader()) { + writer.writeHeader( + rowTransformer.getRowType(), + sqlQuery.includeTypesHeader(), + sqlQuery.includeSqlTypesHeader() + ); + } + + while (!yielder.isDone()) { + final Object[] row = yielder.get(); + writer.writeRowStart(); + for (int i = 0; i < rowTransformer.getFieldList().size(); i++) { + final Object value = rowTransformer.transform(row, i); + writer.writeRowField(rowTransformer.getFieldList().get(i), value); + } + writer.writeRowEnd(); + yielder = yielder.next(null); + } + + writer.writeResponseEnd(); + } + catch (Exception e) { + log.error(e, "Unable to send SQL response [%s]", sqlQueryId); + throw new RuntimeException(e); + } + finally { + yielder.close(); + } + }); + + if (sqlQuery.includeHeader()) { + responseBuilder.header(SqlResource.SQL_HEADER_RESPONSE_HEADER, SqlResource.SQL_HEADER_VALUE); + } + + return responseBuilder.build(); + } + catch (Throwable e) { + // make sure to close yielder if anything happened before starting to serialize the response. + yielder0.close(); + throw e; + } + } + + private Response buildTaskResponse(Sequence sequence, String user) + { + List rows = sequence.toList(); + int numRows = rows.size(); + if (numRows != 1) { + throw new RE("Expected a single row but got [%d] rows. Please check broker logs for more information.", numRows); + } + Object[] firstRow = rows.get(0); + if (firstRow == null || firstRow.length != 1) { + throw new RE( + "Expected a single column but got [%s] columns. Please check broker logs for more information.", + firstRow == null ? 0 : firstRow.length + ); + } + String taskId = String.valueOf(firstRow[0]); + + Optional statementResult = getStatementStatus(taskId, user, true); + + if (statementResult.isPresent()) { + return Response.status(Response.Status.OK).entity(statementResult.get()).build(); + } else { + return buildNonOkResponse( + DruidException.forPersona(DruidException.Persona.DEVELOPER) + .ofCategory(DruidException.Category.DEFENSIVE).build( + "Unable to find associated task for query id [%s]. Contact cluster admin to check overlord logs for [%s]", + taskId, + taskId + ) + ); + } + } + + private Response buildNonOkResponse(DruidException exception) + { + return Response + .status(exception.getStatusCode()) + .entity(new ErrorResponse(exception)) + .build(); + } + + private Optional getSampleResults( + String asyncResultId, + boolean isSelectQuery, + String dataSource, + SqlStatementState sqlStatementState + ) + { + if (sqlStatementState == SqlStatementState.SUCCESS) { + Map payload = SqlStatementResourceHelper.getPayload(contactOverlord(overlordClient.taskReportAsMap( + asyncResultId))); + Optional> rowsAndSize = SqlStatementResourceHelper.getRowsAndSizeFromPayload( + payload, + isSelectQuery + ); + return Optional.of(new ResultSetInformation( + null, + // since the rows can be sampled, get the number of rows from counters + rowsAndSize.orElse(new Pair<>(null, null)).lhs, + rowsAndSize.orElse(new Pair<>(null, null)).rhs, + dataSource, + // only populate sample results in case a select query is successful + isSelectQuery ? SqlStatementResourceHelper.getResults(payload).orElse(null) : null + )); + } else { + return Optional.empty(); + } + } + + + private Optional getStatementStatus(String queryId, String currentUser, boolean withResults) + throws DruidException + { + TaskStatusResponse taskResponse = contactOverlord(overlordClient.taskStatus(queryId)); + if (taskResponse == null) { + return Optional.empty(); + } + + TaskStatusPlus statusPlus = taskResponse.getStatus(); + if (statusPlus == null || !MSQControllerTask.TYPE.equals(statusPlus.getType())) { + return Optional.empty(); + } + + // since we need the controller payload for auth checks. + MSQControllerTask msqControllerTask = getMSQControllerTaskOrThrow(queryId, currentUser); + SqlStatementState sqlStatementState = SqlStatementResourceHelper.getSqlStatementState(statusPlus); + + if (SqlStatementState.FAILED == sqlStatementState) { + return SqlStatementResourceHelper.getExceptionPayload( + queryId, + taskResponse, + statusPlus, + sqlStatementState, + contactOverlord(overlordClient.taskReportAsMap( + queryId)) + ); + } else { + Optional> signature = SqlStatementResourceHelper.getSignature(msqControllerTask); + return Optional.of(new SqlStatementResult( + queryId, + sqlStatementState, + taskResponse.getStatus().getCreatedTime(), + signature.orElse(null), + taskResponse.getStatus().getDuration(), + withResults ? getSampleResults( + queryId, + signature.isPresent(), + msqControllerTask.getDataSource(), + sqlStatementState + ).orElse(null) : null, + null + )); + } + } + + + private MSQControllerTask getMSQControllerTaskOrThrow(String queryId, String currentUser) + { + TaskPayloadResponse taskPayloadResponse = contactOverlord(overlordClient.taskPayload(queryId)); + SqlStatementResourceHelper.isMSQPayload(taskPayloadResponse, queryId); + + MSQControllerTask msqControllerTask = (MSQControllerTask) taskPayloadResponse.getPayload(); + String queryUser = String.valueOf(msqControllerTask.getQuerySpec() + .getQuery() + .getContext() + .get(MSQTaskQueryMaker.USER_KEY)); + if (currentUser == null || !currentUser.equals(queryUser)) { + throw new ForbiddenException(StringUtils.format( + "The current user[%s] cannot view query id[%s] since the query is owned by user[%s]", + currentUser, + queryId, + queryUser + )); + } + return msqControllerTask; + } + + private T contactOverlord(final ListenableFuture future) + { + try { + return FutureUtils.getUnchecked(future, true); + } + catch (RuntimeException e) { + throw DruidException.forPersona(DruidException.Persona.DEVELOPER) + .ofCategory(DruidException.Category.UNCATEGORIZED) + .build("Unable to contact overlord " + e.getMessage()); + } + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SqlTaskResource.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlTaskResource.java similarity index 98% rename from extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SqlTaskResource.java rename to extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlTaskResource.java index dd4ec4d2b3c..ecabbd0fa5c 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SqlTaskResource.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlTaskResource.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.msq.sql; +package org.apache.druid.msq.sql.resources; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; @@ -32,6 +32,8 @@ import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.common.guava.Yielders; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.msq.sql.MSQTaskSqlEngine; +import org.apache.druid.msq.sql.SqlTaskStatus; import org.apache.druid.query.QueryException; import org.apache.druid.server.QueryResponse; import org.apache.druid.server.initialization.ServerConfig; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java new file mode 100644 index 00000000000..4c0474e1f93 --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.druid.msq.util; + +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.client.indexing.TaskPayloadResponse; +import org.apache.druid.client.indexing.TaskStatusResponse; +import org.apache.druid.error.DruidException; +import org.apache.druid.indexer.TaskLocation; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.msq.indexing.MSQControllerTask; +import org.apache.druid.msq.indexing.TaskReportMSQDestination; +import org.apache.druid.msq.sql.SqlStatementState; +import org.apache.druid.msq.sql.entity.ColumnNameAndTypes; +import org.apache.druid.msq.sql.entity.SqlStatementResult; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.sql.calcite.planner.ColumnMappings; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class SqlStatementResourceHelper +{ + public static Optional> getSignature( + MSQControllerTask msqControllerTask + ) + { + // only populate signature for select q's + if (msqControllerTask.getQuerySpec().getDestination().getClass() == TaskReportMSQDestination.class) { + ColumnMappings columnMappings = msqControllerTask.getQuerySpec().getColumnMappings(); + List sqlTypeNames = msqControllerTask.getSqlTypeNames(); + if (sqlTypeNames == null || sqlTypeNames.size() != columnMappings.size()) { + return Optional.empty(); + } + List nativeTypeNames = msqControllerTask.getNativeTypeNames(); + if (nativeTypeNames == null || nativeTypeNames.size() != columnMappings.size()) { + return Optional.empty(); + } + List signature = new ArrayList<>(columnMappings.size()); + int index = 0; + for (String colName : columnMappings.getOutputColumnNames()) { + signature.add(new ColumnNameAndTypes( + colName, + sqlTypeNames.get(index).getName(), + nativeTypeNames.get(index).asTypeString() + )); + index++; + } + return Optional.of(signature); + } + return Optional.empty(); + } + + + public static void isMSQPayload(TaskPayloadResponse taskPayloadResponse, String queryId) throws DruidException + { + if (taskPayloadResponse == null || taskPayloadResponse.getPayload() == null) { + throw DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build( + "Query[%s] not found", queryId); + } + + if (MSQControllerTask.class != taskPayloadResponse.getPayload().getClass()) { + throw DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build( + "Query[%s] not found", queryId); + } + } + + public static SqlStatementState getSqlStatementState(TaskStatusPlus taskStatusPlus) + { + TaskState state = taskStatusPlus.getStatusCode(); + if (state == null) { + return SqlStatementState.ACCEPTED; + } + + switch (state) { + case FAILED: + return SqlStatementState.FAILED; + case RUNNING: + if (TaskLocation.unknown().equals(taskStatusPlus.getLocation())) { + return SqlStatementState.ACCEPTED; + } else { + return SqlStatementState.RUNNING; + } + case SUCCESS: + return SqlStatementState.SUCCESS; + default: + throw new ISE("Unrecognized state[%s] found.", state); + } + } + + @SuppressWarnings("unchecked") + + + public static long getLastIndex(Long numberOfRows, long start) + { + final long last; + if (numberOfRows == null) { + last = Long.MAX_VALUE; + } else { + long finalIndex; + try { + finalIndex = Math.addExact(start, numberOfRows); + } + catch (ArithmeticException e) { + finalIndex = Long.MAX_VALUE; + } + last = finalIndex; + } + return last; + } + + public static Optional> getRowsAndSizeFromPayload(Map payload, boolean isSelectQuery) + { + List stages = getList(payload, "stages"); + if (stages == null || stages.isEmpty()) { + return Optional.empty(); + } else { + int maxStage = stages.size() - 1; // Last stage output is the total number of rows returned to the end user. + Map counterMap = getMap(getMap(payload, "counters"), String.valueOf(maxStage)); + long rows = -1L; + long sizeInBytes = -1L; + if (counterMap == null) { + return Optional.empty(); + } + for (Map.Entry worker : counterMap.entrySet()) { + Object workerChannels = worker.getValue(); + if (workerChannels == null || !(workerChannels instanceof Map)) { + return Optional.empty(); + } + if (isSelectQuery) { + Object output = ((Map) workerChannels).get("output"); + if (output != null && output instanceof Map) { + List rowsPerChannel = (List) ((Map) output).get("rows"); + List bytesPerChannel = (List) ((Map) output).get("bytes"); + for (Integer row : rowsPerChannel) { + rows = rows + row; + } + for (Integer bytes : bytesPerChannel) { + sizeInBytes = sizeInBytes + bytes; + } + } + } else { + Object output = ((Map) workerChannels).get("segmentGenerationProgress"); + if (output != null && output instanceof Map) { + rows += (Integer) ((Map) output).get("rowsPushed"); + } + } + } + + return Optional.of(new Pair<>(rows == -1L ? null : rows + 1, sizeInBytes == -1L ? null : sizeInBytes + 1)); + } + } + + + public static Optional getExceptionPayload( + String queryId, + TaskStatusResponse taskResponse, + TaskStatusPlus statusPlus, + SqlStatementState sqlStatementState, + Map msqPayload + ) + { + Map exceptionDetails = getQueryExceptionDetails(getPayload(msqPayload)); + Map exception = getMap(exceptionDetails, "error"); + if (exceptionDetails == null || exception == null) { + return Optional.of(new SqlStatementResult( + queryId, + sqlStatementState, + taskResponse.getStatus().getCreatedTime(), + null, + taskResponse.getStatus().getDuration(), + null, + DruidException.forPersona(DruidException.Persona.DEVELOPER) + .ofCategory(DruidException.Category.UNCATEGORIZED) + .build(taskResponse.getStatus().getErrorMsg()).toErrorResponse() + )); + } + + final String errorMessage = String.valueOf(exception.getOrDefault("errorMessage", statusPlus.getErrorMsg())); + exception.remove("errorMessage"); + String errorCode = String.valueOf(exception.getOrDefault("errorCode", "unknown")); + exception.remove("errorCode"); + Map stringException = new HashMap<>(); + for (Map.Entry exceptionKeys : exception.entrySet()) { + stringException.put(exceptionKeys.getKey(), String.valueOf(exceptionKeys.getValue())); + } + return Optional.of(new SqlStatementResult( + queryId, + sqlStatementState, + taskResponse.getStatus().getCreatedTime(), + null, + taskResponse.getStatus().getDuration(), + null, + DruidException.fromFailure(new DruidException.Failure(errorCode) + { + @Override + protected DruidException makeException(DruidException.DruidExceptionBuilder bob) + { + DruidException ex = bob.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.UNCATEGORIZED) + .build(errorMessage); + ex.withContext(stringException); + return ex; + } + }).toErrorResponse() + )); + } + + public static Map getQueryExceptionDetails(Map payload) + { + return getMap(getMap(payload, "status"), "errorReport"); + } + + public static Map getMap(Map map, String key) + { + if (map == null) { + return null; + } + return (Map) map.get(key); + } + + @SuppressWarnings("rawtypes") + public static List getList(Map map, String key) + { + if (map == null) { + return null; + } + return (List) map.get(key); + } + + /** + * Get results from report + */ + @SuppressWarnings("unchecked") + public static Optional> getResults(Map payload) + { + Map resultsHolder = getMap(payload, "results"); + + if (resultsHolder == null) { + return Optional.empty(); + } + + List data = (List) resultsHolder.get("results"); + List rows = new ArrayList<>(); + if (data != null) { + rows.addAll(data); + } + return Optional.of(rows); + } + + public static Map getPayload(Map results) + { + Map msqReport = getMap(results, "multiStageQuery"); + Map payload = getMap(msqReport, "payload"); + return payload; + } +} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java index e1c5c8bf53d..33e0d8c2486 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java @@ -65,6 +65,7 @@ public class MSQControllerTaskTest null, null, null, + null, null); Assert.assertTrue(msqWorkerTask.getInputSourceResources().isEmpty()); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java index 6c111c7fd80..b226583922d 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java @@ -58,7 +58,7 @@ public class MSQTaskReportTest { private static final String TASK_ID = "mytask"; private static final String HOST = "example.com:1234"; - private static final QueryDefinition QUERY_DEFINITION = + public static final QueryDefinition QUERY_DEFINITION = QueryDefinition .builder() .add( diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlMsqStatementResourcePostTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlMsqStatementResourcePostTest.java new file mode 100644 index 00000000000..ceae64dcf70 --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlMsqStatementResourcePostTest.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.sql; + + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.msq.indexing.MSQControllerTask; +import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault; +import org.apache.druid.msq.indexing.error.MSQException; +import org.apache.druid.msq.sql.entity.ColumnNameAndTypes; +import org.apache.druid.msq.sql.entity.ResultSetInformation; +import org.apache.druid.msq.sql.entity.SqlStatementResult; +import org.apache.druid.msq.sql.resources.SqlStatementResource; +import org.apache.druid.msq.test.MSQTestBase; +import org.apache.druid.msq.test.MSQTestOverlordServiceClient; +import org.apache.druid.query.ExecutionMode; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.sql.calcite.util.CalciteTests; +import org.apache.druid.sql.http.SqlQuery; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class SqlMsqStatementResourcePostTest extends MSQTestBase +{ + private SqlStatementResource resource; + + @Before + public void init() + { + resource = new SqlStatementResource( + sqlStatementFactory, + CalciteTests.TEST_AUTHORIZER_MAPPER, + objectMapper, + indexingServiceClient + ); + } + + @Test + public void testMSQSelectQueryTest() throws IOException + { + List results = ImmutableList.of( + new Object[]{1L, ""}, + new Object[]{ + 1L, + "10.1" + }, + new Object[]{1L, "2"}, + new Object[]{1L, "1"}, + new Object[]{1L, "def"}, + new Object[]{1L, "abc"} + ); + + Response response = resource.doPost(new SqlQuery( + "select cnt,dim1 from foo", + null, + false, + false, + false, + defaultAsyncContext(), + null + ), SqlStatementResourceTest.makeOkRequest()); + + + Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + + String taskId = ((SqlStatementResult) response.getEntity()).getQueryId(); + + SqlStatementResult expected = + new SqlStatementResult(taskId, SqlStatementState.SUCCESS, + MSQTestOverlordServiceClient.CREATED_TIME, + ImmutableList.of( + new ColumnNameAndTypes( + "cnt", + SqlTypeName.BIGINT.getName(), + ValueType.LONG.name() + ), + new ColumnNameAndTypes( + "dim1", + SqlTypeName.VARCHAR.getName(), + ValueType.STRING.name() + ) + ), + MSQTestOverlordServiceClient.DURATION, + new ResultSetInformation( + null, + 6L, + 316L, + MSQControllerTask.DUMMY_DATASOURCE_FOR_SELECT, + objectMapper.readValue( + objectMapper.writeValueAsString( + results), + new TypeReference>() + { + } + ) + ), + null + ); + + Assert.assertEquals(expected, response.getEntity()); + } + + + @Test + public void nonSupportedModes() + { + for (ImmutableMap context : ImmutableList.of(ImmutableMap.of( + QueryContexts.CTX_EXECUTION_MODE, + ExecutionMode.SYNC.name() + ), ImmutableMap.of())) { + SqlStatementResourceTest.assertExceptionMessage( + resource.doPost(new SqlQuery( + "select * from foo", + null, + false, + false, + false, + (Map) context, + null + ), SqlStatementResourceTest.makeOkRequest()), + "The statement sql api only supports sync mode[ASYNC]. Please set context parameter [executionMode=ASYNC] in the context payload", + Response.Status.BAD_REQUEST + ); + } + } + + + @Test + public void insertCannotBeEmptyFaultTest() + { + Response response = resource.doPost(new SqlQuery( + "insert into foo1 select __time, dim1 , count(*) as cnt from foo where dim1 is not null and __time < TIMESTAMP '1971-01-01 00:00:00' group by 1, 2 PARTITIONED by day clustered by dim1", + null, + false, + false, + false, + defaultAsyncContext(), + null + ), SqlStatementResourceTest.makeOkRequest()); + Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + + SqlStatementResult actual = (SqlStatementResult) response.getEntity(); + + InsertCannotBeEmptyFault insertCannotBeEmptyFault = new InsertCannotBeEmptyFault("foo1"); + + MSQException insertCannotBeEmpty = new MSQException(insertCannotBeEmptyFault); + + SqlStatementResult expected = new SqlStatementResult( + actual.getQueryId(), + SqlStatementState.FAILED, + MSQTestOverlordServiceClient.CREATED_TIME, + null, + MSQTestOverlordServiceClient.DURATION, + null, + DruidException.fromFailure(new DruidException.Failure(InsertCannotBeEmptyFault.CODE) + { + @Override + protected DruidException makeException(DruidException.DruidExceptionBuilder bob) + { + DruidException e = bob.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.UNCATEGORIZED) + .build(insertCannotBeEmpty.getFault().getErrorMessage()); + e.withContext("dataSource", insertCannotBeEmptyFault.getDataSource()); + return e; + } + }).toErrorResponse() + ); + Assert.assertEquals(expected, actual); + } + + @Test + public void testExplain() throws IOException + { + Map context = defaultAsyncContext(); + context.put("sqlQueryId", "queryId"); + Response response = resource.doPost(new SqlQuery( + "explain plan for select * from foo", + null, + false, + false, + false, + context, + null + ), SqlStatementResourceTest.makeOkRequest()); + + Assert.assertEquals( + "{PLAN=[{\"query\":" + + "{\"queryType\":\"scan\"," + + "\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"}," + + "\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]}," + + "\"resultFormat\":\"compactedList\"," + + "\"columns\":[\"__time\",\"cnt\",\"dim1\",\"dim2\",\"dim3\",\"m1\",\"m2\",\"unique_dim1\"]," + + "\"legacy\":false," + + "\"context\":{\"executionMode\":\"ASYNC\",\"scanSignature\":\"[{\\\"name\\\":\\\"__time\\\",\\\"type\\\":\\\"LONG\\\"},{\\\"name\\\":\\\"cnt\\\",\\\"type\\\":\\\"LONG\\\"},{\\\"name\\\":\\\"dim1\\\",\\\"type\\\":\\\"STRING\\\"},{\\\"name\\\":\\\"dim2\\\",\\\"type\\\":\\\"STRING\\\"},{\\\"name\\\":\\\"dim3\\\",\\\"type\\\":\\\"STRING\\\"},{\\\"name\\\":\\\"m1\\\",\\\"type\\\":\\\"FLOAT\\\"},{\\\"name\\\":\\\"m2\\\",\\\"type\\\":\\\"DOUBLE\\\"},{\\\"name\\\":\\\"unique_dim1\\\",\\\"type\\\":\\\"COMPLEX\\\"}]\",\"sqlQueryId\":\"queryId\"},\"granularity\":{\"type\":\"all\"}},\"signature\":[{\"name\":\"__time\",\"type\":\"LONG\"},{\"name\":\"dim1\",\"type\":\"STRING\"},{\"name\":\"dim2\",\"type\":\"STRING\"},{\"name\":\"dim3\",\"type\":\"STRING\"},{\"name\":\"cnt\",\"type\":\"LONG\"},{\"name\":\"m1\",\"type\":\"FLOAT\"},{\"name\":\"m2\",\"type\":\"DOUBLE\"},{\"name\":\"unique_dim1\",\"type\":\"COMPLEX\"}],\"columnMappings\":[{\"queryColumn\":\"__time\",\"outputColumn\":\"__time\"},{\"queryColumn\":\"dim1\",\"outputColumn\":\"dim1\"},{\"queryColumn\":\"dim2\",\"outputColumn\":\"dim2\"},{\"queryColumn\":\"dim3\",\"outputColumn\":\"dim3\"},{\"queryColumn\":\"cnt\",\"outputColumn\":\"cnt\"},{\"queryColumn\":\"m1\",\"outputColumn\":\"m1\"},{\"queryColumn\":\"m2\",\"outputColumn\":\"m2\"},{\"queryColumn\":\"unique_dim1\",\"outputColumn\":\"unique_dim1\"}]}]," + + " RESOURCES=[{\"name\":\"foo\",\"type\":\"DATASOURCE\"}]," + + " ATTRIBUTES={\"statementType\":\"SELECT\"}}", + String.valueOf(SqlStatementResourceTest.getResultRowsFromResponse(response).get(0)) + ); + } + + @Test + public void forbiddenTest() + { + Assert.assertEquals(Response.Status.FORBIDDEN.getStatusCode(), resource.doPost( + new SqlQuery( + StringUtils.format("select * from %s", CalciteTests.FORBIDDEN_DATASOURCE), + null, + false, + false, + false, + defaultAsyncContext(), + null + ), + SqlStatementResourceTest.makeOkRequest() + ).getStatus()); + } + + + private static Map defaultAsyncContext() + { + Map context = new HashMap(); + context.put(QueryContexts.CTX_EXECUTION_MODE, ExecutionMode.ASYNC.name()); + return context; + } + +} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlStatementResourceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlStatementResourceTest.java new file mode 100644 index 00000000000..3f2e7288b62 --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlStatementResourceTest.java @@ -0,0 +1,947 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.sql; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.Futures; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.client.indexing.TaskPayloadResponse; +import org.apache.druid.client.indexing.TaskStatusResponse; +import org.apache.druid.error.ErrorResponse; +import org.apache.druid.indexer.TaskLocation; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.indexing.common.TaskReport; +import org.apache.druid.indexing.common.task.IndexTask; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.java.util.common.guava.Yielders; +import org.apache.druid.msq.counters.CounterSnapshotsTree; +import org.apache.druid.msq.guice.MSQIndexingModule; +import org.apache.druid.msq.indexing.DataSourceMSQDestination; +import org.apache.druid.msq.indexing.MSQControllerTask; +import org.apache.druid.msq.indexing.MSQSpec; +import org.apache.druid.msq.indexing.MSQTuningConfig; +import org.apache.druid.msq.indexing.TaskReportMSQDestination; +import org.apache.druid.msq.indexing.report.MSQResultsReport; +import org.apache.druid.msq.indexing.report.MSQStagesReport; +import org.apache.druid.msq.indexing.report.MSQStatusReport; +import org.apache.druid.msq.indexing.report.MSQTaskReport; +import org.apache.druid.msq.indexing.report.MSQTaskReportPayload; +import org.apache.druid.msq.indexing.report.MSQTaskReportTest; +import org.apache.druid.msq.sql.entity.ColumnNameAndTypes; +import org.apache.druid.msq.sql.entity.ResultSetInformation; +import org.apache.druid.msq.sql.entity.SqlStatementResult; +import org.apache.druid.msq.sql.resources.SqlStatementResource; +import org.apache.druid.msq.test.MSQTestBase; +import org.apache.druid.query.Druids; +import org.apache.druid.query.Query; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.rpc.indexing.OverlordClient; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.server.mocks.MockHttpServletRequest; +import org.apache.druid.server.security.AuthConfig; +import org.apache.druid.server.security.AuthenticationResult; +import org.apache.druid.sql.calcite.planner.ColumnMappings; +import org.apache.druid.sql.calcite.util.CalciteTests; +import org.apache.druid.sql.http.SqlResourceTest; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; +import org.mockito.Mockito; + +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class SqlStatementResourceTest extends MSQTestBase +{ + + public static final DateTime CREATED_TIME = DateTimes.of("2023-05-31T12:00Z"); + private static final ObjectMapper JSON_MAPPER = TestHelper.makeJsonMapper(); + private static final String ACCEPTED_SELECT_MSQ_QUERY = "QUERY_ID_1"; + private static final String RUNNING_SELECT_MSQ_QUERY = "QUERY_ID_2"; + private static final String FINISHED_SELECT_MSQ_QUERY = "QUERY_ID_3"; + + private static final String ERRORED_SELECT_MSQ_QUERY = "QUERY_ID_4"; + + + private static final String RUNNING_NON_MSQ_TASK = "QUERY_ID_5"; + + private static final String FAILED_NON_MSQ_TASK = "QUERY_ID_6"; + + private static final String FINISHED_NON_MSQ_TASK = "QUERY_ID_7"; + + + private static final String ACCEPTED_INSERT_MSQ_TASK = "QUERY_ID_8"; + + private static final String RUNNING_INSERT_MSQ_QUERY = "QUERY_ID_9"; + private static final String FINISHED_INSERT_MSQ_QUERY = "QUERY_ID_10"; + private static final String ERRORED_INSERT_MSQ_QUERY = "QUERY_ID_11"; + + + private static final Query QUERY = new Druids.ScanQueryBuilder().resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .legacy(false) + .intervals(new MultipleIntervalSegmentSpec( + Collections.singletonList(Intervals.of( + "2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z")))) + .dataSource("target") + .context(ImmutableMap.of( + MSQTaskQueryMaker.USER_KEY, + AuthConfig.ALLOW_ALL_NAME + )) + .build(); + + + private static final MSQControllerTask MSQ_CONTROLLER_SELECT_PAYLOAD = new MSQControllerTask( + ACCEPTED_SELECT_MSQ_QUERY, + MSQSpec.builder() + .query(QUERY) + .columnMappings( + ColumnMappings.identity( + RowSignature.builder() + .add( + "_time", + ColumnType.LONG + ) + .add( + "alias", + ColumnType.STRING + ) + .add( + "market", + ColumnType.STRING + ) + .build())) + .destination( + TaskReportMSQDestination.INSTANCE) + .tuningConfig( + MSQTuningConfig.defaultConfig()) + .build(), + "select _time,alias,market from test", + new HashMap<>(), + null, + ImmutableList.of( + SqlTypeName.TIMESTAMP, + SqlTypeName.VARCHAR, + SqlTypeName.VARCHAR + ), + ImmutableList.of( + ColumnType.LONG, + ColumnType.STRING, + ColumnType.STRING + ), + null + ); + + private static final MSQControllerTask MSQ_CONTROLLER_INSERT_PAYLOAD = new MSQControllerTask( + ACCEPTED_SELECT_MSQ_QUERY, + MSQSpec.builder() + .query(QUERY) + .columnMappings( + ColumnMappings.identity( + RowSignature.builder() + .add( + "_time", + ColumnType.LONG + ) + .add( + "alias", + ColumnType.STRING + ) + .add( + "market", + ColumnType.STRING + ) + .build())) + .destination(new DataSourceMSQDestination( + "test", + Granularities.DAY, + null, + null + )) + .tuningConfig( + MSQTuningConfig.defaultConfig()) + .build(), + "insert into test select _time,alias,market from test", + new HashMap<>(), + null, + ImmutableList.of( + SqlTypeName.TIMESTAMP, + SqlTypeName.VARCHAR, + SqlTypeName.VARCHAR + ), + ImmutableList.of( + ColumnType.LONG, + ColumnType.STRING, + ColumnType.STRING + ), + null + ); + + private static final List RESULT_ROWS = ImmutableList.of( + new Object[]{123, "foo", "bar"}, + new Object[]{234, "foo1", "bar1"} + ); + + private static final MSQTaskReport MSQ_SELECT_TASK_REPORT = new MSQTaskReport( + FINISHED_SELECT_MSQ_QUERY, + new MSQTaskReportPayload( + new MSQStatusReport( + TaskState.SUCCESS, + null, + new ArrayDeque<>(), + null, + 0, + 1, + 2 + ), + MSQStagesReport.create( + MSQTaskReportTest.QUERY_DEFINITION, + ImmutableMap.of(), + ImmutableMap.of(), + ImmutableMap.of(), + ImmutableMap.of() + ), + new CounterSnapshotsTree(), + new MSQResultsReport( + ImmutableList.of( + new MSQResultsReport.ColumnAndType( + "_time", + ColumnType.LONG + ), + new MSQResultsReport.ColumnAndType( + "alias", + ColumnType.STRING + ), + new MSQResultsReport.ColumnAndType( + "market", + ColumnType.STRING + ) + ), + ImmutableList.of( + SqlTypeName.TIMESTAMP, + SqlTypeName.VARCHAR, + SqlTypeName.VARCHAR + ), + Yielders.each( + Sequences.simple( + RESULT_ROWS)), + null + ) + ) + ); + + private static final MSQTaskReport MSQ_INSERT_TASK_REPORT = new MSQTaskReport( + FINISHED_INSERT_MSQ_QUERY, + new MSQTaskReportPayload( + new MSQStatusReport( + TaskState.SUCCESS, + null, + new ArrayDeque<>(), + null, + 0, + 1, + 2 + ), + MSQStagesReport.create( + MSQTaskReportTest.QUERY_DEFINITION, + ImmutableMap.of(), + ImmutableMap.of(), + ImmutableMap.of(), + ImmutableMap.of() + ), + new CounterSnapshotsTree(), + null + ) + ); + private static final DateTime QUEUE_INSERTION_TIME = DateTimes.of("2023-05-31T12:01Z"); + private static final Map ROW1 = ImmutableMap.of("_time", 123, "alias", "foo", "market", "bar"); + private static final Map ROW2 = ImmutableMap.of("_time", 234, "alias", "foo1", "market", "bar1"); + public static final ImmutableList COL_NAME_AND_TYPES = ImmutableList.of( + new ColumnNameAndTypes( + "_time", + SqlTypeName.TIMESTAMP.getName(), + ValueType.LONG.name() + ), + new ColumnNameAndTypes( + "alias", + SqlTypeName.VARCHAR.getName(), + ValueType.STRING.name() + ), + new ColumnNameAndTypes( + "market", + SqlTypeName.VARCHAR.getName(), + ValueType.STRING.name() + ) + ); + private static final String FAILURE_MSG = "failure msg"; + private static SqlStatementResource resource; + @Mock + private static OverlordClient overlordClient; + + private static void setupMocks(OverlordClient indexingServiceClient) throws JsonProcessingException + { + + Mockito.when(indexingServiceClient.taskStatus(ArgumentMatchers.eq(ACCEPTED_SELECT_MSQ_QUERY))) + .thenReturn(Futures.immediateFuture(new TaskStatusResponse(ACCEPTED_SELECT_MSQ_QUERY, new TaskStatusPlus( + ACCEPTED_SELECT_MSQ_QUERY, + null, + MSQControllerTask.TYPE, + CREATED_TIME, + QUEUE_INSERTION_TIME, + null, + null, + null, + TaskLocation.unknown(), + null, + null + )))); + + Mockito.when(indexingServiceClient.taskPayload(ArgumentMatchers.eq(ACCEPTED_SELECT_MSQ_QUERY))) + .thenReturn(Futures.immediateFuture(new TaskPayloadResponse( + ACCEPTED_SELECT_MSQ_QUERY, + MSQ_CONTROLLER_SELECT_PAYLOAD + ))); + + Mockito.when(indexingServiceClient.taskStatus(ArgumentMatchers.eq(RUNNING_SELECT_MSQ_QUERY))) + .thenReturn(Futures.immediateFuture(new TaskStatusResponse(RUNNING_SELECT_MSQ_QUERY, new TaskStatusPlus( + RUNNING_SELECT_MSQ_QUERY, + null, + MSQControllerTask.TYPE, + CREATED_TIME, + QUEUE_INSERTION_TIME, + TaskState.RUNNING, + null, + null, + new TaskLocation("test", 0, 0), + null, + null + )))); + + Mockito.when(indexingServiceClient.taskPayload(RUNNING_SELECT_MSQ_QUERY)) + .thenReturn(Futures.immediateFuture(new TaskPayloadResponse( + RUNNING_SELECT_MSQ_QUERY, + MSQ_CONTROLLER_SELECT_PAYLOAD + ))); + + + Mockito.when(indexingServiceClient.taskStatus(ArgumentMatchers.eq(FINISHED_SELECT_MSQ_QUERY))) + .thenReturn(Futures.immediateFuture(new TaskStatusResponse(FINISHED_SELECT_MSQ_QUERY, new TaskStatusPlus( + FINISHED_SELECT_MSQ_QUERY, + null, + MSQControllerTask.TYPE, + CREATED_TIME, + QUEUE_INSERTION_TIME, + TaskState.SUCCESS, + null, + 100L, + new TaskLocation("test", 0, 0), + null, + null + )))); + + Mockito.when(indexingServiceClient.taskPayload(FINISHED_SELECT_MSQ_QUERY)) + .thenReturn(Futures.immediateFuture(new TaskPayloadResponse( + FINISHED_SELECT_MSQ_QUERY, + MSQ_CONTROLLER_SELECT_PAYLOAD + ))); + + + final ObjectMapper mapper = TestHelper.makeJsonMapper() + .registerModules(new MSQIndexingModule().getJacksonModules()); + + + Mockito.when(indexingServiceClient.taskReportAsMap(FINISHED_SELECT_MSQ_QUERY)) + .thenReturn(Futures.immediateFuture(mapper.readValue( + mapper.writeValueAsString(TaskReport.buildTaskReports( + MSQ_SELECT_TASK_REPORT)), + new TypeReference>() + { + } + ))); + + + Mockito.when(indexingServiceClient.taskStatus(ArgumentMatchers.eq(ERRORED_SELECT_MSQ_QUERY))) + .thenReturn(Futures.immediateFuture(new TaskStatusResponse(ERRORED_SELECT_MSQ_QUERY, new TaskStatusPlus( + ERRORED_SELECT_MSQ_QUERY, + null, + MSQControllerTask.TYPE, + CREATED_TIME, + QUEUE_INSERTION_TIME, + TaskState.FAILED, + null, + -1L, + TaskLocation.unknown(), + null, + FAILURE_MSG + )))); + + Mockito.when(indexingServiceClient.taskPayload(ArgumentMatchers.eq(ERRORED_SELECT_MSQ_QUERY))) + .thenReturn(Futures.immediateFuture(new TaskPayloadResponse( + FINISHED_INSERT_MSQ_QUERY, + MSQ_CONTROLLER_SELECT_PAYLOAD + ))); + + Mockito.when(indexingServiceClient.taskReportAsMap(ArgumentMatchers.eq(ERRORED_SELECT_MSQ_QUERY))) + .thenReturn(Futures.immediateFuture(null)); + + Mockito.when(indexingServiceClient.taskStatus(ArgumentMatchers.eq(RUNNING_NON_MSQ_TASK))) + .thenReturn(Futures.immediateFuture(new TaskStatusResponse(RUNNING_NON_MSQ_TASK, new TaskStatusPlus( + RUNNING_NON_MSQ_TASK, + null, + null, + CREATED_TIME, + QUEUE_INSERTION_TIME, + TaskState.RUNNING, + null, + -1L, + TaskLocation.unknown(), + null, + null + )))); + + + Mockito.when(indexingServiceClient.taskStatus(ArgumentMatchers.eq(FAILED_NON_MSQ_TASK))) + .thenReturn(Futures.immediateFuture(new TaskStatusResponse(FAILED_NON_MSQ_TASK, new TaskStatusPlus( + FAILED_NON_MSQ_TASK, + null, + null, + CREATED_TIME, + QUEUE_INSERTION_TIME, + TaskState.FAILED, + null, + -1L, + TaskLocation.unknown(), + null, + FAILURE_MSG + )))); + + + Mockito.when(indexingServiceClient.taskStatus(ArgumentMatchers.eq(FINISHED_NON_MSQ_TASK))) + .thenReturn(Futures.immediateFuture(new TaskStatusResponse(FINISHED_NON_MSQ_TASK, new TaskStatusPlus( + FINISHED_NON_MSQ_TASK, + null, + IndexTask.TYPE, + CREATED_TIME, + QUEUE_INSERTION_TIME, + TaskState.SUCCESS, + null, + -1L, + TaskLocation.unknown(), + null, + null + )))); + + + Mockito.when(indexingServiceClient.taskStatus(ArgumentMatchers.eq(ACCEPTED_INSERT_MSQ_TASK))) + .thenReturn(Futures.immediateFuture(new TaskStatusResponse(ACCEPTED_SELECT_MSQ_QUERY, new TaskStatusPlus( + ACCEPTED_SELECT_MSQ_QUERY, + null, + MSQControllerTask.TYPE, + CREATED_TIME, + QUEUE_INSERTION_TIME, + null, + null, + null, + TaskLocation.unknown(), + null, + null + )))); + + Mockito.when(indexingServiceClient.taskPayload(ArgumentMatchers.eq(ACCEPTED_INSERT_MSQ_TASK))) + .thenReturn(Futures.immediateFuture(new TaskPayloadResponse( + ACCEPTED_INSERT_MSQ_TASK, + MSQ_CONTROLLER_INSERT_PAYLOAD + ))); + + Mockito.when(indexingServiceClient.taskStatus(ArgumentMatchers.eq(RUNNING_INSERT_MSQ_QUERY))) + .thenReturn(Futures.immediateFuture(new TaskStatusResponse(RUNNING_INSERT_MSQ_QUERY, new TaskStatusPlus( + RUNNING_INSERT_MSQ_QUERY, + null, + MSQControllerTask.TYPE, + CREATED_TIME, + QUEUE_INSERTION_TIME, + TaskState.RUNNING, + null, + null, + new TaskLocation("test", 0, 0), + null, + null + )))); + + Mockito.when(indexingServiceClient.taskPayload(RUNNING_INSERT_MSQ_QUERY)) + .thenReturn(Futures.immediateFuture(new TaskPayloadResponse( + RUNNING_INSERT_MSQ_QUERY, + MSQ_CONTROLLER_INSERT_PAYLOAD + ))); + + + Mockito.when(indexingServiceClient.taskStatus(ArgumentMatchers.eq(FINISHED_INSERT_MSQ_QUERY))) + .thenReturn(Futures.immediateFuture(new TaskStatusResponse(FINISHED_INSERT_MSQ_QUERY, new TaskStatusPlus( + FINISHED_INSERT_MSQ_QUERY, + null, + MSQControllerTask.TYPE, + CREATED_TIME, + QUEUE_INSERTION_TIME, + TaskState.SUCCESS, + null, + 100L, + new TaskLocation("test", 0, 0), + null, + null + )))); + + Mockito.when(indexingServiceClient.taskReportAsMap(ArgumentMatchers.eq(FINISHED_INSERT_MSQ_QUERY))) + .thenReturn(Futures.immediateFuture(mapper.readValue( + mapper.writeValueAsString(TaskReport.buildTaskReports( + MSQ_INSERT_TASK_REPORT)), + new TypeReference>() + { + } + ))); + + Mockito.when(indexingServiceClient.taskPayload(FINISHED_INSERT_MSQ_QUERY)) + .thenReturn(Futures.immediateFuture(new TaskPayloadResponse( + FINISHED_INSERT_MSQ_QUERY, + MSQ_CONTROLLER_INSERT_PAYLOAD + ))); + + Mockito.when(indexingServiceClient.taskStatus(ArgumentMatchers.eq(ERRORED_INSERT_MSQ_QUERY))) + .thenReturn(Futures.immediateFuture(new TaskStatusResponse(ERRORED_INSERT_MSQ_QUERY, new TaskStatusPlus( + ERRORED_INSERT_MSQ_QUERY, + null, + MSQControllerTask.TYPE, + CREATED_TIME, + QUEUE_INSERTION_TIME, + TaskState.FAILED, + null, + -1L, + TaskLocation.unknown(), + null, + FAILURE_MSG + )))); + + Mockito.when(indexingServiceClient.taskPayload(ArgumentMatchers.eq(ERRORED_INSERT_MSQ_QUERY))) + .thenReturn(Futures.immediateFuture(new TaskPayloadResponse( + ERRORED_INSERT_MSQ_QUERY, + MSQ_CONTROLLER_INSERT_PAYLOAD + ))); + + Mockito.when(indexingServiceClient.taskReportAsMap(ArgumentMatchers.eq(ERRORED_INSERT_MSQ_QUERY))) + .thenReturn(Futures.immediateFuture(null)); + + } + + public static void assertNullResponse(Response response, Response.Status expectectedStatus) + { + Assert.assertEquals(expectectedStatus.getStatusCode(), response.getStatus()); + Assert.assertNull(response.getEntity()); + } + + public static void assertExceptionMessage( + Response response, + String exceptionMessage, + Response.Status expectectedStatus + ) + { + Assert.assertEquals(expectectedStatus.getStatusCode(), response.getStatus()); + Assert.assertEquals(exceptionMessage, getQueryExceptionFromResponse(response)); + } + + public static List getResultRowsFromResponse(Response resultsResponse) throws IOException + { + byte[] bytes = SqlResourceTest.responseToByteArray(resultsResponse); + if (bytes == null) { + return null; + } + return JSON_MAPPER.readValue(bytes, List.class); + } + + private static String getQueryExceptionFromResponse(Response response) + { + if (response.getEntity() instanceof SqlStatementResult) { + return ((SqlStatementResult) response.getEntity()).getErrorResponse().getUnderlyingException().getMessage(); + } else { + return ((ErrorResponse) response.getEntity()).getUnderlyingException().getMessage(); + } + } + + public static MockHttpServletRequest makeOkRequest() + { + return makeExpectedReq(CalciteTests.REGULAR_USER_AUTH_RESULT); + } + + public static MockHttpServletRequest makeExpectedReq(AuthenticationResult authenticationResult) + { + MockHttpServletRequest req = new MockHttpServletRequest(); + req.attributes.put(AuthConfig.DRUID_AUTHENTICATION_RESULT, authenticationResult); + req.remoteAddr = "1.2.3.4"; + return req; + } + + @Before + public void init() throws Exception + { + overlordClient = Mockito.mock(OverlordClient.class); + setupMocks(overlordClient); + resource = new SqlStatementResource( + sqlStatementFactory, + CalciteTests.TEST_AUTHORIZER_MAPPER, + JSON_MAPPER, + overlordClient + ); + } + + @Test + public void testMSQSelectAcceptedQuery() + { + Response response = resource.doGetStatus(ACCEPTED_SELECT_MSQ_QUERY, makeOkRequest()); + Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + Assert.assertEquals( + new SqlStatementResult( + ACCEPTED_SELECT_MSQ_QUERY, + SqlStatementState.ACCEPTED, + CREATED_TIME, + COL_NAME_AND_TYPES, + null, + null, + null + ), + response.getEntity() + ); + + assertExceptionMessage( + resource.doGetResults(ACCEPTED_SELECT_MSQ_QUERY, null, null, makeOkRequest()), + StringUtils.format( + "Query[%s] is currently in [%s] state. Please wait for it to complete.", + ACCEPTED_SELECT_MSQ_QUERY, + SqlStatementState.ACCEPTED + ), + Response.Status.BAD_REQUEST + ); + Assert.assertEquals( + Response.Status.ACCEPTED.getStatusCode(), + resource.deleteQuery(ACCEPTED_SELECT_MSQ_QUERY, makeOkRequest()).getStatus() + ); + } + + @Test + + public void testMSQSelectRunningQuery() + { + + Response response = resource.doGetStatus(RUNNING_SELECT_MSQ_QUERY, makeOkRequest()); + Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + Assert.assertEquals( + new SqlStatementResult( + RUNNING_SELECT_MSQ_QUERY, + SqlStatementState.RUNNING, + CREATED_TIME, + COL_NAME_AND_TYPES, + null, + null, + null + ), + response.getEntity() + ); + + assertExceptionMessage( + resource.doGetResults(RUNNING_SELECT_MSQ_QUERY, null, null, makeOkRequest()), + StringUtils.format( + "Query[%s] is currently in [%s] state. Please wait for it to complete.", + RUNNING_SELECT_MSQ_QUERY, + SqlStatementState.RUNNING + ), + Response.Status.BAD_REQUEST + ); + Assert.assertEquals( + Response.Status.ACCEPTED.getStatusCode(), + resource.deleteQuery(RUNNING_SELECT_MSQ_QUERY, makeOkRequest()).getStatus() + ); + } + + @Test + public void testFinishedSelectMSQQuery() throws Exception + { + Response response = resource.doGetStatus(FINISHED_SELECT_MSQ_QUERY, makeOkRequest()); + Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + Assert.assertEquals(new SqlStatementResult( + FINISHED_SELECT_MSQ_QUERY, + SqlStatementState.SUCCESS, + CREATED_TIME, + COL_NAME_AND_TYPES, + 100L, + new ResultSetInformation( + null, + null, + null, + MSQControllerTask.DUMMY_DATASOURCE_FOR_SELECT, + RESULT_ROWS.stream() + .map(Arrays::asList) + .collect(Collectors.toList()) + ), + null + ), response.getEntity()); + + Response resultsResponse = resource.doGetResults(FINISHED_SELECT_MSQ_QUERY, null, null, makeOkRequest()); + Assert.assertEquals(Response.Status.OK.getStatusCode(), resultsResponse.getStatus()); + + List> rows = new ArrayList<>(); + rows.add(ROW1); + rows.add(ROW2); + + Assert.assertEquals(rows, getResultRowsFromResponse(resultsResponse)); + + Assert.assertEquals( + Response.Status.OK.getStatusCode(), + resource.deleteQuery(FINISHED_SELECT_MSQ_QUERY, makeOkRequest()).getStatus() + ); + + Assert.assertEquals( + rows.subList(1, 2), + getResultRowsFromResponse(resource.doGetResults( + FINISHED_SELECT_MSQ_QUERY, + 1L, + null, + makeOkRequest() + )) + ); + Assert.assertEquals( + rows.subList(0, 1), + getResultRowsFromResponse(resource.doGetResults( + FINISHED_SELECT_MSQ_QUERY, + 0L, + 1L, + makeOkRequest() + )) + ); + Assert.assertEquals( + rows, + getResultRowsFromResponse(resource.doGetResults( + FINISHED_SELECT_MSQ_QUERY, + 0L, + 3L, + makeOkRequest() + )) + ); + + Assert.assertEquals( + Response.Status.BAD_REQUEST.getStatusCode(), + resource.doGetResults(FINISHED_SELECT_MSQ_QUERY, -1L, 3L, makeOkRequest()).getStatus() + ); + Assert.assertEquals( + Response.Status.BAD_REQUEST.getStatusCode(), + resource.doGetResults(FINISHED_SELECT_MSQ_QUERY, null, -1L, makeOkRequest()).getStatus() + ); + + } + + @Test + public void testFailedMSQQuery() + { + for (String queryID : ImmutableList.of(ERRORED_SELECT_MSQ_QUERY, ERRORED_INSERT_MSQ_QUERY)) { + assertExceptionMessage(resource.doGetStatus(queryID, makeOkRequest()), FAILURE_MSG, Response.Status.OK); + assertExceptionMessage( + resource.doGetResults(queryID, null, null, makeOkRequest()), + StringUtils.format( + "Query[%s] failed. Hit status api for more details.", + queryID + ), + Response.Status.BAD_REQUEST + ); + + Assert.assertEquals( + Response.Status.OK.getStatusCode(), + resource.deleteQuery(queryID, makeOkRequest()).getStatus() + ); + } + } + + @Test + public void testFinishedInsertMSQQuery() throws Exception + { + Response response = resource.doGetStatus(FINISHED_INSERT_MSQ_QUERY, makeOkRequest()); + Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + Assert.assertEquals(new SqlStatementResult( + FINISHED_INSERT_MSQ_QUERY, + SqlStatementState.SUCCESS, + CREATED_TIME, + null, + 100L, + new ResultSetInformation(null, null, null, "test", null), + null + ), response.getEntity()); + + Response resultsResponse = resource.doGetResults(FINISHED_INSERT_MSQ_QUERY, null, null, makeOkRequest()); + Assert.assertEquals(Response.Status.OK.getStatusCode(), resultsResponse.getStatus()); + + + Assert.assertNull(getResultRowsFromResponse(resource.doGetResults( + FINISHED_INSERT_MSQ_QUERY, + 1L, + null, + makeOkRequest() + ))); + Assert.assertNull(getResultRowsFromResponse(resource.doGetResults( + FINISHED_INSERT_MSQ_QUERY, + 0L, + 1L, + makeOkRequest() + ))); + Assert.assertNull(getResultRowsFromResponse(resource.doGetResults( + FINISHED_INSERT_MSQ_QUERY, + 0L, + 3L, + makeOkRequest() + ))); + + Assert.assertEquals( + Response.Status.BAD_REQUEST.getStatusCode(), + resource.doGetResults(FINISHED_INSERT_MSQ_QUERY, -1L, 3L, makeOkRequest()).getStatus() + ); + Assert.assertEquals( + Response.Status.BAD_REQUEST.getStatusCode(), + resource.doGetResults(FINISHED_INSERT_MSQ_QUERY, null, -1L, makeOkRequest()).getStatus() + ); + + } + + @Test + public void testNonMSQTasks() + { + for (String queryID : ImmutableList.of(RUNNING_NON_MSQ_TASK, FAILED_NON_MSQ_TASK, FINISHED_NON_MSQ_TASK)) { + assertNullResponse(resource.doGetStatus(queryID, makeOkRequest()), Response.Status.NOT_FOUND); + assertNullResponse(resource.doGetResults(queryID, null, null, makeOkRequest()), Response.Status.NOT_FOUND); + assertNullResponse(resource.deleteQuery(queryID, makeOkRequest()), Response.Status.NOT_FOUND); + } + } + + @Test + public void testMSQInsertAcceptedQuery() + { + Response response = resource.doGetStatus(ACCEPTED_INSERT_MSQ_TASK, makeOkRequest()); + Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + Assert.assertEquals( + new SqlStatementResult( + ACCEPTED_INSERT_MSQ_TASK, + SqlStatementState.ACCEPTED, + CREATED_TIME, + null, + null, + null, + null + ), + response.getEntity() + ); + + assertExceptionMessage( + resource.doGetResults(ACCEPTED_INSERT_MSQ_TASK, null, null, makeOkRequest()), + StringUtils.format( + "Query[%s] is currently in [%s] state. Please wait for it to complete.", + ACCEPTED_INSERT_MSQ_TASK, + SqlStatementState.ACCEPTED + ), + Response.Status.BAD_REQUEST + ); + Assert.assertEquals( + Response.Status.ACCEPTED.getStatusCode(), + resource.deleteQuery(ACCEPTED_INSERT_MSQ_TASK, makeOkRequest()).getStatus() + ); + } + + @Test + public void testMSQInsertRunningQuery() + { + Response response = resource.doGetStatus(RUNNING_INSERT_MSQ_QUERY, makeOkRequest()); + Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + Assert.assertEquals( + new SqlStatementResult( + RUNNING_INSERT_MSQ_QUERY, + SqlStatementState.RUNNING, + CREATED_TIME, + null, + null, + null, + null + ), + response.getEntity() + ); + + assertExceptionMessage( + resource.doGetResults(RUNNING_INSERT_MSQ_QUERY, null, null, makeOkRequest()), + StringUtils.format( + "Query[%s] is currently in [%s] state. Please wait for it to complete.", + RUNNING_INSERT_MSQ_QUERY, + SqlStatementState.RUNNING + ), + Response.Status.BAD_REQUEST + ); + Assert.assertEquals( + Response.Status.ACCEPTED.getStatusCode(), + resource.deleteQuery(RUNNING_INSERT_MSQ_QUERY, makeOkRequest()).getStatus() + ); + } + + @Test + public void forbiddenTests() + { + + Assert.assertEquals(Response.Status.FORBIDDEN.getStatusCode(), + resource.doGetStatus(RUNNING_SELECT_MSQ_QUERY, + makeExpectedReq(CalciteTests.SUPER_USER_AUTH_RESULT)).getStatus()); + + Assert.assertEquals(Response.Status.FORBIDDEN.getStatusCode(), + resource.doGetResults(RUNNING_SELECT_MSQ_QUERY, + null, + null, + makeExpectedReq(CalciteTests.SUPER_USER_AUTH_RESULT)).getStatus()); + Assert.assertEquals(Response.Status.FORBIDDEN.getStatusCode(), + resource.deleteQuery(RUNNING_SELECT_MSQ_QUERY, + makeExpectedReq(CalciteTests.SUPER_USER_AUTH_RESULT)).getStatus()); + } + + @Test + public void testIsEnabled() + { + Assert.assertEquals(Response.Status.OK.getStatusCode(), resource.isEnabled(makeOkRequest()).getStatus()); + } +} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/entity/ColumnNameAndTypesTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/entity/ColumnNameAndTypesTest.java new file mode 100644 index 00000000000..f0524849995 --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/entity/ColumnNameAndTypesTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.sql.entity; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +public class ColumnNameAndTypesTest +{ + public static final ObjectMapper MAPPER = new ObjectMapper(); + + public static final ColumnNameAndTypes COLUMN_NAME_AND_TYPES = new ColumnNameAndTypes("test", "test1", "test2"); + public static final String JSON_STRING = "{\"name\":\"test\",\"type\":\"test1\",\"nativeType\":\"test2\"}"; + + @Test + public void sanityTest() throws JsonProcessingException + { + Assert.assertEquals(JSON_STRING, MAPPER.writeValueAsString(COLUMN_NAME_AND_TYPES)); + Assert.assertEquals( + COLUMN_NAME_AND_TYPES, + MAPPER.readValue(MAPPER.writeValueAsString(COLUMN_NAME_AND_TYPES), ColumnNameAndTypes.class) + ); + + Assert.assertEquals( + COLUMN_NAME_AND_TYPES.hashCode(), + MAPPER.readValue(MAPPER.writeValueAsString(COLUMN_NAME_AND_TYPES), ColumnNameAndTypes.class) + .hashCode() + ); + Assert.assertEquals("ColumnNameAndTypes{colName='test', sqlTypeName='test1', nativeTypeName='test2'}", + COLUMN_NAME_AND_TYPES.toString()); + + } + +} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/entity/ResultSetInformationTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/entity/ResultSetInformationTest.java new file mode 100644 index 00000000000..14d04b1b76d --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/entity/ResultSetInformationTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.druid.msq.sql.entity; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import org.apache.druid.sql.http.ResultFormat; +import org.junit.Assert; +import org.junit.Test; + +public class ResultSetInformationTest +{ + public static final ObjectMapper MAPPER = new ObjectMapper(); + + public static final ResultSetInformation RESULTS = new ResultSetInformation(ResultFormat.OBJECT, 1L, 1L, "ds", + ImmutableList.of( + ImmutableList.of("1"), + ImmutableList.of("2"), + ImmutableList.of("3") + ) + ); + public static final String JSON_STRING = "{\"resultFormat\":\"object\",\"numRows\":1,\"sizeInBytes\":1,\"dataSource\":\"ds\",\"sampleRecords\":[[\"1\"],[\"2\"],[\"3\"]]}"; + + + @Test + public void sanityTest() throws JsonProcessingException + { + Assert.assertEquals(JSON_STRING, MAPPER.writeValueAsString(RESULTS)); + Assert.assertEquals(RESULTS, MAPPER.readValue(MAPPER.writeValueAsString(RESULTS), ResultSetInformation.class)); + Assert.assertEquals( + RESULTS.hashCode(), + MAPPER.readValue(MAPPER.writeValueAsString(RESULTS), ResultSetInformation.class).hashCode() + ); + Assert.assertEquals( + "ResultSetInformation{totalRows=1, totalSize=1, resultFormat=object, records=[[1], [2], [3]], dataSource='ds'}", + RESULTS.toString() + ); + } +} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/entity/SqlStatementResultTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/entity/SqlStatementResultTest.java new file mode 100644 index 00000000000..a0be3afcf74 --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/entity/SqlStatementResultTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.druid.msq.sql.entity; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.error.DruidException; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.msq.indexing.error.MSQException; +import org.apache.druid.msq.indexing.error.QueryNotSupportedFault; +import org.apache.druid.msq.sql.SqlStatementResourceTest; +import org.apache.druid.msq.sql.SqlStatementState; +import org.junit.Assert; +import org.junit.Test; + +public class SqlStatementResultTest +{ + public static final MSQException MSQ_EXCEPTION = new MSQException( + QueryNotSupportedFault.instance()); + + public static final ObjectMapper MAPPER = DefaultObjectMapper.INSTANCE; + + public static final String JSON_STRING = "{\"queryId\":\"q1\"," + + "\"state\":\"RUNNING\"," + + "\"createdAt\":\"2023-05-31T12:00:00.000Z\"," + + "\"schema\":[{\"name\":\"_time\",\"type\":\"TIMESTAMP\",\"nativeType\":\"LONG\"},{\"name\":\"alias\",\"type\":\"VARCHAR\",\"nativeType\":\"STRING\"},{\"name\":\"market\",\"type\":\"VARCHAR\",\"nativeType\":\"STRING\"}]," + + "\"durationMs\":100," + + "\"result\":{\"resultFormat\":\"object\",\"numRows\":1,\"sizeInBytes\":1,\"dataSource\":\"ds\",\"sampleRecords\":[[\"1\"],[\"2\"],[\"3\"]]}," + + "\"errorDetails\":{\"error\":\"druidException\",\"errorCode\":\"QueryNotSupported\",\"persona\":\"USER\",\"category\":\"UNCATEGORIZED\",\"errorMessage\":\"QueryNotSupported\",\"context\":{}}}"; + + public static final SqlStatementResult SQL_STATEMENT_RESULT = new SqlStatementResult( + "q1", + SqlStatementState.RUNNING, + SqlStatementResourceTest.CREATED_TIME, + SqlStatementResourceTest.COL_NAME_AND_TYPES, + 100L, + ResultSetInformationTest.RESULTS, + DruidException.fromFailure(new DruidException.Failure(MSQ_EXCEPTION.getFault().getErrorCode()) + { + @Override + protected DruidException makeException(DruidException.DruidExceptionBuilder bob) + { + DruidException ex = bob.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.UNCATEGORIZED) + .build(MSQ_EXCEPTION.getMessage()); + return ex; + } + }).toErrorResponse() + ); + + + @Test + public void sanityTest() throws JsonProcessingException + { + + Assert.assertEquals(JSON_STRING, MAPPER.writeValueAsString(SQL_STATEMENT_RESULT)); + Assert.assertEquals( + SQL_STATEMENT_RESULT, + MAPPER.readValue(MAPPER.writeValueAsString(SQL_STATEMENT_RESULT), SqlStatementResult.class) + ); + Assert.assertEquals( + SQL_STATEMENT_RESULT.hashCode(), + MAPPER.readValue(MAPPER.writeValueAsString(SQL_STATEMENT_RESULT), SqlStatementResult.class).hashCode() + ); + Assert.assertEquals( + "SqlStatementResult{" + + "queryId='q1'," + + " state=RUNNING," + + " createdAt=2023-05-31T12:00:00.000Z," + + " sqlRowSignature=[ColumnNameAndTypes{colName='_time', sqlTypeName='TIMESTAMP', nativeTypeName='LONG'}, ColumnNameAndTypes{colName='alias', sqlTypeName='VARCHAR', nativeTypeName='STRING'}, ColumnNameAndTypes{colName='market', sqlTypeName='VARCHAR', nativeTypeName='STRING'}]," + + " durationInMs=100," + + " resultSetInformation=ResultSetInformation{totalRows=1, totalSize=1, resultFormat=object, records=[[1], [2], [3]], dataSource='ds'}," + + " errorResponse={error=druidException, errorCode=QueryNotSupported, persona=USER, category=UNCATEGORIZED, errorMessage=QueryNotSupported, context={}}}", + SQL_STATEMENT_RESULT.toString() + ); + } +} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 4301a68b4f3..0138b957b32 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -134,7 +134,6 @@ import org.apache.druid.server.SegmentManager; import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer; import org.apache.druid.server.security.AuthConfig; -import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.sql.DirectStatement; import org.apache.druid.sql.SqlQueryPlus; import org.apache.druid.sql.SqlStatementFactory; @@ -226,6 +225,7 @@ public class MSQTestBase extends BaseCalciteQueryTest .put(QueryContexts.CTX_SQL_STRINGIFY_ARRAYS, false) .put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 2) .put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, 0) + .put(MSQTaskQueryMaker.USER_KEY, "allowAll") .build(); public static final Map DURABLE_STORAGE_MSQ_CONTEXT = @@ -266,10 +266,10 @@ public class MSQTestBase extends BaseCalciteQueryTest protected File localFileStorageDir; protected LocalFileStorageConnector localFileStorageConnector; private static final Logger log = new Logger(MSQTestBase.class); - private ObjectMapper objectMapper; - private MSQTestOverlordServiceClient indexingServiceClient; + protected ObjectMapper objectMapper; + protected MSQTestOverlordServiceClient indexingServiceClient; protected MSQTestTaskActionClient testTaskActionClient; - private SqlStatementFactory sqlStatementFactory; + protected SqlStatementFactory sqlStatementFactory; private IndexIO indexIO; private MSQTestSegmentManager segmentManager; @@ -493,7 +493,7 @@ public class MSQTestBase extends BaseCalciteQueryTest qf.operatorTable(), qf.macroTable(), PLANNER_CONFIG_DEFAULT, - AuthTestUtils.TEST_AUTHORIZER_MAPPER, + CalciteTests.TEST_EXTERNAL_AUTHORIZER_MAPPER, objectMapper, CalciteTests.DRUID_SCHEMA_NAME, new CalciteRulesManager(ImmutableSet.of()), @@ -952,10 +952,12 @@ public class MSQTestBase extends BaseCalciteQueryTest worker, channel ); - Assert.assertTrue(StringUtils.format("Counters not found for stage [%d], worker [%d], channel [%s]", - stage, - worker, - channel), channelToCounters.containsKey(channel)); + Assert.assertTrue(StringUtils.format( + "Counters not found for stage [%d], worker [%d], channel [%s]", + stage, + worker, + channel + ), channelToCounters.containsKey(channel)); counter.matchQuerySnapshot(errorMessageFormat, channelToCounters.get(channel)); } ); @@ -1066,7 +1068,7 @@ public class MSQTestBase extends BaseCalciteQueryTest verifyWorkerCount(reportPayload.getCounters()); verifyCounters(reportPayload.getCounters()); - MSQSpec foundSpec = indexingServiceClient.getQuerySpecForTask(controllerId); + MSQSpec foundSpec = indexingServiceClient.getMSQControllerTask(controllerId).getQuerySpec(); log.info( "found generated segments: %s", segmentManager.getAllDataSegments().stream().map(s -> s.toString()).collect( @@ -1285,7 +1287,7 @@ public class MSQTestBase extends BaseCalciteQueryTest log.info("found row signature %s", payload.getResults().getSignature()); log.info(rows.stream().map(Arrays::toString).collect(Collectors.joining("\n"))); - final MSQSpec spec = indexingServiceClient.getQuerySpecForTask(controllerId); + final MSQSpec spec = indexingServiceClient.getMSQControllerTask(controllerId).getQuerySpec(); log.info("Found spec: %s", objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(spec)); return new Pair<>(spec, Pair.of(payload.getResults().getSignature(), rows)); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java index a4aff4d8756..1b49982cad4 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java @@ -19,24 +19,29 @@ package org.apache.druid.msq.test; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.google.inject.Injector; import org.apache.druid.client.indexing.NoopOverlordClient; +import org.apache.druid.client.indexing.TaskPayloadResponse; +import org.apache.druid.client.indexing.TaskStatusResponse; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.msq.exec.Controller; import org.apache.druid.msq.exec.ControllerImpl; import org.apache.druid.msq.exec.WorkerMemoryParameters; import org.apache.druid.msq.indexing.MSQControllerTask; -import org.apache.druid.msq.indexing.MSQSpec; -import org.apache.druid.msq.indexing.report.MSQTaskReport; +import org.joda.time.DateTime; import javax.annotation.Nullable; +import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -48,7 +53,13 @@ public class MSQTestOverlordServiceClient extends NoopOverlordClient private final WorkerMemoryParameters workerMemoryParameters; private Map inMemoryControllers = new HashMap<>(); private Map> reports = new HashMap<>(); - private Map msqSpec = new HashMap<>(); + private Map inMemoryControllerTask = new HashMap<>(); + private Map inMemoryTaskStatus = new HashMap<>(); + + public static final DateTime CREATED_TIME = DateTimes.of("2023-05-31T12:00Z"); + public static final DateTime QUEUE_INSERTION_TIME = DateTimes.of("2023-05-31T12:01Z"); + + public static final long DURATION = 100L; public MSQTestOverlordServiceClient( ObjectMapper objectMapper, @@ -77,16 +88,13 @@ public class MSQTestOverlordServiceClient extends NoopOverlordClient ); MSQControllerTask cTask = objectMapper.convertValue(taskObject, MSQControllerTask.class); - msqSpec.put(cTask.getId(), cTask.getQuerySpec()); + inMemoryControllerTask.put(cTask.getId(), cTask); - controller = new ControllerImpl( - cTask, - msqTestControllerContext - ); + controller = new ControllerImpl(cTask, msqTestControllerContext); - inMemoryControllers.put(cTask.getId(), controller); + inMemoryControllers.put(controller.id(), controller); - controller.run(); + inMemoryTaskStatus.put(taskId, controller.run()); return Futures.immediateFuture(null); } catch (Exception e) { @@ -110,24 +118,61 @@ public class MSQTestOverlordServiceClient extends NoopOverlordClient public ListenableFuture> taskReportAsMap(String taskId) { SettableFuture> future = SettableFuture.create(); - future.set( - ImmutableMap.of( - MSQTaskReport.REPORT_KEY, - getReportForTask(taskId).get(MSQTaskReport.REPORT_KEY) - )); + try { + future.set( + objectMapper.readValue( + objectMapper.writeValueAsBytes(getReportForTask(taskId)), + new TypeReference>() + { + } + )); + } + catch (IOException e) { + throw new RuntimeException(e); + } + return future; + } + + @Override + public ListenableFuture taskPayload(String taskId) + { + SettableFuture future = SettableFuture.create(); + future.set(new TaskPayloadResponse(taskId, getMSQControllerTask(taskId))); + return future; + } + + @Override + public ListenableFuture taskStatus(String taskId) + { + SettableFuture future = SettableFuture.create(); + TaskStatus taskStatus = inMemoryTaskStatus.get(taskId); + future.set(new TaskStatusResponse(taskId, new TaskStatusPlus( + taskId, + null, + MSQControllerTask.TYPE, + CREATED_TIME, + QUEUE_INSERTION_TIME, + taskStatus.getStatusCode(), + null, + DURATION, + taskStatus.getLocation(), + null, + taskStatus.getErrorMsg() + ))); + return future; } // hooks to pull stuff out for testing @Nullable - Map getReportForTask(String id) + public Map getReportForTask(String id) { return reports.get(id); } @Nullable - MSQSpec getQuerySpecForTask(String id) + MSQControllerTask getMSQControllerTask(String id) { - return msqSpec.get(id); + return inMemoryControllerTask.get(id); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 5e3d7b65fbf..9fbf8a6b50b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -144,8 +144,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler public static final HashFunction HASH_FUNCTION = Hashing.murmur3_128(); + public static final String TYPE = "index"; private static final Logger log = new Logger(IndexTask.class); - private static final String TYPE = "index"; private static String makeGroupId(IndexIngestionSpec ingestionSchema, IngestionMode ingestionMode) { diff --git a/processing/src/main/java/org/apache/druid/error/DruidException.java b/processing/src/main/java/org/apache/druid/error/DruidException.java index 81a7939ec4a..85aa73ad1d5 100644 --- a/processing/src/main/java/org/apache/druid/error/DruidException.java +++ b/processing/src/main/java/org/apache/druid/error/DruidException.java @@ -47,34 +47,33 @@ import java.util.Map; * no change should occur. * * Notes about exception messages - * + *

* Firstly, exception messages should always be written with the notions from the style conventions covered in * {@code dev/style-conventions.md}. Whenever possible, we should also try to provide an action to take to resolve * the issue. - * + *

* Secondly, given that the DruidException requires defining a target persona, exception messages should always be * written with that target persona in mind. Reviewers should use the targetPersona as added input to help validate * that an exception message in meaningful. - * + *

* For example, at the time that this exception was introduced, there is an exception that the router throws which is * an {@link org.apache.druid.java.util.common.ISE} with the message {@code "No default server found!"}. This * exception is thrown when the router is unable to find a broker to forward a request to. It is completely * meaningless to an end-user trying to run a query (what's a default server? why does it need to be found?). If we - * were to convert the exception to a DruidException and keep the same message, we should mark it as targetting the + * were to convert the exception to a DruidException and keep the same message, we should mark it as targeting the * DEVELOPER persona as that is the only persona who should actually be able to figure out what a default server is - * and why it is important. That said, does it makes sense for an exception that means "router cannot find a broker - * to forward the query to" to only be targetting the DEVELOPER? The answer to that is no, it's something that should - * really be made meaningful to a wider group. Some options could be - * - * USER persona: Cannot find a queryable server, contact your cluster administrator to validate that all services are - * operational - * - * OPERATOR persona: Router unable to find a broker, check that brokers are up and active - * + * and why it is important. That said, does it make sense for an exception that means "router cannot find a broker + * to forward the query to" to only be targeting the DEVELOPER? The answer to that is no, it's something that should + * really be made meaningful to a wider group. Some options could be + *

    + *
  • USER persona: Cannot find a queryable server, contact your cluster administrator to validate that all services are + * operational
  • + *
  • OPERATOR persona: Router unable to find a broker, check that brokers are up and active
  • + *
* The user-facing message doesn't talk about any Druid-specific concepts and just tries to relay a high-level * understanding of what happened. The admin-facing message includes Druid notions in it as it expects that an Admin * will understand the various node types of Druid. - * + *

* If we think about this error more, we will realize that it's fundamentally something wrong with the cluster setup, * which is something that we would expect an operator to be in charge of. So, we would pick the OPERATOR persona * message, which also allows us to include more specific information about what server was not found and provide a @@ -95,8 +94,8 @@ import java.util.Map; *

    *
  1. It identifies why the developer is creating the exception and who they believe can take action on it. * This context allows for code reviewers and other developers to evaluate the message with the persona in mind
  2. - *
  3. It can be used as a way to control which error messages should be routed where. For example, a user-targetted - * error message should be able to be exposed directly to the user, while an operator-targetted error message should + *
  4. It can be used as a way to control which error messages should be routed where. For example, a user-targeted + * error message should be able to be exposed directly to the user, while an operator-targeted error message should * perhaps be routed to the operators of the system instead of the end user firing a query.
  5. *
*

@@ -105,11 +104,11 @@ import java.util.Map; *

* The error code is a code that indicates a grouping of error messages. There is no forced structure around whether * a specific error code can be reused for different problems or not. That is, an error code like "general" will get - * reused in many different places as it's the basic error code used whenever a DruidException is created in-line. But, + * reused in many places as it's the basic error code used whenever a DruidException is created in-line. But, * we might decide that a specific type of error should be identified explicitly by its error code and should only mean * one thing, in which case that error code might only exist on a single error. *

- * The error message is a message written targetting the target persona. It should have values interpolated into it + * The error message is a message written targeting the target persona. It should have values interpolated into it * in order to be as meaningful as possible for the target persona without leaking potentially sensitive information. *

* The context is a place to add extra information about the error that is not necessarily interpolated into the @@ -132,7 +131,7 @@ import java.util.Map; public class DruidException extends RuntimeException { /** - * Starts building an "general" DruidException targetting the specific persona. + * Starts building a "general" DruidException targeting the specific persona. * * @param persona the target persona of the exception message * @return a builder that can be used to complete the creation of the DruidException @@ -266,7 +265,7 @@ public class DruidException extends RuntimeException } /** - * The persona that the message on a DruidException is targetting + * The persona that the message on a DruidException is targeting */ public enum Persona { @@ -307,7 +306,7 @@ public class DruidException extends RuntimeException DEFENSIVE(500), /** * Means that the input provided was malformed in some way. Generally speaking, it is hoped that errors of this - * category have messages written either targetting the USER or ADMIN personas as those are the general users + * category have messages written either targeting the USER or ADMIN personas as those are the general users * of the APIs who could generate invalid inputs. */ INVALID_INPUT(400), diff --git a/processing/src/main/java/org/apache/druid/query/ExecutionMode.java b/processing/src/main/java/org/apache/druid/query/ExecutionMode.java new file mode 100644 index 00000000000..9809d66051b --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/ExecutionMode.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +public enum ExecutionMode +{ + + /** + * This mode executes the query in a blocking way. The results are returned as part of the original post query call. Current sql/native endpoints are sync execution. + */ + SYNC, + + /** + * This mode executes the query in a non-blocking way. The results are returned as part of subsequent get results call. Currently, the msq engine uses this mode of execution. + */ + ASYNC + +} diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java b/processing/src/main/java/org/apache/druid/query/QueryContexts.java index ea21987bd18..203e63f23f6 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java +++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java @@ -88,6 +88,9 @@ public class QueryContexts public static final String CTX_SQL_QUERY_ID = BaseQuery.SQL_QUERY_ID; public static final String CTX_SQL_STRINGIFY_ARRAYS = "sqlStringifyArrays"; + // SQL statement resource specific keys + public static final String CTX_EXECUTION_MODE = "executionMode"; + // Defaults public static final boolean DEFAULT_BY_SEGMENT = false; public static final boolean DEFAULT_POPULATE_CACHE = true; @@ -425,8 +428,20 @@ public class QueryContexts public static > E getAsEnum(String key, Object value, Class clazz, E defaultValue) { - if (value == null) { + E result = getAsEnum(key, value, clazz); + if (result == null) { return defaultValue; + } else { + return result; + } + } + + + @Nullable + public static > E getAsEnum(String key, Object value, Class clazz) + { + if (value == null) { + return null; } try { diff --git a/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java b/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java index d47bb558fe9..38b5384ded9 100644 --- a/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java +++ b/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java @@ -295,4 +295,26 @@ public class QueryContextsTest () -> query.context().getEnum("e2", QueryContexts.Vectorize.class, QueryContexts.Vectorize.FALSE) ); } + + @Test + public void testExecutionModeEnum() + { + Query query = new TestQuery( + new TableDataSource("test"), + new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("0/100"))), + false, + ImmutableMap.of(QueryContexts.CTX_EXECUTION_MODE, "SYNC", QueryContexts.CTX_EXECUTION_MODE + "_1", "ASYNC") + ); + + Assert.assertEquals( + ExecutionMode.SYNC, + query.context().getEnum(QueryContexts.CTX_EXECUTION_MODE, ExecutionMode.class, ExecutionMode.ASYNC) + ); + + Assert.assertEquals( + ExecutionMode.ASYNC, + query.context().getEnum(QueryContexts.CTX_EXECUTION_MODE + "_1", ExecutionMode.class, ExecutionMode.SYNC) + ); + } + } diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java index 85344e8cdbd..dac5dfc2263 100644 --- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java +++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java @@ -20,6 +20,7 @@ package org.apache.druid.rpc.indexing; import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.client.indexing.TaskPayloadResponse; import org.apache.druid.client.indexing.TaskStatusResponse; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.rpc.ServiceRetryPolicy; @@ -52,5 +53,7 @@ public interface OverlordClient ListenableFuture> taskReportAsMap(String taskId); + ListenableFuture taskPayload(String taskId); + OverlordClient withRetryPolicy(ServiceRetryPolicy retryPolicy); } diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java index 4b9cfd7433f..5c28d87a8d1 100644 --- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java +++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.client.indexing.TaskPayloadResponse; import org.apache.druid.client.indexing.TaskStatusResponse; import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.indexer.TaskStatus; @@ -131,6 +132,22 @@ public class OverlordClientImpl implements OverlordClient ); } + @Override + public ListenableFuture taskPayload(String taskId) + { + final String path = StringUtils.format("/druid/indexer/v1/task/%s", StringUtils.urlEncode(taskId)); + + return FutureUtils.transform( + client.asyncRequest( + new RequestBuilder(HttpMethod.GET, path), + new BytesFullResponseHandler() + ), + holder -> deserialize(holder, new TypeReference() + { + }) + ); + } + @Override public OverlordClientImpl withRetryPolicy(ServiceRetryPolicy retryPolicy) { diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java index 907973edcf7..dbfa1ec1047 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java +++ b/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java @@ -59,6 +59,12 @@ public class NoopOverlordClient implements OverlordClient throw new UnsupportedOperationException(); } + @Override + public ListenableFuture taskPayload(String taskId) + { + throw new UnsupportedOperationException(); + } + @Override public OverlordClient withRetryPolicy(ServiceRetryPolicy retryPolicy) { diff --git a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java new file mode 100644 index 00000000000..26fb972ac5c --- /dev/null +++ b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.rpc.indexing; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery; +import org.apache.druid.client.indexing.ClientTaskQuery; +import org.apache.druid.client.indexing.TaskPayloadResponse; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.rpc.MockServiceClient; +import org.apache.druid.rpc.RequestBuilder; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.concurrent.ExecutionException; + +public class OverlordClientImplTest +{ + + + @Test + public void testTaskPayload() throws ExecutionException, InterruptedException, JsonProcessingException + { + final String taskID = "taskId_1"; + MockServiceClient client = new MockServiceClient(); + final OverlordClientImpl overlordClient = new OverlordClientImpl(client, DefaultObjectMapper.INSTANCE); + + ClientTaskQuery clientTaskQuery = new ClientKillUnusedSegmentsTaskQuery(taskID, "test", null, null); + + client.expect(new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/task/" + taskID), + HttpResponseStatus.OK, + Collections.emptyMap(), + DefaultObjectMapper.INSTANCE.writeValueAsBytes(new TaskPayloadResponse(taskID, clientTaskQuery)) + ); + + Assert.assertEquals(clientTaskQuery, overlordClient.taskPayload(taskID).get().getPayload()); + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/DirectStatement.java b/sql/src/main/java/org/apache/druid/sql/DirectStatement.java index 0bcf0f684ca..21ebe9e17ba 100644 --- a/sql/src/main/java/org/apache/druid/sql/DirectStatement.java +++ b/sql/src/main/java/org/apache/druid/sql/DirectStatement.java @@ -230,8 +230,8 @@ public class DirectStatement extends AbstractStatement implements Cancelable catch (RelOptPlanner.CannotPlanException e) { // Not sure if this is even thrown here. throw DruidException.forPersona(DruidException.Persona.DEVELOPER) - .ofCategory(DruidException.Category.UNCATEGORIZED) - .build(e, "Problem planning SQL query"); + .ofCategory(DruidException.Category.UNCATEGORIZED) + .build(e, "Problem planning SQL query"); } catch (RuntimeException e) { state = State.FAILED; diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index 8bffc909efa..0dfec3759e0 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -135,6 +135,40 @@ public class CalciteTests }; } }; + + public static final AuthorizerMapper TEST_EXTERNAL_AUTHORIZER_MAPPER = new AuthorizerMapper(null) + { + @Override + public Authorizer getAuthorizer(String name) + { + return (authenticationResult, resource, action) -> { + if (TEST_SUPERUSER_NAME.equals(authenticationResult.getIdentity())) { + return Access.OK; + } + + switch (resource.getType()) { + case ResourceType.DATASOURCE: + if (FORBIDDEN_DATASOURCE.equals(resource.getName())) { + return new Access(false); + } else { + return Access.OK; + } + case ResourceType.VIEW: + if ("forbiddenView".equals(resource.getName())) { + return new Access(false); + } else { + return Access.OK; + } + case ResourceType.QUERY_CONTEXT: + case ResourceType.EXTERNAL: + return Access.OK; + default: + return new Access(false); + } + }; + } + }; + public static final AuthenticatorMapper TEST_AUTHENTICATOR_MAPPER; static { diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java index 7dbc5ce6931..d92b1cbcd44 100644 --- a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java +++ b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java @@ -2056,7 +2056,7 @@ public class SqlResourceTest extends CalciteTestBase return JSON_MAPPER.readValue(responseToByteArray(resp), clazz); } - private byte[] responseToByteArray(Response resp) throws IOException + public static byte[] responseToByteArray(Response resp) throws IOException { if (resp.getEntity() instanceof StreamingOutput) { ByteArrayOutputStream baos = new ByteArrayOutputStream();