mirror of https://github.com/apache/druid.git
Fix condition for timeout in worker task launcher (#14270)
* Fix condition for timeout in worker task launcher
This commit is contained in:
parent
66d4ea014c
commit
e8ef31fe92
|
@ -432,7 +432,7 @@ The following table describes error codes you may encounter in the `multiStageQu
|
|||
| <a name="error_QueryNotSupported">`QueryNotSupported`</a> | QueryKit could not translate the provided native query to a multi-stage query.<br /> <br />This can happen if the query uses features that aren't supported, like GROUPING SETS. | |
|
||||
| <a name="error_QueryRuntimeError">`QueryRuntimeError`</a> | MSQ uses the native query engine to run the leaf stages. This error tells MSQ that error is in native query runtime.<br /> <br /> Since this is a generic error, the user needs to look at logs for the error message and stack trace to figure out the next course of action. If the user is stuck, consider raising a `github` issue for assistance. | `baseErrorMessage` error message from the native query runtime. |
|
||||
| <a name="error_RowTooLarge">`RowTooLarge`</a> | The query tried to process a row that was too large to write to a single frame. See the [Limits](#limits) table for specific limits on frame size. Note that the effective maximum row size is smaller than the maximum frame size due to alignment considerations during frame writing. | `maxFrameSize`: The limit on the frame size. |
|
||||
| <a name="error_TaskStartTimeout">`TaskStartTimeout`</a> | Unable to launch `numTasksNotStarted` worker out of total `totalTasks` workers tasks within `timeout` seconds of the last successful worker launch.<br /><br />There may be insufficient available slots to start all the worker tasks simultaneously. Try splitting up your query into smaller chunks using a smaller value of [`maxNumTasks`](#context-parameters). Another option is to increase capacity. | `numTasksNotStarted`: Number of tasks not yet started.<br /><br />`totalTasks`: The number of tasks attempted to launch.<br /><br />`timeout`: Timeout, in milliseconds, that was exceeded. |
|
||||
| <a name="error_TaskStartTimeout">`TaskStartTimeout`</a> | Unable to launch `pendingTasks` worker out of total `totalTasks` workers tasks within `timeout` seconds of the last successful worker launch.<br /><br />There may be insufficient available slots to start all the worker tasks simultaneously. Try splitting up your query into smaller chunks using a smaller value of [`maxNumTasks`](#context-parameters). Another option is to increase capacity. | `pendingTasks`: Number of tasks not yet started.<br /><br />`totalTasks`: The number of tasks attempted to launch.<br /><br />`timeout`: Timeout, in milliseconds, that was exceeded. |
|
||||
| <a name="error_TooManyAttemptsForJob">`TooManyAttemptsForJob`</a> | Total relaunch attempt count across all workers exceeded max relaunch attempt limit. See the [Limits](#limits) table for the specific limit. | `maxRelaunchCount`: Max number of relaunches across all the workers defined in the [Limits](#limits) section. <br /><br /> `currentRelaunchCount`: current relaunch counter for the job across all workers. <br /><br /> `taskId`: Latest task id which failed <br /> <br /> `rootErrorMessage`: Error message of the latest failed task.|
|
||||
| <a name="error_TooManyAttemptsForWorker">`TooManyAttemptsForWorker`</a> | Worker exceeded maximum relaunch attempt count as defined in the [Limits](#limits) section. |`maxPerWorkerRelaunchCount`: Max number of relaunches allowed per worker as defined in the [Limits](#limits) section. <br /><br /> `workerNumber`: the worker number for which the task failed <br /><br /> `taskId`: Latest task id which failed <br /> <br /> `rootErrorMessage`: Error message of the latest failed task.|
|
||||
| <a name="error_TooManyBuckets">`TooManyBuckets`</a> | Exceeded the maximum number of partition buckets for a stage (5,000 partition buckets).<br />< br />Partition buckets are created for each [`PARTITIONED BY`](#partitioned-by) time chunk for INSERT and REPLACE queries. The most common reason for this error is that your `PARTITIONED BY` is too narrow relative to your data. | `maxBuckets`: The limit on partition buckets. |
|
||||
|
|
|
@ -71,7 +71,7 @@ public class MSQWorkerTaskLauncher
|
|||
private static final long HIGH_FREQUENCY_CHECK_MILLIS = 100;
|
||||
private static final long LOW_FREQUENCY_CHECK_MILLIS = 2000;
|
||||
private static final long SWITCH_TO_LOW_FREQUENCY_CHECK_AFTER_MILLIS = 10000;
|
||||
private static final long SHUTDOWN_TIMEOUT_MS = Duration.ofMinutes(1).toMillis();
|
||||
private static final long SHUTDOWN_TIMEOUT_MILLIS = Duration.ofMinutes(1).toMillis();
|
||||
private int currentRelaunchCount = 0;
|
||||
|
||||
// States for "state" variable.
|
||||
|
@ -133,7 +133,7 @@ public class MSQWorkerTaskLauncher
|
|||
private final ConcurrentHashMap<Integer, List<String>> workerToTaskIds = new ConcurrentHashMap<>();
|
||||
private final RetryTask retryTask;
|
||||
|
||||
private final AtomicLong recentFullyStartedWorkerTimeInMs = new AtomicLong(System.currentTimeMillis());
|
||||
private final AtomicLong recentFullyStartedWorkerTimeInMillis = new AtomicLong(System.currentTimeMillis());
|
||||
|
||||
public MSQWorkerTaskLauncher(
|
||||
final String controllerTaskId,
|
||||
|
@ -378,11 +378,11 @@ public class MSQWorkerTaskLauncher
|
|||
// Sleep for a bit, maybe.
|
||||
final long now = System.currentTimeMillis();
|
||||
|
||||
if (now > stopStartTime + SHUTDOWN_TIMEOUT_MS) {
|
||||
if (now > stopStartTime + SHUTDOWN_TIMEOUT_MILLIS) {
|
||||
if (caught != null) {
|
||||
throw caught;
|
||||
} else {
|
||||
throw new ISE("Task shutdown timed out (limit = %,dms)", SHUTDOWN_TIMEOUT_MS);
|
||||
throw new ISE("Task shutdown timed out (limit = %,dms)", SHUTDOWN_TIMEOUT_MILLIS);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -501,7 +501,7 @@ public class MSQWorkerTaskLauncher
|
|||
if (tracker.status.getStatusCode() == TaskState.RUNNING && !tracker.unknownLocation()) {
|
||||
synchronized (taskIds) {
|
||||
if (fullyStartedTasks.add(tracker.workerNumber)) {
|
||||
recentFullyStartedWorkerTimeInMs.set(System.currentTimeMillis());
|
||||
recentFullyStartedWorkerTimeInMillis.set(System.currentTimeMillis());
|
||||
}
|
||||
taskIds.notifyAll();
|
||||
}
|
||||
|
@ -687,16 +687,16 @@ public class MSQWorkerTaskLauncher
|
|||
/**
|
||||
* Used by the main loop to decide how often to check task status.
|
||||
*/
|
||||
private long computeSleepTime(final long loopDurationMs)
|
||||
private long computeSleepTime(final long loopDurationMillis)
|
||||
{
|
||||
final OptionalLong maxTaskStartTime =
|
||||
taskTrackers.values().stream().mapToLong(tracker -> tracker.startTimeMs).max();
|
||||
taskTrackers.values().stream().mapToLong(tracker -> tracker.startTimeMillis).max();
|
||||
|
||||
if (maxTaskStartTime.isPresent() &&
|
||||
System.currentTimeMillis() - maxTaskStartTime.getAsLong() < SWITCH_TO_LOW_FREQUENCY_CHECK_AFTER_MILLIS) {
|
||||
return HIGH_FREQUENCY_CHECK_MILLIS - loopDurationMs;
|
||||
return HIGH_FREQUENCY_CHECK_MILLIS - loopDurationMillis;
|
||||
} else {
|
||||
return LOW_FREQUENCY_CHECK_MILLIS - loopDurationMs;
|
||||
return LOW_FREQUENCY_CHECK_MILLIS - loopDurationMillis;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -729,7 +729,7 @@ public class MSQWorkerTaskLauncher
|
|||
private class TaskTracker
|
||||
{
|
||||
private final int workerNumber;
|
||||
private final long startTimeMs = System.currentTimeMillis();
|
||||
private final long startTimeMillis = System.currentTimeMillis();
|
||||
private final MSQWorkerTask msqWorkerTask;
|
||||
private TaskStatus status;
|
||||
private TaskLocation initialLocation;
|
||||
|
@ -758,13 +758,20 @@ public class MSQWorkerTaskLauncher
|
|||
}
|
||||
|
||||
/**
|
||||
* The timeout is checked from the recentFullyStartedWorkerTimeInMs. If it's more than maxTaskStartDelayMillis return true.
|
||||
* Checks if the task has timed out if all the following conditions are true:
|
||||
* 1. The task is still running.
|
||||
* 2. The location has never been reported by the task. If this is not the case, the task has started already.
|
||||
* 3. Task has taken more than maxTaskStartDelayMillis to start.
|
||||
* 4. No task has started in maxTaskStartDelayMillis. This is in case the cluster is scaling up and other workers
|
||||
* are starting.
|
||||
*/
|
||||
public boolean didRunTimeOut(final long maxTaskStartDelayMillis)
|
||||
{
|
||||
long currentTimeMillis = System.currentTimeMillis();
|
||||
return (status == null || status.getStatusCode() == TaskState.RUNNING)
|
||||
&& unknownLocation()
|
||||
&& System.currentTimeMillis() - recentFullyStartedWorkerTimeInMs.get() > maxTaskStartDelayMillis;
|
||||
&& currentTimeMillis - recentFullyStartedWorkerTimeInMillis.get() > maxTaskStartDelayMillis
|
||||
&& currentTimeMillis - startTimeMillis > maxTaskStartDelayMillis;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -32,13 +32,13 @@ public class TaskStartTimeoutFault extends BaseMSQFault
|
|||
{
|
||||
static final String CODE = "TaskStartTimeout";
|
||||
|
||||
private final int numTasksNotStarted;
|
||||
private final int pendingTasks;
|
||||
private final int totalTasks;
|
||||
private final long timeout;
|
||||
|
||||
@JsonCreator
|
||||
public TaskStartTimeoutFault(
|
||||
@JsonProperty("numTasksNotStarted") int numTasksNotStarted,
|
||||
@JsonProperty("pendingTasks") int pendingTasks,
|
||||
@JsonProperty("totalTasks") int totalTasks,
|
||||
@JsonProperty("timeout") long timeout
|
||||
)
|
||||
|
@ -49,20 +49,20 @@ public class TaskStartTimeoutFault extends BaseMSQFault
|
|||
+ "There might be insufficient available slots to start all worker tasks simultaneously. "
|
||||
+ "Try lowering '%s' in your query context to a number that fits within your available task capacity, "
|
||||
+ "or try increasing capacity.",
|
||||
numTasksNotStarted,
|
||||
pendingTasks,
|
||||
totalTasks,
|
||||
TimeUnit.MILLISECONDS.toSeconds(timeout),
|
||||
MultiStageQueryContext.CTX_MAX_NUM_TASKS
|
||||
);
|
||||
this.numTasksNotStarted = numTasksNotStarted;
|
||||
this.pendingTasks = pendingTasks;
|
||||
this.totalTasks = totalTasks;
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public int getNumTasksNotStarted()
|
||||
public int getPendingTasks()
|
||||
{
|
||||
return numTasksNotStarted;
|
||||
return pendingTasks;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -90,13 +90,13 @@ public class TaskStartTimeoutFault extends BaseMSQFault
|
|||
return false;
|
||||
}
|
||||
TaskStartTimeoutFault that = (TaskStartTimeoutFault) o;
|
||||
return numTasksNotStarted == that.numTasksNotStarted && totalTasks == that.totalTasks && timeout == that.timeout;
|
||||
return pendingTasks == that.pendingTasks && totalTasks == that.totalTasks && timeout == that.timeout;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(super.hashCode(), numTasksNotStarted, totalTasks, timeout);
|
||||
return Objects.hash(super.hashCode(), pendingTasks, totalTasks, timeout);
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue