diff --git a/docs/reference/modules/discovery/discovery-settings.asciidoc b/docs/reference/modules/discovery/discovery-settings.asciidoc index bfd9cbd51fe..2a1d640280a 100644 --- a/docs/reference/modules/discovery/discovery-settings.asciidoc +++ b/docs/reference/modules/discovery/discovery-settings.asciidoc @@ -192,6 +192,12 @@ or may become unstable or intolerant of certain failures. time. The default value is `10`. See <>. +`cluster.publish.info_timeout`:: + + Sets how long the master node waits for each cluster state update to be + completely published to all nodes before logging a message indicating that + some nodes are responding slowly. The default value is `10s`. + `cluster.publish.timeout`:: Sets how long the master node waits for each cluster state update to be diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 71bb2cd3de3..7ccb2abf59d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.cluster.coordination; +import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -99,6 +100,11 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private static final Logger logger = LogManager.getLogger(Coordinator.class); + // the timeout before emitting an info log about a slow-running publication + public static final Setting PUBLISH_INFO_TIMEOUT_SETTING = + Setting.timeSetting("cluster.publish.info_timeout", + TimeValue.timeValueMillis(10000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); + // the timeout for the publication of each value public static final Setting PUBLISH_TIMEOUT_SETTING = Setting.timeSetting("cluster.publish.timeout", @@ -126,6 +132,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private final ElectionSchedulerFactory electionSchedulerFactory; private final SeedHostsResolver configuredHostsResolver; private final TimeValue publishTimeout; + private final TimeValue publishInfoTimeout; private final PublicationTransportHandler publicationHandler; private final LeaderChecker leaderChecker; private final FollowersChecker followersChecker; @@ -173,6 +180,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery this.lastJoin = Optional.empty(); this.joinAccumulator = new InitialJoinAccumulator(); this.publishTimeout = PUBLISH_TIMEOUT_SETTING.get(settings); + this.publishInfoTimeout = PUBLISH_INFO_TIMEOUT_SETTING.get(settings); this.random = random; this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool()); this.preVoteCollector = new PreVoteCollector(transportService, this::startElection, this::updateMaxTermSeen, electionStrategy); @@ -1244,7 +1252,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private final AckListener ackListener; private final ActionListener publishListener; private final PublicationTransportHandler.PublicationContext publicationContext; - private final Scheduler.ScheduledCancellable scheduledCancellable; + private final Scheduler.ScheduledCancellable timeoutHandler; + private final Scheduler.Cancellable infoTimeoutHandler; // We may not have accepted our own state before receiving a join from another node, causing its join to be rejected (we cannot // safely accept a join whose last-accepted term/version is ahead of ours), so store them up and process them at the end. @@ -1285,7 +1294,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery this.localNodeAckEvent = localNodeAckEvent; this.ackListener = ackListener; this.publishListener = publishListener; - this.scheduledCancellable = transportService.getThreadPool().schedule(new Runnable() { + + this.timeoutHandler = transportService.getThreadPool().schedule(new Runnable() { @Override public void run() { synchronized (mutex) { @@ -1298,6 +1308,20 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery return "scheduled timeout for " + CoordinatorPublication.this; } }, publishTimeout, Names.GENERIC); + + this.infoTimeoutHandler = transportService.getThreadPool().schedule(new Runnable() { + @Override + public void run() { + synchronized (mutex) { + logIncompleteNodes(Level.INFO); + } + } + + @Override + public String toString() { + return "scheduled timeout for reporting on " + CoordinatorPublication.this; + } + }, publishInfoTimeout, Names.GENERIC); } private void removePublicationAndPossiblyBecomeCandidate(String reason) { @@ -1339,7 +1363,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery synchronized (mutex) { removePublicationAndPossiblyBecomeCandidate("clusterApplier#onNewClusterState"); } - scheduledCancellable.cancel(); + timeoutHandler.cancel(); + infoTimeoutHandler.cancel(); ackListener.onNodeAck(getLocalNode(), e); publishListener.onFailure(e); } @@ -1384,8 +1409,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery } } lagDetector.startLagDetector(publishRequest.getAcceptedState().version()); + logIncompleteNodes(Level.WARN); } - scheduledCancellable.cancel(); + timeoutHandler.cancel(); + infoTimeoutHandler.cancel(); ackListener.onNodeAck(getLocalNode(), null); publishListener.onResponse(null); } @@ -1396,7 +1423,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery public void onFailure(Exception e) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; removePublicationAndPossiblyBecomeCandidate("Publication.onCompletion(false)"); - scheduledCancellable.cancel(); + timeoutHandler.cancel(); + infoTimeoutHandler.cancel(); final FailedToCommitClusterStateException exception = new FailedToCommitClusterStateException("publication failed", e); ackListener.onNodeAck(getLocalNode(), exception); // other nodes have acked, but not the master. diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java b/server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java index 48ffb96aa74..5c3f48d3642 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java @@ -162,7 +162,9 @@ public class LagDetector { return; } - logger.debug("{}, detected lag at version {}, node has only applied version {}", this, version, appliedVersion); + logger.warn( + "node [{}] is lagging at cluster state version [{}], although publication of cluster state version [{}] completed [{}] ago", + discoveryNode, appliedVersion, version, clusterStateApplicationTimeout); onLagDetected.accept(discoveryNode); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java index 2557328233e..c494a4a06a8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.coordination; +import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -195,6 +196,16 @@ public abstract class Publication { ", version=" + publishRequest.getAcceptedState().version() + '}'; } + void logIncompleteNodes(Level level) { + final String message = publicationTargets.stream().filter(PublicationTarget::isActive).map(publicationTarget -> + publicationTarget.getDiscoveryNode() + " [" + publicationTarget.getState() + "]").collect(Collectors.joining(", ")); + if (message.isEmpty() == false) { + final TimeValue elapsedTime = TimeValue.timeValueMillis(currentTimeSupplier.getAsLong() - startTime); + logger.log(level, "after [{}] publication of cluster state version [{}] is still waiting for {}", elapsedTime, + publishRequest.getAcceptedState().version(), message); + } + } + enum PublicationTargetState { NOT_STARTED, FAILED, @@ -213,6 +224,10 @@ public abstract class Publication { this.discoveryNode = discoveryNode; } + PublicationTargetState getState() { + return state; + } + @Override public String toString() { return "PublicationTarget{" + diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java index 9d5d592023c..9009d12243c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java @@ -35,7 +35,9 @@ import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -47,6 +49,7 @@ import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -61,6 +64,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.elasticsearch.cluster.service.ClusterService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING; @@ -389,15 +393,18 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements final ClusterState previousClusterState = state.get(); long startTimeMS = currentTimeInMillis(); + final StopWatch stopWatch = new StopWatch(); final ClusterState newClusterState; try { - newClusterState = task.apply(previousClusterState); + try (Releasable ignored = stopWatch.timing("running task [" + task.source + ']')) { + newClusterState = task.apply(previousClusterState); + } } catch (Exception e) { TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS)); logger.trace(() -> new ParameterizedMessage( "failed to execute cluster state applier in [{}], state:\nversion [{}], source [{}]\n{}", executionTime, previousClusterState.version(), task.source, previousClusterState), e); - warnAboutSlowTaskIfNeeded(executionTime, task.source); + warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch); task.listener.onFailure(task.source, e); return; } @@ -405,7 +412,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements if (previousClusterState == newClusterState) { TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS)); logger.debug("processing [{}]: took [{}] no change in cluster state", task.source, executionTime); - warnAboutSlowTaskIfNeeded(executionTime, task.source); + warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch); task.listener.onSuccess(task.source); } else { if (logger.isTraceEnabled()) { @@ -415,12 +422,12 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), task.source); } try { - applyChanges(task, previousClusterState, newClusterState); + applyChanges(task, previousClusterState, newClusterState, stopWatch); TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS)); logger.debug("processing [{}]: took [{}] done applying updated cluster state (version: {}, uuid: {})", task.source, executionTime, newClusterState.version(), newClusterState.stateUUID()); - warnAboutSlowTaskIfNeeded(executionTime, task.source); + warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch); task.listener.onSuccess(task.source); } catch (Exception e) { TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS)); @@ -438,7 +445,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements } } - private void applyChanges(UpdateTask task, ClusterState previousClusterState, ClusterState newClusterState) { + private void applyChanges(UpdateTask task, ClusterState previousClusterState, ClusterState newClusterState, StopWatch stopWatch) { ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(task.source, newClusterState, previousClusterState); // new cluster state, notify all listeners final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta(); @@ -451,17 +458,21 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements } logger.trace("connecting to nodes of cluster state with version {}", newClusterState.version()); - connectToNodesAndWait(newClusterState); + try (Releasable ignored = stopWatch.timing("connecting to new nodes")) { + connectToNodesAndWait(newClusterState); + } // nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metaDataChanged()) { logger.debug("applying settings from cluster state with version {}", newClusterState.version()); final Settings incomingSettings = clusterChangedEvent.state().metaData().settings(); - clusterSettings.applySettings(incomingSettings); + try (Releasable ignored = stopWatch.timing("applying settings")) { + clusterSettings.applySettings(incomingSettings); + } } logger.debug("apply cluster state with version {}", newClusterState.version()); - callClusterStateAppliers(clusterChangedEvent); + callClusterStateAppliers(clusterChangedEvent, stopWatch); nodeConnectionsService.disconnectFromNodesExcept(newClusterState.nodes()); @@ -474,7 +485,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements logger.debug("set locally applied cluster state to version {}", newClusterState.version()); state.set(newClusterState); - callClusterStateListeners(clusterChangedEvent); + callClusterStateListeners(clusterChangedEvent, stopWatch); } protected void connectToNodesAndWait(ClusterState newClusterState) { @@ -489,18 +500,22 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements } } - private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent) { + private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch) { clusterStateAppliers.forEach(applier -> { logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version()); - applier.applyClusterState(clusterChangedEvent); + try (Releasable ignored = stopWatch.timing("running applier [" + applier + "]")) { + applier.applyClusterState(clusterChangedEvent); + } }); } - private void callClusterStateListeners(ClusterChangedEvent clusterChangedEvent) { + private void callClusterStateListeners(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch) { Stream.concat(clusterStateListeners.stream(), timeoutClusterStateListeners.stream()).forEach(listener -> { try { logger.trace("calling [{}] with change to version [{}]", listener, clusterChangedEvent.state().version()); - listener.clusterChanged(clusterChangedEvent); + try (Releasable ignored = stopWatch.timing("notifying listener [" + listener + "]")) { + listener.clusterChanged(clusterChangedEvent); + } } catch (Exception ex) { logger.warn("failed to notify ClusterStateListener", ex); } @@ -538,10 +553,11 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements } } - protected void warnAboutSlowTaskIfNeeded(TimeValue executionTime, String source) { + private void warnAboutSlowTaskIfNeeded(TimeValue executionTime, String source, StopWatch stopWatch) { if (executionTime.getMillis() > slowTaskLoggingThreshold.getMillis()) { - logger.warn("cluster state applier task [{}] took [{}] which is above the warn threshold of {}", source, executionTime, - slowTaskLoggingThreshold); + logger.warn("cluster state applier task [{}] took [{}] which is above the warn threshold of [{}]: {}", source, executionTime, + slowTaskLoggingThreshold, Arrays.stream(stopWatch.taskInfo()) + .map(ti -> '[' + ti.getTaskName() + "] took [" + ti.getTime().millis() + "ms]").collect(Collectors.joining(", "))); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java index c3555928908..c4ce272472a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java @@ -569,7 +569,7 @@ public class MasterService extends AbstractLifecycleComponent { protected void warnAboutSlowTaskIfNeeded(TimeValue executionTime, String source) { if (executionTime.getMillis() > slowTaskLoggingThreshold.getMillis()) { - logger.warn("cluster state update task [{}] took [{}] which is above the warn threshold of {}", source, executionTime, + logger.warn("cluster state update task [{}] took [{}] which is above the warn threshold of [{}]", source, executionTime, slowTaskLoggingThreshold); } } diff --git a/server/src/main/java/org/elasticsearch/common/StopWatch.java b/server/src/main/java/org/elasticsearch/common/StopWatch.java index f842dde68c4..acef66340ee 100644 --- a/server/src/main/java/org/elasticsearch/common/StopWatch.java +++ b/server/src/main/java/org/elasticsearch/common/StopWatch.java @@ -19,6 +19,7 @@ package org.elasticsearch.common; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.unit.TimeValue; import java.text.NumberFormat; @@ -51,8 +52,6 @@ public class StopWatch { */ private final String id; - private boolean keepTaskList = true; - private final List taskList = new LinkedList<>(); /** @@ -72,8 +71,6 @@ public class StopWatch { private TaskInfo lastTaskInfo; - private int taskCount; - /** * Total running time */ @@ -98,16 +95,6 @@ public class StopWatch { this.id = id; } - /** - * Determine whether the TaskInfo array is built over time. Set this to - * "false" when using a StopWatch for millions of intervals, or the task - * info structure will consume excessive memory. Default is "true". - */ - public StopWatch keepTaskList(boolean keepTaskList) { - this.keepTaskList = keepTaskList; - return this; - } - /** * Start an unnamed task. The results are undefined if {@link #stop()} * or timing methods are called without invoking this method. @@ -138,7 +125,7 @@ public class StopWatch { /** * Stop the current task. The results are undefined if timing * methods are called without invoking at least one pair - * {@link #start()} / {@link #stop()} methods. + * {@link #start()} / {@code #stop()} methods. * * @see #start() */ @@ -149,15 +136,17 @@ public class StopWatch { long lastTimeNS = System.nanoTime() - this.startTimeNS; this.totalTimeNS += lastTimeNS; this.lastTaskInfo = new TaskInfo(this.currentTaskName, TimeValue.nsecToMSec(lastTimeNS)); - if (this.keepTaskList) { - this.taskList.add(lastTaskInfo); - } - ++this.taskCount; + this.taskList.add(lastTaskInfo); this.running = false; this.currentTaskName = null; return this; } + public Releasable timing(String taskName) { + start(taskName); + return this::stop; + } + /** * Return whether the stop watch is currently running. */ @@ -175,16 +164,6 @@ public class StopWatch { return this.lastTaskInfo.getTime(); } - /** - * Return the name of the last task. - */ - public String lastTaskName() throws IllegalStateException { - if (this.lastTaskInfo == null) { - throw new IllegalStateException("No tests run: can't get last interval"); - } - return this.lastTaskInfo.getTaskName(); - } - /** * Return the total time for all tasks. */ @@ -192,21 +171,11 @@ public class StopWatch { return new TimeValue(totalTimeNS, TimeUnit.NANOSECONDS); } - /** - * Return the number of tasks timed. - */ - public int taskCount() { - return taskCount; - } - /** * Return an array of the data for tasks performed. */ public TaskInfo[] taskInfo() { - if (!this.keepTaskList) { - throw new UnsupportedOperationException("Task info is not being kept!"); - } - return this.taskList.toArray(new TaskInfo[this.taskList.size()]); + return this.taskList.toArray(new TaskInfo[0]); } /** @@ -223,23 +192,19 @@ public class StopWatch { public String prettyPrint() { StringBuilder sb = new StringBuilder(shortSummary()); sb.append('\n'); - if (!this.keepTaskList) { - sb.append("No task info kept"); - } else { - sb.append("-----------------------------------------\n"); - sb.append("ms % Task name\n"); - sb.append("-----------------------------------------\n"); - NumberFormat nf = NumberFormat.getNumberInstance(Locale.ROOT); - nf.setMinimumIntegerDigits(5); - nf.setGroupingUsed(false); - NumberFormat pf = NumberFormat.getPercentInstance(Locale.ROOT); - pf.setMinimumIntegerDigits(3); - pf.setGroupingUsed(false); - for (TaskInfo task : taskInfo()) { - sb.append(nf.format(task.getTime().millis())).append(" "); - sb.append(pf.format(task.getTime().secondsFrac() / totalTime().secondsFrac())).append(" "); - sb.append(task.getTaskName()).append("\n"); - } + sb.append("-----------------------------------------\n"); + sb.append("ms % Task name\n"); + sb.append("-----------------------------------------\n"); + NumberFormat nf = NumberFormat.getNumberInstance(Locale.ROOT); + nf.setMinimumIntegerDigits(5); + nf.setGroupingUsed(false); + NumberFormat pf = NumberFormat.getPercentInstance(Locale.ROOT); + pf.setMinimumIntegerDigits(3); + pf.setGroupingUsed(false); + for (TaskInfo task : taskInfo()) { + sb.append(nf.format(task.getTime().millis())).append(" "); + sb.append(pf.format(task.getTime().secondsFrac() / totalTime().secondsFrac())).append(" "); + sb.append(task.getTaskName()).append("\n"); } return sb.toString(); } @@ -251,14 +216,10 @@ public class StopWatch { @Override public String toString() { StringBuilder sb = new StringBuilder(shortSummary()); - if (this.keepTaskList) { - for (TaskInfo task : taskInfo()) { - sb.append("; [").append(task.getTaskName()).append("] took ").append(task.getTime()); - long percent = Math.round((100.0f * task.getTime().millis()) / totalTime().millis()); - sb.append(" = ").append(percent).append("%"); - } - } else { - sb.append("; no task info kept"); + for (TaskInfo task : taskInfo()) { + sb.append("; [").append(task.getTaskName()).append("] took ").append(task.getTime()); + long percent = Math.round((100.0f * task.getTime().millis()) / totalTime().millis()); + sb.append(" = ").append(percent).append("%"); } return sb.toString(); } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index f62c840e11b..cd89ff47181 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -484,6 +484,7 @@ public final class ClusterSettings extends AbstractScopedSettings { ElectionSchedulerFactory.ELECTION_MAX_TIMEOUT_SETTING, ElectionSchedulerFactory.ELECTION_DURATION_SETTING, Coordinator.PUBLISH_TIMEOUT_SETTING, + Coordinator.PUBLISH_INFO_TIMEOUT_SETTING, JoinHelper.JOIN_TIMEOUT_SETTING, FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING, FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING, diff --git a/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java b/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java index 04741cd87d0..570b1b60410 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java +++ b/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java @@ -248,6 +248,14 @@ public interface Scheduler { } } } + + @Override + public String toString() { + return "ReschedulingRunnable{" + + "runnable=" + runnable + + ", interval=" + interval + + '}'; + } } /** diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 437bbfd571f..9a0238960b5 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -1192,7 +1192,7 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase { } } - public void testLogsWarningPeriodicallyIfClusterNotFormed() { + public void testLogsWarningPeriodicallyIfClusterNotFormed() throws IllegalAccessException { final long warningDelayMillis; final Settings settings; if (randomBoolean()) { @@ -1220,16 +1220,10 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase { "waiting for leader failure"); for (int i = scaledRandomIntBetween(1, 10); i >= 0; i--) { - final MockLogAppender mockLogAppender; + final MockLogAppender mockLogAppender = new MockLogAppender(); try { - mockLogAppender = new MockLogAppender(); - } catch (IllegalAccessException e) { - throw new AssertionError(e); - } - - try { - Loggers.addAppender(LogManager.getLogger(ClusterFormationFailureHelper.class), mockLogAppender); mockLogAppender.start(); + Loggers.addAppender(LogManager.getLogger(ClusterFormationFailureHelper.class), mockLogAppender); mockLogAppender.addExpectation(new MockLogAppender.LoggingExpectation() { final Set nodesLogged = new HashSet<>(); @@ -1260,13 +1254,70 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase { cluster.runFor(warningDelayMillis + DEFAULT_DELAY_VARIABILITY, "waiting for warning to be emitted"); mockLogAppender.assertAllExpectationsMatched(); } finally { - mockLogAppender.stop(); Loggers.removeAppender(LogManager.getLogger(ClusterFormationFailureHelper.class), mockLogAppender); + mockLogAppender.stop(); } } } } + public void testLogsMessagesIfPublicationDelayed() throws IllegalAccessException { + try (Cluster cluster = new Cluster(between(3, 5))) { + cluster.runRandomly(); + cluster.stabilise(); + final ClusterNode brokenNode = cluster.getAnyNodeExcept(cluster.getAnyLeader()); + + final MockLogAppender mockLogAppender = new MockLogAppender(); + try { + mockLogAppender.start(); + Loggers.addAppender(LogManager.getLogger(Coordinator.CoordinatorPublication.class), mockLogAppender); + Loggers.addAppender(LogManager.getLogger(LagDetector.class), mockLogAppender); + + mockLogAppender.addExpectation(new MockLogAppender.SeenEventExpectation("publication info message", + Coordinator.CoordinatorPublication.class.getCanonicalName(), Level.INFO, + "after [*] publication of cluster state version [*] is still waiting for " + brokenNode.getLocalNode() + " [" + + Publication.PublicationTargetState.SENT_PUBLISH_REQUEST + ']')); + + mockLogAppender.addExpectation(new MockLogAppender.SeenEventExpectation("publication warning", + Coordinator.CoordinatorPublication.class.getCanonicalName(), Level.WARN, + "after [*] publication of cluster state version [*] is still waiting for " + brokenNode.getLocalNode() + " [" + + Publication.PublicationTargetState.SENT_PUBLISH_REQUEST + ']')); + + mockLogAppender.addExpectation(new MockLogAppender.SeenEventExpectation("lag warning", + LagDetector.class.getCanonicalName(), Level.WARN, + "node [" + brokenNode + "] is lagging at cluster state version [*], " + + "although publication of cluster state version [*] completed [*] ago")); + + // drop the publication messages to one node, but then restore connectivity so it remains in the cluster and does not fail + // health checks + brokenNode.blackhole(); + cluster.deterministicTaskQueue.scheduleAt( + cluster.deterministicTaskQueue.getCurrentTimeMillis() + DEFAULT_CLUSTER_STATE_UPDATE_DELAY, + new Runnable() { + @Override + public void run() { + brokenNode.heal(); + } + + @Override + public String toString() { + return "healing " + brokenNode; + } + }); + cluster.getAnyLeader().submitValue(randomLong()); + cluster.runFor(defaultMillis(PUBLISH_TIMEOUT_SETTING) + 2 * DEFAULT_DELAY_VARIABILITY + + defaultMillis(LagDetector.CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING), + "waiting for messages to be emitted"); + + mockLogAppender.assertAllExpectationsMatched(); + } finally { + Loggers.removeAppender(LogManager.getLogger(Coordinator.CoordinatorPublication.class), mockLogAppender); + Loggers.removeAppender(LogManager.getLogger(LagDetector.class), mockLogAppender); + mockLogAppender.stop(); + } + } + } + public void testReconfiguresToExcludeMasterIneligibleNodesInVotingConfig() { try (Cluster cluster = new Cluster(3)) { cluster.runRandomly(); diff --git a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java index c556f2a3a5e..4da5de7941e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java @@ -61,8 +61,8 @@ import static org.hamcrest.Matchers.is; public class ClusterApplierServiceTests extends ESTestCase { - protected static ThreadPool threadPool; - protected TimedClusterApplierService clusterApplierService; + private static ThreadPool threadPool; + private TimedClusterApplierService clusterApplierService; @BeforeClass public static void createThreadPool() { @@ -192,13 +192,15 @@ public class ClusterApplierServiceTests extends ESTestCase { "test2", ClusterApplierService.class.getCanonicalName(), Level.WARN, - "*cluster state applier task [test2] took [32s] which is above the warn threshold of *")); + "*cluster state applier task [test2] took [32s] which is above the warn threshold of [*]: " + + "[running task [test2]] took [*")); mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test4", ClusterApplierService.class.getCanonicalName(), Level.WARN, - "*cluster state applier task [test3] took [34s] which is above the warn threshold of *")); + "*cluster state applier task [test3] took [34s] which is above the warn threshold of [*]: " + + "[running task [test3]] took [*")); Logger clusterLogger = LogManager.getLogger(ClusterApplierService.class); Loggers.addAppender(clusterLogger, mockAppender); @@ -273,7 +275,7 @@ public class ClusterApplierServiceTests extends ESTestCase { mockAppender.assertAllExpectationsMatched(); } - public void testLocalNodeMasterListenerCallbacks() throws Exception { + public void testLocalNodeMasterListenerCallbacks() { TimedClusterApplierService timedClusterApplierService = createTimedClusterService(false); AtomicBoolean isMaster = new AtomicBoolean(); @@ -493,7 +495,7 @@ public class ClusterApplierServiceTests extends ESTestCase { static class TimedClusterApplierService extends ClusterApplierService { final ClusterSettings clusterSettings; - public volatile Long currentTimeOverride = null; + volatile Long currentTimeOverride = null; TimedClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { super("test_node", settings, clusterSettings, threadPool); diff --git a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java index 2bf043f6a19..e196be6a119 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java @@ -652,25 +652,25 @@ public class MasterServiceTests extends ESTestCase { "test1 shouldn't see because setting is too low", MasterService.class.getCanonicalName(), Level.WARN, - "*cluster state update task [test1] took [*] which is above the warn threshold of *")); + "*cluster state update task [test1] took [*] which is above the warn threshold of [*]")); mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test2", MasterService.class.getCanonicalName(), Level.WARN, - "*cluster state update task [test2] took [32s] which is above the warn threshold of *")); + "*cluster state update task [test2] took [32s] which is above the warn threshold of [*]")); mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test3", MasterService.class.getCanonicalName(), Level.WARN, - "*cluster state update task [test3] took [33s] which is above the warn threshold of *")); + "*cluster state update task [test3] took [33s] which is above the warn threshold of [*]")); mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test4", MasterService.class.getCanonicalName(), Level.WARN, - "*cluster state update task [test4] took [34s] which is above the warn threshold of *")); + "*cluster state update task [test4] took [34s] which is above the warn threshold of [*]")); Logger clusterLogger = LogManager.getLogger(MasterService.class); Loggers.addAppender(clusterLogger, mockAppender);