Adding query stack fault to MSQ to capture native query errors. (#13926)

* Add a new fault "QueryRuntimeError" to MSQ engine to capture native query errors. 
* Fixed bug in MSQ fault tolerance where worker were being retried if `UnexpectedMultiValueDimensionException` was thrown.
* An exception from the query runtime with `org.apache.druid.query` as the package name is thrown as a QueryRuntimeError
This commit is contained in:
Karan Kumar 2023-04-05 16:29:10 +05:30 committed by GitHub
parent 1c8a184677
commit e6a11707cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 187 additions and 21 deletions

View File

@ -750,6 +750,7 @@ The following table describes error codes you may encounter in the `multiStageQu
| <a name="error_InsertTimeOutOfBounds">`InsertTimeOutOfBounds`</a> | A REPLACE query generated a timestamp outside the bounds of the TIMESTAMP parameter for your OVERWRITE WHERE clause.<br /> <br />To avoid this error, verify that the you specified is valid. | `interval`: time chunk interval corresponding to the out-of-bounds timestamp |
| <a name="error_InvalidNullByte">`InvalidNullByte`</a> | A string column included a null byte. Null bytes in strings are not permitted. | `column`: The column that included the null byte |
| <a name="error_QueryNotSupported">`QueryNotSupported`</a> | QueryKit could not translate the provided native query to a multi-stage query.<br /> <br />This can happen if the query uses features that aren't supported, like GROUPING SETS. | |
| <a name="error_QueryRuntimeError">`QueryRuntimeError`</a> | MSQ uses the native query engine to run the leaf stages. This error tells MSQ that error is in native query runtime.<br /> <br /> Since this is a generic error, the user needs to look at logs for the error message and stack trace to figure out the next course of action. If the user is stuck, consider raising a `github` issue for assistance. | `baseErrorMessage` error message from the native query runtime. |
| <a name="error_RowTooLarge">`RowTooLarge`</a> | The query tried to process a row that was too large to write to a single frame. See the [Limits](#limits) table for specific limits on frame size. Note that the effective maximum row size is smaller than the maximum frame size due to alignment considerations during frame writing. | `maxFrameSize`: The limit on the frame size. |
| <a name="error_TaskStartTimeout">`TaskStartTimeout`</a> | Unable to launch `numTasks` tasks within `timeout` milliseconds.<br /><br />There may be insufficient available slots to start all the worker tasks simultaneously. Try splitting up your query into smaller chunks using a smaller value of [`maxNumTasks`](#context-parameters). Another option is to increase capacity. | `numTasks`: The number of tasks attempted to launch.<br /><br />`timeout`: Timeout, in milliseconds, that was exceeded. |
| <a name="error_TooManyAttemptsForJob">`TooManyAttemptsForJob`</a> | Total relaunch attempt count across all workers exceeded max relaunch attempt limit. See the [Limits](#limits) table for the specific limit. | `maxRelaunchCount`: Max number of relaunches across all the workers defined in the [Limits](#limits) section. <br /><br /> `currentRelaunchCount`: current relaunch counter for the job across all workers. <br /><br /> `taskId`: Latest task id which failed <br /> <br /> `rootErrorMessage`: Error message of the latest failed task.|

View File

@ -33,6 +33,7 @@ import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.MSQFault;
import org.apache.druid.msq.indexing.error.MSQFaultUtils;
import org.apache.druid.msq.indexing.error.QueryRuntimeFault;
import org.apache.druid.msq.indexing.error.UnknownFault;
import org.apache.druid.msq.indexing.error.WorkerFailedFault;
import org.apache.druid.msq.indexing.error.WorkerRpcFailedFault;
@ -162,12 +163,12 @@ public class MSQTasks
/**
* Builds an error report from a possible controller error report and a possible worker error report. Both may be
* null, in which case this function will return a report with {@link UnknownFault}.
*
* <br/>
* We only include a single {@link MSQErrorReport} in the task report, because it's important that a query have
* a single {@link MSQFault} explaining why it failed. To aid debugging
* in cases where we choose the controller error over the worker error, we'll log the worker error too, even though
* it doesn't appear in the report.
*
* <br/>
* Logic: we prefer the controller exception unless it's {@link WorkerFailedFault}, {@link WorkerRpcFailedFault},
* or {@link CanceledFault}. In these cases we prefer the worker error report. This ensures we get the best, most
* useful exception even when the controller cancels worker tasks after a failure. (As tasks are canceled one by
@ -228,8 +229,8 @@ public class MSQTasks
logMessage.append(": ").append(MSQFaultUtils.generateMessageWithErrorCode(errorReport.getFault()));
if (errorReport.getExceptionStackTrace() != null) {
if (errorReport.getFault() instanceof UnknownFault) {
// Log full stack trace for unknown faults.
if (errorReport.getFault() instanceof UnknownFault || errorReport.getFault() instanceof QueryRuntimeFault) {
// Log full stack trace for unknown and QueryStack faults
logMessage.append('\n').append(errorReport.getExceptionStackTrace());
} else {
// Log first line only (error class, message) for known faults, to avoid polluting logs.

View File

@ -53,6 +53,7 @@ import org.apache.druid.msq.indexing.error.InvalidNullByteFault;
import org.apache.druid.msq.indexing.error.MSQFault;
import org.apache.druid.msq.indexing.error.NotEnoughMemoryFault;
import org.apache.druid.msq.indexing.error.QueryNotSupportedFault;
import org.apache.druid.msq.indexing.error.QueryRuntimeFault;
import org.apache.druid.msq.indexing.error.RowTooLargeFault;
import org.apache.druid.msq.indexing.error.TaskStartTimeoutFault;
import org.apache.druid.msq.indexing.error.TooManyAttemptsForJob;
@ -114,6 +115,7 @@ public class MSQIndexingModule implements DruidModule
InvalidNullByteFault.class,
NotEnoughMemoryFault.class,
QueryNotSupportedFault.class,
QueryRuntimeFault.class,
RowTooLargeFault.class,
TaskStartTimeoutFault.class,
TooManyBucketsFault.class,

View File

@ -26,8 +26,10 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import org.apache.druid.frame.processor.FrameRowTooLargeException;
import org.apache.druid.frame.write.UnsupportedColumnTypeException;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.msq.statistics.TooManyBucketsException;
import org.apache.druid.query.groupby.epinephelinae.UnexpectedMultiValueDimensionException;
import javax.annotation.Nullable;
import java.util.Objects;
@ -47,7 +49,7 @@ public class MSQErrorReport
MSQErrorReport(
@JsonProperty("taskId") final String taskId,
@JsonProperty("host") @Nullable final String host,
@JsonProperty("stageNumber") final Integer stageNumber,
@JsonProperty("stageNumber") @Nullable final Integer stageNumber,
@JsonProperty("error") final MSQFault fault,
@JsonProperty("exceptionStackTrace") @Nullable final String exceptionStackTrace
)
@ -190,6 +192,14 @@ public class MSQErrorReport
return new TooManyBucketsFault(((TooManyBucketsException) cause).getMaxBuckets());
} else if (cause instanceof FrameRowTooLargeException) {
return new RowTooLargeFault(((FrameRowTooLargeException) cause).getMaxFrameSize());
} else if (cause instanceof UnexpectedMultiValueDimensionException) {
return new QueryRuntimeFault(StringUtils.format(
"Column [%s] is a multi value string. Please wrap the column using MV_TO_ARRAY() to proceed further.",
((UnexpectedMultiValueDimensionException) cause).getDimensionName()
), cause.getMessage());
} else if (cause.getClass().getPackage().getName().startsWith("org.apache.druid.query")) {
// catch all for all query runtime exception faults.
return new QueryRuntimeFault(e.getMessage(), null);
} else {
cause = cause.getCause();
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.indexing.error;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import javax.annotation.Nullable;
import java.util.Objects;
/**
* Fault to throw when the error comes from the druid native query runtime while running in the MSQ engine .
*/
@JsonTypeName(QueryRuntimeFault.CODE)
public class QueryRuntimeFault extends BaseMSQFault
{
public static final String CODE = "QueryRuntimeError";
@Nullable
private final String baseErrorMessage;
@JsonCreator
public QueryRuntimeFault(
@JsonProperty("errorMessage") String errorMessage,
@Nullable @JsonProperty("baseErrorMessage") String baseErrorMessage
)
{
super(CODE, errorMessage);
this.baseErrorMessage = baseErrorMessage;
}
@JsonProperty
@Nullable
public String getBaseErrorMessage()
{
return baseErrorMessage;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
QueryRuntimeFault that = (QueryRuntimeFault) o;
return Objects.equals(baseErrorMessage, that.baseErrorMessage);
}
@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), baseErrorMessage);
}
}

View File

@ -496,11 +496,8 @@ public class MSQInsertTest extends MSQTestBase
.setQueryContext(localContext)
.setExpectedExecutionErrorMatcher(CoreMatchers.allOf(
CoreMatchers.instanceOf(ISE.class),
ThrowableMessageMatcher.hasMessage(!FAULT_TOLERANCE.equals(contextName)
? CoreMatchers.containsString(
"Encountered multi-value dimension [dim3] that cannot be processed with 'groupByEnableMultiValueUnnesting' set to false.")
:
CoreMatchers.containsString("exceeded max relaunch count")
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
"Column [dim3] is a multi value string. Please wrap the column using MV_TO_ARRAY() to proceed further.")
)
))
.verifyExecutionError();

View File

@ -1275,12 +1275,8 @@ public class MSQSelectTest extends MSQTestBase
.setQueryContext(localContext)
.setExpectedExecutionErrorMatcher(CoreMatchers.allOf(
CoreMatchers.instanceOf(ISE.class),
ThrowableMessageMatcher.hasMessage(
!FAULT_TOLERANCE.equals(contextName)
? CoreMatchers.containsString(
"Encountered multi-value dimension [dim3] that cannot be processed with 'groupByEnableMultiValueUnnesting' set to false.")
:
CoreMatchers.containsString("exceeded max relaunch count")
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
"Column [dim3] is a multi value string. Please wrap the column using MV_TO_ARRAY() to proceed further.")
)
))
.verifyExecutionError();
@ -1450,11 +1446,8 @@ public class MSQSelectTest extends MSQTestBase
.setExpectedExecutionErrorMatcher(CoreMatchers.allOf(
CoreMatchers.instanceOf(ISE.class),
ThrowableMessageMatcher.hasMessage(
!FAULT_TOLERANCE.equals(contextName)
? CoreMatchers.containsString(
CoreMatchers.containsString(
"Encountered multi-value dimension [dim3] that cannot be processed with 'groupByEnableMultiValueUnnesting' set to false.")
:
CoreMatchers.containsString("exceeded max relaunch count")
)
))
.verifyExecutionError();
@ -1581,7 +1574,11 @@ public class MSQSelectTest extends MSQTestBase
@Test
public void testMultiValueStringWithIncorrectType() throws IOException
{
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/unparseable-mv-string-array.json");
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(
temporaryFolder,
this,
"/unparseable-mv-string-array.json"
);
final String toReadAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
RowSignature rowSignature = RowSignature.builder()

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.indexing.error;
import org.apache.druid.frame.processor.FrameRowTooLargeException;
import org.apache.druid.frame.write.UnsupportedColumnTypeException;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.msq.statistics.TooManyBucketsException;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.groupby.epinephelinae.UnexpectedMultiValueDimensionException;
import org.junit.Assert;
import org.junit.Test;
public class MSQErrorReportTest
{
public static final String ERROR_MESSAGE = "test";
@Test
public void testErrorReportFault()
{
Assert.assertEquals(UnknownFault.forException(null), MSQErrorReport.getFaultFromException(null));
MSQException msqException = new MSQException(null, UnknownFault.forMessage(ERROR_MESSAGE));
Assert.assertEquals(msqException.getFault(), MSQErrorReport.getFaultFromException(msqException));
ParseException parseException = new ParseException(null, ERROR_MESSAGE);
Assert.assertEquals(
new CannotParseExternalDataFault(ERROR_MESSAGE),
MSQErrorReport.getFaultFromException(parseException)
);
UnsupportedColumnTypeException columnTypeException = new UnsupportedColumnTypeException(ERROR_MESSAGE, null);
Assert.assertEquals(
new ColumnTypeNotSupportedFault(ERROR_MESSAGE, null),
MSQErrorReport.getFaultFromException(columnTypeException)
);
TooManyBucketsException tooManyBucketsException = new TooManyBucketsException(10);
Assert.assertEquals(new TooManyBucketsFault(10), MSQErrorReport.getFaultFromException(tooManyBucketsException));
FrameRowTooLargeException tooLargeException = new FrameRowTooLargeException(10);
Assert.assertEquals(new RowTooLargeFault(10), MSQErrorReport.getFaultFromException(tooLargeException));
UnexpectedMultiValueDimensionException mvException = new UnexpectedMultiValueDimensionException(ERROR_MESSAGE);
Assert.assertEquals(QueryRuntimeFault.CODE, MSQErrorReport.getFaultFromException(mvException).getErrorCode());
QueryTimeoutException queryException = new QueryTimeoutException(ERROR_MESSAGE);
Assert.assertEquals(
new QueryRuntimeFault(ERROR_MESSAGE, null),
MSQErrorReport.getFaultFromException(queryException)
);
RuntimeException runtimeException = new RuntimeException(ERROR_MESSAGE);
Assert.assertEquals(
UnknownFault.forException(runtimeException),
MSQErrorReport.getFaultFromException(runtimeException)
);
}
}

View File

@ -63,6 +63,8 @@ public class MSQFaultSerdeTest
assertFaultSerde(new InvalidNullByteFault("the column"));
assertFaultSerde(new NotEnoughMemoryFault(1000, 1000, 900, 1, 2));
assertFaultSerde(QueryNotSupportedFault.INSTANCE);
assertFaultSerde(new QueryRuntimeFault("new error", "base error"));
assertFaultSerde(new QueryRuntimeFault("new error", null));
assertFaultSerde(new RowTooLargeFault(1000));
assertFaultSerde(new TaskStartTimeoutFault(10, 11));
assertFaultSerde(new TooManyBucketsFault(10));