More logging for slow cluster state application (#45007)

Today the lag detector may remove nodes from the cluster if they fail to apply
a cluster state within a reasonable timeframe, but it is rather unclear from
the default logging that this has occurred and there is very little extra
information beyond the fact that the removed node was lagging. Moreover the
only forewarning that the lag detector might be invoked is a message indicating
that cluster state publication took unreasonably long, which does not contain
enough information to investigate the problem further.

This commit adds a good deal more detail to make the issues of slow nodes more
prominent:

- after 10 seconds (by default) we log an INFO message indicating that a
  publication is still waiting for responses from some nodes, including the
  identities of the problematic nodes.

- when the publication times out after 30 seconds (by default) we log a WARN
  message identifying the nodes that are still pending.

- the lag detector logs a more detailed warning when a fatally-lagging node is
  detected.

- if applying a cluster state takes too long then the cluster applier service
  logs a breakdown of all the tasks it ran as part of that process.
This commit is contained in:
David Turner 2019-08-01 08:21:40 +01:00
parent b3be8f75f0
commit 532ade7816
12 changed files with 199 additions and 109 deletions

View File

@ -192,6 +192,12 @@ or may become unstable or intolerant of certain failures.
time. The default value is `10`. See
<<modules-discovery-adding-removing-nodes>>.
`cluster.publish.info_timeout`::
Sets how long the master node waits for each cluster state update to be
completely published to all nodes before logging a message indicating that
some nodes are responding slowly. The default value is `10s`.
`cluster.publish.timeout`::
Sets how long the master node waits for each cluster state update to be

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.cluster.coordination;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
@ -99,6 +100,11 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private static final Logger logger = LogManager.getLogger(Coordinator.class);
// the timeout before emitting an info log about a slow-running publication
public static final Setting<TimeValue> PUBLISH_INFO_TIMEOUT_SETTING =
Setting.timeSetting("cluster.publish.info_timeout",
TimeValue.timeValueMillis(10000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);
// the timeout for the publication of each value
public static final Setting<TimeValue> PUBLISH_TIMEOUT_SETTING =
Setting.timeSetting("cluster.publish.timeout",
@ -126,6 +132,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final ElectionSchedulerFactory electionSchedulerFactory;
private final SeedHostsResolver configuredHostsResolver;
private final TimeValue publishTimeout;
private final TimeValue publishInfoTimeout;
private final PublicationTransportHandler publicationHandler;
private final LeaderChecker leaderChecker;
private final FollowersChecker followersChecker;
@ -173,6 +180,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
this.lastJoin = Optional.empty();
this.joinAccumulator = new InitialJoinAccumulator();
this.publishTimeout = PUBLISH_TIMEOUT_SETTING.get(settings);
this.publishInfoTimeout = PUBLISH_INFO_TIMEOUT_SETTING.get(settings);
this.random = random;
this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool());
this.preVoteCollector = new PreVoteCollector(transportService, this::startElection, this::updateMaxTermSeen, electionStrategy);
@ -1244,7 +1252,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final AckListener ackListener;
private final ActionListener<Void> publishListener;
private final PublicationTransportHandler.PublicationContext publicationContext;
private final Scheduler.ScheduledCancellable scheduledCancellable;
private final Scheduler.ScheduledCancellable timeoutHandler;
private final Scheduler.Cancellable infoTimeoutHandler;
// We may not have accepted our own state before receiving a join from another node, causing its join to be rejected (we cannot
// safely accept a join whose last-accepted term/version is ahead of ours), so store them up and process them at the end.
@ -1285,7 +1294,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
this.localNodeAckEvent = localNodeAckEvent;
this.ackListener = ackListener;
this.publishListener = publishListener;
this.scheduledCancellable = transportService.getThreadPool().schedule(new Runnable() {
this.timeoutHandler = transportService.getThreadPool().schedule(new Runnable() {
@Override
public void run() {
synchronized (mutex) {
@ -1298,6 +1308,20 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
return "scheduled timeout for " + CoordinatorPublication.this;
}
}, publishTimeout, Names.GENERIC);
this.infoTimeoutHandler = transportService.getThreadPool().schedule(new Runnable() {
@Override
public void run() {
synchronized (mutex) {
logIncompleteNodes(Level.INFO);
}
}
@Override
public String toString() {
return "scheduled timeout for reporting on " + CoordinatorPublication.this;
}
}, publishInfoTimeout, Names.GENERIC);
}
private void removePublicationAndPossiblyBecomeCandidate(String reason) {
@ -1339,7 +1363,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
synchronized (mutex) {
removePublicationAndPossiblyBecomeCandidate("clusterApplier#onNewClusterState");
}
scheduledCancellable.cancel();
timeoutHandler.cancel();
infoTimeoutHandler.cancel();
ackListener.onNodeAck(getLocalNode(), e);
publishListener.onFailure(e);
}
@ -1384,8 +1409,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
}
}
lagDetector.startLagDetector(publishRequest.getAcceptedState().version());
logIncompleteNodes(Level.WARN);
}
scheduledCancellable.cancel();
timeoutHandler.cancel();
infoTimeoutHandler.cancel();
ackListener.onNodeAck(getLocalNode(), null);
publishListener.onResponse(null);
}
@ -1396,7 +1423,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
public void onFailure(Exception e) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
removePublicationAndPossiblyBecomeCandidate("Publication.onCompletion(false)");
scheduledCancellable.cancel();
timeoutHandler.cancel();
infoTimeoutHandler.cancel();
final FailedToCommitClusterStateException exception = new FailedToCommitClusterStateException("publication failed", e);
ackListener.onNodeAck(getLocalNode(), exception); // other nodes have acked, but not the master.

View File

@ -162,7 +162,9 @@ public class LagDetector {
return;
}
logger.debug("{}, detected lag at version {}, node has only applied version {}", this, version, appliedVersion);
logger.warn(
"node [{}] is lagging at cluster state version [{}], although publication of cluster state version [{}] completed [{}] ago",
discoveryNode, appliedVersion, version, clusterStateApplicationTimeout);
onLagDetected.accept(discoveryNode);
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.coordination;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
@ -195,6 +196,16 @@ public abstract class Publication {
", version=" + publishRequest.getAcceptedState().version() + '}';
}
void logIncompleteNodes(Level level) {
final String message = publicationTargets.stream().filter(PublicationTarget::isActive).map(publicationTarget ->
publicationTarget.getDiscoveryNode() + " [" + publicationTarget.getState() + "]").collect(Collectors.joining(", "));
if (message.isEmpty() == false) {
final TimeValue elapsedTime = TimeValue.timeValueMillis(currentTimeSupplier.getAsLong() - startTime);
logger.log(level, "after [{}] publication of cluster state version [{}] is still waiting for {}", elapsedTime,
publishRequest.getAcceptedState().version(), message);
}
}
enum PublicationTargetState {
NOT_STARTED,
FAILED,
@ -213,6 +224,10 @@ public abstract class Publication {
this.discoveryNode = discoveryNode;
}
PublicationTargetState getState() {
return state;
}
@Override
public String toString() {
return "PublicationTarget{" +

View File

@ -35,7 +35,9 @@ import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -47,6 +49,7 @@ import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
@ -61,6 +64,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.elasticsearch.cluster.service.ClusterService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING;
@ -389,15 +393,18 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
final ClusterState previousClusterState = state.get();
long startTimeMS = currentTimeInMillis();
final StopWatch stopWatch = new StopWatch();
final ClusterState newClusterState;
try {
newClusterState = task.apply(previousClusterState);
try (Releasable ignored = stopWatch.timing("running task [" + task.source + ']')) {
newClusterState = task.apply(previousClusterState);
}
} catch (Exception e) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
logger.trace(() -> new ParameterizedMessage(
"failed to execute cluster state applier in [{}], state:\nversion [{}], source [{}]\n{}",
executionTime, previousClusterState.version(), task.source, previousClusterState), e);
warnAboutSlowTaskIfNeeded(executionTime, task.source);
warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch);
task.listener.onFailure(task.source, e);
return;
}
@ -405,7 +412,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
if (previousClusterState == newClusterState) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
logger.debug("processing [{}]: took [{}] no change in cluster state", task.source, executionTime);
warnAboutSlowTaskIfNeeded(executionTime, task.source);
warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch);
task.listener.onSuccess(task.source);
} else {
if (logger.isTraceEnabled()) {
@ -415,12 +422,12 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), task.source);
}
try {
applyChanges(task, previousClusterState, newClusterState);
applyChanges(task, previousClusterState, newClusterState, stopWatch);
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
logger.debug("processing [{}]: took [{}] done applying updated cluster state (version: {}, uuid: {})", task.source,
executionTime, newClusterState.version(),
newClusterState.stateUUID());
warnAboutSlowTaskIfNeeded(executionTime, task.source);
warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch);
task.listener.onSuccess(task.source);
} catch (Exception e) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
@ -438,7 +445,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
}
}
private void applyChanges(UpdateTask task, ClusterState previousClusterState, ClusterState newClusterState) {
private void applyChanges(UpdateTask task, ClusterState previousClusterState, ClusterState newClusterState, StopWatch stopWatch) {
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(task.source, newClusterState, previousClusterState);
// new cluster state, notify all listeners
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
@ -451,17 +458,21 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
}
logger.trace("connecting to nodes of cluster state with version {}", newClusterState.version());
connectToNodesAndWait(newClusterState);
try (Releasable ignored = stopWatch.timing("connecting to new nodes")) {
connectToNodesAndWait(newClusterState);
}
// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metaDataChanged()) {
logger.debug("applying settings from cluster state with version {}", newClusterState.version());
final Settings incomingSettings = clusterChangedEvent.state().metaData().settings();
clusterSettings.applySettings(incomingSettings);
try (Releasable ignored = stopWatch.timing("applying settings")) {
clusterSettings.applySettings(incomingSettings);
}
}
logger.debug("apply cluster state with version {}", newClusterState.version());
callClusterStateAppliers(clusterChangedEvent);
callClusterStateAppliers(clusterChangedEvent, stopWatch);
nodeConnectionsService.disconnectFromNodesExcept(newClusterState.nodes());
@ -474,7 +485,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
logger.debug("set locally applied cluster state to version {}", newClusterState.version());
state.set(newClusterState);
callClusterStateListeners(clusterChangedEvent);
callClusterStateListeners(clusterChangedEvent, stopWatch);
}
protected void connectToNodesAndWait(ClusterState newClusterState) {
@ -489,18 +500,22 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
}
}
private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent) {
private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch) {
clusterStateAppliers.forEach(applier -> {
logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version());
applier.applyClusterState(clusterChangedEvent);
try (Releasable ignored = stopWatch.timing("running applier [" + applier + "]")) {
applier.applyClusterState(clusterChangedEvent);
}
});
}
private void callClusterStateListeners(ClusterChangedEvent clusterChangedEvent) {
private void callClusterStateListeners(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch) {
Stream.concat(clusterStateListeners.stream(), timeoutClusterStateListeners.stream()).forEach(listener -> {
try {
logger.trace("calling [{}] with change to version [{}]", listener, clusterChangedEvent.state().version());
listener.clusterChanged(clusterChangedEvent);
try (Releasable ignored = stopWatch.timing("notifying listener [" + listener + "]")) {
listener.clusterChanged(clusterChangedEvent);
}
} catch (Exception ex) {
logger.warn("failed to notify ClusterStateListener", ex);
}
@ -538,10 +553,11 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
}
}
protected void warnAboutSlowTaskIfNeeded(TimeValue executionTime, String source) {
private void warnAboutSlowTaskIfNeeded(TimeValue executionTime, String source, StopWatch stopWatch) {
if (executionTime.getMillis() > slowTaskLoggingThreshold.getMillis()) {
logger.warn("cluster state applier task [{}] took [{}] which is above the warn threshold of {}", source, executionTime,
slowTaskLoggingThreshold);
logger.warn("cluster state applier task [{}] took [{}] which is above the warn threshold of [{}]: {}", source, executionTime,
slowTaskLoggingThreshold, Arrays.stream(stopWatch.taskInfo())
.map(ti -> '[' + ti.getTaskName() + "] took [" + ti.getTime().millis() + "ms]").collect(Collectors.joining(", ")));
}
}

View File

@ -569,7 +569,7 @@ public class MasterService extends AbstractLifecycleComponent {
protected void warnAboutSlowTaskIfNeeded(TimeValue executionTime, String source) {
if (executionTime.getMillis() > slowTaskLoggingThreshold.getMillis()) {
logger.warn("cluster state update task [{}] took [{}] which is above the warn threshold of {}", source, executionTime,
logger.warn("cluster state update task [{}] took [{}] which is above the warn threshold of [{}]", source, executionTime,
slowTaskLoggingThreshold);
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.common;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.unit.TimeValue;
import java.text.NumberFormat;
@ -51,8 +52,6 @@ public class StopWatch {
*/
private final String id;
private boolean keepTaskList = true;
private final List<TaskInfo> taskList = new LinkedList<>();
/**
@ -72,8 +71,6 @@ public class StopWatch {
private TaskInfo lastTaskInfo;
private int taskCount;
/**
* Total running time
*/
@ -98,16 +95,6 @@ public class StopWatch {
this.id = id;
}
/**
* Determine whether the TaskInfo array is built over time. Set this to
* "false" when using a StopWatch for millions of intervals, or the task
* info structure will consume excessive memory. Default is "true".
*/
public StopWatch keepTaskList(boolean keepTaskList) {
this.keepTaskList = keepTaskList;
return this;
}
/**
* Start an unnamed task. The results are undefined if {@link #stop()}
* or timing methods are called without invoking this method.
@ -138,7 +125,7 @@ public class StopWatch {
/**
* Stop the current task. The results are undefined if timing
* methods are called without invoking at least one pair
* {@link #start()} / {@link #stop()} methods.
* {@link #start()} / {@code #stop()} methods.
*
* @see #start()
*/
@ -149,15 +136,17 @@ public class StopWatch {
long lastTimeNS = System.nanoTime() - this.startTimeNS;
this.totalTimeNS += lastTimeNS;
this.lastTaskInfo = new TaskInfo(this.currentTaskName, TimeValue.nsecToMSec(lastTimeNS));
if (this.keepTaskList) {
this.taskList.add(lastTaskInfo);
}
++this.taskCount;
this.taskList.add(lastTaskInfo);
this.running = false;
this.currentTaskName = null;
return this;
}
public Releasable timing(String taskName) {
start(taskName);
return this::stop;
}
/**
* Return whether the stop watch is currently running.
*/
@ -175,16 +164,6 @@ public class StopWatch {
return this.lastTaskInfo.getTime();
}
/**
* Return the name of the last task.
*/
public String lastTaskName() throws IllegalStateException {
if (this.lastTaskInfo == null) {
throw new IllegalStateException("No tests run: can't get last interval");
}
return this.lastTaskInfo.getTaskName();
}
/**
* Return the total time for all tasks.
*/
@ -192,21 +171,11 @@ public class StopWatch {
return new TimeValue(totalTimeNS, TimeUnit.NANOSECONDS);
}
/**
* Return the number of tasks timed.
*/
public int taskCount() {
return taskCount;
}
/**
* Return an array of the data for tasks performed.
*/
public TaskInfo[] taskInfo() {
if (!this.keepTaskList) {
throw new UnsupportedOperationException("Task info is not being kept!");
}
return this.taskList.toArray(new TaskInfo[this.taskList.size()]);
return this.taskList.toArray(new TaskInfo[0]);
}
/**
@ -223,23 +192,19 @@ public class StopWatch {
public String prettyPrint() {
StringBuilder sb = new StringBuilder(shortSummary());
sb.append('\n');
if (!this.keepTaskList) {
sb.append("No task info kept");
} else {
sb.append("-----------------------------------------\n");
sb.append("ms % Task name\n");
sb.append("-----------------------------------------\n");
NumberFormat nf = NumberFormat.getNumberInstance(Locale.ROOT);
nf.setMinimumIntegerDigits(5);
nf.setGroupingUsed(false);
NumberFormat pf = NumberFormat.getPercentInstance(Locale.ROOT);
pf.setMinimumIntegerDigits(3);
pf.setGroupingUsed(false);
for (TaskInfo task : taskInfo()) {
sb.append(nf.format(task.getTime().millis())).append(" ");
sb.append(pf.format(task.getTime().secondsFrac() / totalTime().secondsFrac())).append(" ");
sb.append(task.getTaskName()).append("\n");
}
sb.append("-----------------------------------------\n");
sb.append("ms % Task name\n");
sb.append("-----------------------------------------\n");
NumberFormat nf = NumberFormat.getNumberInstance(Locale.ROOT);
nf.setMinimumIntegerDigits(5);
nf.setGroupingUsed(false);
NumberFormat pf = NumberFormat.getPercentInstance(Locale.ROOT);
pf.setMinimumIntegerDigits(3);
pf.setGroupingUsed(false);
for (TaskInfo task : taskInfo()) {
sb.append(nf.format(task.getTime().millis())).append(" ");
sb.append(pf.format(task.getTime().secondsFrac() / totalTime().secondsFrac())).append(" ");
sb.append(task.getTaskName()).append("\n");
}
return sb.toString();
}
@ -251,14 +216,10 @@ public class StopWatch {
@Override
public String toString() {
StringBuilder sb = new StringBuilder(shortSummary());
if (this.keepTaskList) {
for (TaskInfo task : taskInfo()) {
sb.append("; [").append(task.getTaskName()).append("] took ").append(task.getTime());
long percent = Math.round((100.0f * task.getTime().millis()) / totalTime().millis());
sb.append(" = ").append(percent).append("%");
}
} else {
sb.append("; no task info kept");
for (TaskInfo task : taskInfo()) {
sb.append("; [").append(task.getTaskName()).append("] took ").append(task.getTime());
long percent = Math.round((100.0f * task.getTime().millis()) / totalTime().millis());
sb.append(" = ").append(percent).append("%");
}
return sb.toString();
}

View File

@ -484,6 +484,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
ElectionSchedulerFactory.ELECTION_MAX_TIMEOUT_SETTING,
ElectionSchedulerFactory.ELECTION_DURATION_SETTING,
Coordinator.PUBLISH_TIMEOUT_SETTING,
Coordinator.PUBLISH_INFO_TIMEOUT_SETTING,
JoinHelper.JOIN_TIMEOUT_SETTING,
FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING,
FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING,

View File

@ -248,6 +248,14 @@ public interface Scheduler {
}
}
}
@Override
public String toString() {
return "ReschedulingRunnable{" +
"runnable=" + runnable +
", interval=" + interval +
'}';
}
}
/**

View File

@ -1192,7 +1192,7 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase {
}
}
public void testLogsWarningPeriodicallyIfClusterNotFormed() {
public void testLogsWarningPeriodicallyIfClusterNotFormed() throws IllegalAccessException {
final long warningDelayMillis;
final Settings settings;
if (randomBoolean()) {
@ -1220,16 +1220,10 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase {
"waiting for leader failure");
for (int i = scaledRandomIntBetween(1, 10); i >= 0; i--) {
final MockLogAppender mockLogAppender;
final MockLogAppender mockLogAppender = new MockLogAppender();
try {
mockLogAppender = new MockLogAppender();
} catch (IllegalAccessException e) {
throw new AssertionError(e);
}
try {
Loggers.addAppender(LogManager.getLogger(ClusterFormationFailureHelper.class), mockLogAppender);
mockLogAppender.start();
Loggers.addAppender(LogManager.getLogger(ClusterFormationFailureHelper.class), mockLogAppender);
mockLogAppender.addExpectation(new MockLogAppender.LoggingExpectation() {
final Set<DiscoveryNode> nodesLogged = new HashSet<>();
@ -1260,13 +1254,70 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase {
cluster.runFor(warningDelayMillis + DEFAULT_DELAY_VARIABILITY, "waiting for warning to be emitted");
mockLogAppender.assertAllExpectationsMatched();
} finally {
mockLogAppender.stop();
Loggers.removeAppender(LogManager.getLogger(ClusterFormationFailureHelper.class), mockLogAppender);
mockLogAppender.stop();
}
}
}
}
public void testLogsMessagesIfPublicationDelayed() throws IllegalAccessException {
try (Cluster cluster = new Cluster(between(3, 5))) {
cluster.runRandomly();
cluster.stabilise();
final ClusterNode brokenNode = cluster.getAnyNodeExcept(cluster.getAnyLeader());
final MockLogAppender mockLogAppender = new MockLogAppender();
try {
mockLogAppender.start();
Loggers.addAppender(LogManager.getLogger(Coordinator.CoordinatorPublication.class), mockLogAppender);
Loggers.addAppender(LogManager.getLogger(LagDetector.class), mockLogAppender);
mockLogAppender.addExpectation(new MockLogAppender.SeenEventExpectation("publication info message",
Coordinator.CoordinatorPublication.class.getCanonicalName(), Level.INFO,
"after [*] publication of cluster state version [*] is still waiting for " + brokenNode.getLocalNode() + " ["
+ Publication.PublicationTargetState.SENT_PUBLISH_REQUEST + ']'));
mockLogAppender.addExpectation(new MockLogAppender.SeenEventExpectation("publication warning",
Coordinator.CoordinatorPublication.class.getCanonicalName(), Level.WARN,
"after [*] publication of cluster state version [*] is still waiting for " + brokenNode.getLocalNode() + " ["
+ Publication.PublicationTargetState.SENT_PUBLISH_REQUEST + ']'));
mockLogAppender.addExpectation(new MockLogAppender.SeenEventExpectation("lag warning",
LagDetector.class.getCanonicalName(), Level.WARN,
"node [" + brokenNode + "] is lagging at cluster state version [*], " +
"although publication of cluster state version [*] completed [*] ago"));
// drop the publication messages to one node, but then restore connectivity so it remains in the cluster and does not fail
// health checks
brokenNode.blackhole();
cluster.deterministicTaskQueue.scheduleAt(
cluster.deterministicTaskQueue.getCurrentTimeMillis() + DEFAULT_CLUSTER_STATE_UPDATE_DELAY,
new Runnable() {
@Override
public void run() {
brokenNode.heal();
}
@Override
public String toString() {
return "healing " + brokenNode;
}
});
cluster.getAnyLeader().submitValue(randomLong());
cluster.runFor(defaultMillis(PUBLISH_TIMEOUT_SETTING) + 2 * DEFAULT_DELAY_VARIABILITY
+ defaultMillis(LagDetector.CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING),
"waiting for messages to be emitted");
mockLogAppender.assertAllExpectationsMatched();
} finally {
Loggers.removeAppender(LogManager.getLogger(Coordinator.CoordinatorPublication.class), mockLogAppender);
Loggers.removeAppender(LogManager.getLogger(LagDetector.class), mockLogAppender);
mockLogAppender.stop();
}
}
}
public void testReconfiguresToExcludeMasterIneligibleNodesInVotingConfig() {
try (Cluster cluster = new Cluster(3)) {
cluster.runRandomly();

View File

@ -61,8 +61,8 @@ import static org.hamcrest.Matchers.is;
public class ClusterApplierServiceTests extends ESTestCase {
protected static ThreadPool threadPool;
protected TimedClusterApplierService clusterApplierService;
private static ThreadPool threadPool;
private TimedClusterApplierService clusterApplierService;
@BeforeClass
public static void createThreadPool() {
@ -192,13 +192,15 @@ public class ClusterApplierServiceTests extends ESTestCase {
"test2",
ClusterApplierService.class.getCanonicalName(),
Level.WARN,
"*cluster state applier task [test2] took [32s] which is above the warn threshold of *"));
"*cluster state applier task [test2] took [32s] which is above the warn threshold of [*]: " +
"[running task [test2]] took [*"));
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"test4",
ClusterApplierService.class.getCanonicalName(),
Level.WARN,
"*cluster state applier task [test3] took [34s] which is above the warn threshold of *"));
"*cluster state applier task [test3] took [34s] which is above the warn threshold of [*]: " +
"[running task [test3]] took [*"));
Logger clusterLogger = LogManager.getLogger(ClusterApplierService.class);
Loggers.addAppender(clusterLogger, mockAppender);
@ -273,7 +275,7 @@ public class ClusterApplierServiceTests extends ESTestCase {
mockAppender.assertAllExpectationsMatched();
}
public void testLocalNodeMasterListenerCallbacks() throws Exception {
public void testLocalNodeMasterListenerCallbacks() {
TimedClusterApplierService timedClusterApplierService = createTimedClusterService(false);
AtomicBoolean isMaster = new AtomicBoolean();
@ -493,7 +495,7 @@ public class ClusterApplierServiceTests extends ESTestCase {
static class TimedClusterApplierService extends ClusterApplierService {
final ClusterSettings clusterSettings;
public volatile Long currentTimeOverride = null;
volatile Long currentTimeOverride = null;
TimedClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
super("test_node", settings, clusterSettings, threadPool);

View File

@ -652,25 +652,25 @@ public class MasterServiceTests extends ESTestCase {
"test1 shouldn't see because setting is too low",
MasterService.class.getCanonicalName(),
Level.WARN,
"*cluster state update task [test1] took [*] which is above the warn threshold of *"));
"*cluster state update task [test1] took [*] which is above the warn threshold of [*]"));
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"test2",
MasterService.class.getCanonicalName(),
Level.WARN,
"*cluster state update task [test2] took [32s] which is above the warn threshold of *"));
"*cluster state update task [test2] took [32s] which is above the warn threshold of [*]"));
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"test3",
MasterService.class.getCanonicalName(),
Level.WARN,
"*cluster state update task [test3] took [33s] which is above the warn threshold of *"));
"*cluster state update task [test3] took [33s] which is above the warn threshold of [*]"));
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"test4",
MasterService.class.getCanonicalName(),
Level.WARN,
"*cluster state update task [test4] took [34s] which is above the warn threshold of *"));
"*cluster state update task [test4] took [34s] which is above the warn threshold of [*]"));
Logger clusterLogger = LogManager.getLogger(MasterService.class);
Loggers.addAppender(clusterLogger, mockAppender);