diff --git a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index d4b15861846..afdfea65328 100644 --- a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -20,8 +20,19 @@ package org.elasticsearch.cluster.service; import org.elasticsearch.Version; -import org.elasticsearch.cluster.*; +import org.elasticsearch.cluster.AckedClusterStateTaskListener; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState.Builder; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.ClusterStateTaskConfig; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.ClusterStateTaskListener; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.LocalNodeMasterListener; +import org.elasticsearch.cluster.TimeoutClusterStateListener; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.MetaData; @@ -42,7 +53,13 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.text.StringText; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.*; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; +import org.elasticsearch.common.util.concurrent.PrioritizedRunnable; import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoveryService; @@ -50,8 +67,19 @@ import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -292,6 +320,7 @@ public class InternalClusterService extends AbstractLifecycleComponent threadPool.generic().execute(() -> { if (updateTask.processed.getAndSet(true) == false) { + logger.debug("cluster state update task [{}] timed out after [{}]", source, config.timeout()); listener.onFailure(source, new ProcessClusterEventTimeoutException(config.timeout(), source)); }})); } else { @@ -413,6 +442,9 @@ public class InternalClusterService extends AbstractLifecycleComponent finalBatchResult = batchResult; + assert toExecute.stream().map(updateTask -> updateTask.task).allMatch(finalBatchResult.executionResults::containsKey); ClusterState newClusterState = batchResult.resultingState; final ArrayList> proccessedListeners = new ArrayList<>(); @@ -421,7 +453,13 @@ public class InternalClusterService extends AbstractLifecycleComponent proccessedListeners.add(updateTask), ex -> updateTask.listener.onFailure(updateTask.source, ex)); + executionResult.handle( + () -> proccessedListeners.add(updateTask), + ex -> { + logger.debug("cluster state update task [{}] failed", ex, updateTask.source); + updateTask.listener.onFailure(updateTask.source, ex); + } + ); } if (previousClusterState == newClusterState) {