Explicitly log cluster state update failures

This commit adds explicit logging at the DEBUG level for cluster state
update failures. Currently this responsibility is left to the cluster
state task listener, but we should expliclty log these with a generic
message to address cases where the listener might not.

Relates #14899, relates #15016, relates #15023
This commit is contained in:
Jason Tedor 2015-12-14 17:20:22 -05:00
parent 1f0c23a21d
commit d64baf0f30
1 changed files with 43 additions and 5 deletions

View File

@ -20,8 +20,19 @@
package org.elasticsearch.cluster.service;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.AckedClusterStateTaskListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterState.Builder;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.TimeoutClusterStateListener;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.MetaData;
@ -42,7 +53,13 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.StringText;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.*;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.PrioritizedRunnable;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryService;
@ -50,8 +67,19 @@ import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.*;
import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@ -292,6 +320,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
if (config.timeout() != null) {
updateTasksExecutor.execute(updateTask, threadPool.scheduler(), config.timeout(), () -> threadPool.generic().execute(() -> {
if (updateTask.processed.getAndSet(true) == false) {
logger.debug("cluster state update task [{}] timed out after [{}]", source, config.timeout());
listener.onFailure(source, new ProcessClusterEventTimeoutException(config.timeout(), source));
}}));
} else {
@ -413,6 +442,9 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
}
assert batchResult.executionResults != null;
assert batchResult.executionResults.size() == toExecute.size();
final ClusterStateTaskExecutor.BatchResult<T> finalBatchResult = batchResult;
assert toExecute.stream().map(updateTask -> updateTask.task).allMatch(finalBatchResult.executionResults::containsKey);
ClusterState newClusterState = batchResult.resultingState;
final ArrayList<UpdateTask<T>> proccessedListeners = new ArrayList<>();
@ -421,7 +453,13 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
assert batchResult.executionResults.containsKey(updateTask.task) : "missing " + updateTask.task.toString();
final ClusterStateTaskExecutor.TaskResult executionResult =
batchResult.executionResults.get(updateTask.task);
executionResult.handle(() -> proccessedListeners.add(updateTask), ex -> updateTask.listener.onFailure(updateTask.source, ex));
executionResult.handle(
() -> proccessedListeners.add(updateTask),
ex -> {
logger.debug("cluster state update task [{}] failed", ex, updateTask.source);
updateTask.listener.onFailure(updateTask.source, ex);
}
);
}
if (previousClusterState == newClusterState) {