diff --git a/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java index 6d69d57ad1f..78b18f1d666 100644 --- a/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java +++ b/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java @@ -451,7 +451,7 @@ public class ClusterService extends AbstractLifecycleComponent { // convert to an identity map to check for dups based on update tasks semantics of using identity instead of equal final IdentityHashMap tasksIdentity = new IdentityHashMap<>(tasks); final List> updateTasks = tasksIdentity.entrySet().stream().map( - entry -> new UpdateTask<>(source, entry.getKey(), config, executor, safe(entry.getValue(), logger)) + entry -> new UpdateTask<>(source, entry.getKey(), config.priority(), executor, safe(entry.getValue(), logger)) ).collect(Collectors.toList()); synchronized (updateTasksPerExecutor) { @@ -590,11 +590,11 @@ public class ClusterService extends AbstractLifecycleComponent { if (pending != null) { for (UpdateTask task : pending) { if (task.processed.getAndSet(true) == false) { - logger.trace("will process {}", task.toString(executor)); + logger.trace("will process {}", task); toExecute.add(task); processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task.task); } else { - logger.trace("skipping {}, already processed", task.toString(executor)); + logger.trace("skipping {}, already processed", task); } } } @@ -652,7 +652,7 @@ public class ClusterService extends AbstractLifecycleComponent { if (assertsEnabled) { for (UpdateTask updateTask : toExecute) { assert batchResult.executionResults.containsKey(updateTask.task) : - "missing task result for " + updateTask.toString(executor); + "missing task result for " + updateTask; } } @@ -660,7 +660,7 @@ public class ClusterService extends AbstractLifecycleComponent { final ArrayList> proccessedListeners = new ArrayList<>(); // fail all tasks that have failed and extract those that are waiting for results for (UpdateTask updateTask : toExecute) { - assert batchResult.executionResults.containsKey(updateTask.task) : "missing " + updateTask.toString(executor); + assert batchResult.executionResults.containsKey(updateTask.task) : "missing " + updateTask; final ClusterStateTaskExecutor.TaskResult executionResult = batchResult.executionResults.get(updateTask.task); executionResult.handle( @@ -668,7 +668,7 @@ public class ClusterService extends AbstractLifecycleComponent { ex -> { logger.debug( (Supplier) - () -> new ParameterizedMessage("cluster state update task {} failed", updateTask.toString(executor)), ex); + () -> new ParameterizedMessage("cluster state update task {} failed", updateTask), ex); updateTask.listener.onFailure(updateTask.source, ex); } ); @@ -944,16 +944,13 @@ public class ClusterService extends AbstractLifecycleComponent { class UpdateTask extends SourcePrioritizedRunnable { public final T task; - public final ClusterStateTaskConfig config; - public final ClusterStateTaskExecutor executor; public final ClusterStateTaskListener listener; + private final ClusterStateTaskExecutor executor; public final AtomicBoolean processed = new AtomicBoolean(); - UpdateTask(String source, T task, ClusterStateTaskConfig config, ClusterStateTaskExecutor executor, - ClusterStateTaskListener listener) { - super(config.priority(), source); + UpdateTask(String source, T task, Priority priority, ClusterStateTaskExecutor executor, ClusterStateTaskListener listener) { + super(priority, source); this.task = task; - this.config = config; this.executor = executor; this.listener = listener; } @@ -967,7 +964,8 @@ public class ClusterService extends AbstractLifecycleComponent { } } - public String toString(ClusterStateTaskExecutor executor) { + @Override + public String toString() { String taskDescription = executor.describeTasks(Collections.singletonList(task)); if (taskDescription.isEmpty()) { return "[" + source + "]";