mirror of https://github.com/apache/druid.git
Add timeout to TaskStartTimeoutFault. (#13970)
* Add timeout to TaskStartTimeoutFault. Makes the error message a bit more useful. * Update docs.
This commit is contained in:
parent
daff7fe73b
commit
062d72b67e
|
@ -751,7 +751,7 @@ The following table describes error codes you may encounter in the `multiStageQu
|
|||
| <a name="error_InvalidNullByte">`InvalidNullByte`</a> | A string column included a null byte. Null bytes in strings are not permitted. | `column`: The column that included the null byte |
|
||||
| <a name="error_QueryNotSupported">`QueryNotSupported`</a> | QueryKit could not translate the provided native query to a multi-stage query.<br /> <br />This can happen if the query uses features that aren't supported, like GROUPING SETS. | |
|
||||
| <a name="error_RowTooLarge">`RowTooLarge`</a> | The query tried to process a row that was too large to write to a single frame. See the [Limits](#limits) table for specific limits on frame size. Note that the effective maximum row size is smaller than the maximum frame size due to alignment considerations during frame writing. | `maxFrameSize`: The limit on the frame size. |
|
||||
| <a name="error_TaskStartTimeout">`TaskStartTimeout`</a> | Unable to launch all the worker tasks in time. <br /> <br />There might be insufficient available slots to start all the worker tasks simultaneously.<br /> <br /> Try splitting up the query into smaller chunks with lesser `maxNumTasks` number. Another option is to increase capacity. | `numTasks`: The number of tasks attempted to launch. |
|
||||
| <a name="error_TaskStartTimeout">`TaskStartTimeout`</a> | Unable to launch `numTasks` tasks within `timeout` milliseconds.<br /><br />There may be insufficient available slots to start all the worker tasks simultaneously. Try splitting up your query into smaller chunks using a smaller value of [`maxNumTasks`](#context-parameters). Another option is to increase capacity. | `numTasks`: The number of tasks attempted to launch.<br /><br />`timeout`: Timeout, in milliseconds, that was exceeded. |
|
||||
| <a name="error_TooManyAttemptsForJob">`TooManyAttemptsForJob`</a> | Total relaunch attempt count across all workers exceeded max relaunch attempt limit. See the [Limits](#limits) table for the specific limit. | `maxRelaunchCount`: Max number of relaunches across all the workers defined in the [Limits](#limits) section. <br /><br /> `currentRelaunchCount`: current relaunch counter for the job across all workers. <br /><br /> `taskId`: Latest task id which failed <br /> <br /> `rootErrorMessage`: Error message of the latest failed task.|
|
||||
| <a name="error_TooManyAttemptsForWorker">`TooManyAttemptsForWorker`</a> | Worker exceeded maximum relaunch attempt count as defined in the [Limits](#limits) section. |`maxPerWorkerRelaunchCount`: Max number of relaunches allowed per worker as defined in the [Limits](#limits) section. <br /><br /> `workerNumber`: the worker number for which the task failed <br /><br /> `taskId`: Latest task id which failed <br /> <br /> `rootErrorMessage`: Error message of the latest failed task.|
|
||||
| <a name="error_TooManyBuckets">`TooManyBuckets`</a> | Exceeded the maximum number of partition buckets for a stage (5,000 partition buckets).<br />< br />Partition buckets are created for each [`PARTITIONED BY`](#partitioned-by) time chunk for INSERT and REPLACE queries. The most common reason for this error is that your `PARTITIONED BY` is too narrow relative to your data. | `maxBuckets`: The limit on partition buckets. |
|
||||
|
|
|
@ -494,7 +494,7 @@ public class MSQWorkerTaskLauncher
|
|||
|
||||
} else if (tracker.didRunTimeOut(maxTaskStartDelayMillis) && !canceledWorkerTasks.contains(taskId)) {
|
||||
removeWorkerFromFullyStartedWorkers(tracker);
|
||||
throw new MSQException(new TaskStartTimeoutFault(numTasks + 1));
|
||||
throw new MSQException(new TaskStartTimeoutFault(numTasks + 1, maxTaskStartDelayMillis));
|
||||
} else if (tracker.didFail() && !canceledWorkerTasks.contains(taskId)) {
|
||||
removeWorkerFromFullyStartedWorkers(tracker);
|
||||
log.info("Task[%s] failed because %s. Trying to relaunch the worker", taskId, tracker.status.getErrorMsg());
|
||||
|
|
|
@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
|
|||
import org.apache.druid.msq.util.MultiStageQueryContext;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@JsonTypeName(TaskStartTimeoutFault.CODE)
|
||||
public class TaskStartTimeoutFault extends BaseMSQFault
|
||||
|
@ -32,18 +33,26 @@ public class TaskStartTimeoutFault extends BaseMSQFault
|
|||
static final String CODE = "TaskStartTimeout";
|
||||
|
||||
private final int numTasks;
|
||||
private final long timeout;
|
||||
|
||||
@JsonCreator
|
||||
public TaskStartTimeoutFault(@JsonProperty("numTasks") int numTasks)
|
||||
public TaskStartTimeoutFault(
|
||||
@JsonProperty("numTasks") int numTasks,
|
||||
@JsonProperty("timeout") long timeout
|
||||
)
|
||||
{
|
||||
super(
|
||||
CODE,
|
||||
"Unable to launch all the worker tasks in time. There might be insufficient available slots to start all the worker tasks simultaneously."
|
||||
+ " Try lowering '%s' in your query context to lower than [%d] tasks, or increasing capacity.",
|
||||
MultiStageQueryContext.CTX_MAX_NUM_TASKS,
|
||||
numTasks
|
||||
"Unable to launch [%d] worker tasks within [%,d] seconds. "
|
||||
+ "There might be insufficient available slots to start all worker tasks simultaneously. "
|
||||
+ "Try lowering '%s' in your query context to a number that fits within your available task capacity, "
|
||||
+ "or try increasing capacity.",
|
||||
numTasks,
|
||||
TimeUnit.MILLISECONDS.toSeconds(timeout),
|
||||
MultiStageQueryContext.CTX_MAX_NUM_TASKS
|
||||
);
|
||||
this.numTasks = numTasks;
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -52,6 +61,12 @@ public class TaskStartTimeoutFault extends BaseMSQFault
|
|||
return numTasks;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public long getTimeout()
|
||||
{
|
||||
return timeout;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
|
@ -65,12 +80,21 @@ public class TaskStartTimeoutFault extends BaseMSQFault
|
|||
return false;
|
||||
}
|
||||
TaskStartTimeoutFault that = (TaskStartTimeoutFault) o;
|
||||
return numTasks == that.numTasks;
|
||||
return numTasks == that.numTasks && timeout == that.timeout;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(super.hashCode(), numTasks);
|
||||
return Objects.hash(super.hashCode(), numTasks, timeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "TaskStartTimeoutFault{" +
|
||||
"numTasks=" + numTasks +
|
||||
", timeout=" + timeout +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -183,7 +183,7 @@ public class MSQTasksTest
|
|||
}
|
||||
catch (Exception e) {
|
||||
Assert.assertEquals(
|
||||
MSQFaultUtils.generateMessageWithErrorCode(new TaskStartTimeoutFault(numTasks + 1)),
|
||||
MSQFaultUtils.generateMessageWithErrorCode(new TaskStartTimeoutFault(numTasks + 1, 5000)),
|
||||
MSQFaultUtils.generateMessageWithErrorCode(((MSQException) e.getCause()).getFault())
|
||||
);
|
||||
}
|
||||
|
|
|
@ -64,7 +64,7 @@ public class MSQFaultSerdeTest
|
|||
assertFaultSerde(new NotEnoughMemoryFault(1000, 1000, 900, 1, 2));
|
||||
assertFaultSerde(QueryNotSupportedFault.INSTANCE);
|
||||
assertFaultSerde(new RowTooLargeFault(1000));
|
||||
assertFaultSerde(new TaskStartTimeoutFault(10));
|
||||
assertFaultSerde(new TaskStartTimeoutFault(10, 11));
|
||||
assertFaultSerde(new TooManyBucketsFault(10));
|
||||
assertFaultSerde(new TooManyColumnsFault(10, 8));
|
||||
assertFaultSerde(new TooManyClusteredByColumnsFault(10, 8, 1));
|
||||
|
|
Loading…
Reference in New Issue