From 45c77830181dacfe0e770238a4163dd19c39d1a7 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 26 Sep 2019 07:40:54 +0100 Subject: [PATCH] Warn on slow metadata persistence (#47130) Today if metadata persistence is excessively slow on a master-ineligible node then the `ClusterApplierService` emits a warning indicating that the `GatewayMetaState` applier was slow, but gives no further details. If it is excessively slow on a master-eligible node then we do not see any warning at all, although we might see other consequences such as a lagging node or a master failure. With this commit we emit a warning if metadata persistence takes longer than a configurable threshold, which defaults to `10s`. We also emit statistics that record how much index metadata was persisted and how much was skipped since this can help distinguish cases where IO was slow from cases where there are simply too many indices involved. Backport of #47005. --- .../common/settings/ClusterSettings.java | 2 + .../gateway/GatewayMetaState.java | 7 +- .../IncrementalClusterStateWriter.java | 63 +++++++++- .../IncrementalClusterStateWriterTests.java | 110 +++++++++++++++++- .../gateway/MockGatewayMetaState.java | 13 ++- 5 files changed, 187 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 69f4f19dd51..d2a338a5405 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -78,6 +78,7 @@ import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.gateway.IncrementalClusterStateWriter; import org.elasticsearch.http.HttpTransportSettings; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; @@ -245,6 +246,7 @@ public final class ClusterSettings extends AbstractScopedSettings { GatewayService.RECOVER_AFTER_MASTER_NODES_SETTING, GatewayService.RECOVER_AFTER_NODES_SETTING, GatewayService.RECOVER_AFTER_TIME_SETTING, + IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD, NetworkModule.HTTP_DEFAULT_TYPE_SETTING, NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING, NetworkModule.HTTP_TYPE_SETTING, diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java index f9433ee6059..35b22968459 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java @@ -92,8 +92,11 @@ public class GatewayMetaState { throw new ElasticsearchException("failed to load metadata", e); } final IncrementalClusterStateWriter incrementalClusterStateWriter - = new IncrementalClusterStateWriter(metaStateService, manifestClusterStateTuple.v1(), - prepareInitialClusterState(transportService, clusterService, manifestClusterStateTuple.v2())); + = new IncrementalClusterStateWriter(settings, clusterService.getClusterSettings(), metaStateService, + manifestClusterStateTuple.v1(), + prepareInitialClusterState(transportService, clusterService, manifestClusterStateTuple.v2()), + transportService.getThreadPool()::relativeTimeInMillis); + if (DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings).equals(DiscoveryModule.ZEN_DISCOVERY_TYPE)) { // only for tests that simulate mixed Zen1/Zen2 clusters, see Zen1IT if (isMasterOrDataNode(settings)) { diff --git a/server/src/main/java/org/elasticsearch/gateway/IncrementalClusterStateWriter.java b/server/src/main/java/org/elasticsearch/gateway/IncrementalClusterStateWriter.java index 5facb826a24..d015bcc5b6c 100644 --- a/server/src/main/java/org/elasticsearch/gateway/IncrementalClusterStateWriter.java +++ b/server/src/main/java/org/elasticsearch/gateway/IncrementalClusterStateWriter.java @@ -18,12 +18,18 @@ */ package org.elasticsearch.gateway; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.Manifest; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import java.util.ArrayList; @@ -33,11 +39,17 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.LongSupplier; /** * Tracks the metadata written to disk, allowing updated metadata to be written incrementally (i.e. only writing out the changed metadata). */ -class IncrementalClusterStateWriter { +public class IncrementalClusterStateWriter { + + private static final Logger logger = LogManager.getLogger(IncrementalClusterStateWriter.class); + + public static final Setting SLOW_WRITE_LOGGING_THRESHOLD = Setting.timeSetting("gateway.slow_write_logging_threshold", + TimeValue.timeValueSeconds(10), TimeValue.ZERO, Setting.Property.NodeScope, Setting.Property.Dynamic); private final MetaStateService metaStateService; @@ -46,13 +58,24 @@ class IncrementalClusterStateWriter { // no need to synchronize access to these fields. private Manifest previousManifest; private ClusterState previousClusterState; + private final LongSupplier relativeTimeMillisSupplier; private boolean incrementalWrite; - IncrementalClusterStateWriter(MetaStateService metaStateService, Manifest manifest, ClusterState clusterState) { + private volatile TimeValue slowWriteLoggingThreshold; + + IncrementalClusterStateWriter(Settings settings, ClusterSettings clusterSettings, MetaStateService metaStateService, Manifest manifest, + ClusterState clusterState, LongSupplier relativeTimeMillisSupplier) { this.metaStateService = metaStateService; this.previousManifest = manifest; this.previousClusterState = clusterState; + this.relativeTimeMillisSupplier = relativeTimeMillisSupplier; this.incrementalWrite = false; + this.slowWriteLoggingThreshold = SLOW_WRITE_LOGGING_THRESHOLD.get(settings); + clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold); + } + + private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) { + this.slowWriteLoggingThreshold = slowWriteLoggingThreshold; } void setCurrentTerm(long currentTerm) throws WriteStateException { @@ -85,14 +108,26 @@ class IncrementalClusterStateWriter { void updateClusterState(ClusterState newState, ClusterState previousState) throws WriteStateException { MetaData newMetaData = newState.metaData(); + final long startTimeMillis = relativeTimeMillisSupplier.getAsLong(); + final AtomicClusterStateWriter writer = new AtomicClusterStateWriter(metaStateService, previousManifest); long globalStateGeneration = writeGlobalState(writer, newMetaData); Map indexGenerations = writeIndicesMetadata(writer, newState, previousState); Manifest manifest = new Manifest(previousManifest.getCurrentTerm(), newState.version(), globalStateGeneration, indexGenerations); writeManifest(writer, manifest); - previousManifest = manifest; previousClusterState = newState; + + final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis; + final TimeValue finalSlowWriteLoggingThreshold = this.slowWriteLoggingThreshold; + if (durationMillis >= finalSlowWriteLoggingThreshold.getMillis()) { + logger.warn("writing cluster state took [{}ms] which is above the warn threshold of [{}]; " + + "wrote metadata for [{}] indices and skipped [{}] unchanged indices", + durationMillis, finalSlowWriteLoggingThreshold, writer.getIndicesWritten(), writer.getIndicesSkipped()); + } else { + logger.debug("writing cluster state took [{}ms]; wrote metadata for [{}] indices and skipped [{}] unchanged indices", + durationMillis, writer.getIndicesWritten(), writer.getIndicesSkipped()); + } } private void writeManifest(AtomicClusterStateWriter writer, Manifest manifest) throws WriteStateException { @@ -256,6 +291,9 @@ class IncrementalClusterStateWriter { private final MetaStateService metaStateService; private boolean finished; + private int indicesWritten; + private int indicesSkipped; + AtomicClusterStateWriter(MetaStateService metaStateService, Manifest previousManifest) { this.metaStateService = metaStateService; assert previousManifest != null; @@ -320,6 +358,22 @@ class IncrementalClusterStateWriter { rollbackCleanupActions.forEach(Runnable::run); finished = true; } + + void incrementIndicesWritten() { + indicesWritten++; + } + + void incrementIndicesSkipped() { + indicesSkipped++; + } + + int getIndicesWritten() { + return indicesWritten; + } + + int getIndicesSkipped() { + return indicesSkipped; + } } static class KeepPreviousGeneration implements IndexMetaDataAction { @@ -338,6 +392,7 @@ class IncrementalClusterStateWriter { @Override public long execute(AtomicClusterStateWriter writer) { + writer.incrementIndicesSkipped(); return generation; } } @@ -356,6 +411,7 @@ class IncrementalClusterStateWriter { @Override public long execute(AtomicClusterStateWriter writer) throws WriteStateException { + writer.incrementIndicesWritten(); return writer.writeIndex("freshly created", indexMetaData); } } @@ -376,6 +432,7 @@ class IncrementalClusterStateWriter { @Override public long execute(AtomicClusterStateWriter writer) throws WriteStateException { + writer.incrementIndicesWritten(); return writer.writeIndex( "version changed from [" + oldIndexMetaData.getVersion() + "] to [" + newIndexMetaData.getVersion() + "]", newIndexMetaData); diff --git a/server/src/test/java/org/elasticsearch/gateway/IncrementalClusterStateWriterTests.java b/server/src/test/java/org/elasticsearch/gateway/IncrementalClusterStateWriterTests.java index b41a24bb820..d5a03dee70e 100644 --- a/server/src/test/java/org/elasticsearch/gateway/IncrementalClusterStateWriterTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/IncrementalClusterStateWriterTests.java @@ -18,26 +18,35 @@ */ package org.elasticsearch.gateway; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.store.Directory; import org.apache.lucene.store.MockDirectoryWrapper; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.Manifest; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; +import org.elasticsearch.test.MockLogAppender; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.mockito.ArgumentCaptor; import java.io.IOException; @@ -48,15 +57,18 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.lessThan; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; public class IncrementalClusterStateWriterTests extends ESAllocationTestCase { @@ -250,13 +262,19 @@ public class IncrementalClusterStateWriterTests extends ESAllocationTestCase { assertThat(actions, hasSize(3)); + boolean keptPreviousGeneration = false; + boolean wroteNewIndex = false; + boolean wroteChangedIndex = false; + for (IncrementalClusterStateWriter.IndexMetaDataAction action : actions) { if (action instanceof IncrementalClusterStateWriter.KeepPreviousGeneration) { assertThat(action.getIndex(), equalTo(notChangedIndex.getIndex())); IncrementalClusterStateWriter.AtomicClusterStateWriter writer = mock(IncrementalClusterStateWriter.AtomicClusterStateWriter.class); assertThat(action.execute(writer), equalTo(3L)); - verifyZeroInteractions(writer); + verify(writer, times(1)).incrementIndicesSkipped(); + verifyNoMoreInteractions(writer); + keptPreviousGeneration = true; } if (action instanceof IncrementalClusterStateWriter.WriteNewIndexMetaData) { assertThat(action.getIndex(), equalTo(newIndex.getIndex())); @@ -264,6 +282,8 @@ public class IncrementalClusterStateWriterTests extends ESAllocationTestCase { = mock(IncrementalClusterStateWriter.AtomicClusterStateWriter.class); when(writer.writeIndex("freshly created", newIndex)).thenReturn(0L); assertThat(action.execute(writer), equalTo(0L)); + verify(writer, times(1)).incrementIndicesWritten(); + wroteNewIndex = true; } if (action instanceof IncrementalClusterStateWriter.WriteChangedIndexMetaData) { assertThat(action.getIndex(), equalTo(newVersionChangedIndex.getIndex())); @@ -273,10 +293,16 @@ public class IncrementalClusterStateWriterTests extends ESAllocationTestCase { assertThat(action.execute(writer), equalTo(3L)); ArgumentCaptor reason = ArgumentCaptor.forClass(String.class); verify(writer).writeIndex(reason.capture(), eq(newVersionChangedIndex)); + verify(writer, times(1)).incrementIndicesWritten(); assertThat(reason.getValue(), containsString(Long.toString(versionChangedIndex.getVersion()))); assertThat(reason.getValue(), containsString(Long.toString(newVersionChangedIndex.getVersion()))); + wroteChangedIndex = true; } } + + assertTrue(keptPreviousGeneration); + assertTrue(wroteNewIndex); + assertTrue(wroteChangedIndex); } private static class MetaStateServiceWithFailures extends MetaStateService { @@ -426,4 +452,84 @@ public class IncrementalClusterStateWriterTests extends ESAllocationTestCase { assertTrue(possibleMetaData.stream().anyMatch(md -> metaDataEquals(md, loadedMetaData))); } } + + @TestLogging(value = "org.elasticsearch.gateway:WARN", reason = "to ensure that we log gateway events on WARN level") + public void testSlowLogging() throws WriteStateException, IllegalAccessException { + final long slowWriteLoggingThresholdMillis; + final Settings settings; + if (randomBoolean()) { + slowWriteLoggingThresholdMillis = IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD.get(Settings.EMPTY).millis(); + settings = Settings.EMPTY; + } else { + slowWriteLoggingThresholdMillis = randomLongBetween(2, 100000); + settings = Settings.builder() + .put(IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD.getKey(), slowWriteLoggingThresholdMillis + "ms") + .build(); + } + + final DiscoveryNode localNode = newNode("node"); + final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId())).build(); + + final long startTimeMillis = randomLongBetween(0L, Long.MAX_VALUE - slowWriteLoggingThresholdMillis * 10); + final AtomicLong currentTime = new AtomicLong(startTimeMillis); + final AtomicLong writeDurationMillis = new AtomicLong(slowWriteLoggingThresholdMillis); + + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final IncrementalClusterStateWriter incrementalClusterStateWriter + = new IncrementalClusterStateWriter(settings, clusterSettings, mock(MetaStateService.class), + new Manifest(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), Collections.emptyMap()), + clusterState, () -> currentTime.getAndAdd(writeDurationMillis.get())); + + assertExpectedLogs(clusterState, incrementalClusterStateWriter, new MockLogAppender.SeenEventExpectation( + "should see warning at threshold", + IncrementalClusterStateWriter.class.getCanonicalName(), + Level.WARN, + "writing cluster state took [*] which is above the warn threshold of [*]; " + + "wrote metadata for [0] indices and skipped [0] unchanged indices")); + + writeDurationMillis.set(randomLongBetween(slowWriteLoggingThresholdMillis, slowWriteLoggingThresholdMillis * 2)); + assertExpectedLogs(clusterState, incrementalClusterStateWriter, new MockLogAppender.SeenEventExpectation( + "should see warning above threshold", + IncrementalClusterStateWriter.class.getCanonicalName(), + Level.WARN, + "writing cluster state took [*] which is above the warn threshold of [*]; " + + "wrote metadata for [0] indices and skipped [0] unchanged indices")); + + writeDurationMillis.set(randomLongBetween(1, slowWriteLoggingThresholdMillis - 1)); + assertExpectedLogs(clusterState, incrementalClusterStateWriter, new MockLogAppender.UnseenEventExpectation( + "should not see warning below threshold", + IncrementalClusterStateWriter.class.getCanonicalName(), + Level.WARN, + "*")); + + clusterSettings.applySettings(Settings.builder() + .put(IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD.getKey(), writeDurationMillis.get() + "ms") + .build()); + assertExpectedLogs(clusterState, incrementalClusterStateWriter, new MockLogAppender.SeenEventExpectation( + "should see warning at reduced threshold", + IncrementalClusterStateWriter.class.getCanonicalName(), + Level.WARN, + "writing cluster state took [*] which is above the warn threshold of [*]; " + + "wrote metadata for [0] indices and skipped [0] unchanged indices")); + + assertThat(currentTime.get(), lessThan(startTimeMillis + 10 * slowWriteLoggingThresholdMillis)); // ensure no overflow + } + + private void assertExpectedLogs(ClusterState clusterState, IncrementalClusterStateWriter incrementalClusterStateWriter, + MockLogAppender.LoggingExpectation expectation) throws IllegalAccessException, WriteStateException { + MockLogAppender mockAppender = new MockLogAppender(); + mockAppender.start(); + mockAppender.addExpectation(expectation); + Logger classLogger = LogManager.getLogger(IncrementalClusterStateWriter.class); + Loggers.addAppender(classLogger, mockAppender); + + try { + incrementalClusterStateWriter.updateClusterState(clusterState, clusterState); + } finally { + Loggers.removeAppender(classLogger, mockAppender); + mockAppender.stop(); + } + mockAppender.assertAllExpectationsMatched(); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java b/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java index b66b5ea3ee2..b73a90b4284 100644 --- a/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java +++ b/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java @@ -23,12 +23,17 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.plugins.MetaDataUpgrader; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + /** * {@link GatewayMetaState} constructor accepts a lot of arguments. * It's not always easy / convenient to construct these dependencies. @@ -55,6 +60,12 @@ public class MockGatewayMetaState extends GatewayMetaState { } public void start(Settings settings, NodeEnvironment nodeEnvironment, NamedXContentRegistry xContentRegistry) { - start(settings, null, null, new MetaStateService(nodeEnvironment, xContentRegistry), null, null); + final TransportService transportService = mock(TransportService.class); + when(transportService.getThreadPool()).thenReturn(mock(ThreadPool.class)); + final ClusterService clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()) + .thenReturn(new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + start(settings, transportService, clusterService, new MetaStateService(nodeEnvironment, xContentRegistry), + null, null); } }