Explicitly correspond cluster state tasks and execution results

This commit is contained in:
Jason Tedor 2015-11-25 12:54:48 -05:00
parent c3f97e7642
commit 7caee2fa4d
6 changed files with 95 additions and 40 deletions

View File

@ -18,15 +18,17 @@
*/
package org.elasticsearch.cluster;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
public interface ClusterStateTaskExecutor<T> {
/**
* Update the cluster state based on the current state and the given tasks. Return the *same instance* if no state
* should be changed.
*/
Result execute(ClusterState currentState, List<T> tasks) throws Exception;
Result<T> execute(ClusterState currentState, List<T> tasks) throws Exception;
/**
* indicates whether this task should only run if current node is master
@ -35,19 +37,66 @@ public interface ClusterStateTaskExecutor<T> {
return true;
}
class Result {
/**
* Represents the result of a batched execution of cluster state update tasks
* @param <T> the type of the cluster state update task
*/
class Result<T> {
final public ClusterState resultingState;
final public List<Throwable> failures;
final public Map<T, ClusterStateTaskExecutionResult> executionResults;
public Result(ClusterState resultingState, int numberOfTasks) {
/**
* Construct an execution result instance for which every cluster state update task succeeded
* @param resultingState the resulting cluster state
* @param tasks the cluster state update tasks
*/
public Result(ClusterState resultingState, List<T> tasks) {
this(resultingState, tasks.stream().collect(Collectors.toMap(task -> task, task -> ClusterStateTaskExecutionResult.success())));
}
/**
* Construct an execution result instance with a correspondence between the tasks and their execution result
* @param resultingState the resulting cluster state
* @param executionResults the correspondence between tasks and their outcome
*/
public Result(ClusterState resultingState, Map<T, ClusterStateTaskExecutionResult> executionResults) {
this.resultingState = resultingState;
failures = Arrays.asList(new Throwable[numberOfTasks]);
}
public Result(ClusterState resultingState, List<Throwable> failures) {
this.resultingState = resultingState;
this.failures = failures;
this.executionResults = executionResults;
}
}
final class ClusterStateTaskExecutionResult {
private final Throwable failure;
private static final ClusterStateTaskExecutionResult SUCCESS = new ClusterStateTaskExecutionResult(null);
public static ClusterStateTaskExecutionResult success() {
return SUCCESS;
}
public static ClusterStateTaskExecutionResult failure(Throwable failure) {
return new ClusterStateTaskExecutionResult(failure);
}
private ClusterStateTaskExecutionResult(Throwable failure) {
this.failure = failure;
}
public boolean isSuccess() {
return failure != null;
}
/**
* Handle the execution result with the provided consumers
* @param onSuccess handler to invoke on success
* @param onFailure handler to invoke on failure; the throwable passed through will not be null
*/
public void handle(Runnable onSuccess, Consumer<Throwable> onFailure) {
if (failure == null) {
onSuccess.run();
} else {
onFailure.accept(failure);
}
}
}
}

View File

@ -28,7 +28,7 @@ import java.util.List;
/**
* A task that can update the cluster state.
*/
abstract public class ClusterStateUpdateTask implements ClusterStateTaskConfig, ClusterStateTaskExecutor<Void>, ClusterStateTaskListener {
abstract public class ClusterStateUpdateTask implements ClusterStateTaskConfig, ClusterStateTaskExecutor<ClusterStateUpdateTask>, ClusterStateTaskListener {
final private Priority priority;
@ -41,9 +41,9 @@ abstract public class ClusterStateUpdateTask implements ClusterStateTaskConfig,
}
@Override
final public Result execute(ClusterState currentState, List<Void> tasks) throws Exception {
final public Result<ClusterStateUpdateTask> execute(ClusterState currentState, List<ClusterStateUpdateTask> tasks) throws Exception {
ClusterState result = execute(currentState);
return new Result(result, tasks.size());
return new Result<>(result, tasks);
}
/**

View File

@ -81,9 +81,9 @@ public class MetaDataMappingService extends AbstractComponent {
class RefreshTaskExecutor implements ClusterStateTaskExecutor<RefreshTask> {
@Override
public Result execute(ClusterState currentState, List<RefreshTask> tasks) throws Exception {
public Result<RefreshTask> execute(ClusterState currentState, List<RefreshTask> tasks) throws Exception {
ClusterState newClusterState = executeRefresh(currentState, tasks);
return new Result(newClusterState, tasks.size());
return new Result<>(newClusterState, tasks);
}
}
@ -221,9 +221,9 @@ public class MetaDataMappingService extends AbstractComponent {
class PutMappingExecutor implements ClusterStateTaskExecutor<PutMappingClusterStateUpdateRequest> {
@Override
public Result execute(ClusterState currentState, List<PutMappingClusterStateUpdateRequest> tasks) throws Exception {
public Result<PutMappingClusterStateUpdateRequest> execute(ClusterState currentState, List<PutMappingClusterStateUpdateRequest> tasks) throws Exception {
List<String> indicesToClose = new ArrayList<>();
ArrayList<Throwable> failures = new ArrayList<>(tasks.size());
Map<PutMappingClusterStateUpdateRequest, ClusterStateTaskExecutionResult> executionResults = new HashMap<>();
try {
// precreate incoming indices;
for (PutMappingClusterStateUpdateRequest request : tasks) {
@ -250,13 +250,13 @@ public class MetaDataMappingService extends AbstractComponent {
for (PutMappingClusterStateUpdateRequest request : tasks) {
try {
currentState = applyRequest(currentState, request);
failures.add(null);
executionResults.put(request, ClusterStateTaskExecutionResult.success());
} catch (Throwable t) {
failures.add(t);
executionResults.put(request, ClusterStateTaskExecutionResult.failure(t));
}
}
return new Result(currentState, failures);
return new Result<>(currentState, executionResults);
} finally {
for (String index : indicesToClose) {
indicesService.removeIndex(index, "created for mapping processing");

View File

@ -269,7 +269,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
@Override
public void submitStateUpdateTask(final String source, final ClusterStateUpdateTask updateTask) {
submitStateUpdateTask(source, null, updateTask, updateTask, updateTask);
submitStateUpdateTask(source, updateTask, updateTask, updateTask, updateTask);
}
@ -395,7 +395,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
toExecute.stream().forEach(task -> task.listener.onNoLongerMaster(task.source));
return;
}
ClusterStateTaskExecutor.Result result;
ClusterStateTaskExecutor.Result<T> result;
long startTimeNS = System.nanoTime();
try {
List<T> inputs = toExecute.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList());
@ -410,21 +410,26 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
logger.trace(sb.toString(), e);
}
warnAboutSlowTaskIfNeeded(executionTime, source);
result = new ClusterStateTaskExecutor.Result(previousClusterState, Collections.nCopies(toExecute.size(), e));
Map<T, ClusterStateTaskExecutor.ClusterStateTaskExecutionResult> executionResults =
toExecute
.stream()
.collect(Collectors.toMap(
updateTask -> updateTask.task,
updateTask -> ClusterStateTaskExecutor.ClusterStateTaskExecutionResult.failure(e)
));
result = new ClusterStateTaskExecutor.Result<>(previousClusterState, executionResults);
}
assert result.failures.size() == toExecute.size();
assert result.executionResults != null;
ClusterState newClusterState = result.resultingState;
final ArrayList<UpdateTask<T>> proccessedListeners = new ArrayList<>();
// fail all tasks that have failed and extract those that are waiting for results
for (int i = 0; i < toExecute.size(); i++) {
final UpdateTask<T> task = toExecute.get(i);
final Throwable failure = result.failures.get(i);
if (failure == null) {
proccessedListeners.add(task);
} else {
task.listener.onFailure(task.source, failure);
}
for (UpdateTask<T> updateTask : toExecute) {
assert result.executionResults.containsKey(updateTask.task) : "missing " + updateTask.task.toString();
final ClusterStateTaskExecutor.ClusterStateTaskExecutionResult executionResult =
result.executionResults.get(updateTask.task);
executionResult.handle(() -> proccessedListeners.add(updateTask), ex -> updateTask.listener.onFailure(updateTask.source, ex));
}
if (previousClusterState == newClusterState) {

View File

@ -755,10 +755,10 @@ public class ClusterServiceIT extends ESIntegTestCase {
private AtomicInteger counter = new AtomicInteger();
@Override
public Result execute(ClusterState currentState, List<Task> tasks) throws Exception {
public Result<Task> execute(ClusterState currentState, List<Task> tasks) throws Exception {
tasks.forEach(task -> task.execute());
counter.addAndGet(tasks.size());
return new Result(currentState, tasks.size());
return new Result<>(currentState, tasks);
}
@Override

View File

@ -191,18 +191,19 @@ public class TestClusterService implements ClusterService {
logger.debug("failed [{}], no longer master", source);
return;
}
ClusterStateTaskExecutor.Result result;
ClusterStateTaskExecutor.Result<T> result;
ClusterState previousClusterState = state;
try {
result = executor.execute(previousClusterState, Arrays.asList(task));
} catch (Exception e) {
result = new ClusterStateTaskExecutor.Result(previousClusterState, Arrays.asList(e));
}
if (result.failures.get(0) != null) {
listener.onFailure(source, new ElasticsearchException("failed to process cluster state update task [" + source + "]",
result.failures.get(0)));
return;
result = new ClusterStateTaskExecutor.Result<>(previousClusterState, Collections.singletonMap(task, ClusterStateTaskExecutor.ClusterStateTaskExecutionResult.failure(e)));
}
result.executionResults.get(task).handle(
() -> {},
ex -> listener.onFailure(source, new ElasticsearchException("failed to process cluster state update task [" + source + "]", ex))
);
setStateAndNotifyListeners(result.resultingState);
listener.clusterStateProcessed(source, previousClusterState, result.resultingState);
logger.debug("finished [{}]", source);