From d6e9101f421235006089944d73d9d95da677aada Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 12 Feb 2015 00:38:06 +0100 Subject: [PATCH] Internal: Introduce TimedPrioritizedRunnable base class to all commands that go into InternalClusterService.updateTasksExecutor At the moment we sometime submit generic runnables, which make life slightly harder when generated pending task list which have to account for them. This commit adds an abstract TimedPrioritizedRunnable class which should always be used. This class also automatically measures time in queue, which is needed for the pending task reporting. Relates to #8077 Closes #9354 Closes #9671 --- .../service/InternalClusterService.java | 49 +++++++++++++------ .../cluster/service/PendingClusterTask.java | 3 +- 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index 82f3116b32a..8e1ce7a0e2c 100644 --- a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -32,8 +32,8 @@ import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeService; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.OperationRouting; +import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.AbstractLifecycleComponent; @@ -235,7 +235,7 @@ public class InternalClusterService extends AbstractLifecycleComponent pendingTasks() { - long now = System.currentTimeMillis(); PrioritizedEsThreadPoolExecutor.Pending[] pendings = updateTasksExecutor.getPending(); List pendingClusterTasks = new ArrayList<>(pendings.length); for (PrioritizedEsThreadPoolExecutor.Pending pending : pendings) { final String source; final long timeInQueue; - if (pending.task instanceof UpdateTask) { - UpdateTask updateTask = (UpdateTask) pending.task; - source = updateTask.source; - timeInQueue = now - updateTask.addedAt; + if (pending.task instanceof TimedPrioritizedRunnable) { + TimedPrioritizedRunnable runnable = (TimedPrioritizedRunnable) pending.task; + source = runnable.source(); + timeInQueue = runnable.timeSinceCreatedInMillis(); } else { + assert false : "expected TimedPrioritizedRunnable got " + pending.task.getClass(); source = "unknown"; - timeInQueue = -1; + timeInQueue = 0; } pendingClusterTasks.add(new PendingClusterTask(pending.insertionOrder, pending.priority, new StringText(source), timeInQueue, pending.executing)); @@ -311,15 +311,34 @@ public class InternalClusterService extends AbstractLifecycleComponent= 0 : "got a negative timeInQueue [" + timeInQueue + "]"; + assert insertOrder >= 0 : "got a negative insertOrder [" + insertOrder + "]"; this.insertOrder = insertOrder; this.priority = priority; this.source = source;