mirror of https://github.com/apache/druid.git
Sql statement api error messaging fixes. (#14629)
* Error messaging fixes. * Static check fix * Review comments
This commit is contained in:
parent
1ddbaa8744
commit
77e0c16bce
|
@ -243,6 +243,7 @@ The following table lists the context parameters for the MSQ task engine:
|
|||
| `indexSpec` | INSERT or REPLACE<br /><br />An [`indexSpec`](../ingestion/ingestion-spec.md#indexspec) to use when generating segments. May be a JSON string or object. See [Front coding](../ingestion/ingestion-spec.md#front-coding) for details on configuring an `indexSpec` with front coding. | See [`indexSpec`](../ingestion/ingestion-spec.md#indexspec). |
|
||||
| `durableShuffleStorage` | SELECT, INSERT, REPLACE <br /><br />Whether to use durable storage for shuffle mesh. To use this feature, configure the durable storage at the server level using `druid.msq.intermediate.storage.enable=true`). If these properties are not configured, any query with the context variable `durableShuffleStorage=true` fails with a configuration error. <br /><br /> | `false` |
|
||||
| `faultTolerance` | SELECT, INSERT, REPLACE<br /><br /> Whether to turn on fault tolerance mode or not. Failed workers are retried based on [Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly set to false. | `false` |
|
||||
| `selectDestination` | SELECT<br /><br /> Controls where the final result of the select query is written. <br />Use `taskReport`(the default) to write select results to the task report. <b> This is not scalable since task reports size explodes for large results </b> <br/>Use `durableStorage` to write results to durable storage location. <b>For large results sets, its recommended to use `durableStorage` </b>. To configure durable storage see [`this`](#durable-storage) section. | `taskReport` |
|
||||
|
||||
## Joins
|
||||
|
||||
|
@ -376,6 +377,12 @@ When you run a query, include the context parameter `durableShuffleStorage` and
|
|||
|
||||
For queries where you want to use fault tolerance for workers, set `faultTolerance` to `true`, which automatically sets `durableShuffleStorage` to `true`.
|
||||
|
||||
Set `selectDestination`:`durableStorage` for select queries that want to write the final results to durable storage instead of the task reports. Saving the results in the durable
|
||||
storage allows users to fetch large result sets. The location where the workers write the intermediate results is different than the location where final results get stored. Therefore, `durableShuffleStorage`:`false` and
|
||||
`selectDestination`:`durableStorage` is a valid configuration to use in the query context, that instructs the controller to persist only the final result in the durable storage, and not the
|
||||
intermediate results.
|
||||
|
||||
|
||||
## Durable storage configurations
|
||||
|
||||
The following common service properties control how durable storage behaves:
|
||||
|
|
|
@ -605,14 +605,14 @@ public class ControllerImpl implements Controller
|
|||
if (MSQControllerTask.writeResultsToDurableStorage(task.getQuerySpec())) {
|
||||
taskContextOverridesBuilder.put(
|
||||
MultiStageQueryContext.CTX_SELECT_DESTINATION,
|
||||
MSQSelectDestination.DURABLE_STORAGE.name()
|
||||
MSQSelectDestination.DURABLESTORAGE.getName()
|
||||
);
|
||||
} else {
|
||||
// we need not pass the value 'TaskReport' to the worker since the worker impl does not do anything in such a case.
|
||||
// but we are passing it anyway for completeness
|
||||
taskContextOverridesBuilder.put(
|
||||
MultiStageQueryContext.CTX_SELECT_DESTINATION,
|
||||
MSQSelectDestination.TASK_REPORT.name()
|
||||
MSQSelectDestination.TASKREPORT.getName()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -732,7 +732,7 @@ public class WorkerImpl implements Worker
|
|||
final int frameSize = frameContext.memoryParameters().getStandardFrameSize();
|
||||
|
||||
if (durableStageStorageEnabled || (isFinalStage
|
||||
&& MSQSelectDestination.DURABLE_STORAGE.equals(selectDestination))) {
|
||||
&& MSQSelectDestination.DURABLESTORAGE.equals(selectDestination))) {
|
||||
return DurableStorageOutputChannelFactory.createStandardImplementation(
|
||||
task.getControllerTaskId(),
|
||||
task().getWorkerNumber(),
|
||||
|
@ -741,7 +741,7 @@ public class WorkerImpl implements Worker
|
|||
frameSize,
|
||||
MSQTasks.makeStorageConnector(context.injector()),
|
||||
context.tempDir(),
|
||||
(isFinalStage && MSQSelectDestination.DURABLE_STORAGE.equals(selectDestination))
|
||||
(isFinalStage && MSQSelectDestination.DURABLESTORAGE.equals(selectDestination))
|
||||
);
|
||||
} else {
|
||||
final File fileChannelDirectory =
|
||||
|
@ -1320,7 +1320,7 @@ public class WorkerImpl implements Worker
|
|||
{
|
||||
final DurableStorageOutputChannelFactory durableStorageOutputChannelFactory;
|
||||
if (durableStageStorageEnabled || (isFinalStage
|
||||
&& MSQSelectDestination.DURABLE_STORAGE.equals(selectDestination))) {
|
||||
&& MSQSelectDestination.DURABLESTORAGE.equals(selectDestination))) {
|
||||
durableStorageOutputChannelFactory = DurableStorageOutputChannelFactory.createStandardImplementation(
|
||||
task.getControllerTaskId(),
|
||||
task().getWorkerNumber(),
|
||||
|
@ -1329,7 +1329,7 @@ public class WorkerImpl implements Worker
|
|||
frameContext.memoryParameters().getStandardFrameSize(),
|
||||
MSQTasks.makeStorageConnector(context.injector()),
|
||||
context.tempDir(),
|
||||
(isFinalStage && MSQSelectDestination.DURABLE_STORAGE.equals(selectDestination))
|
||||
(isFinalStage && MSQSelectDestination.DURABLESTORAGE.equals(selectDestination))
|
||||
);
|
||||
} else {
|
||||
return;
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
|
||||
package org.apache.druid.msq.indexing.destination;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonValue;
|
||||
|
||||
/**
|
||||
* Determines the destination for results of select queries.
|
||||
*/
|
||||
|
@ -27,12 +29,13 @@ public enum MSQSelectDestination
|
|||
/**
|
||||
* Writes all the results directly to the report.
|
||||
*/
|
||||
TASK_REPORT(false),
|
||||
TASKREPORT("taskReport", false),
|
||||
/**
|
||||
* Writes the results as frame files to durable storage. Task report can be truncated to a preview.
|
||||
*/
|
||||
DURABLE_STORAGE(true);
|
||||
DURABLESTORAGE("durableStorage", true);
|
||||
|
||||
private final String name;
|
||||
private final boolean shouldTruncateResultsInTaskReport;
|
||||
|
||||
public boolean shouldTruncateResultsInTaskReport()
|
||||
|
@ -40,8 +43,24 @@ public enum MSQSelectDestination
|
|||
return shouldTruncateResultsInTaskReport;
|
||||
}
|
||||
|
||||
MSQSelectDestination(boolean shouldTruncateResultsInTaskReport)
|
||||
MSQSelectDestination(String name, boolean shouldTruncateResultsInTaskReport)
|
||||
{
|
||||
this.name = name;
|
||||
this.shouldTruncateResultsInTaskReport = shouldTruncateResultsInTaskReport;
|
||||
}
|
||||
|
||||
@JsonValue
|
||||
public String getName()
|
||||
{
|
||||
return name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "MSQSelectDestination{" +
|
||||
"name='" + name + '\'' +
|
||||
", shouldTruncateResultsInTaskReport=" + shouldTruncateResultsInTaskReport +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,7 +25,7 @@ import org.apache.druid.storage.StorageConnector;
|
|||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
/**
|
||||
* Used for reading results when select destination is {@link org.apache.druid.msq.indexing.destination.MSQSelectDestination#DURABLE_STORAGE}
|
||||
* Used for reading results when select destination is {@link org.apache.druid.msq.indexing.destination.MSQSelectDestination#DURABLESTORAGE}
|
||||
*/
|
||||
public class DurableStorageQueryResultsInputChannelFactory extends DurableStorageInputChannelFactory
|
||||
{
|
||||
|
|
|
@ -66,6 +66,7 @@ import org.joda.time.Interval;
|
|||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
@ -233,17 +234,18 @@ public class MSQTaskQueryMaker implements QueryMaker
|
|||
);
|
||||
} else {
|
||||
final MSQSelectDestination msqSelectDestination = MultiStageQueryContext.getSelectDestination(sqlQueryContext);
|
||||
if (msqSelectDestination.equals(MSQSelectDestination.TASK_REPORT)) {
|
||||
if (msqSelectDestination.equals(MSQSelectDestination.TASKREPORT)) {
|
||||
destination = TaskReportMSQDestination.instance();
|
||||
} else if (msqSelectDestination.equals(MSQSelectDestination.DURABLE_STORAGE)) {
|
||||
} else if (msqSelectDestination.equals(MSQSelectDestination.DURABLESTORAGE)) {
|
||||
destination = DurableStorageMSQDestination.instance();
|
||||
} else {
|
||||
throw InvalidInput.exception(
|
||||
"Unsupported select destination [%s] provided in the query context. MSQ can currently write the select results to "
|
||||
+ "[%s] and [%s]",
|
||||
msqSelectDestination.name(),
|
||||
MSQSelectDestination.TASK_REPORT.toString(),
|
||||
MSQSelectDestination.DURABLE_STORAGE.toString()
|
||||
+ "[%s]",
|
||||
msqSelectDestination.getName(),
|
||||
Arrays.stream(MSQSelectDestination.values())
|
||||
.map(MSQSelectDestination::getName)
|
||||
.collect(Collectors.joining(","))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.druid.msq.sql.entity;
|
|||
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -32,36 +33,22 @@ import java.util.Objects;
|
|||
*/
|
||||
public class PageInformation
|
||||
{
|
||||
private final long id;
|
||||
@Nullable
|
||||
private final Long numRows;
|
||||
@Nullable
|
||||
private final Long sizeInBytes;
|
||||
private final long id;
|
||||
|
||||
@JsonCreator
|
||||
public PageInformation(
|
||||
@JsonProperty("id") long id,
|
||||
@JsonProperty("numRows") @Nullable Long numRows,
|
||||
@JsonProperty("sizeInBytes") @Nullable Long sizeInBytes,
|
||||
@JsonProperty("id") long id
|
||||
@JsonProperty("sizeInBytes") @Nullable Long sizeInBytes
|
||||
)
|
||||
{
|
||||
this.id = id;
|
||||
this.numRows = numRows;
|
||||
this.sizeInBytes = sizeInBytes;
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
@Nullable
|
||||
public Long getNumRows()
|
||||
{
|
||||
return numRows;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
@Nullable
|
||||
public Long getSizeInBytes()
|
||||
{
|
||||
return sizeInBytes;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -70,6 +57,23 @@ public class PageInformation
|
|||
return id;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||
@Nullable
|
||||
public Long getNumRows()
|
||||
{
|
||||
return numRows;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||
@Nullable
|
||||
public Long getSizeInBytes()
|
||||
{
|
||||
return sizeInBytes;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
|
@ -89,16 +93,16 @@ public class PageInformation
|
|||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(numRows, sizeInBytes, id);
|
||||
return Objects.hash(id, numRows, sizeInBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "PageInformation{" +
|
||||
"numRows=" + numRows +
|
||||
"id=" + id +
|
||||
", numRows=" + numRows +
|
||||
", sizeInBytes=" + sizeInBytes +
|
||||
", id=" + id +
|
||||
'}';
|
||||
}
|
||||
|
||||
|
|
|
@ -28,6 +28,7 @@ import com.google.inject.Inject;
|
|||
import org.apache.druid.client.indexing.TaskPayloadResponse;
|
||||
import org.apache.druid.client.indexing.TaskStatusResponse;
|
||||
import org.apache.druid.common.guava.FutureUtils;
|
||||
import org.apache.druid.discovery.NodeRole;
|
||||
import org.apache.druid.error.DruidException;
|
||||
import org.apache.druid.error.ErrorResponse;
|
||||
import org.apache.druid.error.Forbidden;
|
||||
|
@ -830,16 +831,16 @@ public class SqlStatementResource
|
|||
|
||||
if (executionMode == null) {
|
||||
throw InvalidInput.exception(
|
||||
"Execution mode is not provided to the SQL statement API. "
|
||||
"Execution mode is not provided to the sql statement api. "
|
||||
+ "Please set [%s] to [%s] in the query context",
|
||||
QueryContexts.CTX_EXECUTION_MODE,
|
||||
ExecutionMode.ASYNC
|
||||
);
|
||||
}
|
||||
|
||||
if (ExecutionMode.ASYNC != executionMode) {
|
||||
if (!ExecutionMode.ASYNC.equals(executionMode)) {
|
||||
throw InvalidInput.exception(
|
||||
"The SQL statement API currently does not support the provided execution mode [%s]. "
|
||||
"The sql statement api currently does not support the provided execution mode [%s]. "
|
||||
+ "Please set [%s] to [%s] in the query context",
|
||||
executionMode,
|
||||
QueryContexts.CTX_EXECUTION_MODE,
|
||||
|
@ -848,7 +849,7 @@ public class SqlStatementResource
|
|||
}
|
||||
|
||||
MSQSelectDestination selectDestination = MultiStageQueryContext.getSelectDestination(queryContext);
|
||||
if (selectDestination == MSQSelectDestination.DURABLE_STORAGE) {
|
||||
if (MSQSelectDestination.DURABLESTORAGE.equals(selectDestination)) {
|
||||
checkForDurableStorageConnectorImpl();
|
||||
}
|
||||
}
|
||||
|
@ -860,12 +861,13 @@ public class SqlStatementResource
|
|||
.ofCategory(DruidException.Category.INVALID_INPUT)
|
||||
.build(
|
||||
StringUtils.format(
|
||||
"The SQL Statement API cannot read from the select destination [%s] provided "
|
||||
+ "in the query context [%s] since it is not configured. It is recommended to configure the durable storage "
|
||||
"The sql statement api cannot read from the select destination [%s] provided "
|
||||
+ "in the query context [%s] since it is not configured on the %s. It is recommended to configure durable storage "
|
||||
+ "as it allows the user to fetch large result sets. Please contact your cluster admin to "
|
||||
+ "configure durable storage.",
|
||||
MSQSelectDestination.DURABLE_STORAGE.name(),
|
||||
MultiStageQueryContext.CTX_SELECT_DESTINATION
|
||||
MSQSelectDestination.DURABLESTORAGE.getName(),
|
||||
MultiStageQueryContext.CTX_SELECT_DESTINATION,
|
||||
NodeRole.BROKER.getJsonName()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
|
|
@ -67,7 +67,7 @@ import java.util.stream.Collectors;
|
|||
*
|
||||
* <li><b>selectDestination</b>: If the query is a Select, determines the location to write results to, once the query
|
||||
* is finished. Depending on the location, the results might also be truncated to {@link Limits#MAX_SELECT_RESULT_ROWS}.
|
||||
* Default value is {@link MSQSelectDestination#TASK_REPORT}, which writes all the results to the report.
|
||||
* Default value is {@link MSQSelectDestination#TASKREPORT}, which writes all the results to the report.
|
||||
*
|
||||
* <li><b>useAutoColumnSchemas</b>: Temporary flag to allow experimentation using
|
||||
* {@link org.apache.druid.segment.AutoTypeColumnSchema} for all 'standard' type columns during segment generation,
|
||||
|
@ -93,7 +93,7 @@ public class MultiStageQueryContext
|
|||
public static final String CTX_DURABLE_SHUFFLE_STORAGE = "durableShuffleStorage";
|
||||
private static final boolean DEFAULT_DURABLE_SHUFFLE_STORAGE = false;
|
||||
public static final String CTX_SELECT_DESTINATION = "selectDestination";
|
||||
private static final String DEFAULT_SELECT_DESTINATION = MSQSelectDestination.TASK_REPORT.toString();
|
||||
private static final String DEFAULT_SELECT_DESTINATION = MSQSelectDestination.TASKREPORT.getName();
|
||||
|
||||
public static final String CTX_FAULT_TOLERANCE = "faultTolerance";
|
||||
public static final boolean DEFAULT_FAULT_TOLERANCE = false;
|
||||
|
|
|
@ -61,7 +61,6 @@ import org.apache.druid.sql.calcite.run.SqlResults;
|
|||
import javax.annotation.Nullable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
@ -142,27 +141,6 @@ public class SqlStatementResourceHelper
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
||||
|
||||
public static long getLastIndex(Long numberOfRows, long start)
|
||||
{
|
||||
final long last;
|
||||
if (numberOfRows == null) {
|
||||
last = Long.MAX_VALUE;
|
||||
} else {
|
||||
long finalIndex;
|
||||
try {
|
||||
finalIndex = Math.addExact(start, numberOfRows);
|
||||
}
|
||||
catch (ArithmeticException e) {
|
||||
finalIndex = Long.MAX_VALUE;
|
||||
}
|
||||
last = finalIndex;
|
||||
}
|
||||
return last;
|
||||
}
|
||||
|
||||
/**
|
||||
* Populates pages list from the {@link CounterSnapshotsTree}.
|
||||
* <br>
|
||||
|
@ -170,7 +148,7 @@ public class SqlStatementResourceHelper
|
|||
* <ol>
|
||||
* <li>{@link DataSourceMSQDestination} a single page is returned which adds all the counters of {@link SegmentGenerationProgressCounter.Snapshot}</li>
|
||||
* <li>{@link TaskReportMSQDestination} a single page is returned which adds all the counters of {@link ChannelCounters}</li>
|
||||
* <li>{@link DurableStorageMSQDestination} a page is returned for each worker which has generated output rows.
|
||||
* <li>{@link DurableStorageMSQDestination} a page is returned for each worker which has generated output rows. The list is sorted on page Id.
|
||||
* If the worker generated 0 rows, we do no populated a page for it. {@link PageInformation#id} is equal to the worker number</li>
|
||||
* </ol>
|
||||
*/
|
||||
|
@ -183,8 +161,9 @@ public class SqlStatementResourceHelper
|
|||
return Optional.empty();
|
||||
}
|
||||
int finalStage = msqTaskReportPayload.getStages().getStages().size() - 1;
|
||||
Map<Integer, CounterSnapshots> workerCounters = msqTaskReportPayload.getCounters().snapshotForStage(finalStage);
|
||||
if (workerCounters == null) {
|
||||
CounterSnapshotsTree counterSnapshotsTree = msqTaskReportPayload.getCounters();
|
||||
Map<Integer, CounterSnapshots> workerCounters = counterSnapshotsTree.snapshotForStage(finalStage);
|
||||
if (workerCounters == null || workerCounters.isEmpty()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
|
@ -198,7 +177,7 @@ public class SqlStatementResourceHelper
|
|||
}
|
||||
}
|
||||
if (rows != 0L) {
|
||||
return Optional.of(ImmutableList.of(new PageInformation(rows, null, 0)));
|
||||
return Optional.of(ImmutableList.of(new PageInformation(0, rows, null)));
|
||||
} else {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
@ -213,7 +192,7 @@ public class SqlStatementResourceHelper
|
|||
}
|
||||
}
|
||||
if (rows != 0L) {
|
||||
return Optional.of(ImmutableList.of(new PageInformation(rows, size, 0)));
|
||||
return Optional.of(ImmutableList.of(new PageInformation(0, rows, size)));
|
||||
} else {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
@ -230,10 +209,10 @@ public class SqlStatementResourceHelper
|
|||
}
|
||||
// do not populate a page if the worker generated 0 rows.
|
||||
if (rows != 0L) {
|
||||
pageList.add(new PageInformation(rows, size, counterSnapshots.getKey()));
|
||||
pageList.add(new PageInformation(counterSnapshots.getKey(), rows, size));
|
||||
}
|
||||
}
|
||||
Collections.sort(pageList, PageInformation.getIDComparator());
|
||||
pageList.sort(PageInformation.getIDComparator());
|
||||
return Optional.of(pageList);
|
||||
} else {
|
||||
return Optional.empty();
|
||||
|
@ -378,7 +357,6 @@ public class SqlStatementResourceHelper
|
|||
public static Map<String, Object> getPayload(Map<String, Object> results)
|
||||
{
|
||||
Map<String, Object> msqReport = getMap(results, "multiStageQuery");
|
||||
Map<String, Object> payload = getMap(msqReport, "payload");
|
||||
return payload;
|
||||
return getMap(msqReport, "payload");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -111,7 +111,7 @@ public class MSQSelectTest extends MSQTestBase
|
|||
.putAll(DURABLE_STORAGE_MSQ_CONTEXT)
|
||||
.put(
|
||||
MultiStageQueryContext.CTX_SELECT_DESTINATION,
|
||||
MSQSelectDestination.DURABLE_STORAGE.name().toLowerCase(Locale.ENGLISH)
|
||||
MSQSelectDestination.DURABLESTORAGE.getName().toLowerCase(Locale.ENGLISH)
|
||||
)
|
||||
.build();
|
||||
|
||||
|
@ -121,7 +121,7 @@ public class MSQSelectTest extends MSQTestBase
|
|||
.putAll(DEFAULT_MSQ_CONTEXT)
|
||||
.put(
|
||||
MultiStageQueryContext.CTX_SELECT_DESTINATION,
|
||||
MSQSelectDestination.DURABLE_STORAGE.name().toLowerCase(Locale.ENGLISH)
|
||||
MSQSelectDestination.DURABLESTORAGE.getName().toLowerCase(Locale.ENGLISH)
|
||||
)
|
||||
.build();
|
||||
|
||||
|
|
|
@ -124,7 +124,7 @@ public class SqlMSQStatementResourcePostTest extends MSQTestBase
|
|||
null,
|
||||
MSQControllerTask.DUMMY_DATASOURCE_FOR_SELECT,
|
||||
results,
|
||||
ImmutableList.of(new PageInformation(6L, 316L, 0))
|
||||
ImmutableList.of(new PageInformation(0, 6L, 316L))
|
||||
),
|
||||
null
|
||||
);
|
||||
|
@ -150,7 +150,7 @@ public class SqlMSQStatementResourcePostTest extends MSQTestBase
|
|||
ImmutableMap.of(),
|
||||
null
|
||||
), SqlStatementResourceTest.makeOkRequest()),
|
||||
"Execution mode is not provided to the SQL statement API. "
|
||||
"Execution mode is not provided to the sql statement api. "
|
||||
+ "Please set [executionMode] to [ASYNC] in the query context",
|
||||
Response.Status.BAD_REQUEST
|
||||
);
|
||||
|
@ -165,7 +165,7 @@ public class SqlMSQStatementResourcePostTest extends MSQTestBase
|
|||
ImmutableMap.of(QueryContexts.CTX_EXECUTION_MODE, ExecutionMode.SYNC.name()),
|
||||
null
|
||||
), SqlStatementResourceTest.makeOkRequest()),
|
||||
"The SQL statement API currently does not support the provided execution mode [SYNC]. "
|
||||
"The sql statement api currently does not support the provided execution mode [SYNC]. "
|
||||
+ "Please set [executionMode] to [ASYNC] in the query context",
|
||||
Response.Status.BAD_REQUEST
|
||||
);
|
||||
|
@ -273,12 +273,12 @@ public class SqlMSQStatementResourcePostTest extends MSQTestBase
|
|||
NilStorageConnector.getInstance()
|
||||
);
|
||||
|
||||
String errorMessage = "The SQL Statement API cannot read from the select destination [DURABLE_STORAGE] provided in "
|
||||
+ "the query context [selectDestination] since it is not configured. It is recommended to "
|
||||
+ "configure the durable storage as it allows the user to fetch large result sets. "
|
||||
String errorMessage = "The sql statement api cannot read from the select destination [durableStorage] provided in "
|
||||
+ "the query context [selectDestination] since it is not configured on the broker. It is recommended to "
|
||||
+ "configure durable storage as it allows the user to fetch large result sets. "
|
||||
+ "Please contact your cluster admin to configure durable storage.";
|
||||
Map<String, Object> context = defaultAsyncContext();
|
||||
context.put(MultiStageQueryContext.CTX_SELECT_DESTINATION, MSQSelectDestination.DURABLE_STORAGE.name());
|
||||
context.put(MultiStageQueryContext.CTX_SELECT_DESTINATION, MSQSelectDestination.DURABLESTORAGE.getName());
|
||||
|
||||
SqlStatementResourceTest.assertExceptionMessage(resourceWithDurableStorage.doPost(
|
||||
new SqlQuery(
|
||||
|
@ -300,7 +300,7 @@ public class SqlMSQStatementResourcePostTest extends MSQTestBase
|
|||
public void testWithDurableStorage() throws IOException
|
||||
{
|
||||
Map<String, Object> context = defaultAsyncContext();
|
||||
context.put(MultiStageQueryContext.CTX_SELECT_DESTINATION, MSQSelectDestination.DURABLE_STORAGE.name());
|
||||
context.put(MultiStageQueryContext.CTX_SELECT_DESTINATION, MSQSelectDestination.DURABLESTORAGE.getName());
|
||||
|
||||
SqlStatementResult sqlStatementResult = (SqlStatementResult) resource.doPost(
|
||||
new SqlQuery(
|
||||
|
|
|
@ -732,7 +732,7 @@ public class SqlStatementResourceTest extends MSQTestBase
|
|||
null,
|
||||
MSQControllerTask.DUMMY_DATASOURCE_FOR_SELECT,
|
||||
RESULT_ROWS,
|
||||
ImmutableList.of(new PageInformation(3L, 8L, 0L))
|
||||
ImmutableList.of(new PageInformation(0, 3L, 8L))
|
||||
),
|
||||
null
|
||||
)), objectMapper.writeValueAsString(response.getEntity()));
|
||||
|
|
|
@ -37,7 +37,7 @@ public class ResultSetInformationTest
|
|||
ResultFormat.OBJECT,
|
||||
"ds",
|
||||
null,
|
||||
ImmutableList.of(new PageInformation(1L, 1L, 0))
|
||||
ImmutableList.of(new PageInformation(0, null, 1L))
|
||||
);
|
||||
|
||||
|
||||
|
@ -51,10 +51,10 @@ public class ResultSetInformationTest
|
|||
new String[]{"2"},
|
||||
new String[]{"3"}
|
||||
),
|
||||
ImmutableList.of(new PageInformation(1L, 1L, 0))
|
||||
ImmutableList.of(new PageInformation(0, 1L, 1L))
|
||||
);
|
||||
public static final String JSON_STRING = "{\"numTotalRows\":1,\"totalSizeInBytes\":1,\"resultFormat\":\"object\",\"dataSource\":\"ds\",\"pages\":[{\"numRows\":1,\"sizeInBytes\":1,\"id\":0}]}";
|
||||
public static final String JSON_STRING_1 = "{\"numTotalRows\":1,\"totalSizeInBytes\":1,\"resultFormat\":\"object\",\"dataSource\":\"ds\",\"sampleRecords\":[[\"1\"],[\"2\"],[\"3\"]],\"pages\":[{\"numRows\":1,\"sizeInBytes\":1,\"id\":0}]}";
|
||||
public static final String JSON_STRING = "{\"numTotalRows\":1,\"totalSizeInBytes\":1,\"resultFormat\":\"object\",\"dataSource\":\"ds\",\"pages\":[{\"id\":0,\"sizeInBytes\":1}]}";
|
||||
public static final String JSON_STRING_1 = "{\"numTotalRows\":1,\"totalSizeInBytes\":1,\"resultFormat\":\"object\",\"dataSource\":\"ds\",\"sampleRecords\":[[\"1\"],[\"2\"],[\"3\"]],\"pages\":[{\"id\":0,\"numRows\":1,\"sizeInBytes\":1}]}";
|
||||
|
||||
@Test
|
||||
public void sanityTest() throws JsonProcessingException
|
||||
|
@ -66,7 +66,7 @@ public class ResultSetInformationTest
|
|||
MAPPER.readValue(MAPPER.writeValueAsString(RESULTS), ResultSetInformation.class).hashCode()
|
||||
);
|
||||
Assert.assertEquals(
|
||||
"ResultSetInformation{numTotalRows=1, totalSizeInBytes=1, resultFormat=object, records=null, dataSource='ds', pages=[PageInformation{numRows=1, sizeInBytes=1, id=0}]}",
|
||||
"ResultSetInformation{numTotalRows=1, totalSizeInBytes=1, resultFormat=object, records=null, dataSource='ds', pages=[PageInformation{id=0, numRows=null, sizeInBytes=1}]}",
|
||||
RESULTS.toString()
|
||||
);
|
||||
}
|
||||
|
|
|
@ -43,7 +43,7 @@ public class SqlStatementResultTest
|
|||
+ "\"createdAt\":\"2023-05-31T12:00:00.000Z\","
|
||||
+ "\"schema\":[{\"name\":\"_time\",\"type\":\"TIMESTAMP\",\"nativeType\":\"LONG\"},{\"name\":\"alias\",\"type\":\"VARCHAR\",\"nativeType\":\"STRING\"},{\"name\":\"market\",\"type\":\"VARCHAR\",\"nativeType\":\"STRING\"}],"
|
||||
+ "\"durationMs\":100,"
|
||||
+ "\"result\":{\"numTotalRows\":1,\"totalSizeInBytes\":1,\"resultFormat\":\"object\",\"dataSource\":\"ds\",\"pages\":[{\"numRows\":1,\"sizeInBytes\":1,\"id\":0}]},"
|
||||
+ "\"result\":{\"numTotalRows\":1,\"totalSizeInBytes\":1,\"resultFormat\":\"object\",\"dataSource\":\"ds\",\"pages\":[{\"id\":0,\"sizeInBytes\":1}]},"
|
||||
+ "\"errorDetails\":{\"error\":\"druidException\",\"errorCode\":\"QueryNotSupported\",\"persona\":\"USER\",\"category\":\"UNCATEGORIZED\",\"errorMessage\":\"QueryNotSupported\",\"context\":{}}}";
|
||||
|
||||
public static final SqlStatementResult SQL_STATEMENT_RESULT = new SqlStatementResult(
|
||||
|
@ -58,10 +58,9 @@ public class SqlStatementResultTest
|
|||
@Override
|
||||
protected DruidException makeException(DruidException.DruidExceptionBuilder bob)
|
||||
{
|
||||
DruidException ex = bob.forPersona(DruidException.Persona.USER)
|
||||
return bob.forPersona(DruidException.Persona.USER)
|
||||
.ofCategory(DruidException.Category.UNCATEGORIZED)
|
||||
.build(MSQ_EXCEPTION.getMessage());
|
||||
return ex;
|
||||
}
|
||||
}).toErrorResponse()
|
||||
);
|
||||
|
@ -87,7 +86,7 @@ public class SqlStatementResultTest
|
|||
+ " createdAt=2023-05-31T12:00:00.000Z,"
|
||||
+ " sqlRowSignature=[ColumnNameAndTypes{colName='_time', sqlTypeName='TIMESTAMP', nativeTypeName='LONG'}, ColumnNameAndTypes{colName='alias', sqlTypeName='VARCHAR', nativeTypeName='STRING'}, ColumnNameAndTypes{colName='market', sqlTypeName='VARCHAR', nativeTypeName='STRING'}],"
|
||||
+ " durationInMs=100,"
|
||||
+ " resultSetInformation=ResultSetInformation{numTotalRows=1, totalSizeInBytes=1, resultFormat=object, records=null, dataSource='ds', pages=[PageInformation{numRows=1, sizeInBytes=1, id=0}]},"
|
||||
+ " resultSetInformation=ResultSetInformation{numTotalRows=1, totalSizeInBytes=1, resultFormat=object, records=null, dataSource='ds', pages=[PageInformation{id=0, numRows=null, sizeInBytes=1}]},"
|
||||
+ " errorResponse={error=druidException, errorCode=QueryNotSupported, persona=USER, category=UNCATEGORIZED, errorMessage=QueryNotSupported, context={}}}",
|
||||
SQL_STATEMENT_RESULT.toString()
|
||||
);
|
||||
|
|
|
@ -253,7 +253,7 @@ public class MultiStageQueryContextTest
|
|||
@Test
|
||||
public void limitSelectResultReturnsDefaultValue()
|
||||
{
|
||||
Assert.assertEquals(MSQSelectDestination.TASK_REPORT, MultiStageQueryContext.getSelectDestination(QueryContext.empty()));
|
||||
Assert.assertEquals(MSQSelectDestination.TASKREPORT, MultiStageQueryContext.getSelectDestination(QueryContext.empty()));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -31,10 +31,12 @@ import org.apache.druid.java.util.common.StringUtils;
|
|||
import javax.annotation.Nullable;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@PublicApi
|
||||
public class QueryContexts
|
||||
|
@ -454,7 +456,12 @@ public class QueryContexts
|
|||
catch (IllegalArgumentException e) {
|
||||
throw badValueException(
|
||||
key,
|
||||
StringUtils.format("a value of enum [%s]", clazz.getSimpleName()),
|
||||
StringUtils.format(
|
||||
"referring to one of the values [%s] of enum [%s]",
|
||||
Arrays.stream(clazz.getEnumConstants()).map(E::name).collect(
|
||||
Collectors.joining(",")),
|
||||
clazz.getSimpleName()
|
||||
),
|
||||
value
|
||||
);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue