diff --git a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index 82f3116b32a..8e1ce7a0e2c 100644 --- a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -32,8 +32,8 @@ import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeService; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.OperationRouting; +import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.AbstractLifecycleComponent; @@ -235,7 +235,7 @@ public class InternalClusterService extends AbstractLifecycleComponent pendingTasks() { - long now = System.currentTimeMillis(); PrioritizedEsThreadPoolExecutor.Pending[] pendings = updateTasksExecutor.getPending(); List pendingClusterTasks = new ArrayList<>(pendings.length); for (PrioritizedEsThreadPoolExecutor.Pending pending : pendings) { final String source; final long timeInQueue; - if (pending.task instanceof UpdateTask) { - UpdateTask updateTask = (UpdateTask) pending.task; - source = updateTask.source; - timeInQueue = now - updateTask.addedAt; + if (pending.task instanceof TimedPrioritizedRunnable) { + TimedPrioritizedRunnable runnable = (TimedPrioritizedRunnable) pending.task; + source = runnable.source(); + timeInQueue = runnable.timeSinceCreatedInMillis(); } else { + assert false : "expected TimedPrioritizedRunnable got " + pending.task.getClass(); source = "unknown"; - timeInQueue = -1; + timeInQueue = 0; } pendingClusterTasks.add(new PendingClusterTask(pending.insertionOrder, pending.priority, new StringText(source), timeInQueue, pending.executing)); @@ -311,15 +311,34 @@ public class InternalClusterService extends AbstractLifecycleComponent= 0 : "got a negative timeInQueue [" + timeInQueue + "]"; + assert insertOrder >= 0 : "got a negative insertOrder [" + insertOrder + "]"; this.insertOrder = insertOrder; this.priority = priority; this.source = source;