Warn on slow metadata performance (#50956)

Has the new cluster state storage layer emit warnings in case metadata performance is very
slow.

Relates #48701
This commit is contained in:
Yannick Welsch 2020-01-14 14:07:52 +01:00
parent 8c16725a0d
commit 91d7b446a0
19 changed files with 276 additions and 52 deletions

View File

@ -51,7 +51,7 @@ public class DetachClusterCommand extends ElasticsearchNodeCommand {
@Override
protected void processNodePaths(Terminal terminal, Path[] dataPaths, int nodeLockId, OptionSet options, Environment env)
throws IOException {
final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(dataPaths);
final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(env.settings(), dataPaths);
terminal.println(Terminal.Verbosity.VERBOSE, "Loading cluster state");
final ClusterState oldClusterState = loadTermAndClusterState(persistedClusterStateService, env).v2();

View File

@ -32,6 +32,8 @@ import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
@ -76,14 +78,15 @@ public abstract class ElasticsearchNodeCommand extends EnvironmentAwareCommand {
.withRequiredArg().ofType(Integer.class);
}
public static PersistedClusterStateService createPersistedClusterStateService(Path[] dataPaths) throws IOException {
public static PersistedClusterStateService createPersistedClusterStateService(Settings settings, Path[] dataPaths) throws IOException {
final NodeMetaData nodeMetaData = PersistedClusterStateService.nodeMetaData(dataPaths);
if (nodeMetaData == null) {
throw new ElasticsearchException(NO_NODE_METADATA_FOUND_MSG);
}
String nodeId = nodeMetaData.nodeId();
return new PersistedClusterStateService(dataPaths, nodeId, namedXContentRegistry, BigArrays.NON_RECYCLING_INSTANCE, true);
return new PersistedClusterStateService(dataPaths, nodeId, namedXContentRegistry, BigArrays.NON_RECYCLING_INSTANCE,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L, true);
}
public static ClusterState clusterState(Environment environment, PersistedClusterStateService.OnDiskState onDiskState) {

View File

@ -62,7 +62,7 @@ public class RemoveSettingsCommand extends ElasticsearchNodeCommand {
throw new UserException(ExitCodes.USAGE, "Must supply at least one setting to remove");
}
final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(dataPaths);
final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(env.settings(), dataPaths);
terminal.println(Terminal.Verbosity.VERBOSE, "Loading cluster state");
final Tuple<Long, ClusterState> termAndClusterState = loadTermAndClusterState(persistedClusterStateService, env);

View File

@ -79,7 +79,7 @@ public class UnsafeBootstrapMasterCommand extends ElasticsearchNodeCommand {
protected void processNodePaths(Terminal terminal, Path[] dataPaths, int nodeLockId, OptionSet options, Environment env)
throws IOException {
final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(dataPaths);
final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(env.settings(), dataPaths);
final Tuple<Long, ClusterState> state = loadTermAndClusterState(persistedClusterStateService, env);
final ClusterState oldClusterState = state.v2();

View File

@ -79,7 +79,7 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.DanglingIndicesState;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.gateway.IncrementalClusterStateWriter;
import org.elasticsearch.gateway.PersistedClusterStateService;
import org.elasticsearch.http.HttpTransportSettings;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
@ -109,9 +109,9 @@ import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.fetch.subphase.highlight.FastVectorHighlighter;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ProxyConnectionStrategy;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.RemoteConnectionStrategy;
import org.elasticsearch.transport.ProxyConnectionStrategy;
import org.elasticsearch.transport.SniffConnectionStrategy;
import org.elasticsearch.transport.TransportSettings;
import org.elasticsearch.watcher.ResourceWatcherService;
@ -252,7 +252,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
GatewayService.RECOVER_AFTER_MASTER_NODES_SETTING,
GatewayService.RECOVER_AFTER_NODES_SETTING,
GatewayService.RECOVER_AFTER_TIME_SETTING,
IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD,
PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD,
NetworkModule.HTTP_DEFAULT_TYPE_SETTING,
NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING,
NetworkModule.HTTP_TYPE_SETTING,

View File

@ -96,7 +96,7 @@ public class NodeRepurposeCommand extends ElasticsearchNodeCommand {
Set<Path> indexPaths = uniqueParentPaths(shardDataPaths, indexMetaDataPaths);
final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(dataPaths);
final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(env.settings(), dataPaths);
final MetaData metaData = loadClusterState(terminal, env, persistedClusterStateService).metaData();
if (indexPaths.isEmpty() && metaData.indices().isEmpty()) {
@ -134,7 +134,7 @@ public class NodeRepurposeCommand extends ElasticsearchNodeCommand {
return;
}
final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(dataPaths);
final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(env.settings(), dataPaths);
final MetaData metaData = loadClusterState(terminal, env, persistedClusterStateService).metaData();

View File

@ -27,7 +27,6 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
@ -47,9 +46,6 @@ public class IncrementalClusterStateWriter {
private static final Logger logger = LogManager.getLogger(IncrementalClusterStateWriter.class);
public static final Setting<TimeValue> SLOW_WRITE_LOGGING_THRESHOLD = Setting.timeSetting("gateway.slow_write_logging_threshold",
TimeValue.timeValueSeconds(10), TimeValue.ZERO, Setting.Property.NodeScope, Setting.Property.Dynamic);
private final MetaStateService metaStateService;
// We call updateClusterState on the (unique) cluster applier thread so there's no need to synchronize access to these fields.
@ -67,8 +63,9 @@ public class IncrementalClusterStateWriter {
this.previousClusterState = clusterState;
this.relativeTimeMillisSupplier = relativeTimeMillisSupplier;
this.incrementalWrite = false;
this.slowWriteLoggingThreshold = SLOW_WRITE_LOGGING_THRESHOLD.get(settings);
clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold);
this.slowWriteLoggingThreshold = PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD.get(settings);
clusterSettings.addSettingsUpdateConsumer(PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD,
this::setSlowWriteLoggingThreshold);
}
private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) {

View File

@ -57,6 +57,9 @@ import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -83,6 +86,8 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntPredicate;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
/**
* Stores cluster metadata in a bare Lucene index (per data path) split across a number of documents. This is used by master-eligible nodes
@ -124,23 +129,39 @@ public class PersistedClusterStateService {
public static final String METADATA_DIRECTORY_NAME = MetaDataStateFormat.STATE_DIR_NAME;
public static final Setting<TimeValue> SLOW_WRITE_LOGGING_THRESHOLD = Setting.timeSetting("gateway.slow_write_logging_threshold",
TimeValue.timeValueSeconds(10), TimeValue.ZERO, Setting.Property.NodeScope, Setting.Property.Dynamic);
private final Path[] dataPaths;
private final String nodeId;
private final NamedXContentRegistry namedXContentRegistry;
private final BigArrays bigArrays;
private final boolean preserveUnknownCustoms;
private final LongSupplier relativeTimeMillisSupplier;
public PersistedClusterStateService(NodeEnvironment nodeEnvironment, NamedXContentRegistry namedXContentRegistry, BigArrays bigArrays) {
this(nodeEnvironment.nodeDataPaths(), nodeEnvironment.nodeId(), namedXContentRegistry, bigArrays, false);
private volatile TimeValue slowWriteLoggingThreshold;
public PersistedClusterStateService(NodeEnvironment nodeEnvironment, NamedXContentRegistry namedXContentRegistry, BigArrays bigArrays,
ClusterSettings clusterSettings, LongSupplier relativeTimeMillisSupplier) {
this(nodeEnvironment.nodeDataPaths(), nodeEnvironment.nodeId(), namedXContentRegistry, bigArrays, clusterSettings,
relativeTimeMillisSupplier, false);
}
public PersistedClusterStateService(Path[] dataPaths, String nodeId, NamedXContentRegistry namedXContentRegistry,
BigArrays bigArrays, boolean preserveUnknownCustoms) {
public PersistedClusterStateService(Path[] dataPaths, String nodeId, NamedXContentRegistry namedXContentRegistry, BigArrays bigArrays,
ClusterSettings clusterSettings, LongSupplier relativeTimeMillisSupplier,
boolean preserveUnknownCustoms) {
this.dataPaths = dataPaths;
this.nodeId = nodeId;
this.namedXContentRegistry = namedXContentRegistry;
this.bigArrays = bigArrays;
this.relativeTimeMillisSupplier = relativeTimeMillisSupplier;
this.preserveUnknownCustoms = preserveUnknownCustoms;
this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD);
clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold);
}
private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) {
this.slowWriteLoggingThreshold = slowWriteLoggingThreshold;
}
public String getNodeId() {
@ -169,7 +190,7 @@ public class PersistedClusterStateService {
IOUtils.closeWhileHandlingException(closeables);
}
}
return new Writer(metaDataIndexWriters, nodeId, bigArrays);
return new Writer(metaDataIndexWriters, nodeId, bigArrays, relativeTimeMillisSupplier, () -> slowWriteLoggingThreshold);
}
private static IndexWriter createIndexWriter(Directory directory, boolean openExisting) throws IOException {
@ -524,14 +545,19 @@ public class PersistedClusterStateService {
private final List<MetaDataIndexWriter> metaDataIndexWriters;
private final String nodeId;
private final BigArrays bigArrays;
private final LongSupplier relativeTimeMillisSupplier;
private final Supplier<TimeValue> slowWriteLoggingThresholdSupplier;
boolean fullStateWritten = false;
private final AtomicBoolean closed = new AtomicBoolean();
private Writer(List<MetaDataIndexWriter> metaDataIndexWriters, String nodeId, BigArrays bigArrays) {
private Writer(List<MetaDataIndexWriter> metaDataIndexWriters, String nodeId, BigArrays bigArrays,
LongSupplier relativeTimeMillisSupplier, Supplier<TimeValue> slowWriteLoggingThresholdSupplier) {
this.metaDataIndexWriters = metaDataIndexWriters;
this.nodeId = nodeId;
this.bigArrays = bigArrays;
this.relativeTimeMillisSupplier = relativeTimeMillisSupplier;
this.slowWriteLoggingThresholdSupplier = slowWriteLoggingThresholdSupplier;
}
private void ensureOpen() {
@ -561,9 +587,21 @@ public class PersistedClusterStateService {
public void writeFullStateAndCommit(long currentTerm, ClusterState clusterState) throws IOException {
ensureOpen();
try {
overwriteMetaData(clusterState.metaData());
final long startTimeMillis = relativeTimeMillisSupplier.getAsLong();
final WriterStats stats = overwriteMetaData(clusterState.metaData());
commit(currentTerm, clusterState.version());
fullStateWritten = true;
final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis;
final TimeValue finalSlowWriteLoggingThreshold = slowWriteLoggingThresholdSupplier.get();
if (durationMillis >= finalSlowWriteLoggingThreshold.getMillis()) {
logger.warn("writing cluster state took [{}ms] which is above the warn threshold of [{}]; " +
"wrote full state with [{}] indices",
durationMillis, finalSlowWriteLoggingThreshold, stats.numIndicesUpdated);
} else {
logger.debug("writing cluster state took [{}ms]; " +
"wrote full state with [{}] indices",
durationMillis, stats.numIndicesUpdated);
}
} finally {
closeIfAnyIndexWriterHasTragedyOrIsClosed();
}
@ -577,8 +615,21 @@ public class PersistedClusterStateService {
ensureOpen();
assert fullStateWritten : "Need to write full state first before doing incremental writes";
try {
updateMetaData(previousClusterState.metaData(), clusterState.metaData());
final long startTimeMillis = relativeTimeMillisSupplier.getAsLong();
final WriterStats stats = updateMetaData(previousClusterState.metaData(), clusterState.metaData());
commit(currentTerm, clusterState.version());
final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis;
final TimeValue finalSlowWriteLoggingThreshold = slowWriteLoggingThresholdSupplier.get();
if (durationMillis >= finalSlowWriteLoggingThreshold.getMillis()) {
logger.warn("writing cluster state took [{}ms] which is above the warn threshold of [{}]; " +
"wrote global metadata [{}] and metadata for [{}] indices and skipped [{}] unchanged indices",
durationMillis, finalSlowWriteLoggingThreshold, stats.globalMetaUpdated, stats.numIndicesUpdated,
stats.numIndicesUnchanged);
} else {
logger.debug("writing cluster state took [{}ms]; " +
"wrote global metadata [{}] and metadata for [{}] indices and skipped [{}] unchanged indices",
durationMillis, stats.globalMetaUpdated, stats.numIndicesUpdated, stats.numIndicesUnchanged);
}
} finally {
closeIfAnyIndexWriterHasTragedyOrIsClosed();
}
@ -588,12 +639,13 @@ public class PersistedClusterStateService {
* Update the persisted metadata to match the given cluster state by removing any stale or unnecessary documents and adding any
* updated documents.
*/
private void updateMetaData(MetaData previouslyWrittenMetaData, MetaData metaData) throws IOException {
private WriterStats updateMetaData(MetaData previouslyWrittenMetaData, MetaData metaData) throws IOException {
assert previouslyWrittenMetaData.coordinationMetaData().term() == metaData.coordinationMetaData().term();
logger.trace("currentTerm [{}] matches previous currentTerm, writing changes only",
metaData.coordinationMetaData().term());
if (MetaData.isGlobalStateEquals(previouslyWrittenMetaData, metaData) == false) {
final boolean updateGlobalMeta = MetaData.isGlobalStateEquals(previouslyWrittenMetaData, metaData) == false;
if (updateGlobalMeta) {
try (ReleasableDocument globalMetaDataDocument = makeGlobalMetaDataDocument(metaData)) {
for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) {
metaDataIndexWriter.updateGlobalMetaData(globalMetaDataDocument.getDocument());
@ -608,18 +660,22 @@ public class PersistedClusterStateService {
assert previousValue == null : indexMetaData.getIndexUUID() + " already mapped to " + previousValue;
}
int numIndicesUpdated = 0;
int numIndicesUnchanged = 0;
for (ObjectCursor<IndexMetaData> cursor : metaData.indices().values()) {
final IndexMetaData indexMetaData = cursor.value;
final Long previousVersion = indexMetaDataVersionByUUID.get(indexMetaData.getIndexUUID());
if (previousVersion == null || indexMetaData.getVersion() != previousVersion) {
logger.trace("updating metadata for [{}], changing version from [{}] to [{}]",
indexMetaData.getIndex(), previousVersion, indexMetaData.getVersion());
numIndicesUpdated++;
try (ReleasableDocument indexMetaDataDocument = makeIndexMetaDataDocument(indexMetaData)) {
for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) {
metaDataIndexWriter.updateIndexMetaDataDocument(indexMetaDataDocument.getDocument(), indexMetaData.getIndex());
}
}
} else {
numIndicesUnchanged++;
logger.trace("no action required for [{}]", indexMetaData.getIndex());
}
indexMetaDataVersionByUUID.remove(indexMetaData.getIndexUUID());
@ -636,22 +692,24 @@ public class PersistedClusterStateService {
for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) {
metaDataIndexWriter.flush();
}
return new WriterStats(updateGlobalMeta, numIndicesUpdated, numIndicesUnchanged);
}
/**
* Update the persisted metadata to match the given cluster state by removing all existing documents and then adding new documents.
*/
private void overwriteMetaData(MetaData metaData) throws IOException {
private WriterStats overwriteMetaData(MetaData metaData) throws IOException {
for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) {
metaDataIndexWriter.deleteAll();
}
addMetaData(metaData);
return addMetaData(metaData);
}
/**
* Add documents for the metadata of the given cluster state, assuming that there are currently no documents.
*/
private void addMetaData(MetaData metaData) throws IOException {
private WriterStats addMetaData(MetaData metaData) throws IOException {
try (ReleasableDocument globalMetaDataDocument = makeGlobalMetaDataDocument(metaData)) {
for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) {
metaDataIndexWriter.updateGlobalMetaData(globalMetaDataDocument.getDocument());
@ -672,6 +730,8 @@ public class PersistedClusterStateService {
for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) {
metaDataIndexWriter.flush();
}
return new WriterStats(true, metaData.indices().size(), 0);
}
public void commit(long currentTerm, long lastAcceptedVersion) throws IOException {
@ -718,6 +778,18 @@ public class PersistedClusterStateService {
}
}
static class WriterStats {
final boolean globalMetaUpdated;
final long numIndicesUpdated;
final long numIndicesUnchanged;
WriterStats(boolean globalMetaUpdated, long numIndicesUpdated, long numIndicesUnchanged) {
this.globalMetaUpdated = globalMetaUpdated;
this.numIndicesUpdated = numIndicesUpdated;
this.numIndicesUnchanged = numIndicesUnchanged;
}
}
private ReleasableDocument makeIndexMetaDataDocument(IndexMetaData indexMetaData) throws IOException {
final ReleasableDocument indexMetaDataDocument = makeDocument(INDEX_TYPE_NAME, indexMetaData);
boolean success = false;

View File

@ -252,7 +252,8 @@ public class RemoveCorruptedShardDataCommand extends ElasticsearchNodeCommand {
throws IOException {
warnAboutIndexBackup(terminal);
final ClusterState clusterState = loadTermAndClusterState(createPersistedClusterStateService(dataPaths), environment).v2();
final ClusterState clusterState =
loadTermAndClusterState(createPersistedClusterStateService(environment.settings(), dataPaths), environment).v2();
findAndProcessShardPath(options, environment, dataPaths, nodeLockId, clusterState, shardPath -> {
final Path indexPath = shardPath.resolveIndex();

View File

@ -413,7 +413,8 @@ public class Node implements Closeable {
.flatMap(Function.identity()).collect(toList()));
final MetaStateService metaStateService = new MetaStateService(nodeEnvironment, xContentRegistry);
final PersistedClusterStateService lucenePersistedStateFactory
= new PersistedClusterStateService(nodeEnvironment, xContentRegistry, bigArrays);
= new PersistedClusterStateService(nodeEnvironment, xContentRegistry, bigArrays, clusterService.getClusterSettings(),
threadPool::relativeTimeInMillis);
// collect engine factory providers from server and from plugins
final Collection<EnginePlugin> enginePlugins = pluginsService.filterPlugins(EnginePlugin.class);

View File

@ -266,7 +266,7 @@ public class UnsafeBootstrapAndDetachCommandIT extends ESIntegTestCase {
logger.info("--> unsafely-bootstrap 1st master-eligible node");
MockTerminal terminal = unsafeBootstrap(environmentMaster1);
MetaData metaData = ElasticsearchNodeCommand.createPersistedClusterStateService(nodeEnvironment.nodeDataPaths())
MetaData metaData = ElasticsearchNodeCommand.createPersistedClusterStateService(Settings.EMPTY, nodeEnvironment.nodeDataPaths())
.loadBestOnDiskState().metaData;
assertThat(terminal.getOutput(), containsString(
String.format(Locale.ROOT, UnsafeBootstrapMasterCommand.CLUSTER_STATE_TERM_VERSION_MSG_FORMAT,

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.gateway.PersistedClusterStateService;
@ -70,7 +71,8 @@ public class NodeRepurposeCommandTests extends ESTestCase {
nodePaths = nodeEnvironment.nodeDataPaths();
final String nodeId = randomAlphaOfLength(10);
try (PersistedClusterStateService.Writer writer = new PersistedClusterStateService(nodePaths, nodeId,
xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, true).createWriter()) {
xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
new ClusterSettings(dataMasterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L, true).createWriter()) {
writer.writeFullStateAndCommit(1L, ClusterState.EMPTY_STATE);
}
}
@ -105,7 +107,7 @@ public class NodeRepurposeCommandTests extends ESTestCase {
if (randomBoolean()) {
try (NodeEnvironment env = new NodeEnvironment(noDataMasterSettings, environment)) {
try (PersistedClusterStateService.Writer writer =
ElasticsearchNodeCommand.createPersistedClusterStateService(env.nodeDataPaths()).createWriter()) {
ElasticsearchNodeCommand.createPersistedClusterStateService(Settings.EMPTY, env.nodeDataPaths()).createWriter()) {
writer.writeFullStateAndCommit(1L, ClusterState.EMPTY_STATE);
}
}
@ -233,7 +235,7 @@ public class NodeRepurposeCommandTests extends ESTestCase {
try (NodeEnvironment env = new NodeEnvironment(settings, environment)) {
if (writeClusterState) {
try (PersistedClusterStateService.Writer writer =
ElasticsearchNodeCommand.createPersistedClusterStateService(env.nodeDataPaths()).createWriter()) {
ElasticsearchNodeCommand.createPersistedClusterStateService(Settings.EMPTY, env.nodeDataPaths()).createWriter()) {
writer.writeFullStateAndCommit(1L, ClusterState.builder(ClusterName.DEFAULT)
.metaData(MetaData.builder().put(IndexMetaData.builder(INDEX.getName())
.settings(Settings.builder().put("index.version.created", Version.CURRENT)

View File

@ -26,6 +26,7 @@ import org.elasticsearch.cli.MockTerminal;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.gateway.PersistedClusterStateService;
@ -56,7 +57,8 @@ public class OverrideNodeVersionCommandTests extends ESTestCase {
nodeId = nodeEnvironment.nodeId();
try (PersistedClusterStateService.Writer writer = new PersistedClusterStateService(nodePaths, nodeId,
xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, true).createWriter()) {
xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L, true).createWriter()) {
writer.writeFullStateAndCommit(1L, ClusterState.builder(ClusterName.DEFAULT).metaData(MetaData.builder()
.persistentSettings(Settings.builder().put(MetaData.SETTING_READ_ONLY_SETTING.getKey(), true).build()).build())
.build());
@ -67,7 +69,9 @@ public class OverrideNodeVersionCommandTests extends ESTestCase {
@After
public void checkClusterStateIntact() throws IOException {
assertTrue(MetaData.SETTING_READ_ONLY_SETTING.get(new PersistedClusterStateService(nodePaths, nodeId,
xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, true).loadBestOnDiskState().metaData.persistentSettings()));
xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L, true)
.loadBestOnDiskState().metaData.persistentSettings()));
}
public void testFailsOnEmptyPath() {

View File

@ -296,7 +296,8 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
public void testStatePersistedOnLoad() throws IOException {
// open LucenePersistedState to make sure that cluster state is written out to each data path
final PersistedClusterStateService persistedClusterStateService =
new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE);
new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L);
final ClusterState state = createClusterState(randomNonNegativeLong(),
MetaData.builder().clusterUUID(randomAlphaOfLength(10)).build());
try (GatewayMetaState.LucenePersistedState ignored = new GatewayMetaState.LucenePersistedState(
@ -313,7 +314,8 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
.put(Environment.PATH_DATA_SETTING.getKey(), path.getParent().getParent().toString()).build();
try (NodeEnvironment nodeEnvironment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings))) {
final PersistedClusterStateService newPersistedClusterStateService =
new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE);
new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L);
final PersistedClusterStateService.OnDiskState onDiskState = newPersistedClusterStateService.loadBestOnDiskState();
assertFalse(onDiskState.empty());
assertThat(onDiskState.currentTerm, equalTo(42L));
@ -338,7 +340,8 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
when(clusterService.getClusterSettings()).thenReturn(
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
final PersistedClusterStateService persistedClusterStateService =
new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE);
new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L);
gateway.start(settings, transportService, clusterService,
new MetaStateService(nodeEnvironment, xContentRegistry()), null, null, persistedClusterStateService);
final CoordinationState.PersistedState persistedState = gateway.getPersistedState();
@ -420,7 +423,8 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
final AtomicReference<Double> ioExceptionRate = new AtomicReference<>(0.01d);
final List<MockDirectoryWrapper> list = new ArrayList<>();
final PersistedClusterStateService persistedClusterStateService =
new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE) {
new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L) {
@Override
Directory createDirectory(Path path) {
final MockDirectoryWrapper wrapper = newMockFSDirectory(path);
@ -496,7 +500,8 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
.put(Environment.PATH_DATA_SETTING.getKey(), path.getParent().getParent().toString()).build();
try (NodeEnvironment nodeEnvironment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings))) {
final PersistedClusterStateService newPersistedClusterStateService =
new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE);
new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L);
final PersistedClusterStateService.OnDiskState onDiskState = newPersistedClusterStateService.loadBestOnDiskState();
assertFalse(onDiskState.empty());
assertThat(onDiskState.currentTerm, equalTo(currentTerm));

View File

@ -443,12 +443,12 @@ public class IncrementalClusterStateWriterTests extends ESAllocationTestCase {
final long slowWriteLoggingThresholdMillis;
final Settings settings;
if (randomBoolean()) {
slowWriteLoggingThresholdMillis = IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD.get(Settings.EMPTY).millis();
slowWriteLoggingThresholdMillis = PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD.get(Settings.EMPTY).millis();
settings = Settings.EMPTY;
} else {
slowWriteLoggingThresholdMillis = randomLongBetween(2, 100000);
settings = Settings.builder()
.put(IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD.getKey(), slowWriteLoggingThresholdMillis + "ms")
.put(PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD.getKey(), slowWriteLoggingThresholdMillis + "ms")
.build();
}
@ -489,7 +489,7 @@ public class IncrementalClusterStateWriterTests extends ESAllocationTestCase {
"*"));
clusterSettings.applySettings(Settings.builder()
.put(IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD.getKey(), writeDurationMillis.get() + "ms")
.put(PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD.getKey(), writeDurationMillis.get() + "ms")
.build());
assertExpectedLogs(clusterState, incrementalClusterStateWriter, new MockLogAppender.SeenEventExpectation(
"should see warning at reduced threshold",

View File

@ -18,6 +18,9 @@
*/
package org.elasticsearch.gateway;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
@ -32,7 +35,11 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.CoordinationMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
@ -45,6 +52,8 @@ import org.elasticsearch.gateway.PersistedClusterStateService.Writer;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;
import java.io.IOError;
import java.io.IOException;
@ -55,12 +64,14 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.nullValue;
public class PersistedClusterStateServiceTests extends ESTestCase {
@ -69,7 +80,9 @@ public class PersistedClusterStateServiceTests extends ESTestCase {
return new PersistedClusterStateService(nodeEnvironment, xContentRegistry(),
usually()
? BigArrays.NON_RECYCLING_INSTANCE
: new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()));
: new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
() -> 0L);
}
public void testPersistsAndReloadsTerm() throws IOException {
@ -217,6 +230,7 @@ public class PersistedClusterStateServiceTests extends ESTestCase {
final String message = expectThrows(IllegalStateException.class,
() -> new PersistedClusterStateService(Stream.of(combinedPaths).map(path -> NodeEnvironment.resolveNodePath(path, 0))
.toArray(Path[]::new), nodeIds[0], xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L,
randomBoolean()).loadBestOnDiskState()).getMessage();
assertThat(message,
allOf(containsString("unexpected node ID in metadata"), containsString(nodeIds[0]), containsString(nodeIds[1])));
@ -344,7 +358,8 @@ public class PersistedClusterStateServiceTests extends ESTestCase {
try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) {
final PersistedClusterStateService persistedClusterStateService
= new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE) {
= new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L) {
@Override
Directory createDirectory(Path path) throws IOException {
return new FilterDirectory(super.createDirectory(path)) {
@ -381,7 +396,8 @@ public class PersistedClusterStateServiceTests extends ESTestCase {
try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) {
final PersistedClusterStateService persistedClusterStateService
= new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE) {
= new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L) {
@Override
Directory createDirectory(Path path) throws IOException {
return new FilterDirectory(super.createDirectory(path)) {
@ -426,7 +442,8 @@ public class PersistedClusterStateServiceTests extends ESTestCase {
try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) {
final PersistedClusterStateService persistedClusterStateService
= new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE) {
= new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L) {
@Override
Directory createDirectory(Path path) throws IOException {
return new FilterDirectory(super.createDirectory(path)) {
@ -758,6 +775,123 @@ public class PersistedClusterStateServiceTests extends ESTestCase {
}
}
@TestLogging(value = "org.elasticsearch.gateway:WARN", reason = "to ensure that we log gateway events on WARN level")
public void testSlowLogging() throws IOException, IllegalAccessException {
final long slowWriteLoggingThresholdMillis;
final Settings settings;
if (randomBoolean()) {
slowWriteLoggingThresholdMillis = PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD.get(Settings.EMPTY).millis();
settings = Settings.EMPTY;
} else {
slowWriteLoggingThresholdMillis = randomLongBetween(2, 100000);
settings = Settings.builder()
.put(PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD.getKey(), slowWriteLoggingThresholdMillis + "ms")
.build();
}
final DiscoveryNode localNode = new DiscoveryNode("node", buildNewFakeTransportAddress(), Version.CURRENT);
final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId())).build();
final long startTimeMillis = randomLongBetween(0L, Long.MAX_VALUE - slowWriteLoggingThresholdMillis * 10);
final AtomicLong currentTime = new AtomicLong(startTimeMillis);
final AtomicLong writeDurationMillis = new AtomicLong(slowWriteLoggingThresholdMillis);
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) {
PersistedClusterStateService persistedClusterStateService = new PersistedClusterStateService(nodeEnvironment,
xContentRegistry(),
usually()
? BigArrays.NON_RECYCLING_INSTANCE
: new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()),
clusterSettings,
() -> currentTime.getAndAdd(writeDurationMillis.get()));
try (Writer writer = persistedClusterStateService.createWriter()) {
assertExpectedLogs(1L, null, clusterState, writer, new MockLogAppender.SeenEventExpectation(
"should see warning at threshold",
PersistedClusterStateService.class.getCanonicalName(),
Level.WARN,
"writing cluster state took [*] which is above the warn threshold of [*]; " +
"wrote full state with [0] indices"));
writeDurationMillis.set(randomLongBetween(slowWriteLoggingThresholdMillis, slowWriteLoggingThresholdMillis * 2));
assertExpectedLogs(1L, null, clusterState, writer, new MockLogAppender.SeenEventExpectation(
"should see warning above threshold",
PersistedClusterStateService.class.getCanonicalName(),
Level.WARN,
"writing cluster state took [*] which is above the warn threshold of [*]; " +
"wrote full state with [0] indices"));
writeDurationMillis.set(randomLongBetween(1, slowWriteLoggingThresholdMillis - 1));
assertExpectedLogs(1L, null, clusterState, writer, new MockLogAppender.UnseenEventExpectation(
"should not see warning below threshold",
PersistedClusterStateService.class.getCanonicalName(),
Level.WARN,
"*"));
clusterSettings.applySettings(Settings.builder()
.put(PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD.getKey(), writeDurationMillis.get() + "ms")
.build());
assertExpectedLogs(1L, null, clusterState, writer, new MockLogAppender.SeenEventExpectation(
"should see warning at reduced threshold",
PersistedClusterStateService.class.getCanonicalName(),
Level.WARN,
"writing cluster state took [*] which is above the warn threshold of [*]; " +
"wrote full state with [0] indices"));
final ClusterState newClusterState = ClusterState.builder(clusterState)
.metaData(MetaData.builder(clusterState.metaData())
.version(clusterState.version())
.put(IndexMetaData.builder("test")
.settings(Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.put(IndexMetaData.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT)
.put(IndexMetaData.SETTING_INDEX_UUID, "test-uuid"))))
.incrementVersion().build();
assertExpectedLogs(1L, clusterState, newClusterState, writer, new MockLogAppender.SeenEventExpectation(
"should see warning at threshold",
PersistedClusterStateService.class.getCanonicalName(),
Level.WARN,
"writing cluster state took [*] which is above the warn threshold of [*]; " +
"wrote global metadata [false] and metadata for [1] indices and skipped [0] unchanged indices"));
writeDurationMillis.set(randomLongBetween(1, writeDurationMillis.get() - 1));
assertExpectedLogs(1L, clusterState, newClusterState, writer, new MockLogAppender.UnseenEventExpectation(
"should not see warning below threshold",
PersistedClusterStateService.class.getCanonicalName(),
Level.WARN,
"*"));
assertThat(currentTime.get(), lessThan(startTimeMillis + 14 * slowWriteLoggingThresholdMillis)); // ensure no overflow
}
}
}
private void assertExpectedLogs(long currentTerm, ClusterState previousState, ClusterState clusterState,
PersistedClusterStateService.Writer writer, MockLogAppender.LoggingExpectation expectation)
throws IllegalAccessException, IOException {
MockLogAppender mockAppender = new MockLogAppender();
mockAppender.start();
mockAppender.addExpectation(expectation);
Logger classLogger = LogManager.getLogger(PersistedClusterStateService.class);
Loggers.addAppender(classLogger, mockAppender);
try {
if (previousState == null) {
writer.writeFullStateAndCommit(currentTerm, clusterState);
} else {
writer.writeIncrementalStateAndCommit(currentTerm, previousState, clusterState);
}
} finally {
Loggers.removeAppender(classLogger, mockAppender);
mockAppender.stop();
}
mockAppender.assertAllExpectationsMatched();
}
@Override
public Settings buildEnvSettings(Settings settings) {
assertTrue(settings.hasValue(Environment.PATH_DATA_SETTING.getKey()));

View File

@ -36,6 +36,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.env.Environment;
@ -127,7 +128,8 @@ public class RemoveCorruptedShardDataCommandTests extends IndexShardTestCase {
try (NodeEnvironment.NodeLock lock = new NodeEnvironment.NodeLock(0, logger, environment, Files::exists)) {
final Path[] dataPaths = Arrays.stream(lock.getNodePaths()).filter(Objects::nonNull).map(p -> p.path).toArray(Path[]::new);
try (PersistedClusterStateService.Writer writer = new PersistedClusterStateService(dataPaths, nodeId,
xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, true).createWriter()) {
xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L, true).createWriter()) {
writer.writeFullStateAndCommit(1L, clusterState);
}
}

View File

@ -745,7 +745,9 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
final long updatedTerm = adaptCurrentTerm.apply(oldState.getCurrentTerm());
if (updatedMetaData != oldState.getLastAcceptedState().metaData() || updatedTerm != oldState.getCurrentTerm()) {
try (PersistedClusterStateService.Writer writer =
new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE)
new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
deterministicTaskQueue::getCurrentTimeMillis)
.createWriter()) {
writer.writeFullStateAndCommit(updatedTerm,
ClusterState.builder(oldState.getLastAcceptedState()).metaData(updatedMetaData).build());

View File

@ -79,6 +79,7 @@ public class MockGatewayMetaState extends GatewayMetaState {
throw new AssertionError(e);
}
start(settings, transportService, clusterService, metaStateService,
null, null, new PersistedClusterStateService(nodeEnvironment, xContentRegistry, BigArrays.NON_RECYCLING_INSTANCE));
null, null, new PersistedClusterStateService(nodeEnvironment, xContentRegistry, BigArrays.NON_RECYCLING_INSTANCE,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L));
}
}