diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/DetachClusterCommand.java b/server/src/main/java/org/elasticsearch/cluster/coordination/DetachClusterCommand.java index 2101cf5a135..994c2c194c7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/DetachClusterCommand.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/DetachClusterCommand.java @@ -51,7 +51,7 @@ public class DetachClusterCommand extends ElasticsearchNodeCommand { @Override protected void processNodePaths(Terminal terminal, Path[] dataPaths, int nodeLockId, OptionSet options, Environment env) throws IOException { - final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(dataPaths); + final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(env.settings(), dataPaths); terminal.println(Terminal.Verbosity.VERBOSE, "Loading cluster state"); final ClusterState oldClusterState = loadTermAndClusterState(persistedClusterStateService, env).v2(); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommand.java b/server/src/main/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommand.java index cd6178154a2..932194014d2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommand.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommand.java @@ -32,6 +32,8 @@ import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; @@ -76,14 +78,15 @@ public abstract class ElasticsearchNodeCommand extends EnvironmentAwareCommand { .withRequiredArg().ofType(Integer.class); } - public static PersistedClusterStateService createPersistedClusterStateService(Path[] dataPaths) throws IOException { + public static PersistedClusterStateService createPersistedClusterStateService(Settings settings, Path[] dataPaths) throws IOException { final NodeMetaData nodeMetaData = PersistedClusterStateService.nodeMetaData(dataPaths); if (nodeMetaData == null) { throw new ElasticsearchException(NO_NODE_METADATA_FOUND_MSG); } String nodeId = nodeMetaData.nodeId(); - return new PersistedClusterStateService(dataPaths, nodeId, namedXContentRegistry, BigArrays.NON_RECYCLING_INSTANCE, true); + return new PersistedClusterStateService(dataPaths, nodeId, namedXContentRegistry, BigArrays.NON_RECYCLING_INSTANCE, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L, true); } public static ClusterState clusterState(Environment environment, PersistedClusterStateService.OnDiskState onDiskState) { diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/RemoveSettingsCommand.java b/server/src/main/java/org/elasticsearch/cluster/coordination/RemoveSettingsCommand.java index 41d9e164b31..66989fcdff3 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/RemoveSettingsCommand.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/RemoveSettingsCommand.java @@ -62,7 +62,7 @@ public class RemoveSettingsCommand extends ElasticsearchNodeCommand { throw new UserException(ExitCodes.USAGE, "Must supply at least one setting to remove"); } - final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(dataPaths); + final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(env.settings(), dataPaths); terminal.println(Terminal.Verbosity.VERBOSE, "Loading cluster state"); final Tuple termAndClusterState = loadTermAndClusterState(persistedClusterStateService, env); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/UnsafeBootstrapMasterCommand.java b/server/src/main/java/org/elasticsearch/cluster/coordination/UnsafeBootstrapMasterCommand.java index 66bcdf5c8f5..f6bee26f7f7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/UnsafeBootstrapMasterCommand.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/UnsafeBootstrapMasterCommand.java @@ -79,7 +79,7 @@ public class UnsafeBootstrapMasterCommand extends ElasticsearchNodeCommand { protected void processNodePaths(Terminal terminal, Path[] dataPaths, int nodeLockId, OptionSet options, Environment env) throws IOException { - final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(dataPaths); + final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(env.settings(), dataPaths); final Tuple state = loadTermAndClusterState(persistedClusterStateService, env); final ClusterState oldClusterState = state.v2(); diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 9397f0116a1..29daa0a8192 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -79,7 +79,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.DanglingIndicesState; import org.elasticsearch.gateway.GatewayService; -import org.elasticsearch.gateway.IncrementalClusterStateWriter; +import org.elasticsearch.gateway.PersistedClusterStateService; import org.elasticsearch.http.HttpTransportSettings; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; @@ -109,9 +109,9 @@ import org.elasticsearch.search.SearchService; import org.elasticsearch.search.aggregations.MultiBucketConsumerService; import org.elasticsearch.search.fetch.subphase.highlight.FastVectorHighlighter; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.ProxyConnectionStrategy; import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.RemoteConnectionStrategy; -import org.elasticsearch.transport.ProxyConnectionStrategy; import org.elasticsearch.transport.SniffConnectionStrategy; import org.elasticsearch.transport.TransportSettings; import org.elasticsearch.watcher.ResourceWatcherService; @@ -252,7 +252,7 @@ public final class ClusterSettings extends AbstractScopedSettings { GatewayService.RECOVER_AFTER_MASTER_NODES_SETTING, GatewayService.RECOVER_AFTER_NODES_SETTING, GatewayService.RECOVER_AFTER_TIME_SETTING, - IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD, + PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD, NetworkModule.HTTP_DEFAULT_TYPE_SETTING, NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING, NetworkModule.HTTP_TYPE_SETTING, diff --git a/server/src/main/java/org/elasticsearch/env/NodeRepurposeCommand.java b/server/src/main/java/org/elasticsearch/env/NodeRepurposeCommand.java index c07f32d2512..dc7ebbda695 100644 --- a/server/src/main/java/org/elasticsearch/env/NodeRepurposeCommand.java +++ b/server/src/main/java/org/elasticsearch/env/NodeRepurposeCommand.java @@ -96,7 +96,7 @@ public class NodeRepurposeCommand extends ElasticsearchNodeCommand { Set indexPaths = uniqueParentPaths(shardDataPaths, indexMetaDataPaths); - final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(dataPaths); + final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(env.settings(), dataPaths); final MetaData metaData = loadClusterState(terminal, env, persistedClusterStateService).metaData(); if (indexPaths.isEmpty() && metaData.indices().isEmpty()) { @@ -134,7 +134,7 @@ public class NodeRepurposeCommand extends ElasticsearchNodeCommand { return; } - final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(dataPaths); + final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(env.settings(), dataPaths); final MetaData metaData = loadClusterState(terminal, env, persistedClusterStateService).metaData(); diff --git a/server/src/main/java/org/elasticsearch/gateway/IncrementalClusterStateWriter.java b/server/src/main/java/org/elasticsearch/gateway/IncrementalClusterStateWriter.java index 6ff97b6a9d2..dea3c629450 100644 --- a/server/src/main/java/org/elasticsearch/gateway/IncrementalClusterStateWriter.java +++ b/server/src/main/java/org/elasticsearch/gateway/IncrementalClusterStateWriter.java @@ -27,7 +27,6 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.settings.ClusterSettings; -import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; @@ -47,9 +46,6 @@ public class IncrementalClusterStateWriter { private static final Logger logger = LogManager.getLogger(IncrementalClusterStateWriter.class); - public static final Setting SLOW_WRITE_LOGGING_THRESHOLD = Setting.timeSetting("gateway.slow_write_logging_threshold", - TimeValue.timeValueSeconds(10), TimeValue.ZERO, Setting.Property.NodeScope, Setting.Property.Dynamic); - private final MetaStateService metaStateService; // We call updateClusterState on the (unique) cluster applier thread so there's no need to synchronize access to these fields. @@ -67,8 +63,9 @@ public class IncrementalClusterStateWriter { this.previousClusterState = clusterState; this.relativeTimeMillisSupplier = relativeTimeMillisSupplier; this.incrementalWrite = false; - this.slowWriteLoggingThreshold = SLOW_WRITE_LOGGING_THRESHOLD.get(settings); - clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold); + this.slowWriteLoggingThreshold = PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD.get(settings); + clusterSettings.addSettingsUpdateConsumer(PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD, + this::setSlowWriteLoggingThreshold); } private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) { diff --git a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java index ec064550d4d..ffef1707540 100644 --- a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java @@ -57,6 +57,9 @@ import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -83,6 +86,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.IntPredicate; +import java.util.function.LongSupplier; +import java.util.function.Supplier; /** * Stores cluster metadata in a bare Lucene index (per data path) split across a number of documents. This is used by master-eligible nodes @@ -124,23 +129,39 @@ public class PersistedClusterStateService { public static final String METADATA_DIRECTORY_NAME = MetaDataStateFormat.STATE_DIR_NAME; + public static final Setting SLOW_WRITE_LOGGING_THRESHOLD = Setting.timeSetting("gateway.slow_write_logging_threshold", + TimeValue.timeValueSeconds(10), TimeValue.ZERO, Setting.Property.NodeScope, Setting.Property.Dynamic); + private final Path[] dataPaths; private final String nodeId; private final NamedXContentRegistry namedXContentRegistry; private final BigArrays bigArrays; private final boolean preserveUnknownCustoms; + private final LongSupplier relativeTimeMillisSupplier; - public PersistedClusterStateService(NodeEnvironment nodeEnvironment, NamedXContentRegistry namedXContentRegistry, BigArrays bigArrays) { - this(nodeEnvironment.nodeDataPaths(), nodeEnvironment.nodeId(), namedXContentRegistry, bigArrays, false); + private volatile TimeValue slowWriteLoggingThreshold; + + public PersistedClusterStateService(NodeEnvironment nodeEnvironment, NamedXContentRegistry namedXContentRegistry, BigArrays bigArrays, + ClusterSettings clusterSettings, LongSupplier relativeTimeMillisSupplier) { + this(nodeEnvironment.nodeDataPaths(), nodeEnvironment.nodeId(), namedXContentRegistry, bigArrays, clusterSettings, + relativeTimeMillisSupplier, false); } - public PersistedClusterStateService(Path[] dataPaths, String nodeId, NamedXContentRegistry namedXContentRegistry, - BigArrays bigArrays, boolean preserveUnknownCustoms) { + public PersistedClusterStateService(Path[] dataPaths, String nodeId, NamedXContentRegistry namedXContentRegistry, BigArrays bigArrays, + ClusterSettings clusterSettings, LongSupplier relativeTimeMillisSupplier, + boolean preserveUnknownCustoms) { this.dataPaths = dataPaths; this.nodeId = nodeId; this.namedXContentRegistry = namedXContentRegistry; this.bigArrays = bigArrays; + this.relativeTimeMillisSupplier = relativeTimeMillisSupplier; this.preserveUnknownCustoms = preserveUnknownCustoms; + this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD); + clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold); + } + + private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) { + this.slowWriteLoggingThreshold = slowWriteLoggingThreshold; } public String getNodeId() { @@ -169,7 +190,7 @@ public class PersistedClusterStateService { IOUtils.closeWhileHandlingException(closeables); } } - return new Writer(metaDataIndexWriters, nodeId, bigArrays); + return new Writer(metaDataIndexWriters, nodeId, bigArrays, relativeTimeMillisSupplier, () -> slowWriteLoggingThreshold); } private static IndexWriter createIndexWriter(Directory directory, boolean openExisting) throws IOException { @@ -524,14 +545,19 @@ public class PersistedClusterStateService { private final List metaDataIndexWriters; private final String nodeId; private final BigArrays bigArrays; + private final LongSupplier relativeTimeMillisSupplier; + private final Supplier slowWriteLoggingThresholdSupplier; boolean fullStateWritten = false; private final AtomicBoolean closed = new AtomicBoolean(); - private Writer(List metaDataIndexWriters, String nodeId, BigArrays bigArrays) { + private Writer(List metaDataIndexWriters, String nodeId, BigArrays bigArrays, + LongSupplier relativeTimeMillisSupplier, Supplier slowWriteLoggingThresholdSupplier) { this.metaDataIndexWriters = metaDataIndexWriters; this.nodeId = nodeId; this.bigArrays = bigArrays; + this.relativeTimeMillisSupplier = relativeTimeMillisSupplier; + this.slowWriteLoggingThresholdSupplier = slowWriteLoggingThresholdSupplier; } private void ensureOpen() { @@ -561,9 +587,21 @@ public class PersistedClusterStateService { public void writeFullStateAndCommit(long currentTerm, ClusterState clusterState) throws IOException { ensureOpen(); try { - overwriteMetaData(clusterState.metaData()); + final long startTimeMillis = relativeTimeMillisSupplier.getAsLong(); + final WriterStats stats = overwriteMetaData(clusterState.metaData()); commit(currentTerm, clusterState.version()); fullStateWritten = true; + final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis; + final TimeValue finalSlowWriteLoggingThreshold = slowWriteLoggingThresholdSupplier.get(); + if (durationMillis >= finalSlowWriteLoggingThreshold.getMillis()) { + logger.warn("writing cluster state took [{}ms] which is above the warn threshold of [{}]; " + + "wrote full state with [{}] indices", + durationMillis, finalSlowWriteLoggingThreshold, stats.numIndicesUpdated); + } else { + logger.debug("writing cluster state took [{}ms]; " + + "wrote full state with [{}] indices", + durationMillis, stats.numIndicesUpdated); + } } finally { closeIfAnyIndexWriterHasTragedyOrIsClosed(); } @@ -577,8 +615,21 @@ public class PersistedClusterStateService { ensureOpen(); assert fullStateWritten : "Need to write full state first before doing incremental writes"; try { - updateMetaData(previousClusterState.metaData(), clusterState.metaData()); + final long startTimeMillis = relativeTimeMillisSupplier.getAsLong(); + final WriterStats stats = updateMetaData(previousClusterState.metaData(), clusterState.metaData()); commit(currentTerm, clusterState.version()); + final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis; + final TimeValue finalSlowWriteLoggingThreshold = slowWriteLoggingThresholdSupplier.get(); + if (durationMillis >= finalSlowWriteLoggingThreshold.getMillis()) { + logger.warn("writing cluster state took [{}ms] which is above the warn threshold of [{}]; " + + "wrote global metadata [{}] and metadata for [{}] indices and skipped [{}] unchanged indices", + durationMillis, finalSlowWriteLoggingThreshold, stats.globalMetaUpdated, stats.numIndicesUpdated, + stats.numIndicesUnchanged); + } else { + logger.debug("writing cluster state took [{}ms]; " + + "wrote global metadata [{}] and metadata for [{}] indices and skipped [{}] unchanged indices", + durationMillis, stats.globalMetaUpdated, stats.numIndicesUpdated, stats.numIndicesUnchanged); + } } finally { closeIfAnyIndexWriterHasTragedyOrIsClosed(); } @@ -588,12 +639,13 @@ public class PersistedClusterStateService { * Update the persisted metadata to match the given cluster state by removing any stale or unnecessary documents and adding any * updated documents. */ - private void updateMetaData(MetaData previouslyWrittenMetaData, MetaData metaData) throws IOException { + private WriterStats updateMetaData(MetaData previouslyWrittenMetaData, MetaData metaData) throws IOException { assert previouslyWrittenMetaData.coordinationMetaData().term() == metaData.coordinationMetaData().term(); logger.trace("currentTerm [{}] matches previous currentTerm, writing changes only", metaData.coordinationMetaData().term()); - if (MetaData.isGlobalStateEquals(previouslyWrittenMetaData, metaData) == false) { + final boolean updateGlobalMeta = MetaData.isGlobalStateEquals(previouslyWrittenMetaData, metaData) == false; + if (updateGlobalMeta) { try (ReleasableDocument globalMetaDataDocument = makeGlobalMetaDataDocument(metaData)) { for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) { metaDataIndexWriter.updateGlobalMetaData(globalMetaDataDocument.getDocument()); @@ -608,18 +660,22 @@ public class PersistedClusterStateService { assert previousValue == null : indexMetaData.getIndexUUID() + " already mapped to " + previousValue; } + int numIndicesUpdated = 0; + int numIndicesUnchanged = 0; for (ObjectCursor cursor : metaData.indices().values()) { final IndexMetaData indexMetaData = cursor.value; final Long previousVersion = indexMetaDataVersionByUUID.get(indexMetaData.getIndexUUID()); if (previousVersion == null || indexMetaData.getVersion() != previousVersion) { logger.trace("updating metadata for [{}], changing version from [{}] to [{}]", indexMetaData.getIndex(), previousVersion, indexMetaData.getVersion()); + numIndicesUpdated++; try (ReleasableDocument indexMetaDataDocument = makeIndexMetaDataDocument(indexMetaData)) { for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) { metaDataIndexWriter.updateIndexMetaDataDocument(indexMetaDataDocument.getDocument(), indexMetaData.getIndex()); } } } else { + numIndicesUnchanged++; logger.trace("no action required for [{}]", indexMetaData.getIndex()); } indexMetaDataVersionByUUID.remove(indexMetaData.getIndexUUID()); @@ -636,22 +692,24 @@ public class PersistedClusterStateService { for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) { metaDataIndexWriter.flush(); } + + return new WriterStats(updateGlobalMeta, numIndicesUpdated, numIndicesUnchanged); } /** * Update the persisted metadata to match the given cluster state by removing all existing documents and then adding new documents. */ - private void overwriteMetaData(MetaData metaData) throws IOException { + private WriterStats overwriteMetaData(MetaData metaData) throws IOException { for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) { metaDataIndexWriter.deleteAll(); } - addMetaData(metaData); + return addMetaData(metaData); } /** * Add documents for the metadata of the given cluster state, assuming that there are currently no documents. */ - private void addMetaData(MetaData metaData) throws IOException { + private WriterStats addMetaData(MetaData metaData) throws IOException { try (ReleasableDocument globalMetaDataDocument = makeGlobalMetaDataDocument(metaData)) { for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) { metaDataIndexWriter.updateGlobalMetaData(globalMetaDataDocument.getDocument()); @@ -672,6 +730,8 @@ public class PersistedClusterStateService { for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) { metaDataIndexWriter.flush(); } + + return new WriterStats(true, metaData.indices().size(), 0); } public void commit(long currentTerm, long lastAcceptedVersion) throws IOException { @@ -718,6 +778,18 @@ public class PersistedClusterStateService { } } + static class WriterStats { + final boolean globalMetaUpdated; + final long numIndicesUpdated; + final long numIndicesUnchanged; + + WriterStats(boolean globalMetaUpdated, long numIndicesUpdated, long numIndicesUnchanged) { + this.globalMetaUpdated = globalMetaUpdated; + this.numIndicesUpdated = numIndicesUpdated; + this.numIndicesUnchanged = numIndicesUnchanged; + } + } + private ReleasableDocument makeIndexMetaDataDocument(IndexMetaData indexMetaData) throws IOException { final ReleasableDocument indexMetaDataDocument = makeDocument(INDEX_TYPE_NAME, indexMetaData); boolean success = false; diff --git a/server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommand.java b/server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommand.java index 46a8c013783..72158d093e4 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommand.java +++ b/server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommand.java @@ -252,7 +252,8 @@ public class RemoveCorruptedShardDataCommand extends ElasticsearchNodeCommand { throws IOException { warnAboutIndexBackup(terminal); - final ClusterState clusterState = loadTermAndClusterState(createPersistedClusterStateService(dataPaths), environment).v2(); + final ClusterState clusterState = + loadTermAndClusterState(createPersistedClusterStateService(environment.settings(), dataPaths), environment).v2(); findAndProcessShardPath(options, environment, dataPaths, nodeLockId, clusterState, shardPath -> { final Path indexPath = shardPath.resolveIndex(); diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 41806845e56..a565a0158b4 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -413,7 +413,8 @@ public class Node implements Closeable { .flatMap(Function.identity()).collect(toList())); final MetaStateService metaStateService = new MetaStateService(nodeEnvironment, xContentRegistry); final PersistedClusterStateService lucenePersistedStateFactory - = new PersistedClusterStateService(nodeEnvironment, xContentRegistry, bigArrays); + = new PersistedClusterStateService(nodeEnvironment, xContentRegistry, bigArrays, clusterService.getClusterSettings(), + threadPool::relativeTimeInMillis); // collect engine factory providers from server and from plugins final Collection enginePlugins = pluginsService.filterPlugins(EnginePlugin.class); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/UnsafeBootstrapAndDetachCommandIT.java b/server/src/test/java/org/elasticsearch/cluster/coordination/UnsafeBootstrapAndDetachCommandIT.java index af62445d1a4..981d03983c7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/UnsafeBootstrapAndDetachCommandIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/UnsafeBootstrapAndDetachCommandIT.java @@ -266,7 +266,7 @@ public class UnsafeBootstrapAndDetachCommandIT extends ESIntegTestCase { logger.info("--> unsafely-bootstrap 1st master-eligible node"); MockTerminal terminal = unsafeBootstrap(environmentMaster1); - MetaData metaData = ElasticsearchNodeCommand.createPersistedClusterStateService(nodeEnvironment.nodeDataPaths()) + MetaData metaData = ElasticsearchNodeCommand.createPersistedClusterStateService(Settings.EMPTY, nodeEnvironment.nodeDataPaths()) .loadBestOnDiskState().metaData; assertThat(terminal.getOutput(), containsString( String.format(Locale.ROOT, UnsafeBootstrapMasterCommand.CLUSTER_STATE_TERM_VERSION_MSG_FORMAT, diff --git a/server/src/test/java/org/elasticsearch/env/NodeRepurposeCommandTests.java b/server/src/test/java/org/elasticsearch/env/NodeRepurposeCommandTests.java index c88a3f36da7..783d4131f83 100644 --- a/server/src/test/java/org/elasticsearch/env/NodeRepurposeCommandTests.java +++ b/server/src/test/java/org/elasticsearch/env/NodeRepurposeCommandTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.CheckedRunnable; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.gateway.PersistedClusterStateService; @@ -70,7 +71,8 @@ public class NodeRepurposeCommandTests extends ESTestCase { nodePaths = nodeEnvironment.nodeDataPaths(); final String nodeId = randomAlphaOfLength(10); try (PersistedClusterStateService.Writer writer = new PersistedClusterStateService(nodePaths, nodeId, - xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, true).createWriter()) { + xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, + new ClusterSettings(dataMasterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L, true).createWriter()) { writer.writeFullStateAndCommit(1L, ClusterState.EMPTY_STATE); } } @@ -105,7 +107,7 @@ public class NodeRepurposeCommandTests extends ESTestCase { if (randomBoolean()) { try (NodeEnvironment env = new NodeEnvironment(noDataMasterSettings, environment)) { try (PersistedClusterStateService.Writer writer = - ElasticsearchNodeCommand.createPersistedClusterStateService(env.nodeDataPaths()).createWriter()) { + ElasticsearchNodeCommand.createPersistedClusterStateService(Settings.EMPTY, env.nodeDataPaths()).createWriter()) { writer.writeFullStateAndCommit(1L, ClusterState.EMPTY_STATE); } } @@ -233,7 +235,7 @@ public class NodeRepurposeCommandTests extends ESTestCase { try (NodeEnvironment env = new NodeEnvironment(settings, environment)) { if (writeClusterState) { try (PersistedClusterStateService.Writer writer = - ElasticsearchNodeCommand.createPersistedClusterStateService(env.nodeDataPaths()).createWriter()) { + ElasticsearchNodeCommand.createPersistedClusterStateService(Settings.EMPTY, env.nodeDataPaths()).createWriter()) { writer.writeFullStateAndCommit(1L, ClusterState.builder(ClusterName.DEFAULT) .metaData(MetaData.builder().put(IndexMetaData.builder(INDEX.getName()) .settings(Settings.builder().put("index.version.created", Version.CURRENT) diff --git a/server/src/test/java/org/elasticsearch/env/OverrideNodeVersionCommandTests.java b/server/src/test/java/org/elasticsearch/env/OverrideNodeVersionCommandTests.java index 09c947d86de..a4f1ef217b8 100644 --- a/server/src/test/java/org/elasticsearch/env/OverrideNodeVersionCommandTests.java +++ b/server/src/test/java/org/elasticsearch/env/OverrideNodeVersionCommandTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.cli.MockTerminal; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.gateway.PersistedClusterStateService; @@ -56,7 +57,8 @@ public class OverrideNodeVersionCommandTests extends ESTestCase { nodeId = nodeEnvironment.nodeId(); try (PersistedClusterStateService.Writer writer = new PersistedClusterStateService(nodePaths, nodeId, - xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, true).createWriter()) { + xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L, true).createWriter()) { writer.writeFullStateAndCommit(1L, ClusterState.builder(ClusterName.DEFAULT).metaData(MetaData.builder() .persistentSettings(Settings.builder().put(MetaData.SETTING_READ_ONLY_SETTING.getKey(), true).build()).build()) .build()); @@ -67,7 +69,9 @@ public class OverrideNodeVersionCommandTests extends ESTestCase { @After public void checkClusterStateIntact() throws IOException { assertTrue(MetaData.SETTING_READ_ONLY_SETTING.get(new PersistedClusterStateService(nodePaths, nodeId, - xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, true).loadBestOnDiskState().metaData.persistentSettings())); + xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L, true) + .loadBestOnDiskState().metaData.persistentSettings())); } public void testFailsOnEmptyPath() { diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java index 4b39abba67b..dfe31b135e1 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -296,7 +296,8 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase { public void testStatePersistedOnLoad() throws IOException { // open LucenePersistedState to make sure that cluster state is written out to each data path final PersistedClusterStateService persistedClusterStateService = - new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE); + new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L); final ClusterState state = createClusterState(randomNonNegativeLong(), MetaData.builder().clusterUUID(randomAlphaOfLength(10)).build()); try (GatewayMetaState.LucenePersistedState ignored = new GatewayMetaState.LucenePersistedState( @@ -313,7 +314,8 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase { .put(Environment.PATH_DATA_SETTING.getKey(), path.getParent().getParent().toString()).build(); try (NodeEnvironment nodeEnvironment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings))) { final PersistedClusterStateService newPersistedClusterStateService = - new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE); + new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L); final PersistedClusterStateService.OnDiskState onDiskState = newPersistedClusterStateService.loadBestOnDiskState(); assertFalse(onDiskState.empty()); assertThat(onDiskState.currentTerm, equalTo(42L)); @@ -338,7 +340,8 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase { when(clusterService.getClusterSettings()).thenReturn( new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); final PersistedClusterStateService persistedClusterStateService = - new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE); + new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L); gateway.start(settings, transportService, clusterService, new MetaStateService(nodeEnvironment, xContentRegistry()), null, null, persistedClusterStateService); final CoordinationState.PersistedState persistedState = gateway.getPersistedState(); @@ -420,7 +423,8 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase { final AtomicReference ioExceptionRate = new AtomicReference<>(0.01d); final List list = new ArrayList<>(); final PersistedClusterStateService persistedClusterStateService = - new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE) { + new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L) { @Override Directory createDirectory(Path path) { final MockDirectoryWrapper wrapper = newMockFSDirectory(path); @@ -496,7 +500,8 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase { .put(Environment.PATH_DATA_SETTING.getKey(), path.getParent().getParent().toString()).build(); try (NodeEnvironment nodeEnvironment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings))) { final PersistedClusterStateService newPersistedClusterStateService = - new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE); + new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L); final PersistedClusterStateService.OnDiskState onDiskState = newPersistedClusterStateService.loadBestOnDiskState(); assertFalse(onDiskState.empty()); assertThat(onDiskState.currentTerm, equalTo(currentTerm)); diff --git a/server/src/test/java/org/elasticsearch/gateway/IncrementalClusterStateWriterTests.java b/server/src/test/java/org/elasticsearch/gateway/IncrementalClusterStateWriterTests.java index 86fe9458709..5903326551c 100644 --- a/server/src/test/java/org/elasticsearch/gateway/IncrementalClusterStateWriterTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/IncrementalClusterStateWriterTests.java @@ -443,12 +443,12 @@ public class IncrementalClusterStateWriterTests extends ESAllocationTestCase { final long slowWriteLoggingThresholdMillis; final Settings settings; if (randomBoolean()) { - slowWriteLoggingThresholdMillis = IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD.get(Settings.EMPTY).millis(); + slowWriteLoggingThresholdMillis = PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD.get(Settings.EMPTY).millis(); settings = Settings.EMPTY; } else { slowWriteLoggingThresholdMillis = randomLongBetween(2, 100000); settings = Settings.builder() - .put(IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD.getKey(), slowWriteLoggingThresholdMillis + "ms") + .put(PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD.getKey(), slowWriteLoggingThresholdMillis + "ms") .build(); } @@ -489,7 +489,7 @@ public class IncrementalClusterStateWriterTests extends ESAllocationTestCase { "*")); clusterSettings.applySettings(Settings.builder() - .put(IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD.getKey(), writeDurationMillis.get() + "ms") + .put(PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD.getKey(), writeDurationMillis.get() + "ms") .build()); assertExpectedLogs(clusterState, incrementalClusterStateWriter, new MockLogAppender.SeenEventExpectation( "should see warning at reduced threshold", diff --git a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java index f3a0cf25f6b..b9600abfb19 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java @@ -18,6 +18,9 @@ */ package org.elasticsearch.gateway; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.Term; @@ -32,7 +35,11 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.coordination.CoordinationMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.MockBigArrays; @@ -45,6 +52,8 @@ import org.elasticsearch.gateway.PersistedClusterStateService.Writer; import org.elasticsearch.index.Index; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.MockLogAppender; +import org.elasticsearch.test.junit.annotations.TestLogging; import java.io.IOError; import java.io.IOException; @@ -55,12 +64,14 @@ import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.Stream; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.nullValue; public class PersistedClusterStateServiceTests extends ESTestCase { @@ -69,7 +80,9 @@ public class PersistedClusterStateServiceTests extends ESTestCase { return new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), usually() ? BigArrays.NON_RECYCLING_INSTANCE - : new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService())); + : new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + () -> 0L); } public void testPersistsAndReloadsTerm() throws IOException { @@ -217,6 +230,7 @@ public class PersistedClusterStateServiceTests extends ESTestCase { final String message = expectThrows(IllegalStateException.class, () -> new PersistedClusterStateService(Stream.of(combinedPaths).map(path -> NodeEnvironment.resolveNodePath(path, 0)) .toArray(Path[]::new), nodeIds[0], xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L, randomBoolean()).loadBestOnDiskState()).getMessage(); assertThat(message, allOf(containsString("unexpected node ID in metadata"), containsString(nodeIds[0]), containsString(nodeIds[1]))); @@ -344,7 +358,8 @@ public class PersistedClusterStateServiceTests extends ESTestCase { try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { final PersistedClusterStateService persistedClusterStateService - = new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE) { + = new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L) { @Override Directory createDirectory(Path path) throws IOException { return new FilterDirectory(super.createDirectory(path)) { @@ -381,7 +396,8 @@ public class PersistedClusterStateServiceTests extends ESTestCase { try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { final PersistedClusterStateService persistedClusterStateService - = new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE) { + = new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L) { @Override Directory createDirectory(Path path) throws IOException { return new FilterDirectory(super.createDirectory(path)) { @@ -426,7 +442,8 @@ public class PersistedClusterStateServiceTests extends ESTestCase { try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { final PersistedClusterStateService persistedClusterStateService - = new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE) { + = new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L) { @Override Directory createDirectory(Path path) throws IOException { return new FilterDirectory(super.createDirectory(path)) { @@ -758,6 +775,123 @@ public class PersistedClusterStateServiceTests extends ESTestCase { } } + @TestLogging(value = "org.elasticsearch.gateway:WARN", reason = "to ensure that we log gateway events on WARN level") + public void testSlowLogging() throws IOException, IllegalAccessException { + final long slowWriteLoggingThresholdMillis; + final Settings settings; + if (randomBoolean()) { + slowWriteLoggingThresholdMillis = PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD.get(Settings.EMPTY).millis(); + settings = Settings.EMPTY; + } else { + slowWriteLoggingThresholdMillis = randomLongBetween(2, 100000); + settings = Settings.builder() + .put(PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD.getKey(), slowWriteLoggingThresholdMillis + "ms") + .build(); + } + + final DiscoveryNode localNode = new DiscoveryNode("node", buildNewFakeTransportAddress(), Version.CURRENT); + final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId())).build(); + + final long startTimeMillis = randomLongBetween(0L, Long.MAX_VALUE - slowWriteLoggingThresholdMillis * 10); + final AtomicLong currentTime = new AtomicLong(startTimeMillis); + final AtomicLong writeDurationMillis = new AtomicLong(slowWriteLoggingThresholdMillis); + + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { + PersistedClusterStateService persistedClusterStateService = new PersistedClusterStateService(nodeEnvironment, + xContentRegistry(), + usually() + ? BigArrays.NON_RECYCLING_INSTANCE + : new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()), + clusterSettings, + () -> currentTime.getAndAdd(writeDurationMillis.get())); + + try (Writer writer = persistedClusterStateService.createWriter()) { + assertExpectedLogs(1L, null, clusterState, writer, new MockLogAppender.SeenEventExpectation( + "should see warning at threshold", + PersistedClusterStateService.class.getCanonicalName(), + Level.WARN, + "writing cluster state took [*] which is above the warn threshold of [*]; " + + "wrote full state with [0] indices")); + + writeDurationMillis.set(randomLongBetween(slowWriteLoggingThresholdMillis, slowWriteLoggingThresholdMillis * 2)); + assertExpectedLogs(1L, null, clusterState, writer, new MockLogAppender.SeenEventExpectation( + "should see warning above threshold", + PersistedClusterStateService.class.getCanonicalName(), + Level.WARN, + "writing cluster state took [*] which is above the warn threshold of [*]; " + + "wrote full state with [0] indices")); + + writeDurationMillis.set(randomLongBetween(1, slowWriteLoggingThresholdMillis - 1)); + assertExpectedLogs(1L, null, clusterState, writer, new MockLogAppender.UnseenEventExpectation( + "should not see warning below threshold", + PersistedClusterStateService.class.getCanonicalName(), + Level.WARN, + "*")); + + clusterSettings.applySettings(Settings.builder() + .put(PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD.getKey(), writeDurationMillis.get() + "ms") + .build()); + assertExpectedLogs(1L, null, clusterState, writer, new MockLogAppender.SeenEventExpectation( + "should see warning at reduced threshold", + PersistedClusterStateService.class.getCanonicalName(), + Level.WARN, + "writing cluster state took [*] which is above the warn threshold of [*]; " + + "wrote full state with [0] indices")); + + final ClusterState newClusterState = ClusterState.builder(clusterState) + .metaData(MetaData.builder(clusterState.metaData()) + .version(clusterState.version()) + .put(IndexMetaData.builder("test") + .settings(Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetaData.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT) + .put(IndexMetaData.SETTING_INDEX_UUID, "test-uuid")))) + .incrementVersion().build(); + + assertExpectedLogs(1L, clusterState, newClusterState, writer, new MockLogAppender.SeenEventExpectation( + "should see warning at threshold", + PersistedClusterStateService.class.getCanonicalName(), + Level.WARN, + "writing cluster state took [*] which is above the warn threshold of [*]; " + + "wrote global metadata [false] and metadata for [1] indices and skipped [0] unchanged indices")); + + writeDurationMillis.set(randomLongBetween(1, writeDurationMillis.get() - 1)); + assertExpectedLogs(1L, clusterState, newClusterState, writer, new MockLogAppender.UnseenEventExpectation( + "should not see warning below threshold", + PersistedClusterStateService.class.getCanonicalName(), + Level.WARN, + "*")); + + assertThat(currentTime.get(), lessThan(startTimeMillis + 14 * slowWriteLoggingThresholdMillis)); // ensure no overflow + } + } + } + + private void assertExpectedLogs(long currentTerm, ClusterState previousState, ClusterState clusterState, + PersistedClusterStateService.Writer writer, MockLogAppender.LoggingExpectation expectation) + throws IllegalAccessException, IOException { + MockLogAppender mockAppender = new MockLogAppender(); + mockAppender.start(); + mockAppender.addExpectation(expectation); + Logger classLogger = LogManager.getLogger(PersistedClusterStateService.class); + Loggers.addAppender(classLogger, mockAppender); + + try { + if (previousState == null) { + writer.writeFullStateAndCommit(currentTerm, clusterState); + } else { + writer.writeIncrementalStateAndCommit(currentTerm, previousState, clusterState); + } + } finally { + Loggers.removeAppender(classLogger, mockAppender); + mockAppender.stop(); + } + mockAppender.assertAllExpectationsMatched(); + } + @Override public Settings buildEnvSettings(Settings settings) { assertTrue(settings.hasValue(Environment.PATH_DATA_SETTING.getKey())); diff --git a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java index af0244c00c2..a5b248a6e80 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.env.Environment; @@ -127,7 +128,8 @@ public class RemoveCorruptedShardDataCommandTests extends IndexShardTestCase { try (NodeEnvironment.NodeLock lock = new NodeEnvironment.NodeLock(0, logger, environment, Files::exists)) { final Path[] dataPaths = Arrays.stream(lock.getNodePaths()).filter(Objects::nonNull).map(p -> p.path).toArray(Path[]::new); try (PersistedClusterStateService.Writer writer = new PersistedClusterStateService(dataPaths, nodeId, - xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, true).createWriter()) { + xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L, true).createWriter()) { writer.writeFullStateAndCommit(1L, clusterState); } } diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java index 1563b5fc32d..fb43a1ff939 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -745,7 +745,9 @@ public class AbstractCoordinatorTestCase extends ESTestCase { final long updatedTerm = adaptCurrentTerm.apply(oldState.getCurrentTerm()); if (updatedMetaData != oldState.getLastAcceptedState().metaData() || updatedTerm != oldState.getCurrentTerm()) { try (PersistedClusterStateService.Writer writer = - new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE) + new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + deterministicTaskQueue::getCurrentTimeMillis) .createWriter()) { writer.writeFullStateAndCommit(updatedTerm, ClusterState.builder(oldState.getLastAcceptedState()).metaData(updatedMetaData).build()); diff --git a/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java b/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java index 25f43e28ca8..e224c2bb406 100644 --- a/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java +++ b/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java @@ -79,6 +79,7 @@ public class MockGatewayMetaState extends GatewayMetaState { throw new AssertionError(e); } start(settings, transportService, clusterService, metaStateService, - null, null, new PersistedClusterStateService(nodeEnvironment, xContentRegistry, BigArrays.NON_RECYCLING_INSTANCE)); + null, null, new PersistedClusterStateService(nodeEnvironment, xContentRegistry, BigArrays.NON_RECYCLING_INSTANCE, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L)); } }