diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 2e98195da43..e9c98dd3aff 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -85,6 +85,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; import java.util.stream.StreamSupport; import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MASTER_BLOCK_ID; @@ -197,7 +198,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private ClusterFormationState getClusterFormationState() { return new ClusterFormationState(settings, getStateForMasterService(), peerFinder.getLastResolvedAddresses(), - StreamSupport.stream(peerFinder.getFoundPeers().spliterator(), false).collect(Collectors.toList()), getCurrentTerm()); + Stream.concat(Stream.of(getLocalNode()), StreamSupport.stream(peerFinder.getFoundPeers().spliterator(), false)) + .collect(Collectors.toList()), getCurrentTerm()); } private void onLeaderFailure(Exception e) { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index b5d93355224..2a06fdb275c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -23,6 +23,7 @@ import org.apache.logging.log4j.CloseableThreadContext; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.LogEvent; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; @@ -58,6 +59,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -1258,6 +1260,79 @@ public class CoordinatorTests extends ESTestCase { } } + public void testLogsWarningPeriodicallyIfClusterNotFormed() { + final long warningDelayMillis; + final Settings settings; + if (randomBoolean()) { + settings = Settings.EMPTY; + warningDelayMillis = ClusterFormationFailureHelper.DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING.get(settings).millis(); + } else { + warningDelayMillis = randomLongBetween(1, 100000); + settings = Settings.builder() + .put(ClusterFormationFailureHelper.DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING.getKey(), warningDelayMillis + "ms") + .build(); + } + logger.info("--> emitting warnings every [{}ms]", warningDelayMillis); + + final Cluster cluster = new Cluster(3, true, settings); + cluster.runRandomly(); + cluster.stabilise(); + + logger.info("--> disconnecting all nodes"); + + for (final ClusterNode clusterNode : cluster.clusterNodes) { + clusterNode.disconnect(); + } + + cluster.runFor(defaultMillis(LEADER_CHECK_INTERVAL_SETTING) + defaultMillis(LEADER_CHECK_TIMEOUT_SETTING), + "waiting for leader failure"); + + for (int i = scaledRandomIntBetween(1, 10); i >= 0; i--) { + final MockLogAppender mockLogAppender; + try { + mockLogAppender = new MockLogAppender(); + } catch (IllegalAccessException e) { + throw new AssertionError(e); + } + + try { + Loggers.addAppender(LogManager.getLogger(ClusterFormationFailureHelper.class), mockLogAppender); + mockLogAppender.start(); + mockLogAppender.addExpectation(new MockLogAppender.LoggingExpectation() { + final Set nodesLogged = new HashSet<>(); + + @Override + public void match(LogEvent event) { + final String message = event.getMessage().getFormattedMessage(); + assertThat(message, + startsWith("master not discovered or elected yet, an election requires at least 2 nodes with ids from [")); + + final List matchingNodes = cluster.clusterNodes.stream() + .filter(n -> event.getContextData().getValue(NODE_ID_LOG_CONTEXT_KEY) + .equals(getNodeIdForLogContext(n.getLocalNode()))).collect(Collectors.toList()); + assertThat(matchingNodes, hasSize(1)); + + assertTrue(Regex.simpleMatch("*have discovered *" + matchingNodes.get(0).toString() + "*discovery will continue*", + message)); + + nodesLogged.add(matchingNodes.get(0).getLocalNode()); + } + + @Override + public void assertMatched() { + assertThat(nodesLogged + " vs " + cluster.clusterNodes, nodesLogged, + equalTo(cluster.clusterNodes.stream().map(ClusterNode::getLocalNode).collect(Collectors.toSet()))); + } + }); + cluster.runFor(warningDelayMillis, "waiting for warning to be emitted"); + mockLogAppender.assertAllExpectationsMatched(); + } finally { + mockLogAppender.stop(); + Loggers.removeAppender(LogManager.getLogger(ClusterFormationFailureHelper.class), mockLogAppender); + } + } + } + private static long defaultMillis(Setting setting) { return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY; } @@ -2183,12 +2258,18 @@ public class CoordinatorTests extends ESTestCase { } } + private static final String NODE_ID_LOG_CONTEXT_KEY = "nodeId"; + + private static String getNodeIdForLogContext(DiscoveryNode node) { + return "{" + node.getId() + "}{" + node.getEphemeralId() + "}"; + } + public static Runnable onNodeLog(DiscoveryNode node, Runnable runnable) { - final String nodeId = "{" + node.getId() + "}{" + node.getEphemeralId() + "}"; + final String nodeId = getNodeIdForLogContext(node); return new Runnable() { @Override public void run() { - try (CloseableThreadContext.Instance ignored = CloseableThreadContext.put("nodeId", nodeId)) { + try (CloseableThreadContext.Instance ignored = CloseableThreadContext.put(NODE_ID_LOG_CONTEXT_KEY, nodeId)) { runnable.run(); } }