From 7caee2fa4d918d4c6978bcf986a158d03bf7db2a Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 25 Nov 2015 12:54:48 -0500 Subject: [PATCH] Explicitly correspond cluster state tasks and execution results --- .../cluster/ClusterStateTaskExecutor.java | 67 ++++++++++++++++--- .../cluster/ClusterStateUpdateTask.java | 6 +- .../metadata/MetaDataMappingService.java | 14 ++-- .../service/InternalClusterService.java | 29 ++++---- .../cluster/ClusterServiceIT.java | 4 +- .../test/cluster/TestClusterService.java | 15 +++-- 6 files changed, 95 insertions(+), 40 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java b/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java index 861b924c52e..ebb8e397b99 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java @@ -18,15 +18,17 @@ */ package org.elasticsearch.cluster; -import java.util.Arrays; import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.stream.Collectors; public interface ClusterStateTaskExecutor { /** * Update the cluster state based on the current state and the given tasks. Return the *same instance* if no state * should be changed. */ - Result execute(ClusterState currentState, List tasks) throws Exception; + Result execute(ClusterState currentState, List tasks) throws Exception; /** * indicates whether this task should only run if current node is master @@ -35,19 +37,66 @@ public interface ClusterStateTaskExecutor { return true; } - class Result { + /** + * Represents the result of a batched execution of cluster state update tasks + * @param the type of the cluster state update task + */ + class Result { final public ClusterState resultingState; - final public List failures; + final public Map executionResults; - public Result(ClusterState resultingState, int numberOfTasks) { - this.resultingState = resultingState; - failures = Arrays.asList(new Throwable[numberOfTasks]); + /** + * Construct an execution result instance for which every cluster state update task succeeded + * @param resultingState the resulting cluster state + * @param tasks the cluster state update tasks + */ + public Result(ClusterState resultingState, List tasks) { + this(resultingState, tasks.stream().collect(Collectors.toMap(task -> task, task -> ClusterStateTaskExecutionResult.success()))); } - public Result(ClusterState resultingState, List failures) { + /** + * Construct an execution result instance with a correspondence between the tasks and their execution result + * @param resultingState the resulting cluster state + * @param executionResults the correspondence between tasks and their outcome + */ + public Result(ClusterState resultingState, Map executionResults) { this.resultingState = resultingState; - this.failures = failures; + this.executionResults = executionResults; } } + final class ClusterStateTaskExecutionResult { + private final Throwable failure; + + private static final ClusterStateTaskExecutionResult SUCCESS = new ClusterStateTaskExecutionResult(null); + + public static ClusterStateTaskExecutionResult success() { + return SUCCESS; + } + + public static ClusterStateTaskExecutionResult failure(Throwable failure) { + return new ClusterStateTaskExecutionResult(failure); + } + + private ClusterStateTaskExecutionResult(Throwable failure) { + this.failure = failure; + } + + public boolean isSuccess() { + return failure != null; + } + + /** + * Handle the execution result with the provided consumers + * @param onSuccess handler to invoke on success + * @param onFailure handler to invoke on failure; the throwable passed through will not be null + */ + public void handle(Runnable onSuccess, Consumer onFailure) { + if (failure == null) { + onSuccess.run(); + } else { + onFailure.accept(failure); + } + } + } } diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java b/core/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java index 17c4635c7de..ffcb9c0e75b 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java @@ -28,7 +28,7 @@ import java.util.List; /** * A task that can update the cluster state. */ -abstract public class ClusterStateUpdateTask implements ClusterStateTaskConfig, ClusterStateTaskExecutor, ClusterStateTaskListener { +abstract public class ClusterStateUpdateTask implements ClusterStateTaskConfig, ClusterStateTaskExecutor, ClusterStateTaskListener { final private Priority priority; @@ -41,9 +41,9 @@ abstract public class ClusterStateUpdateTask implements ClusterStateTaskConfig, } @Override - final public Result execute(ClusterState currentState, List tasks) throws Exception { + final public Result execute(ClusterState currentState, List tasks) throws Exception { ClusterState result = execute(currentState); - return new Result(result, tasks.size()); + return new Result<>(result, tasks); } /** diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java index 44e38533257..be401269917 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java @@ -81,9 +81,9 @@ public class MetaDataMappingService extends AbstractComponent { class RefreshTaskExecutor implements ClusterStateTaskExecutor { @Override - public Result execute(ClusterState currentState, List tasks) throws Exception { + public Result execute(ClusterState currentState, List tasks) throws Exception { ClusterState newClusterState = executeRefresh(currentState, tasks); - return new Result(newClusterState, tasks.size()); + return new Result<>(newClusterState, tasks); } } @@ -221,9 +221,9 @@ public class MetaDataMappingService extends AbstractComponent { class PutMappingExecutor implements ClusterStateTaskExecutor { @Override - public Result execute(ClusterState currentState, List tasks) throws Exception { + public Result execute(ClusterState currentState, List tasks) throws Exception { List indicesToClose = new ArrayList<>(); - ArrayList failures = new ArrayList<>(tasks.size()); + Map executionResults = new HashMap<>(); try { // precreate incoming indices; for (PutMappingClusterStateUpdateRequest request : tasks) { @@ -250,13 +250,13 @@ public class MetaDataMappingService extends AbstractComponent { for (PutMappingClusterStateUpdateRequest request : tasks) { try { currentState = applyRequest(currentState, request); - failures.add(null); + executionResults.put(request, ClusterStateTaskExecutionResult.success()); } catch (Throwable t) { - failures.add(t); + executionResults.put(request, ClusterStateTaskExecutionResult.failure(t)); } } - return new Result(currentState, failures); + return new Result<>(currentState, executionResults); } finally { for (String index : indicesToClose) { indicesService.removeIndex(index, "created for mapping processing"); diff --git a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index c97c55d5587..ad4139fec21 100644 --- a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -269,7 +269,7 @@ public class InternalClusterService extends AbstractLifecycleComponent task.listener.onNoLongerMaster(task.source)); return; } - ClusterStateTaskExecutor.Result result; + ClusterStateTaskExecutor.Result result; long startTimeNS = System.nanoTime(); try { List inputs = toExecute.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList()); @@ -410,21 +410,26 @@ public class InternalClusterService extends AbstractLifecycleComponent executionResults = + toExecute + .stream() + .collect(Collectors.toMap( + updateTask -> updateTask.task, + updateTask -> ClusterStateTaskExecutor.ClusterStateTaskExecutionResult.failure(e) + )); + result = new ClusterStateTaskExecutor.Result<>(previousClusterState, executionResults); } - assert result.failures.size() == toExecute.size(); + + assert result.executionResults != null; ClusterState newClusterState = result.resultingState; final ArrayList> proccessedListeners = new ArrayList<>(); // fail all tasks that have failed and extract those that are waiting for results - for (int i = 0; i < toExecute.size(); i++) { - final UpdateTask task = toExecute.get(i); - final Throwable failure = result.failures.get(i); - if (failure == null) { - proccessedListeners.add(task); - } else { - task.listener.onFailure(task.source, failure); - } + for (UpdateTask updateTask : toExecute) { + assert result.executionResults.containsKey(updateTask.task) : "missing " + updateTask.task.toString(); + final ClusterStateTaskExecutor.ClusterStateTaskExecutionResult executionResult = + result.executionResults.get(updateTask.task); + executionResult.handle(() -> proccessedListeners.add(updateTask), ex -> updateTask.listener.onFailure(updateTask.source, ex)); } if (previousClusterState == newClusterState) { diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java b/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java index 60e7fb29041..820e468a093 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java @@ -755,10 +755,10 @@ public class ClusterServiceIT extends ESIntegTestCase { private AtomicInteger counter = new AtomicInteger(); @Override - public Result execute(ClusterState currentState, List tasks) throws Exception { + public Result execute(ClusterState currentState, List tasks) throws Exception { tasks.forEach(task -> task.execute()); counter.addAndGet(tasks.size()); - return new Result(currentState, tasks.size()); + return new Result<>(currentState, tasks); } @Override diff --git a/test-framework/src/main/java/org/elasticsearch/test/cluster/TestClusterService.java b/test-framework/src/main/java/org/elasticsearch/test/cluster/TestClusterService.java index 3845a71c45e..e5d45cd0e58 100644 --- a/test-framework/src/main/java/org/elasticsearch/test/cluster/TestClusterService.java +++ b/test-framework/src/main/java/org/elasticsearch/test/cluster/TestClusterService.java @@ -191,18 +191,19 @@ public class TestClusterService implements ClusterService { logger.debug("failed [{}], no longer master", source); return; } - ClusterStateTaskExecutor.Result result; + ClusterStateTaskExecutor.Result result; ClusterState previousClusterState = state; try { result = executor.execute(previousClusterState, Arrays.asList(task)); } catch (Exception e) { - result = new ClusterStateTaskExecutor.Result(previousClusterState, Arrays.asList(e)); - } - if (result.failures.get(0) != null) { - listener.onFailure(source, new ElasticsearchException("failed to process cluster state update task [" + source + "]", - result.failures.get(0))); - return; + result = new ClusterStateTaskExecutor.Result<>(previousClusterState, Collections.singletonMap(task, ClusterStateTaskExecutor.ClusterStateTaskExecutionResult.failure(e))); } + + result.executionResults.get(task).handle( + () -> {}, + ex -> listener.onFailure(source, new ElasticsearchException("failed to process cluster state update task [" + source + "]", ex)) + ); + setStateAndNotifyListeners(result.resultingState); listener.clusterStateProcessed(source, previousClusterState, result.resultingState); logger.debug("finished [{}]", source);