Using DruidExceptions in MSQ (changes related to the Broker) (#14534)

MSQ engine returns correct error codes for invalid user inputs in the query context. Also, using DruidExceptions for MSQ related errors happening in the Broker with improved error messages.
This commit is contained in:
Laksh Singla 2023-07-13 19:08:49 +00:00 committed by GitHub
parent 589aac8b31
commit c1c7dff2ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 486 additions and 252 deletions

View File

@ -26,8 +26,8 @@ import org.apache.calcite.runtime.Hook;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Pair;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
@ -48,7 +48,6 @@ import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.server.QueryResponse;
@ -76,10 +75,6 @@ import java.util.stream.Collectors;
public class MSQTaskQueryMaker implements QueryMaker
{
private static final String DESTINATION_DATASOURCE = "dataSource";
private static final String DESTINATION_REPORT = "taskReport";
public static final String USER_KEY = "__user";
private static final Granularity DEFAULT_SEGMENT_GRANULARITY = Granularities.ALL;
@ -128,9 +123,6 @@ public class MSQTaskQueryMaker implements QueryMaker
MSQMode.populateDefaultQueryContext(msqMode, nativeQueryContext);
}
final String ctxDestination =
DimensionHandlerUtils.convertObjectToString(MultiStageQueryContext.getDestination(sqlQueryContext));
Object segmentGranularity;
try {
segmentGranularity = Optional.ofNullable(plannerContext.queryContext()
@ -138,15 +130,24 @@ public class MSQTaskQueryMaker implements QueryMaker
.orElse(jsonMapper.writeValueAsString(DEFAULT_SEGMENT_GRANULARITY));
}
catch (JsonProcessingException e) {
throw new IAE("Unable to deserialize the insert granularity. Please retry the query with a valid "
+ "segment graularity");
// This would only be thrown if we are unable to serialize the DEFAULT_SEGMENT_GRANULARITY, which we don't expect
// to happen
throw DruidException.defensive()
.build(
e,
"Unable to deserialize the DEFAULT_SEGMENT_GRANULARITY in MSQTaskQueryMaker. "
+ "This shouldn't have happened since the DEFAULT_SEGMENT_GRANULARITY object is guaranteed to be "
+ "serializable. Please raise an issue in case you are seeing this message while executing a query."
);
}
final int maxNumTasks = MultiStageQueryContext.getMaxNumTasks(sqlQueryContext);
if (maxNumTasks < 2) {
throw new IAE(MultiStageQueryContext.CTX_MAX_NUM_TASKS
+ " cannot be less than 2 since at least 1 controller and 1 worker is necessary.");
throw InvalidInput.exception(
"MSQ context maxNumTasks [%,d] cannot be less than 2, since at least 1 controller and 1 worker is necessary",
maxNumTasks
);
}
// This parameter is used internally for the number of worker tasks only, so we subtract 1
@ -202,16 +203,19 @@ public class MSQTaskQueryMaker implements QueryMaker
final MSQDestination destination;
if (targetDataSource != null) {
if (ctxDestination != null && !DESTINATION_DATASOURCE.equals(ctxDestination)) {
throw new IAE("Cannot INSERT with destination [%s]", ctxDestination);
}
Granularity segmentGranularityObject;
try {
segmentGranularityObject = jsonMapper.readValue((String) segmentGranularity, Granularity.class);
}
catch (Exception e) {
throw new ISE("Unable to convert %s to a segment granularity", segmentGranularity);
throw DruidException.defensive()
.build(
e,
"Unable to deserialize the provided segmentGranularity [%s]. "
+ "This is populated internally by Druid and therefore should not occur. "
+ "Please contact the developers if you are seeing this error message.",
segmentGranularity
);
}
final List<String> segmentSortOrder = MultiStageQueryContext.getSortOrder(sqlQueryContext);
@ -228,16 +232,19 @@ public class MSQTaskQueryMaker implements QueryMaker
replaceTimeChunks
);
} else {
if (ctxDestination != null && !DESTINATION_REPORT.equals(ctxDestination)) {
throw new IAE("Cannot SELECT with destination [%s]", ctxDestination);
}
final MSQSelectDestination msqSelectDestination = MultiStageQueryContext.getSelectDestination(sqlQueryContext);
if (msqSelectDestination.equals(MSQSelectDestination.TASK_REPORT)) {
destination = TaskReportMSQDestination.instance();
} else if (msqSelectDestination.equals(MSQSelectDestination.DURABLE_STORAGE)) {
destination = DurableStorageMSQDestination.instance();
} else {
throw new IAE("Cannot SELECT with destination [%s]", msqSelectDestination.name());
throw InvalidInput.exception(
"Unsupported select destination [%s] provided in the query context. MSQ can currently write the select results to "
+ "[%s] and [%s]",
msqSelectDestination.name(),
MSQSelectDestination.TASK_REPORT.toString(),
MSQSelectDestination.DURABLE_STORAGE.toString()
);
}
}

View File

@ -30,15 +30,13 @@ import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.tools.ValidationException;
import org.apache.calcite.util.Pair;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.error.InvalidSqlInput;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.msq.querykit.QueryKitUtils;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
@ -60,7 +58,6 @@ public class MSQTaskSqlEngine implements SqlEngine
public static final Set<String> SYSTEM_CONTEXT_PARAMETERS =
ImmutableSet.<String>builder()
.addAll(NativeSqlEngine.SYSTEM_CONTEXT_PARAMETERS)
.add(MultiStageQueryContext.CTX_DESTINATION)
.add(QueryKitUtils.CTX_TIME_COLUMN_NAME)
.build();
@ -125,7 +122,7 @@ public class MSQTaskSqlEngine implements SqlEngine
case SCAN_NEEDS_SIGNATURE:
return true;
default:
throw new IAE("Unrecognized feature: %s", feature);
throw SqlEngines.generateUnrecognizedFeatureException(MSQTaskSqlEngine.class.getSimpleName(), feature);
}
}
@ -133,9 +130,9 @@ public class MSQTaskSqlEngine implements SqlEngine
public QueryMaker buildQueryMakerForSelect(
final RelRoot relRoot,
final PlannerContext plannerContext
) throws ValidationException
)
{
validateSelect(relRoot.fields, plannerContext);
validateSelect(plannerContext);
return new MSQTaskQueryMaker(
null,
@ -156,7 +153,7 @@ public class MSQTaskSqlEngine implements SqlEngine
final String targetDataSource,
final RelRoot relRoot,
final PlannerContext plannerContext
) throws ValidationException
)
{
validateInsert(relRoot.rel, relRoot.fields, plannerContext);
@ -169,15 +166,23 @@ public class MSQTaskSqlEngine implements SqlEngine
);
}
private static void validateSelect(
final List<Pair<Integer, String>> fieldMappings,
final PlannerContext plannerContext
) throws ValidationException
/**
* Checks if the SELECT contains {@link DruidSqlInsert#SQL_INSERT_SEGMENT_GRANULARITY} in the context. This is a
* defensive cheeck because {@link org.apache.druid.sql.calcite.planner.DruidPlanner} should have called the
* {@link #validateContext}
*/
private static void validateSelect(final PlannerContext plannerContext)
{
if (plannerContext.queryContext().containsKey(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY)) {
throw new ValidationException(
StringUtils.format("Cannot use \"%s\" without INSERT", DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY)
);
throw DruidException
.forPersona(DruidException.Persona.DEVELOPER)
.ofCategory(DruidException.Category.DEFENSIVE)
.build(
"The SELECT query's context contains invalid parameter [%s] which is supposed to be populated "
+ "by Druid for INSERT queries. If the user is seeing this exception, that means there's a bug in Druid "
+ "that is populating the query context with the segment's granularity.",
DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY
);
}
}
@ -185,7 +190,7 @@ public class MSQTaskSqlEngine implements SqlEngine
final RelNode rootRel,
final List<Pair<Integer, String>> fieldMappings,
final PlannerContext plannerContext
) throws ValidationException
)
{
validateNoDuplicateAliases(fieldMappings);
@ -199,12 +204,10 @@ public class MSQTaskSqlEngine implements SqlEngine
// Validate the __time field has the proper type.
final SqlTypeName timeType = rootRel.getRowType().getFieldList().get(field.left).getType().getSqlTypeName();
if (timeType != SqlTypeName.TIMESTAMP) {
throw new ValidationException(
StringUtils.format(
"Field \"%s\" must be of type TIMESTAMP (was %s)",
ColumnHolder.TIME_COLUMN_NAME,
timeType
)
throw InvalidSqlInput.exception(
"Field [%s] was the wrong type [%s], expected TIMESTAMP",
ColumnHolder.TIME_COLUMN_NAME,
timeType
);
}
}
@ -220,13 +223,18 @@ public class MSQTaskSqlEngine implements SqlEngine
);
}
catch (Exception e) {
throw new ValidationException(
StringUtils.format(
"Invalid segmentGranularity: %s",
plannerContext.queryContext().get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY)
),
e
);
// This is a defensive check as the DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY in the query context is
// populated by Druid. If the user entered an incorrect granularity, that should have been flagged before reaching
// here
throw DruidException.forPersona(DruidException.Persona.DEVELOPER)
.ofCategory(DruidException.Category.DEFENSIVE)
.build(
e,
"[%s] is not a valid value for [%s]",
plannerContext.queryContext().get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY),
DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY
);
}
final boolean hasSegmentGranularity = !Granularities.ALL.equals(segmentGranularity);
@ -237,11 +245,10 @@ public class MSQTaskSqlEngine implements SqlEngine
validateLimitAndOffset(rootRel, !hasSegmentGranularity);
if (hasSegmentGranularity && timeFieldIndex < 0) {
throw new ValidationException(
StringUtils.format(
"INSERT queries with segment granularity other than \"all\" must have a \"%s\" field.",
ColumnHolder.TIME_COLUMN_NAME
)
throw InvalidInput.exception(
"The granularity [%s] specified in the PARTITIONED BY clause of the INSERT query is different from ALL. "
+ "Therefore, the query must specify a time column (named __time).",
segmentGranularity
);
}
}

View File

@ -30,6 +30,8 @@ import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.ErrorResponse;
import org.apache.druid.error.Forbidden;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.error.QueryExceptionCompat;
import org.apache.druid.frame.channel.FrameChannelSequence;
import org.apache.druid.guice.annotations.MSQ;
@ -198,19 +200,15 @@ public class SqlStatementResource
}
catch (ForbiddenException e) {
log.debug("Got forbidden request for reason [%s]", e.getErrorMessage());
return buildNonOkResponse(
DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.FORBIDDEN)
.build(Access.DEFAULT_ERROR_MESSAGE)
);
return buildNonOkResponse(Forbidden.exception());
}
// Calcite throws java.lang.AssertionError at various points in planning/validation.
catch (AssertionError | Exception e) {
stmt.reporter().failed(e);
if (isDebug) {
log.warn(e, "Failed to handle query: %s", sqlQueryId);
log.warn(e, "Failed to handle query [%s]", sqlQueryId);
} else {
log.noStackTrace().warn(e, "Failed to handle query: %s", sqlQueryId);
log.noStackTrace().warn(e, "Failed to handle query [%s]", sqlQueryId);
}
return buildNonOkResponse(
DruidException.forPersona(DruidException.Persona.DEVELOPER)
@ -260,17 +258,13 @@ public class SqlStatementResource
}
catch (ForbiddenException e) {
log.debug("Got forbidden request for reason [%s]", e.getErrorMessage());
return buildNonOkResponse(
DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.FORBIDDEN)
.build(Access.DEFAULT_ERROR_MESSAGE)
);
return buildNonOkResponse(Forbidden.exception());
}
catch (Exception e) {
log.warn(e, "Failed to handle query: %s", queryId);
log.warn(e, "Failed to handle query [%s]", queryId);
return buildNonOkResponse(DruidException.forPersona(DruidException.Persona.DEVELOPER)
.ofCategory(DruidException.Category.UNCATEGORIZED)
.build(e, "Failed to handle query: [%s]", queryId));
.build(e, "Failed to handle query [%s]", queryId));
}
}
@ -345,17 +339,13 @@ public class SqlStatementResource
}
catch (ForbiddenException e) {
log.debug("Got forbidden request for reason [%s]", e.getErrorMessage());
return buildNonOkResponse(
DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.FORBIDDEN)
.build(Access.DEFAULT_ERROR_MESSAGE)
);
return buildNonOkResponse(Forbidden.exception());
}
catch (Exception e) {
log.warn(e, "Failed to handle query: %s", queryId);
log.warn(e, "Failed to handle query [%s]", queryId);
return buildNonOkResponse(DruidException.forPersona(DruidException.Persona.DEVELOPER)
.ofCategory(DruidException.Category.UNCATEGORIZED)
.build(e, "Failed to handle query: [%s]", queryId));
.build(e, "Failed to handle query [%s]", queryId));
}
}
@ -412,17 +402,13 @@ public class SqlStatementResource
}
catch (ForbiddenException e) {
log.debug("Got forbidden request for reason [%s]", e.getErrorMessage());
return buildNonOkResponse(
DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.FORBIDDEN)
.build(Access.DEFAULT_ERROR_MESSAGE)
);
return buildNonOkResponse(Forbidden.exception());
}
catch (Exception e) {
log.warn(e, "Failed to handle query: %s", queryId);
log.warn(e, "Failed to handle query [%s]", queryId);
return buildNonOkResponse(DruidException.forPersona(DruidException.Persona.DEVELOPER)
.ofCategory(DruidException.Category.UNCATEGORIZED)
.build(e, "Failed to handle query: [%s]", queryId));
.build(e, "Failed to handle query [%s]", queryId));
}
}
@ -679,9 +665,9 @@ public class SqlStatementResource
if (msqControllerTask.getQuerySpec().getDestination() instanceof TaskReportMSQDestination) {
// Results from task report are only present as one page.
if (page != null && page > 0) {
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.INVALID_INPUT)
.build("Page number is out of range of the results.");
throw InvalidInput.exception(
"Page number [%d] is out of the range of results", page
);
}
MSQTaskReportPayload msqTaskReportPayload = jsonMapper.convertValue(SqlStatementResourceHelper.getPayload(
@ -769,9 +755,7 @@ public class SqlStatementResource
return pageInfo;
}
}
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.INVALID_INPUT)
.build("Invalid page id [%d] passed.", pageId);
throw InvalidInput.exception("Invalid page id [%d] passed.", pageId);
}
private void resultPusher(
@ -832,7 +816,7 @@ public class SqlStatementResource
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.INVALID_INPUT)
.build(
"Query[%s] failed. Hit status api for more details.",
"Query[%s] failed. Check the status api for more details.",
queryId
);
} else {
@ -842,22 +826,25 @@ public class SqlStatementResource
private void contextChecks(QueryContext queryContext)
{
ExecutionMode executionMode = queryContext.getEnum(
QueryContexts.CTX_EXECUTION_MODE,
ExecutionMode.class,
null
);
ExecutionMode executionMode = queryContext.getEnum(QueryContexts.CTX_EXECUTION_MODE, ExecutionMode.class, null);
if (executionMode == null) {
throw InvalidInput.exception(
"Execution mode is not provided to the SQL statement API. "
+ "Please set [%s] to [%s] in the query context",
QueryContexts.CTX_EXECUTION_MODE,
ExecutionMode.ASYNC
);
}
if (ExecutionMode.ASYNC != executionMode) {
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.INVALID_INPUT)
.build(
StringUtils.format(
"The statement sql api only supports sync mode[%s]. Please set context parameter [%s=%s] in the context payload",
ExecutionMode.ASYNC,
QueryContexts.CTX_EXECUTION_MODE,
ExecutionMode.ASYNC
)
);
throw InvalidInput.exception(
"The SQL statement API currently does not support the provided execution mode [%s]. "
+ "Please set [%s] to [%s] in the query context",
executionMode,
QueryContexts.CTX_EXECUTION_MODE,
ExecutionMode.ASYNC
);
}
MSQSelectDestination selectDestination = MultiStageQueryContext.getSelectDestination(queryContext);
@ -873,11 +860,12 @@ public class SqlStatementResource
.ofCategory(DruidException.Category.INVALID_INPUT)
.build(
StringUtils.format(
"The statement sql api cannot read from select destination [%s=%s] since its not configured. "
+ "Its recommended to configure durable storage as it allows the user to fetch big results. "
+ "Please contact your cluster admin to configure durable storage.",
MultiStageQueryContext.CTX_SELECT_DESTINATION,
MSQSelectDestination.DURABLE_STORAGE.name()
"The SQL Statement API cannot read from the select destination [%s] provided "
+ "in the query context [%s] since it is not configured. It is recommended to configure the durable storage "
+ "as it allows the user to fetch large result sets. Please contact your cluster admin to "
+ "configure durable storage.",
MSQSelectDestination.DURABLE_STORAGE.name(),
MultiStageQueryContext.CTX_SELECT_DESTINATION
)
);
}

View File

@ -39,7 +39,6 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@ -103,9 +102,6 @@ public class MultiStageQueryContext
public static final String CTX_CLUSTER_STATISTICS_MERGE_MODE = "clusterStatisticsMergeMode";
public static final String DEFAULT_CLUSTER_STATISTICS_MERGE_MODE = ClusterStatisticsMergeMode.SEQUENTIAL.toString();
public static final String CTX_DESTINATION = "destination";
private static final String DEFAULT_DESTINATION = null;
public static final String CTX_ROWS_PER_SEGMENT = "rowsPerSegment";
static final int DEFAULT_ROWS_PER_SEGMENT = 3000000;
@ -160,13 +156,10 @@ public class MultiStageQueryContext
public static ClusterStatisticsMergeMode getClusterStatisticsMergeMode(QueryContext queryContext)
{
return ClusterStatisticsMergeMode.valueOf(
String.valueOf(
queryContext.getString(
CTX_CLUSTER_STATISTICS_MERGE_MODE,
DEFAULT_CLUSTER_STATISTICS_MERGE_MODE
)
)
return QueryContexts.getAsEnum(
CTX_CLUSTER_STATISTICS_MERGE_MODE,
queryContext.getString(CTX_CLUSTER_STATISTICS_MERGE_MODE, DEFAULT_CLUSTER_STATISTICS_MERGE_MODE),
ClusterStatisticsMergeMode.class
);
}
@ -180,12 +173,11 @@ public class MultiStageQueryContext
public static WorkerAssignmentStrategy getAssignmentStrategy(final QueryContext queryContext)
{
String assignmentStrategyString = queryContext.getString(
return QueryContexts.getAsEnum(
CTX_TASK_ASSIGNMENT_STRATEGY,
DEFAULT_TASK_ASSIGNMENT_STRATEGY
queryContext.getString(CTX_TASK_ASSIGNMENT_STRATEGY, DEFAULT_TASK_ASSIGNMENT_STRATEGY),
WorkerAssignmentStrategy.class
);
return WorkerAssignmentStrategy.fromString(assignmentStrategyString);
}
public static int getMaxNumTasks(final QueryContext queryContext)
@ -196,14 +188,6 @@ public class MultiStageQueryContext
);
}
public static Object getDestination(final QueryContext queryContext)
{
return queryContext.get(
CTX_DESTINATION,
DEFAULT_DESTINATION
);
}
public static int getRowsPerSegment(final QueryContext queryContext)
{
return queryContext.getInt(
@ -214,22 +198,21 @@ public class MultiStageQueryContext
public static MSQSelectDestination getSelectDestination(final QueryContext queryContext)
{
return MSQSelectDestination.valueOf(
queryContext.getString(
CTX_SELECT_DESTINATION,
DEFAULT_SELECT_DESTINATION
).toUpperCase(Locale.ENGLISH)
return QueryContexts.getAsEnum(
CTX_SELECT_DESTINATION,
queryContext.getString(CTX_SELECT_DESTINATION, DEFAULT_SELECT_DESTINATION),
MSQSelectDestination.class
);
}
@Nullable
public static MSQSelectDestination getSelectDestinationOrNull(final QueryContext queryContext)
{
String selectDestination = queryContext.getString(CTX_SELECT_DESTINATION);
if (selectDestination == null) {
return null;
}
return MSQSelectDestination.valueOf(selectDestination.toUpperCase(Locale.ENGLISH));
return QueryContexts.getAsEnum(
CTX_SELECT_DESTINATION,
queryContext.getString(CTX_SELECT_DESTINATION),
MSQSelectDestination.class
);
}
public static int getRowsInMemory(final QueryContext queryContext)

View File

@ -26,6 +26,7 @@ import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.hll.HyperLogLogCollector;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
@ -1024,11 +1025,13 @@ public class MSQInsertTest extends MSQTestBase
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.setQueryContext(context)
.setExpectedValidationErrorMatcher(CoreMatchers.allOf(
CoreMatchers.instanceOf(DruidException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
"Field \"__time\" must be of type TIMESTAMP"))
))
.setExpectedValidationErrorMatcher(
new DruidExceptionMatcher(
DruidException.Persona.USER,
DruidException.Category.INVALID_INPUT,
"invalidInput"
).expectMessageIs("Field [__time] was the wrong type [VARCHAR], expected TIMESTAMP")
)
.verifyPlanningErrors();
}
@ -1106,11 +1109,13 @@ public class MSQInsertTest extends MSQTestBase
"insert into foo1 select __time, dim1 , count(*) as cnt from foo where dim1 is not null group by 1, 2 PARTITIONED by day clustered by dim1")
.setQueryContext(localContext)
.setExpectedExecutionErrorMatcher(
ThrowableMessageMatcher.hasMessage(
CoreMatchers.startsWith(
MultiStageQueryContext.CTX_MAX_NUM_TASKS
+ " cannot be less than 2 since at least 1 controller and 1 worker is necessary."
)
new DruidExceptionMatcher(
DruidException.Persona.USER,
DruidException.Category.INVALID_INPUT,
"invalidInput"
).expectMessageIs(
"MSQ context maxNumTasks [1] cannot be less than 2, since at least 1 controller "
+ "and 1 worker is necessary"
)
)
.verifyExecutionError();

View File

@ -138,24 +138,36 @@ public class SqlMSQStatementResourcePostTest extends MSQTestBase
@Test
public void nonSupportedModes()
{
for (ImmutableMap<?, ?> context : ImmutableList.of(ImmutableMap.of(
QueryContexts.CTX_EXECUTION_MODE,
ExecutionMode.SYNC.name()
), ImmutableMap.of())) {
SqlStatementResourceTest.assertExceptionMessage(
resource.doPost(new SqlQuery(
"select * from foo",
null,
false,
false,
false,
(Map<String, Object>) context,
null
), SqlStatementResourceTest.makeOkRequest()),
"The statement sql api only supports sync mode[ASYNC]. Please set context parameter [executionMode=ASYNC] in the context payload",
Response.Status.BAD_REQUEST
);
}
SqlStatementResourceTest.assertExceptionMessage(
resource.doPost(new SqlQuery(
"select * from foo",
null,
false,
false,
false,
ImmutableMap.of(),
null
), SqlStatementResourceTest.makeOkRequest()),
"Execution mode is not provided to the SQL statement API. "
+ "Please set [executionMode] to [ASYNC] in the query context",
Response.Status.BAD_REQUEST
);
SqlStatementResourceTest.assertExceptionMessage(
resource.doPost(new SqlQuery(
"select * from foo",
null,
false,
false,
false,
ImmutableMap.of(QueryContexts.CTX_EXECUTION_MODE, ExecutionMode.SYNC.name()),
null
), SqlStatementResourceTest.makeOkRequest()),
"The SQL statement API currently does not support the provided execution mode [SYNC]. "
+ "Please set [executionMode] to [ASYNC] in the query context",
Response.Status.BAD_REQUEST
);
}
@ -260,13 +272,10 @@ public class SqlMSQStatementResourcePostTest extends MSQTestBase
NilStorageConnector.getInstance()
);
String errorMessage = StringUtils.format(
"The statement sql api cannot read from select destination [%s=%s] since its not configured. "
+ "Its recommended to configure durable storage as it allows the user to fetch big results. "
+ "Please contact your cluster admin to configure durable storage.",
MultiStageQueryContext.CTX_SELECT_DESTINATION,
MSQSelectDestination.DURABLE_STORAGE.name()
);
String errorMessage = "The SQL Statement API cannot read from the select destination [DURABLE_STORAGE] provided in "
+ "the query context [selectDestination] since it is not configured. It is recommended to "
+ "configure the durable storage as it allows the user to fetch large result sets. "
+ "Please contact your cluster admin to configure durable storage.";
Map<String, Object> context = defaultAsyncContext();
context.put(MultiStageQueryContext.CTX_SELECT_DESTINATION, MSQSelectDestination.DURABLE_STORAGE.name());

View File

@ -783,7 +783,7 @@ public class SqlStatementResourceTest extends MSQTestBase
assertExceptionMessage(
resource.doGetResults(queryID, 0L, makeOkRequest()),
StringUtils.format(
"Query[%s] failed. Hit status api for more details.",
"Query[%s] failed. Check the status api for more details.",
queryID
),
Response.Status.BAD_REQUEST

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Injector;
import com.google.inject.Module;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.tools.ValidationException;
import org.apache.druid.guice.DruidInjectorBuilder;
import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.sql.MSQTaskSqlEngine;
@ -142,7 +141,6 @@ public class CalciteSelectJoinQueryMSQTest
@Override
public QueryMaker buildQueryMakerForSelect(RelRoot relRoot, PlannerContext plannerContext)
throws ValidationException
{
plannerContext.queryContextMap().put(PlannerContext.CTX_SQL_JOIN_ALGORITHM, joinAlgorithm.toString());
return super.buildQueryMakerForSelect(relRoot, plannerContext);

View File

@ -39,7 +39,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_DESTINATION;
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE;
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_FAULT_TOLERANCE;
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_FINALIZE_AGGREGATIONS;
@ -142,19 +141,6 @@ public class MultiStageQueryContextTest
Assert.assertEquals(101, MultiStageQueryContext.getMaxNumTasks(QueryContext.of(propertyMap)));
}
@Test
public void getDestination_noParameterSetReturnsDefaultValue()
{
Assert.assertNull(MultiStageQueryContext.getDestination(QueryContext.empty()));
}
@Test
public void getDestination_parameterSetReturnsCorrectValue()
{
Map<String, Object> propertyMap = ImmutableMap.of(CTX_DESTINATION, "dataSource");
Assert.assertEquals("dataSource", MultiStageQueryContext.getDestination(QueryContext.of(propertyMap)));
}
@Test
public void getRowsPerSegment_noParameterSetReturnsDefaultValue()
{

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.error;
public class Forbidden extends DruidException.Failure
{
public static DruidException exception()
{
return exception("Unauthorized");
}
public static DruidException exception(String msg, Object... args)
{
return exception(null, msg, args);
}
public static DruidException exception(Throwable t, String msg, Object... args)
{
return DruidException.fromFailure(new Forbidden(t, msg, args));
}
private final Throwable t;
private final String msg;
private final Object[] args;
private Forbidden(
Throwable t,
String msg,
Object... args
)
{
super("forbidden");
this.t = t;
this.msg = msg;
this.args = args;
}
@Override
public DruidException makeException(DruidException.DruidExceptionBuilder bob)
{
bob = bob.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.FORBIDDEN);
if (t == null) {
return bob.build(msg, args);
} else {
return bob.build(t, msg, args);
}
}
}

View File

@ -66,12 +66,11 @@ public class QueryExceptionCompat extends DruidException.Failure
return DruidException.Category.RUNTIME_FAILURE;
case CANCELED:
return DruidException.Category.CANCELED;
case UNKNOWN:
return DruidException.Category.UNCATEGORIZED;
case UNSUPPORTED:
return DruidException.Category.UNSUPPORTED;
case TIMEOUT:
return DruidException.Category.TIMEOUT;
case UNKNOWN:
default:
return DruidException.Category.UNCATEGORIZED;
}

View File

@ -106,16 +106,6 @@ public class QueryContext
return context.get(key);
}
/**
* Return a value as a generic {@code Object}, returning the default value if the
* context value is not set.
*/
public Object get(String key, Object defaultValue)
{
final Object val = get(key);
return val == null ? defaultValue : val;
}
/**
* Return a value as an {@code String}, returning {@link null} if the
* context value is not set.

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.error;
import org.apache.druid.matchers.DruidMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Test;
import java.util.Map;
public class ForbiddenTest
{
@Test
public void testAsErrorResponse()
{
ErrorResponse errorResponse = new ErrorResponse(Forbidden.exception());
final Map<String, Object> asMap = errorResponse.getAsMap();
MatcherAssert.assertThat(
asMap,
DruidMatchers.mapMatcher(
"error", "druidException",
"errorCode", "forbidden",
"persona", "USER",
"category", "FORBIDDEN",
"errorMessage", "Unauthorized"
)
);
ErrorResponse recomposed = ErrorResponse.fromMap(asMap);
MatcherAssert.assertThat(
recomposed.getUnderlyingException(),
new DruidExceptionMatcher(
DruidException.Persona.USER,
DruidException.Category.FORBIDDEN,
"forbidden"
).expectMessageContains("Unauthorized")
);
}
}

View File

@ -30,6 +30,7 @@ import org.apache.druid.catalog.model.table.ExternalTableSpec;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.segment.column.RowSignature;
@ -90,44 +91,73 @@ public class ExternalOperatorConversion extends DruidExternTableMacroConversion
final ObjectMapper jsonMapper
)
{
try {
final String sigValue = CatalogUtils.getString(args, SIGNATURE_PARAM);
if (sigValue == null && columns == null) {
throw new IAE(
"EXTERN requires either a %s value or an EXTEND clause",
SIGNATURE_PARAM
);
}
if (sigValue != null && columns != null) {
throw new IAE(
"EXTERN requires either a %s value or an EXTEND clause, but not both",
SIGNATURE_PARAM
);
}
final RowSignature rowSignature;
if (columns != null) {
rowSignature = Columns.convertSignature(columns);
} else {
rowSignature = jsonMapper.readValue(sigValue, RowSignature.class);
}
String inputSrcStr = CatalogUtils.getString(args, INPUT_SOURCE_PARAM);
InputSource inputSource = jsonMapper.readValue(inputSrcStr, InputSource.class);
return new ExternalTableSpec(
inputSource,
jsonMapper.readValue(CatalogUtils.getString(args, INPUT_FORMAT_PARAM), InputFormat.class),
rowSignature,
inputSource::getTypes
final String sigValue = CatalogUtils.getString(args, SIGNATURE_PARAM);
if (sigValue == null && columns == null) {
throw InvalidInput.exception(
"EXTERN requires either a [%s] value or an EXTEND clause",
SIGNATURE_PARAM
);
}
catch (JsonProcessingException e) {
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.INVALID_INPUT)
.build(e, e.getMessage());
if (sigValue != null && columns != null) {
throw InvalidInput.exception(
"EXTERN requires either a [%s] value or an EXTEND clause, but not both",
SIGNATURE_PARAM
);
}
final RowSignature rowSignature;
if (columns != null) {
try {
rowSignature = Columns.convertSignature(columns);
}
catch (IAE e) {
throw badArgumentException(e, "columns");
}
} else {
try {
rowSignature = jsonMapper.readValue(sigValue, RowSignature.class);
}
catch (JsonProcessingException e) {
throw badArgumentException(e, "rowSignature");
}
}
String inputSrcStr = CatalogUtils.getString(args, INPUT_SOURCE_PARAM);
InputSource inputSource;
try {
inputSource = jsonMapper.readValue(inputSrcStr, InputSource.class);
}
catch (JsonProcessingException e) {
throw badArgumentException(e, "inputSource");
}
InputFormat inputFormat;
try {
inputFormat = jsonMapper.readValue(CatalogUtils.getString(args, INPUT_FORMAT_PARAM), InputFormat.class);
}
catch (JsonProcessingException e) {
throw badArgumentException(e, "inputFormat");
}
return new ExternalTableSpec(
inputSource,
inputFormat,
rowSignature,
inputSource::getTypes
);
}
}
private static DruidException badArgumentException(
Throwable cause,
String fieldName
)
{
return InvalidInput.exception(
cause,
"Invalid value for the field [%s]. Reason: [%s]",
fieldName,
cause.getMessage()
);
}
@Inject
public ExternalOperatorConversion(@Json final ObjectMapper jsonMapper)
{

View File

@ -27,7 +27,6 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.druid.error.InvalidSqlInput;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
import org.apache.druid.server.QueryLifecycleFactory;
@ -116,7 +115,7 @@ public class NativeSqlEngine implements SqlEngine
case SCAN_NEEDS_SIGNATURE:
return false;
default:
throw new IAE("Unrecognized feature: %s", feature);
throw SqlEngines.generateUnrecognizedFeatureException(NativeSqlEngine.class.getSimpleName(), feature);
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.sql.calcite.run;
import org.apache.calcite.tools.ValidationException;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import java.util.Map;
@ -46,4 +47,24 @@ public class SqlEngines
}
}
}
/**
* This is a helper function that provides a developer-friendly exception when an engine cannot recognize the feature.
*/
public static DruidException generateUnrecognizedFeatureException(
final String engineName,
final EngineFeature unrecognizedFeature
)
{
return DruidException
.forPersona(DruidException.Persona.DEVELOPER)
.ofCategory(DruidException.Category.DEFENSIVE)
.build(
"Engine [%s] is unable to recognize the feature [%s] for availability. This might happen when a "
+ "newer feature is added without updating all the implementations of SqlEngine(s) to either allow or disallow "
+ "its availability. Please raise an issue if you encounter this exception while using Druid.",
engineName,
unrecognizedFeature
);
}
}

View File

@ -22,11 +22,11 @@ package org.apache.druid.sql.calcite.view;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.run.EngineFeature;
import org.apache.druid.sql.calcite.run.QueryMaker;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.run.SqlEngines;
import java.util.Map;
@ -78,7 +78,7 @@ public class ViewSqlEngine implements SqlEngine
return false;
default:
throw new IAE("Unrecognized feature: %s", feature);
throw SqlEngines.generateUnrecognizedFeatureException(ViewSqlEngine.class.getSimpleName(), feature);
}
}

View File

@ -1635,11 +1635,96 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
new DruidExceptionMatcher(
DruidException.Persona.USER,
DruidException.Category.INVALID_INPUT,
"general"
"invalidInput"
).expectMessageContains(
"Cannot construct instance of `org.apache.druid.segment.column.ColumnSignature`, problem: Column name must be provided and non-empty"
)
)
.verify();
}
@Test
public void testErrorWhenBothRowSignatureAndExtendsProvidedToExtern()
{
final String sqlString = "insert into dst \n"
+ "select time_parse(\"time\") as __time, * \n"
+ "from table( \n"
+ "extern(\n"
+ "'{\"type\": \"s3\", \"uris\": [\\\"s3://imply-eng-datasets/qa/IngestionTest/wikipedia/files/wikiticker-2015-09-12-sampled.mini.json.gz\\\"]}',\n"
+ "'{\"type\": \"json\"}',\n"
+ "'[{\"name\": \"time\", \"type\": \"string\"}, {\"name\": \"channel\", \"type\": \"string\"}]'\n"
+ ")\n"
+ ") EXTEND (\"time\" VARCHAR, \"channel\" VARCHAR)\n"
+ "partitioned by DAY\n"
+ "clustered by channel";
HashMap<String, Object> context = new HashMap<>(DEFAULT_CONTEXT);
context.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 100);
testIngestionQuery().context(context).sql(sqlString)
.expectValidationError(
new DruidExceptionMatcher(
DruidException.Persona.USER,
DruidException.Category.INVALID_INPUT,
"invalidInput"
).expectMessageContains(
"EXTERN requires either a [signature] value or an EXTEND clause, but not both"
)
)
.verify();
}
@Test
public void testErrorWhenNoneOfRowSignatureAndExtendsProvidedToExtern()
{
final String sqlString = "insert into dst \n"
+ "select time_parse(\"time\") as __time, * \n"
+ "from table( \n"
+ "extern(\n"
+ "'{\"type\": \"s3\", \"uris\": [\\\"s3://imply-eng-datasets/qa/IngestionTest/wikipedia/files/wikiticker-2015-09-12-sampled.mini.json.gz\\\"]}',\n"
+ "'{\"type\": \"json\"}'\n"
+ ")\n"
+ ")\n"
+ "partitioned by DAY\n"
+ "clustered by channel";
HashMap<String, Object> context = new HashMap<>(DEFAULT_CONTEXT);
context.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 100);
testIngestionQuery().context(context).sql(sqlString)
.expectValidationError(
new DruidExceptionMatcher(
DruidException.Persona.USER,
DruidException.Category.INVALID_INPUT,
"invalidInput"
).expectMessageContains(
"EXTERN requires either a [signature] value or an EXTEND clause"
)
)
.verify();
}
@Test
public void testErrorWhenInputSourceInvalid()
{
final String sqlString = "insert into dst \n"
+ "select time_parse(\"time\") as __time, * \n"
+ "from table( \n"
+ "extern(\n"
+ "'{\"type\": \"local\"}',\n"
+ "'{\"type\": \"json\"}',\n"
+ "'[{\"name\": \"time\", \"type\": \"string\"}, {\"name\": \"channel\", \"type\": \"string\"}]'\n"
+ ")\n"
+ ")\n"
+ "partitioned by DAY\n"
+ "clustered by channel";
HashMap<String, Object> context = new HashMap<>(DEFAULT_CONTEXT);
context.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 100);
testIngestionQuery().context(context).sql(sqlString)
.expectValidationError(
new DruidExceptionMatcher(
DruidException.Persona.USER,
DruidException.Category.INVALID_INPUT,
"invalidInput"
).expectMessageContains(
"Invalid value for the field [inputSource]. Reason:"
)
.expectMessageContains(
"Cannot construct instance of `org.apache.druid.segment.column.ColumnSignature`, problem: Column name must be provided and non-empty"
)
)
.verify();
}

View File

@ -24,12 +24,12 @@ import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.run.EngineFeature;
import org.apache.druid.sql.calcite.run.QueryMaker;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.run.SqlEngines;
import org.apache.druid.sql.calcite.table.RowSignatures;
import java.util.Map;
@ -91,7 +91,7 @@ public class IngestionTestSqlEngine implements SqlEngine
case ALLOW_BROADCAST_RIGHTY_JOIN:
return true;
default:
throw new IAE("Unrecognized feature: %s", feature);
throw SqlEngines.generateUnrecognizedFeatureException(IngestionTestSqlEngine.class.getSimpleName(), feature);
}
}