Warn on slow metadata persistence (#47130)

Today if metadata persistence is excessively slow on a master-ineligible node
then the `ClusterApplierService` emits a warning indicating that the
`GatewayMetaState` applier was slow, but gives no further details. If it is
excessively slow on a master-eligible node then we do not see any warning at
all, although we might see other consequences such as a lagging node or a
master failure.

With this commit we emit a warning if metadata persistence takes longer than a
configurable threshold, which defaults to `10s`. We also emit statistics that
record how much index metadata was persisted and how much was skipped since
this can help distinguish cases where IO was slow from cases where there are
simply too many indices involved.

Backport of #47005.
This commit is contained in:
David Turner 2019-09-26 07:40:54 +01:00 committed by GitHub
parent fcddaa90de
commit 45c7783018
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 187 additions and 8 deletions

View File

@ -78,6 +78,7 @@ import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.gateway.IncrementalClusterStateWriter;
import org.elasticsearch.http.HttpTransportSettings;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
@ -245,6 +246,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
GatewayService.RECOVER_AFTER_MASTER_NODES_SETTING,
GatewayService.RECOVER_AFTER_NODES_SETTING,
GatewayService.RECOVER_AFTER_TIME_SETTING,
IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD,
NetworkModule.HTTP_DEFAULT_TYPE_SETTING,
NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING,
NetworkModule.HTTP_TYPE_SETTING,

View File

@ -92,8 +92,11 @@ public class GatewayMetaState {
throw new ElasticsearchException("failed to load metadata", e);
}
final IncrementalClusterStateWriter incrementalClusterStateWriter
= new IncrementalClusterStateWriter(metaStateService, manifestClusterStateTuple.v1(),
prepareInitialClusterState(transportService, clusterService, manifestClusterStateTuple.v2()));
= new IncrementalClusterStateWriter(settings, clusterService.getClusterSettings(), metaStateService,
manifestClusterStateTuple.v1(),
prepareInitialClusterState(transportService, clusterService, manifestClusterStateTuple.v2()),
transportService.getThreadPool()::relativeTimeInMillis);
if (DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings).equals(DiscoveryModule.ZEN_DISCOVERY_TYPE)) {
// only for tests that simulate mixed Zen1/Zen2 clusters, see Zen1IT
if (isMasterOrDataNode(settings)) {

View File

@ -18,12 +18,18 @@
*/
package org.elasticsearch.gateway;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.Manifest;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import java.util.ArrayList;
@ -33,11 +39,17 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.LongSupplier;
/**
* Tracks the metadata written to disk, allowing updated metadata to be written incrementally (i.e. only writing out the changed metadata).
*/
class IncrementalClusterStateWriter {
public class IncrementalClusterStateWriter {
private static final Logger logger = LogManager.getLogger(IncrementalClusterStateWriter.class);
public static final Setting<TimeValue> SLOW_WRITE_LOGGING_THRESHOLD = Setting.timeSetting("gateway.slow_write_logging_threshold",
TimeValue.timeValueSeconds(10), TimeValue.ZERO, Setting.Property.NodeScope, Setting.Property.Dynamic);
private final MetaStateService metaStateService;
@ -46,13 +58,24 @@ class IncrementalClusterStateWriter {
// no need to synchronize access to these fields.
private Manifest previousManifest;
private ClusterState previousClusterState;
private final LongSupplier relativeTimeMillisSupplier;
private boolean incrementalWrite;
IncrementalClusterStateWriter(MetaStateService metaStateService, Manifest manifest, ClusterState clusterState) {
private volatile TimeValue slowWriteLoggingThreshold;
IncrementalClusterStateWriter(Settings settings, ClusterSettings clusterSettings, MetaStateService metaStateService, Manifest manifest,
ClusterState clusterState, LongSupplier relativeTimeMillisSupplier) {
this.metaStateService = metaStateService;
this.previousManifest = manifest;
this.previousClusterState = clusterState;
this.relativeTimeMillisSupplier = relativeTimeMillisSupplier;
this.incrementalWrite = false;
this.slowWriteLoggingThreshold = SLOW_WRITE_LOGGING_THRESHOLD.get(settings);
clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold);
}
private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) {
this.slowWriteLoggingThreshold = slowWriteLoggingThreshold;
}
void setCurrentTerm(long currentTerm) throws WriteStateException {
@ -85,14 +108,26 @@ class IncrementalClusterStateWriter {
void updateClusterState(ClusterState newState, ClusterState previousState) throws WriteStateException {
MetaData newMetaData = newState.metaData();
final long startTimeMillis = relativeTimeMillisSupplier.getAsLong();
final AtomicClusterStateWriter writer = new AtomicClusterStateWriter(metaStateService, previousManifest);
long globalStateGeneration = writeGlobalState(writer, newMetaData);
Map<Index, Long> indexGenerations = writeIndicesMetadata(writer, newState, previousState);
Manifest manifest = new Manifest(previousManifest.getCurrentTerm(), newState.version(), globalStateGeneration, indexGenerations);
writeManifest(writer, manifest);
previousManifest = manifest;
previousClusterState = newState;
final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis;
final TimeValue finalSlowWriteLoggingThreshold = this.slowWriteLoggingThreshold;
if (durationMillis >= finalSlowWriteLoggingThreshold.getMillis()) {
logger.warn("writing cluster state took [{}ms] which is above the warn threshold of [{}]; " +
"wrote metadata for [{}] indices and skipped [{}] unchanged indices",
durationMillis, finalSlowWriteLoggingThreshold, writer.getIndicesWritten(), writer.getIndicesSkipped());
} else {
logger.debug("writing cluster state took [{}ms]; wrote metadata for [{}] indices and skipped [{}] unchanged indices",
durationMillis, writer.getIndicesWritten(), writer.getIndicesSkipped());
}
}
private void writeManifest(AtomicClusterStateWriter writer, Manifest manifest) throws WriteStateException {
@ -256,6 +291,9 @@ class IncrementalClusterStateWriter {
private final MetaStateService metaStateService;
private boolean finished;
private int indicesWritten;
private int indicesSkipped;
AtomicClusterStateWriter(MetaStateService metaStateService, Manifest previousManifest) {
this.metaStateService = metaStateService;
assert previousManifest != null;
@ -320,6 +358,22 @@ class IncrementalClusterStateWriter {
rollbackCleanupActions.forEach(Runnable::run);
finished = true;
}
void incrementIndicesWritten() {
indicesWritten++;
}
void incrementIndicesSkipped() {
indicesSkipped++;
}
int getIndicesWritten() {
return indicesWritten;
}
int getIndicesSkipped() {
return indicesSkipped;
}
}
static class KeepPreviousGeneration implements IndexMetaDataAction {
@ -338,6 +392,7 @@ class IncrementalClusterStateWriter {
@Override
public long execute(AtomicClusterStateWriter writer) {
writer.incrementIndicesSkipped();
return generation;
}
}
@ -356,6 +411,7 @@ class IncrementalClusterStateWriter {
@Override
public long execute(AtomicClusterStateWriter writer) throws WriteStateException {
writer.incrementIndicesWritten();
return writer.writeIndex("freshly created", indexMetaData);
}
}
@ -376,6 +432,7 @@ class IncrementalClusterStateWriter {
@Override
public long execute(AtomicClusterStateWriter writer) throws WriteStateException {
writer.incrementIndicesWritten();
return writer.writeIndex(
"version changed from [" + oldIndexMetaData.getVersion() + "] to [" + newIndexMetaData.getVersion() + "]",
newIndexMetaData);

View File

@ -18,26 +18,35 @@
*/
package org.elasticsearch.gateway;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.Manifest;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.mockito.ArgumentCaptor;
import java.io.IOException;
@ -48,15 +57,18 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThan;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class IncrementalClusterStateWriterTests extends ESAllocationTestCase {
@ -250,13 +262,19 @@ public class IncrementalClusterStateWriterTests extends ESAllocationTestCase {
assertThat(actions, hasSize(3));
boolean keptPreviousGeneration = false;
boolean wroteNewIndex = false;
boolean wroteChangedIndex = false;
for (IncrementalClusterStateWriter.IndexMetaDataAction action : actions) {
if (action instanceof IncrementalClusterStateWriter.KeepPreviousGeneration) {
assertThat(action.getIndex(), equalTo(notChangedIndex.getIndex()));
IncrementalClusterStateWriter.AtomicClusterStateWriter writer
= mock(IncrementalClusterStateWriter.AtomicClusterStateWriter.class);
assertThat(action.execute(writer), equalTo(3L));
verifyZeroInteractions(writer);
verify(writer, times(1)).incrementIndicesSkipped();
verifyNoMoreInteractions(writer);
keptPreviousGeneration = true;
}
if (action instanceof IncrementalClusterStateWriter.WriteNewIndexMetaData) {
assertThat(action.getIndex(), equalTo(newIndex.getIndex()));
@ -264,6 +282,8 @@ public class IncrementalClusterStateWriterTests extends ESAllocationTestCase {
= mock(IncrementalClusterStateWriter.AtomicClusterStateWriter.class);
when(writer.writeIndex("freshly created", newIndex)).thenReturn(0L);
assertThat(action.execute(writer), equalTo(0L));
verify(writer, times(1)).incrementIndicesWritten();
wroteNewIndex = true;
}
if (action instanceof IncrementalClusterStateWriter.WriteChangedIndexMetaData) {
assertThat(action.getIndex(), equalTo(newVersionChangedIndex.getIndex()));
@ -273,10 +293,16 @@ public class IncrementalClusterStateWriterTests extends ESAllocationTestCase {
assertThat(action.execute(writer), equalTo(3L));
ArgumentCaptor<String> reason = ArgumentCaptor.forClass(String.class);
verify(writer).writeIndex(reason.capture(), eq(newVersionChangedIndex));
verify(writer, times(1)).incrementIndicesWritten();
assertThat(reason.getValue(), containsString(Long.toString(versionChangedIndex.getVersion())));
assertThat(reason.getValue(), containsString(Long.toString(newVersionChangedIndex.getVersion())));
wroteChangedIndex = true;
}
}
assertTrue(keptPreviousGeneration);
assertTrue(wroteNewIndex);
assertTrue(wroteChangedIndex);
}
private static class MetaStateServiceWithFailures extends MetaStateService {
@ -426,4 +452,84 @@ public class IncrementalClusterStateWriterTests extends ESAllocationTestCase {
assertTrue(possibleMetaData.stream().anyMatch(md -> metaDataEquals(md, loadedMetaData)));
}
}
@TestLogging(value = "org.elasticsearch.gateway:WARN", reason = "to ensure that we log gateway events on WARN level")
public void testSlowLogging() throws WriteStateException, IllegalAccessException {
final long slowWriteLoggingThresholdMillis;
final Settings settings;
if (randomBoolean()) {
slowWriteLoggingThresholdMillis = IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD.get(Settings.EMPTY).millis();
settings = Settings.EMPTY;
} else {
slowWriteLoggingThresholdMillis = randomLongBetween(2, 100000);
settings = Settings.builder()
.put(IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD.getKey(), slowWriteLoggingThresholdMillis + "ms")
.build();
}
final DiscoveryNode localNode = newNode("node");
final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId())).build();
final long startTimeMillis = randomLongBetween(0L, Long.MAX_VALUE - slowWriteLoggingThresholdMillis * 10);
final AtomicLong currentTime = new AtomicLong(startTimeMillis);
final AtomicLong writeDurationMillis = new AtomicLong(slowWriteLoggingThresholdMillis);
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final IncrementalClusterStateWriter incrementalClusterStateWriter
= new IncrementalClusterStateWriter(settings, clusterSettings, mock(MetaStateService.class),
new Manifest(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), Collections.emptyMap()),
clusterState, () -> currentTime.getAndAdd(writeDurationMillis.get()));
assertExpectedLogs(clusterState, incrementalClusterStateWriter, new MockLogAppender.SeenEventExpectation(
"should see warning at threshold",
IncrementalClusterStateWriter.class.getCanonicalName(),
Level.WARN,
"writing cluster state took [*] which is above the warn threshold of [*]; " +
"wrote metadata for [0] indices and skipped [0] unchanged indices"));
writeDurationMillis.set(randomLongBetween(slowWriteLoggingThresholdMillis, slowWriteLoggingThresholdMillis * 2));
assertExpectedLogs(clusterState, incrementalClusterStateWriter, new MockLogAppender.SeenEventExpectation(
"should see warning above threshold",
IncrementalClusterStateWriter.class.getCanonicalName(),
Level.WARN,
"writing cluster state took [*] which is above the warn threshold of [*]; " +
"wrote metadata for [0] indices and skipped [0] unchanged indices"));
writeDurationMillis.set(randomLongBetween(1, slowWriteLoggingThresholdMillis - 1));
assertExpectedLogs(clusterState, incrementalClusterStateWriter, new MockLogAppender.UnseenEventExpectation(
"should not see warning below threshold",
IncrementalClusterStateWriter.class.getCanonicalName(),
Level.WARN,
"*"));
clusterSettings.applySettings(Settings.builder()
.put(IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD.getKey(), writeDurationMillis.get() + "ms")
.build());
assertExpectedLogs(clusterState, incrementalClusterStateWriter, new MockLogAppender.SeenEventExpectation(
"should see warning at reduced threshold",
IncrementalClusterStateWriter.class.getCanonicalName(),
Level.WARN,
"writing cluster state took [*] which is above the warn threshold of [*]; " +
"wrote metadata for [0] indices and skipped [0] unchanged indices"));
assertThat(currentTime.get(), lessThan(startTimeMillis + 10 * slowWriteLoggingThresholdMillis)); // ensure no overflow
}
private void assertExpectedLogs(ClusterState clusterState, IncrementalClusterStateWriter incrementalClusterStateWriter,
MockLogAppender.LoggingExpectation expectation) throws IllegalAccessException, WriteStateException {
MockLogAppender mockAppender = new MockLogAppender();
mockAppender.start();
mockAppender.addExpectation(expectation);
Logger classLogger = LogManager.getLogger(IncrementalClusterStateWriter.class);
Loggers.addAppender(classLogger, mockAppender);
try {
incrementalClusterStateWriter.updateClusterState(clusterState, clusterState);
} finally {
Loggers.removeAppender(classLogger, mockAppender);
mockAppender.stop();
}
mockAppender.assertAllExpectationsMatched();
}
}

View File

@ -23,12 +23,17 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.plugins.MetaDataUpgrader;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* {@link GatewayMetaState} constructor accepts a lot of arguments.
* It's not always easy / convenient to construct these dependencies.
@ -55,6 +60,12 @@ public class MockGatewayMetaState extends GatewayMetaState {
}
public void start(Settings settings, NodeEnvironment nodeEnvironment, NamedXContentRegistry xContentRegistry) {
start(settings, null, null, new MetaStateService(nodeEnvironment, xContentRegistry), null, null);
final TransportService transportService = mock(TransportService.class);
when(transportService.getThreadPool()).thenReturn(mock(ThreadPool.class));
final ClusterService clusterService = mock(ClusterService.class);
when(clusterService.getClusterSettings())
.thenReturn(new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
start(settings, transportService, clusterService, new MetaStateService(nodeEnvironment, xContentRegistry),
null, null);
}
}