Internal: Introduce TimedPrioritizedRunnable base class to all commands that go into InternalClusterService.updateTasksExecutor

At the moment we sometime submit generic runnables, which make life slightly harder when generated pending task list which have to account for them. This commit adds an abstract TimedPrioritizedRunnable class which should always be used. This class also automatically measures time in queue, which is needed for the pending task reporting.

Relates to #8077

Closes #9354
Closes #9671
This commit is contained in:
Boaz Leskes 2015-02-12 00:38:06 +01:00
parent 41befaf6b5
commit d6e9101f42
2 changed files with 36 additions and 16 deletions

View File

@ -32,8 +32,8 @@ import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
@ -235,7 +235,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
}
// call the post added notification on the same event thread
try {
updateTasksExecutor.execute(new PrioritizedRunnable(Priority.HIGH) {
updateTasksExecutor.execute(new TimedPrioritizedRunnable(Priority.HIGH, "_add_listener_") {
@Override
public void run() {
NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout);
@ -272,7 +272,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
timeoutUpdateTask.onFailure(task.source, new ProcessClusterEventTimeoutException(timeoutUpdateTask.timeout(), task.source));
timeoutUpdateTask.onFailure(task.source(), new ProcessClusterEventTimeoutException(timeoutUpdateTask.timeout(), task.source()));
}
});
}
@ -291,19 +291,19 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
@Override
public List<PendingClusterTask> pendingTasks() {
long now = System.currentTimeMillis();
PrioritizedEsThreadPoolExecutor.Pending[] pendings = updateTasksExecutor.getPending();
List<PendingClusterTask> pendingClusterTasks = new ArrayList<>(pendings.length);
for (PrioritizedEsThreadPoolExecutor.Pending pending : pendings) {
final String source;
final long timeInQueue;
if (pending.task instanceof UpdateTask) {
UpdateTask updateTask = (UpdateTask) pending.task;
source = updateTask.source;
timeInQueue = now - updateTask.addedAt;
if (pending.task instanceof TimedPrioritizedRunnable) {
TimedPrioritizedRunnable runnable = (TimedPrioritizedRunnable) pending.task;
source = runnable.source();
timeInQueue = runnable.timeSinceCreatedInMillis();
} else {
assert false : "expected TimedPrioritizedRunnable got " + pending.task.getClass();
source = "unknown";
timeInQueue = -1;
timeInQueue = 0;
}
pendingClusterTasks.add(new PendingClusterTask(pending.insertionOrder, pending.priority, new StringText(source), timeInQueue, pending.executing));
@ -311,15 +311,34 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
return pendingClusterTasks;
}
class UpdateTask extends PrioritizedRunnable {
static abstract class TimedPrioritizedRunnable extends PrioritizedRunnable {
private final long creationTime;
protected final String source;
public final String source;
public final ClusterStateUpdateTask updateTask;
public final long addedAt = System.currentTimeMillis();
UpdateTask(String source, Priority priority, ClusterStateUpdateTask updateTask) {
protected TimedPrioritizedRunnable(Priority priority, String source) {
super(priority);
this.source = source;
this.creationTime = System.currentTimeMillis();
}
public long timeSinceCreatedInMillis() {
// max with 0 to make sure we always return a non negative number
// even if time shifts.
return Math.max(0, System.currentTimeMillis() - creationTime);
}
public String source() {
return source;
}
}
class UpdateTask extends TimedPrioritizedRunnable {
public final ClusterStateUpdateTask updateTask;
UpdateTask(String source, Priority priority, ClusterStateUpdateTask updateTask) {
super(priority, source);
this.updateTask = updateTask;
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.cluster.service;
import org.elasticsearch.Version;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -43,6 +42,8 @@ public class PendingClusterTask implements Streamable {
}
public PendingClusterTask(long insertOrder, Priority priority, Text source, long timeInQueue, boolean executing) {
assert timeInQueue >= 0 : "got a negative timeInQueue [" + timeInQueue + "]";
assert insertOrder >= 0 : "got a negative insertOrder [" + insertOrder + "]";
this.insertOrder = insertOrder;
this.priority = priority;
this.source = source;