[Zen2] Write manifest file (#35049)

Elasticsearch node is responsible for storing cluster metadata. 
There are 2 types of metadata: global metadata and index metadata. 
`GatewayMetaState` implements `ClusterStateApplier` and receives all 
`ClusterStateChanged` events and is responsible for storing modified 
metadata to disk. 

When new `ClusterStateChanged` event is received, `GatewayMetaState` 
checks if global metadata has changed and if it's the case writes new 
global metadata to disk. After that `GatewayMetaState` checks if index 
metadata has changed or there are new indices assigned to this node and 
if it's the case writes new index metadata to disk. Atomicity of global 
metadata and index metadata writes is ensured by `MetaDataStateFormat` 
class.

Unfortunately, there is no atomicity when more than one metadata changes
(global and index, or metadata for two indices). And atomicity is 
important for Zen2 correctness.
This commit adds atomicity by adding a notion of manifest file, 
represented by `MetaState` class. `MetaState` contains pointers to
current metadata.
More precisely, it stores global state generation as long and map from 
`Index` to index metadata generation as long. Atomicity of writes for 
manifest file is ensured by `MetaStateFormat` class.

The algorithm of writing changes to the disk would be the following:

1. Write global metadata state file to disk and remember
it's generation.
2. For each new/changed index write state file to disk and remember
it's generation. For each not-changed index use generation from 
previous manifest file. If index is removed or this node is no longer
responsible for this index - forget about the index.
3. Create `MetaState` object using previously remembered generations and
write it to disk.
4. Remove old state files for global metadata, indices metadata and 
manifest.

Additonally new implementation relies on enhanced `MetaDataStateFormat` 
failure semantics, `applyClusterState` throws IOException, whose 
descendant `WriteStateException` could be (and should be in Zen2) 
explicitly handled.
This commit is contained in:
Andrey Ershov 2018-11-19 19:49:44 +01:00 committed by GitHub
parent 86ef041539
commit f9ecd0c49e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1506 additions and 493 deletions

View File

@ -0,0 +1,197 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.gateway.MetaDataStateFormat;
import org.elasticsearch.index.Index;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* This class represents the manifest file, which is the entry point for reading meta data from disk.
* Metadata consists of global metadata and index metadata.
* When new version of metadata is written it's assigned some generation long value.
* Global metadata generation could be obtained by calling {@link #getGlobalGeneration()}.
* Index metadata generation could be obtained by calling {@link #getIndexGenerations()}.
*/
public class Manifest implements ToXContentFragment {
private static final long MISSING_GLOBAL_GENERATION = -1;
private final long globalGeneration;
private final Map<Index, Long> indexGenerations;
public Manifest(long globalGeneration, Map<Index, Long> indexGenerations) {
this.globalGeneration = globalGeneration;
this.indexGenerations = indexGenerations;
}
/**
* Returns global metadata generation.
*/
public long getGlobalGeneration() {
return globalGeneration;
}
/**
* Returns map from {@link Index} to index metadata generation.
*/
public Map<Index, Long> getIndexGenerations() {
return indexGenerations;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Manifest manifest = (Manifest) o;
return globalGeneration == manifest.globalGeneration &&
Objects.equals(indexGenerations, manifest.indexGenerations);
}
@Override
public int hashCode() {
return Objects.hash(globalGeneration, indexGenerations);
}
private static final String MANIFEST_FILE_PREFIX = "manifest-";
private static final ToXContent.Params MANIFEST_FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("binary", "true"));
public static final MetaDataStateFormat<Manifest> FORMAT = new MetaDataStateFormat<Manifest>(MANIFEST_FILE_PREFIX) {
@Override
public void toXContent(XContentBuilder builder, Manifest state) throws IOException {
state.toXContent(builder, MANIFEST_FORMAT_PARAMS);
}
@Override
public Manifest fromXContent(XContentParser parser) throws IOException {
return Manifest.fromXContent(parser);
}
};
/*
* Code below this comment is for XContent parsing/generation
*/
private static final ParseField GENERATION_PARSE_FIELD = new ParseField("generation");
private static final ParseField INDEX_GENERATIONS_PARSE_FIELD = new ParseField("index_generations");
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(GENERATION_PARSE_FIELD.getPreferredName(), globalGeneration);
builder.array(INDEX_GENERATIONS_PARSE_FIELD.getPreferredName(), indexEntryList().toArray());
return builder;
}
private List<IndexEntry> indexEntryList() {
return indexGenerations.entrySet().stream().
map(entry -> new IndexEntry(entry.getKey(), entry.getValue())).
collect(Collectors.toList());
}
private static long generation(Object[] generationAndListOfIndexEntries) {
return (Long) generationAndListOfIndexEntries[0];
}
private static Map<Index, Long> indices(Object[] generationAndListOfIndexEntries) {
List<IndexEntry> listOfIndices = (List<IndexEntry>) generationAndListOfIndexEntries[1];
return listOfIndices.stream().collect(Collectors.toMap(IndexEntry::getIndex, IndexEntry::getGeneration));
}
private static final ConstructingObjectParser<Manifest, Void> PARSER = new ConstructingObjectParser<>(
"manifest",
generationAndListOfIndexEntries ->
new Manifest(generation(generationAndListOfIndexEntries), indices(generationAndListOfIndexEntries)));
static {
PARSER.declareLong(ConstructingObjectParser.constructorArg(), GENERATION_PARSE_FIELD);
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), IndexEntry.INDEX_ENTRY_PARSER, INDEX_GENERATIONS_PARSE_FIELD);
}
public static Manifest fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
public boolean isEmpty() {
return globalGeneration == MISSING_GLOBAL_GENERATION && indexGenerations.isEmpty();
}
public static Manifest empty() {
return new Manifest(MISSING_GLOBAL_GENERATION, Collections.emptyMap());
}
public boolean isGlobalGenerationMissing() {
return globalGeneration == MISSING_GLOBAL_GENERATION;
}
private static final class IndexEntry implements ToXContentFragment {
private static final ParseField INDEX_GENERATION_PARSE_FIELD = new ParseField("generation");
private static final ParseField INDEX_PARSE_FIELD = new ParseField("index");
static final ConstructingObjectParser<IndexEntry, Void> INDEX_ENTRY_PARSER = new ConstructingObjectParser<>(
"indexEntry",
indexAndGeneration -> new IndexEntry((Index) indexAndGeneration[0], (long) indexAndGeneration[1]));
static {
INDEX_ENTRY_PARSER.declareField(ConstructingObjectParser.constructorArg(),
Index::fromXContent, INDEX_PARSE_FIELD, ObjectParser.ValueType.OBJECT);
INDEX_ENTRY_PARSER.declareLong(ConstructingObjectParser.constructorArg(), INDEX_GENERATION_PARSE_FIELD);
}
private final long generation;
private final Index index;
IndexEntry(Index index, long generation) {
this.index = index;
this.generation = generation;
}
public long getGeneration() {
return generation;
}
public Index getIndex() {
return index;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(INDEX_PARSE_FIELD.getPreferredName(), index);
builder.field(GENERATION_PARSE_FIELD.getPreferredName(), generation);
builder.endObject();
return builder;
}
}
}

View File

@ -390,7 +390,7 @@ public final class NodeEnvironment implements Closeable {
metaData = new NodeMetaData(generateNodeId(settings)); metaData = new NodeMetaData(generateNodeId(settings));
} }
// we write again to make sure all paths have the latest state file // we write again to make sure all paths have the latest state file
NodeMetaData.FORMAT.write(metaData, paths); NodeMetaData.FORMAT.writeAndCleanup(metaData, paths);
return metaData; return metaData;
} }

View File

@ -20,11 +20,14 @@
package org.elasticsearch.gateway; package org.elasticsearch.gateway;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier; import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.Manifest;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService; import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
@ -32,7 +35,7 @@ import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
@ -54,133 +57,253 @@ import java.util.function.BiConsumer;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.UnaryOperator; import java.util.function.UnaryOperator;
import static java.util.Collections.emptySet; /**
import static java.util.Collections.unmodifiableSet; * This class is responsible for storing/retrieving metadata to/from disk.
* When instance of this class is created, constructor ensures that this version is compatible with state stored on disk and performs
public class GatewayMetaState extends AbstractComponent implements ClusterStateApplier { * state upgrade if necessary. Also it checks that atomic move is supported on the filesystem level, because it's a must for metadata
* store algorithm.
* Please note that the state being loaded when constructing the instance of this class is NOT the state that will be used as a
* {@link ClusterState#metaData()}. Instead when node is starting up, it calls {@link #loadMetaData()} method and if this node is
* elected as master, it requests metaData from other master eligible nodes. After that, master node performs re-conciliation on the
* gathered results, re-creates {@link ClusterState} and broadcasts this state to other nodes in the cluster.
* It means that the first time {@link #applyClusterState(ClusterChangedEvent)} method is called, it won't have any previous metaData in
* memory and will iterate over all the indices in received {@link ClusterState} and store them to disk.
*/
public class GatewayMetaState implements ClusterStateApplier {
private static final Logger logger = LogManager.getLogger(GatewayMetaState.class);
private final NodeEnvironment nodeEnv; private final NodeEnvironment nodeEnv;
private final MetaStateService metaStateService; private final MetaStateService metaStateService;
private final Settings settings;
@Nullable @Nullable
private volatile MetaData previousMetaData; //there is a single thread executing applyClusterState calls, hence no volatile modifier
private Manifest previousManifest;
private volatile Set<Index> previouslyWrittenIndices = emptySet(); private MetaData previousMetaData;
public GatewayMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateService metaStateService, public GatewayMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateService metaStateService,
MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader) throws IOException { MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader) throws IOException {
this.settings = settings;
this.nodeEnv = nodeEnv; this.nodeEnv = nodeEnv;
this.metaStateService = metaStateService; this.metaStateService = metaStateService;
if (DiscoveryNode.isDataNode(settings)) { ensureNoPre019State(); //TODO remove this check, it's Elasticsearch version 7 already
ensureNoPre019ShardState(nodeEnv); ensureAtomicMoveSupported(); //TODO move this check to NodeEnvironment, because it's related to all types of metadata
upgradeMetaData(metaDataIndexUpgradeService, metaDataUpgrader);
profileLoadMetaData();
} }
if (DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings)) { private void profileLoadMetaData() throws IOException {
nodeEnv.ensureAtomicMoveSupported(); if (isMasterOrDataNode()) {
}
if (DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings)) {
try {
ensureNoPre019State();
final MetaData metaData = metaStateService.loadFullState();
final MetaData upgradedMetaData = upgradeMetaData(metaData, metaDataIndexUpgradeService, metaDataUpgrader);
// We finished global state validation and successfully checked all indices for backward compatibility
// and found no non-upgradable indices, which means the upgrade can continue.
// Now it's safe to overwrite global and index metadata.
if (metaData != upgradedMetaData) {
if (MetaData.isGlobalStateEquals(metaData, upgradedMetaData) == false) {
metaStateService.writeGlobalState("upgrade", upgradedMetaData);
}
for (IndexMetaData indexMetaData : upgradedMetaData) {
if (metaData.hasIndexMetaData(indexMetaData) == false) {
metaStateService.writeIndex("upgrade", indexMetaData);
}
}
}
long startNS = System.nanoTime(); long startNS = System.nanoTime();
metaStateService.loadFullState(); metaStateService.loadFullState();
logger.debug("took {} to load state", TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - startNS))); logger.debug("took {} to load state", TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - startNS)));
}
}
private void upgradeMetaData(MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader)
throws IOException {
if (isMasterOrDataNode()) {
try {
final Tuple<Manifest, MetaData> metaStateAndData = metaStateService.loadFullState();
final Manifest manifest = metaStateAndData.v1();
final MetaData metaData = metaStateAndData.v2();
// We finished global state validation and successfully checked all indices for backward compatibility
// and found no non-upgradable indices, which means the upgrade can continue.
// Now it's safe to overwrite global and index metadata.
// We don't re-write metadata if it's not upgraded by upgrade plugins, because
// if there is manifest file, it means metadata is properly persisted to all data paths
// if there is no manifest file (upgrade from 6.x to 7.x) metadata might be missing on some data paths,
// but anyway we will re-write it as soon as we receive first ClusterState
final AtomicClusterStateWriter writer = new AtomicClusterStateWriter(metaStateService, manifest);
final MetaData upgradedMetaData = upgradeMetaData(metaData, metaDataIndexUpgradeService, metaDataUpgrader);
final long globalStateGeneration;
if (MetaData.isGlobalStateEquals(metaData, upgradedMetaData) == false) {
globalStateGeneration = writer.writeGlobalState("upgrade", upgradedMetaData);
} else {
globalStateGeneration = manifest.getGlobalGeneration();
}
Map<Index, Long> indices = new HashMap<>(manifest.getIndexGenerations());
for (IndexMetaData indexMetaData : upgradedMetaData) {
if (metaData.hasIndexMetaData(indexMetaData) == false) {
final long generation = writer.writeIndex("upgrade", indexMetaData);
indices.put(indexMetaData.getIndex(), generation);
}
}
final Manifest newManifest = new Manifest(globalStateGeneration, indices);
writer.writeManifestAndCleanup("startup", newManifest);
} catch (Exception e) { } catch (Exception e) {
logger.error("failed to read local state, exiting...", e); logger.error("failed to read or upgrade local state, exiting...", e);
throw e; throw e;
} }
} }
} }
public MetaData loadMetaState() throws IOException { private boolean isMasterOrDataNode() {
return metaStateService.loadFullState(); return DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings);
}
private void ensureAtomicMoveSupported() throws IOException {
if (isMasterOrDataNode()) {
nodeEnv.ensureAtomicMoveSupported();
}
}
public MetaData loadMetaData() throws IOException {
return metaStateService.loadFullState().v2();
} }
@Override @Override
public void applyClusterState(ClusterChangedEvent event) { public void applyClusterState(ClusterChangedEvent event) {
if (isMasterOrDataNode() == false) {
final ClusterState state = event.state(); return;
if (state.blocks().disableStatePersistence()) { }
// reset the current metadata, we need to start fresh...
this.previousMetaData = null; if (event.state().blocks().disableStatePersistence()) {
previouslyWrittenIndices = emptySet(); // reset the current state, we need to start fresh...
previousMetaData = null;
previousManifest = null;
return; return;
} }
MetaData newMetaData = state.metaData();
// we don't check if metaData changed, since we might be called several times and we need to check dangling...
Set<Index> relevantIndices = Collections.emptySet();
boolean success = true;
// write the state if this node is a master eligible node or if it is a data node and has shards allocated on it
if (state.nodes().getLocalNode().isMasterNode() || state.nodes().getLocalNode().isDataNode()) {
if (previousMetaData == null) {
try { try {
// we determine if or if not we write meta data on data only nodes by looking at the shard routing if (previousManifest == null) {
// and only write if a shard of this index is allocated on this node previousManifest = metaStateService.loadManifestOrEmpty();
// however, closed indices do not appear in the shard routing. if the meta data for a closed index is
// updated it will therefore not be written in case the list of previouslyWrittenIndices is empty (because state
// persistence was disabled or the node was restarted), see getRelevantIndicesOnDataOnlyNode().
// we therefore have to check here if we have shards on disk and add their indices to the previouslyWrittenIndices list
if (isDataOnlyNode(state)) {
Set<Index> newPreviouslyWrittenIndices = new HashSet<>(previouslyWrittenIndices.size());
for (IndexMetaData indexMetaData : newMetaData) {
IndexMetaData indexMetaDataOnDisk = null;
if (indexMetaData.getState().equals(IndexMetaData.State.CLOSE)) {
indexMetaDataOnDisk = metaStateService.loadIndexState(indexMetaData.getIndex());
}
if (indexMetaDataOnDisk != null) {
newPreviouslyWrittenIndices.add(indexMetaDataOnDisk.getIndex());
}
}
newPreviouslyWrittenIndices.addAll(previouslyWrittenIndices);
previouslyWrittenIndices = unmodifiableSet(newPreviouslyWrittenIndices);
} }
updateMetaData(event);
} catch (Exception e) { } catch (Exception e) {
success = false; logger.warn("Exception occurred when storing new meta data", e);
}
}
// check if the global state changed?
if (previousMetaData == null || !MetaData.isGlobalStateEquals(previousMetaData, newMetaData)) {
try {
metaStateService.writeGlobalState("changed", newMetaData);
} catch (Exception e) {
success = false;
} }
} }
/**
* This class is used to write changed global {@link MetaData}, {@link IndexMetaData} and {@link Manifest} to disk.
* This class delegates <code>write*</code> calls to corresponding write calls in {@link MetaStateService} and
* additionally it keeps track of cleanup actions to be performed if transaction succeeds or fails.
*/
static class AtomicClusterStateWriter {
private static final String FINISHED_MSG = "AtomicClusterStateWriter is finished";
private final List<Runnable> commitCleanupActions;
private final List<Runnable> rollbackCleanupActions;
private final Manifest previousManifest;
private final MetaStateService metaStateService;
private boolean finished;
relevantIndices = getRelevantIndices(event.state(), event.previousState(), previouslyWrittenIndices); AtomicClusterStateWriter(MetaStateService metaStateService, Manifest previousManifest) {
final Iterable<IndexMetaWriteInfo> writeInfo = resolveStatesToBeWritten(previouslyWrittenIndices, relevantIndices, this.metaStateService = metaStateService;
previousMetaData, event.state().metaData()); assert previousManifest != null;
// check and write changes in indices this.previousManifest = previousManifest;
for (IndexMetaWriteInfo indexMetaWrite : writeInfo) { this.commitCleanupActions = new ArrayList<>();
this.rollbackCleanupActions = new ArrayList<>();
this.finished = false;
}
long writeGlobalState(String reason, MetaData metaData) throws WriteStateException {
assert finished == false : FINISHED_MSG;
try { try {
metaStateService.writeIndex(indexMetaWrite.reason, indexMetaWrite.newMetaData); rollbackCleanupActions.add(() -> metaStateService.cleanupGlobalState(previousManifest.getGlobalGeneration()));
} catch (Exception e) { long generation = metaStateService.writeGlobalState(reason, metaData);
success = false; commitCleanupActions.add(() -> metaStateService.cleanupGlobalState(generation));
} return generation;
} catch (WriteStateException e) {
rollback();
throw e;
} }
} }
if (success) { long writeIndex(String reason, IndexMetaData metaData) throws WriteStateException {
assert finished == false : FINISHED_MSG;
try {
Index index = metaData.getIndex();
Long previousGeneration = previousManifest.getIndexGenerations().get(index);
if (previousGeneration != null) {
// we prefer not to clean-up index metadata in case of rollback,
// if it's not referenced by previous manifest file
// not to break dangling indices functionality
rollbackCleanupActions.add(() -> metaStateService.cleanupIndex(index, previousGeneration));
}
long generation = metaStateService.writeIndex(reason, metaData);
commitCleanupActions.add(() -> metaStateService.cleanupIndex(index, generation));
return generation;
} catch (WriteStateException e) {
rollback();
throw e;
}
}
long writeManifestAndCleanup(String reason, Manifest manifest) throws WriteStateException {
assert finished == false : FINISHED_MSG;
try {
long generation = metaStateService.writeManifestAndCleanup(reason, manifest);
commitCleanupActions.forEach(Runnable::run);
finished = true;
return generation;
} catch (WriteStateException e) {
rollback();
throw e;
}
}
void rollback() {
rollbackCleanupActions.forEach(Runnable::run);
finished = true;
}
}
/**
* Updates meta state and meta data on disk according to {@link ClusterChangedEvent}.
*
* @throws IOException if IOException occurs. It's recommended for the callers of this method to handle {@link WriteStateException},
* which is subclass of {@link IOException} explicitly. See also {@link WriteStateException#isDirty()}.
*/
private void updateMetaData(ClusterChangedEvent event) throws IOException {
ClusterState newState = event.state();
ClusterState previousState = event.previousState();
MetaData newMetaData = newState.metaData();
final AtomicClusterStateWriter writer = new AtomicClusterStateWriter(metaStateService, previousManifest);
long globalStateGeneration = writeGlobalState(writer, newMetaData);
Map<Index, Long> indexGenerations = writeIndicesMetadata(writer, newState, previousState);
Manifest manifest = new Manifest(globalStateGeneration, indexGenerations);
writeManifest(writer, manifest);
previousMetaData = newMetaData; previousMetaData = newMetaData;
previouslyWrittenIndices = unmodifiableSet(relevantIndices); previousManifest = manifest;
} }
private void writeManifest(AtomicClusterStateWriter writer, Manifest manifest) throws IOException {
if (manifest.equals(previousManifest) == false) {
writer.writeManifestAndCleanup("changed", manifest);
}
}
private Map<Index, Long> writeIndicesMetadata(AtomicClusterStateWriter writer, ClusterState newState, ClusterState previousState)
throws IOException {
Map<Index, Long> previouslyWrittenIndices = previousManifest.getIndexGenerations();
Set<Index> relevantIndices = getRelevantIndices(newState, previousState, previouslyWrittenIndices.keySet());
Map<Index, Long> newIndices = new HashMap<>();
Iterable<IndexMetaDataAction> actions = resolveIndexMetaDataActions(previouslyWrittenIndices, relevantIndices, previousMetaData,
newState.metaData());
for (IndexMetaDataAction action : actions) {
long generation = action.execute(writer);
newIndices.put(action.getIndex(), generation);
}
return newIndices;
}
private long writeGlobalState(AtomicClusterStateWriter writer, MetaData newMetaData) throws IOException {
if (previousMetaData == null || MetaData.isGlobalStateEquals(previousMetaData, newMetaData) == false) {
return writer.writeGlobalState("changed", newMetaData);
}
return previousManifest.getGlobalGeneration();
} }
public static Set<Index> getRelevantIndices(ClusterState state, ClusterState previousState, Set<Index> previouslyWrittenIndices) { public static Set<Index> getRelevantIndices(ClusterState state, ClusterState previousState, Set<Index> previouslyWrittenIndices) {
@ -196,14 +319,24 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateA
} }
protected static boolean isDataOnlyNode(ClusterState state) { private static boolean isDataOnlyNode(ClusterState state) {
return ((state.nodes().getLocalNode().isMasterNode() == false) && state.nodes().getLocalNode().isDataNode()); return ((state.nodes().getLocalNode().isMasterNode() == false) && state.nodes().getLocalNode().isDataNode());
} }
private void ensureNoPre019State() throws IOException {
if (DiscoveryNode.isDataNode(settings)) {
ensureNoPre019ShardState();
}
if (isMasterOrDataNode()) {
ensureNoPre019MetadataFiles();
}
}
/** /**
* Throws an IAE if a pre 0.19 state is detected * Throws an IAE if a pre 0.19 state is detected
*/ */
private void ensureNoPre019State() throws IOException { private void ensureNoPre019MetadataFiles() throws IOException {
for (Path dataLocation : nodeEnv.nodeDataPaths()) { for (Path dataLocation : nodeEnv.nodeDataPaths()) {
final Path stateLocation = dataLocation.resolve(MetaDataStateFormat.STATE_DIR_NAME); final Path stateLocation = dataLocation.resolve(MetaDataStateFormat.STATE_DIR_NAME);
if (!Files.exists(stateLocation)) { if (!Files.exists(stateLocation)) {
@ -225,6 +358,22 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateA
} }
} }
// shard state BWC
private void ensureNoPre019ShardState() throws IOException {
for (Path dataLocation : nodeEnv.nodeDataPaths()) {
final Path stateLocation = dataLocation.resolve(MetaDataStateFormat.STATE_DIR_NAME);
if (Files.exists(stateLocation)) {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(stateLocation, "shards-*")) {
for (Path stateFile : stream) {
throw new IllegalStateException("Detected pre 0.19 shard state file please upgrade to a version before "
+ Version.CURRENT.minimumIndexCompatibilityVersion()
+ " first to upgrade state structures - shard state found: [" + stateFile.getParent().toAbsolutePath());
}
}
}
}
}
/** /**
* Elasticsearch 2.0 removed several deprecated features and as well as support for Lucene 3.x. This method calls * Elasticsearch 2.0 removed several deprecated features and as well as support for Lucene 3.x. This method calls
* {@link MetaDataIndexUpgradeService} to makes sure that indices are compatible with the current version. The * {@link MetaDataIndexUpgradeService} to makes sure that indices are compatible with the current version. The
@ -235,7 +384,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateA
*/ */
static MetaData upgradeMetaData(MetaData metaData, static MetaData upgradeMetaData(MetaData metaData,
MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataIndexUpgradeService metaDataIndexUpgradeService,
MetaDataUpgrader metaDataUpgrader) throws IOException { MetaDataUpgrader metaDataUpgrader) {
// upgrade index meta data // upgrade index meta data
boolean changed = false; boolean changed = false;
final MetaData.Builder upgradedMetaData = MetaData.builder(metaData); final MetaData.Builder upgradedMetaData = MetaData.builder(metaData);
@ -247,7 +396,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateA
} }
// upgrade global custom meta data // upgrade global custom meta data
if (applyPluginUpgraders(metaData.getCustoms(), metaDataUpgrader.customMetaDataUpgraders, if (applyPluginUpgraders(metaData.getCustoms(), metaDataUpgrader.customMetaDataUpgraders,
upgradedMetaData::removeCustom,upgradedMetaData::putCustom)) { upgradedMetaData::removeCustom, upgradedMetaData::putCustom)) {
changed = true; changed = true;
} }
// upgrade current templates // upgrade current templates
@ -280,57 +429,51 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateA
return false; return false;
} }
// shard state BWC
private void ensureNoPre019ShardState(NodeEnvironment nodeEnv) throws IOException {
for (Path dataLocation : nodeEnv.nodeDataPaths()) {
final Path stateLocation = dataLocation.resolve(MetaDataStateFormat.STATE_DIR_NAME);
if (Files.exists(stateLocation)) {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(stateLocation, "shards-*")) {
for (Path stateFile : stream) {
throw new IllegalStateException("Detected pre 0.19 shard state file please upgrade to a version before "
+ Version.CURRENT.minimumIndexCompatibilityVersion()
+ " first to upgrade state structures - shard state found: [" + stateFile.getParent().toAbsolutePath());
}
}
}
}
}
/** /**
* Loads the current meta state for each index in the new cluster state and checks if it has to be persisted. * Returns list of {@link IndexMetaDataAction} for each relevant index.
* Each index state that should be written to disk will be returned. This is only run for data only nodes. * For each relevant index there are 3 options:
* It will return only the states for indices that actually have a shard allocated on the current node. * <ol>
* <li>
* {@link KeepPreviousGeneration} - index metadata is already stored to disk and index metadata version is not changed, no
* action is required.
* </li>
* <li>
* {@link WriteNewIndexMetaData} - there is no index metadata on disk and index metadata for this index should be written.
* </li>
* <li>
* {@link WriteChangedIndexMetaData} - index metadata is already on disk, but index metadata version has changed. Updated
* index metadata should be written to disk.
* </li>
* </ol>
* *
* @param previouslyWrittenIndices A list of indices for which the state was already written before * @param previouslyWrittenIndices A list of indices for which the state was already written before
* @param potentiallyUnwrittenIndices The list of indices for which state should potentially be written * @param relevantIndices The list of indices for which state should potentially be written
* @param previousMetaData The last meta data we know of. meta data for all indices in previouslyWrittenIndices list is * @param previousMetaData The last meta data we know of
* persisted now
* @param newMetaData The new metadata * @param newMetaData The new metadata
* @return iterable over all indices states that should be written to disk * @return list of {@link IndexMetaDataAction} for each relevant index.
*/ */
public static Iterable<GatewayMetaState.IndexMetaWriteInfo> resolveStatesToBeWritten(Set<Index> previouslyWrittenIndices, public static List<IndexMetaDataAction> resolveIndexMetaDataActions(Map<Index, Long> previouslyWrittenIndices,
Set<Index> potentiallyUnwrittenIndices, Set<Index> relevantIndices,
MetaData previousMetaData, MetaData newMetaData) { MetaData previousMetaData,
List<GatewayMetaState.IndexMetaWriteInfo> indicesToWrite = new ArrayList<>(); MetaData newMetaData) {
for (Index index : potentiallyUnwrittenIndices) { List<IndexMetaDataAction> actions = new ArrayList<>();
for (Index index : relevantIndices) {
IndexMetaData newIndexMetaData = newMetaData.getIndexSafe(index); IndexMetaData newIndexMetaData = newMetaData.getIndexSafe(index);
IndexMetaData previousIndexMetaData = previousMetaData == null ? null : previousMetaData.index(index); IndexMetaData previousIndexMetaData = previousMetaData == null ? null : previousMetaData.index(index);
String writeReason = null;
if (previouslyWrittenIndices.contains(index) == false || previousIndexMetaData == null) { if (previouslyWrittenIndices.containsKey(index) == false || previousIndexMetaData == null) {
writeReason = "freshly created"; actions.add(new WriteNewIndexMetaData(newIndexMetaData));
} else if (previousIndexMetaData.getVersion() != newIndexMetaData.getVersion()) { } else if (previousIndexMetaData.getVersion() != newIndexMetaData.getVersion()) {
writeReason = "version changed from [" + previousIndexMetaData.getVersion() + "] to [" + actions.add(new WriteChangedIndexMetaData(previousIndexMetaData, newIndexMetaData));
newIndexMetaData.getVersion() + "]"; } else {
} actions.add(new KeepPreviousGeneration(index, previouslyWrittenIndices.get(index)));
if (writeReason != null) {
indicesToWrite.add(new GatewayMetaState.IndexMetaWriteInfo(newIndexMetaData, previousIndexMetaData, writeReason));
} }
} }
return indicesToWrite; return actions;
} }
public static Set<Index> getRelevantIndicesOnDataOnlyNode(ClusterState state, ClusterState previousState, private static Set<Index> getRelevantIndicesOnDataOnlyNode(ClusterState state, ClusterState previousState, Set<Index>
Set<Index> previouslyWrittenIndices) { previouslyWrittenIndices) {
RoutingNode newRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); RoutingNode newRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
if (newRoutingNode == null) { if (newRoutingNode == null) {
throw new IllegalStateException("cluster state does not contain this node - cannot write index meta state"); throw new IllegalStateException("cluster state does not contain this node - cannot write index meta state");
@ -356,7 +499,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateA
return indices; return indices;
} }
public static Set<Index> getRelevantIndicesForMasterEligibleNode(ClusterState state) { private static Set<Index> getRelevantIndicesForMasterEligibleNode(ClusterState state) {
Set<Index> relevantIndices; Set<Index> relevantIndices;
relevantIndices = new HashSet<>(); relevantIndices = new HashSet<>();
// we have to iterate over the metadata to make sure we also capture closed indices // we have to iterate over the metadata to make sure we also capture closed indices
@ -366,24 +509,81 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateA
return relevantIndices; return relevantIndices;
} }
/**
* Action to perform with index metadata.
*/
public interface IndexMetaDataAction {
/**
* @return index for index metadata.
*/
Index getIndex();
public static class IndexMetaWriteInfo { /**
final IndexMetaData newMetaData; * Executes this action using provided {@link AtomicClusterStateWriter}.
final String reason; *
final IndexMetaData previousMetaData; * @return new index metadata state generation, to be used in manifest file.
* @throws WriteStateException if exception occurs.
public IndexMetaWriteInfo(IndexMetaData newMetaData, IndexMetaData previousMetaData, String reason) { */
this.newMetaData = newMetaData; long execute(AtomicClusterStateWriter writer) throws WriteStateException;
this.reason = reason;
this.previousMetaData = previousMetaData;
} }
public IndexMetaData getNewMetaData() { public static class KeepPreviousGeneration implements IndexMetaDataAction {
return newMetaData; private final Index index;
private final long generation;
KeepPreviousGeneration(Index index, long generation) {
this.index = index;
this.generation = generation;
} }
public String getReason() { @Override
return reason; public Index getIndex() {
return index;
}
@Override
public long execute(AtomicClusterStateWriter writer) {
return generation;
}
}
public static class WriteNewIndexMetaData implements IndexMetaDataAction {
private final IndexMetaData indexMetaData;
WriteNewIndexMetaData(IndexMetaData indexMetaData) {
this.indexMetaData = indexMetaData;
}
@Override
public Index getIndex() {
return indexMetaData.getIndex();
}
@Override
public long execute(AtomicClusterStateWriter writer) throws WriteStateException {
return writer.writeIndex("freshly created", indexMetaData);
}
}
public static class WriteChangedIndexMetaData implements IndexMetaDataAction {
private final IndexMetaData newIndexMetaData;
private final IndexMetaData oldIndexMetaData;
WriteChangedIndexMetaData(IndexMetaData oldIndexMetaData, IndexMetaData newIndexMetaData) {
this.oldIndexMetaData = oldIndexMetaData;
this.newIndexMetaData = newIndexMetaData;
}
@Override
public Index getIndex() {
return newIndexMetaData.getIndex();
}
@Override
public long execute(AtomicClusterStateWriter writer) throws WriteStateException {
return writer.writeIndex(
"version changed from [" + oldIndexMetaData.getVersion() + "] to [" + newIndexMetaData.getVersion() + "]",
newIndexMetaData);
} }
} }
} }

View File

@ -114,7 +114,6 @@ public abstract class MetaDataStateFormat<T> {
// in order to write the footer we need to prevent closing the actual index input. // in order to write the footer we need to prevent closing the actual index input.
} }
})) { })) {
builder.startObject(); builder.startObject();
toXContent(builder, state); toXContent(builder, state);
builder.endObject(); builder.endObject();
@ -177,20 +176,40 @@ public abstract class MetaDataStateFormat<T> {
} }
} }
/**
* Writes the given state to the given directories and performs cleanup of old state files if the write succeeds or
* newly created state file if write fails.
* See also {@link #write(Object, Path...)} and {@link #cleanupOldFiles(long, Path[])}.
*/
public final long writeAndCleanup(final T state, final Path... locations) throws WriteStateException {
return write(state, true, locations);
}
/** /**
* Writes the given state to the given directories. The state is written to a * Writes the given state to the given directories. The state is written to a
* state directory ({@value #STATE_DIR_NAME}) underneath each of the given file locations and is created if it * state directory ({@value #STATE_DIR_NAME}) underneath each of the given file locations and is created if it
* doesn't exist. The state is serialized to a temporary file in that directory and is then atomically moved to * doesn't exist. The state is serialized to a temporary file in that directory and is then atomically moved to
* it's target filename of the pattern {@code {prefix}{version}.st}. * it's target filename of the pattern {@code {prefix}{version}.st}.
* If this method returns without exception there is a guarantee that state is persisted to the disk and loadLatestState will return * If this method returns without exception there is a guarantee that state is persisted to the disk and loadLatestState will return
* it. * it.<br>
* This method always performs cleanup of temporary files regardless whether it succeeds or fails. Cleanup logic for state files is
* more involved.
* If this method fails with an exception, it performs cleanup of newly created state file.
* But if this method succeeds, it does not perform cleanup of old state files.
* If this write succeeds, but some further write fails, you may want to rollback the transaction and keep old file around.
* After transaction is finished use {@link #cleanupOldFiles(long, Path[])} for the clean-up.
* If this write is not a part of bigger transaction, consider using {@link #writeAndCleanup(Object, Path...)} method instead.
* *
* @param state the state object to write * @param state the state object to write
* @param locations the locations where the state should be written to. * @param locations the locations where the state should be written to.
* @throws WriteStateException if some exception during writing state occurs. See also {@link WriteStateException#isDirty()}. * @throws WriteStateException if some exception during writing state occurs. See also {@link WriteStateException#isDirty()}.
* @return generation of newly written state.
*/ */
public final long write(final T state, final Path... locations) throws WriteStateException {
return write(state, false, locations);
}
public final void write(final T state, final Path... locations) throws WriteStateException { private long write(final T state, boolean cleanup, final Path... locations) throws WriteStateException {
if (locations == null) { if (locations == null) {
throw new IllegalArgumentException("Locations must not be null"); throw new IllegalArgumentException("Locations must not be null");
} }
@ -198,15 +217,16 @@ public abstract class MetaDataStateFormat<T> {
throw new IllegalArgumentException("One or more locations required"); throw new IllegalArgumentException("One or more locations required");
} }
long maxStateId; final long oldGenerationId, newGenerationId;
try { try {
maxStateId = findMaxStateId(prefix, locations) + 1; oldGenerationId = findMaxGenerationId(prefix, locations);
newGenerationId = oldGenerationId + 1;
} catch (Exception e) { } catch (Exception e) {
throw new WriteStateException(false, "exception during looking up max state id", e); throw new WriteStateException(false, "exception during looking up new generation id", e);
} }
assert maxStateId >= 0 : "maxStateId must be positive but was: [" + maxStateId + "]"; assert newGenerationId >= 0 : "newGenerationId must be positive but was: [" + oldGenerationId + "]";
final String fileName = prefix + maxStateId + STATE_FILE_EXTENSION; final String fileName = getStateFileName(newGenerationId);
final String tmpFileName = fileName + ".tmp"; final String tmpFileName = fileName + ".tmp";
List<Tuple<Path, Directory>> directories = new ArrayList<>(); List<Tuple<Path, Directory>> directories = new ArrayList<>();
@ -224,6 +244,11 @@ public abstract class MetaDataStateFormat<T> {
copyStateToExtraLocations(directories, tmpFileName); copyStateToExtraLocations(directories, tmpFileName);
performRenames(tmpFileName, fileName, directories); performRenames(tmpFileName, fileName, directories);
performStateDirectoriesFsync(directories); performStateDirectoriesFsync(directories);
} catch (WriteStateException e) {
if (cleanup) {
cleanupOldFiles(oldGenerationId, locations);
}
throw e;
} finally { } finally {
for (Tuple<Path, Directory> pathAndDirectory : directories) { for (Tuple<Path, Directory> pathAndDirectory : directories) {
deleteFileIgnoreExceptions(pathAndDirectory.v1(), pathAndDirectory.v2(), tmpFileName); deleteFileIgnoreExceptions(pathAndDirectory.v1(), pathAndDirectory.v2(), tmpFileName);
@ -231,7 +256,11 @@ public abstract class MetaDataStateFormat<T> {
} }
} }
cleanupOldFiles(fileName, locations); if (cleanup) {
cleanupOldFiles(newGenerationId, locations);
}
return newGenerationId;
} }
protected XContentBuilder newXContentBuilder(XContentType type, OutputStream stream ) throws IOException { protected XContentBuilder newXContentBuilder(XContentType type, OutputStream stream ) throws IOException {
@ -284,13 +313,21 @@ public abstract class MetaDataStateFormat<T> {
return new SimpleFSDirectory(dir); return new SimpleFSDirectory(dir);
} }
private void cleanupOldFiles(final String currentStateFile, Path[] locations) {
/**
* Clean ups all state files not matching passed generation.
*
* @param currentGeneration state generation to keep.
* @param locations state paths.
*/
public void cleanupOldFiles(final long currentGeneration, Path[] locations) {
final String fileNameToKeep = getStateFileName(currentGeneration);
for (Path location : locations) { for (Path location : locations) {
logger.trace("cleanupOldFiles: cleaning up {}", location); logger.trace("cleanupOldFiles: cleaning up {}", location);
Path stateLocation = location.resolve(STATE_DIR_NAME); Path stateLocation = location.resolve(STATE_DIR_NAME);
try (Directory stateDir = newDirectory(stateLocation)) { try (Directory stateDir = newDirectory(stateLocation)) {
for (String file : stateDir.listAll()) { for (String file : stateDir.listAll()) {
if (file.startsWith(prefix) && file.equals(currentStateFile) == false) { if (file.startsWith(prefix) && file.equals(fileNameToKeep) == false) {
deleteFileIgnoreExceptions(stateLocation, stateDir, file); deleteFileIgnoreExceptions(stateLocation, stateDir, file);
} }
} }
@ -308,7 +345,7 @@ public abstract class MetaDataStateFormat<T> {
* @return maximum id of state file or -1 if no such files are found * @return maximum id of state file or -1 if no such files are found
* @throws IOException if IOException occurs * @throws IOException if IOException occurs
*/ */
private long findMaxStateId(final String prefix, Path... locations) throws IOException { private long findMaxGenerationId(final String prefix, Path... locations) throws IOException {
long maxId = -1; long maxId = -1;
for (Path dataLocation : locations) { for (Path dataLocation : locations) {
final Path resolve = dataLocation.resolve(STATE_DIR_NAME); final Path resolve = dataLocation.resolve(STATE_DIR_NAME);
@ -333,7 +370,7 @@ public abstract class MetaDataStateFormat<T> {
return files; return files;
} }
final String fileName = prefix + generation + STATE_FILE_EXTENSION; final String fileName = getStateFileName(generation);
for (Path dataLocation : locations) { for (Path dataLocation : locations) {
final Path stateFilePath = dataLocation.resolve(STATE_DIR_NAME).resolve(fileName); final Path stateFilePath = dataLocation.resolve(STATE_DIR_NAME).resolve(fileName);
if (Files.exists(stateFilePath)) { if (Files.exists(stateFilePath)) {
@ -345,32 +382,27 @@ public abstract class MetaDataStateFormat<T> {
return files; return files;
} }
/** private String getStateFileName(long generation) {
* Tries to load the latest state from the given data-locations. It tries to load the latest state determined by return prefix + generation + STATE_FILE_EXTENSION;
* the states version from one or more data directories and if none of the latest states can be loaded an exception
* is thrown to prevent accidentally loading a previous state and silently omitting the latest state.
*
* @param logger a logger instance
* @param dataLocations the data-locations to try.
* @return the latest state or <code>null</code> if no state was found.
*/
public T loadLatestState(Logger logger, NamedXContentRegistry namedXContentRegistry, Path... dataLocations) throws IOException {
long maxStateId = findMaxStateId(prefix, dataLocations);
List<Path> stateFiles = findStateFilesByGeneration(maxStateId, dataLocations);
if (maxStateId > -1 && stateFiles.isEmpty()) {
throw new IllegalStateException("unable to find state files with state id " + maxStateId +
" returned by findMaxStateId function, in data folders [" +
Arrays.stream(dataLocations).map(Path::toAbsolutePath).
map(Object::toString).collect(Collectors.joining(", ")) +
"], concurrent writes?");
} }
/**
* Tries to load the state of particular generation from the given data-locations. If any of data locations contain state files with
* given generation, state will be loaded from these state files.
*
* @param logger a logger instance.
* @param generation the generation to be loaded.
* @param dataLocations the data-locations to try.
* @return the state of asked generation or <code>null</code> if no state was found.
*/
public T loadGeneration(Logger logger, NamedXContentRegistry namedXContentRegistry, long generation, Path... dataLocations) {
List<Path> stateFiles = findStateFilesByGeneration(generation, dataLocations);
final List<Throwable> exceptions = new ArrayList<>(); final List<Throwable> exceptions = new ArrayList<>();
for (Path stateFile : stateFiles) { for (Path stateFile : stateFiles) {
try { try {
T state = read(namedXContentRegistry, stateFile); T state = read(namedXContentRegistry, stateFile);
logger.trace("state id [{}] read from [{}]", maxStateId, stateFile.getFileName()); logger.trace("generation id [{}] read from [{}]", generation, stateFile.getFileName());
return state; return state;
} catch (Exception e) { } catch (Exception e) {
exceptions.add(new IOException("failed to read " + stateFile.toAbsolutePath(), e)); exceptions.add(new IOException("failed to read " + stateFile.toAbsolutePath(), e));
@ -388,6 +420,40 @@ public abstract class MetaDataStateFormat<T> {
return null; return null;
} }
/**
* Tries to load the latest state from the given data-locations.
*
* @param logger a logger instance.
* @param dataLocations the data-locations to try.
* @return tuple of the latest state and generation. (-1, null) if no state is found.
*/
public Tuple<T, Long> loadLatestStateWithGeneration(Logger logger, NamedXContentRegistry namedXContentRegistry, Path... dataLocations)
throws IOException {
long generation = findMaxGenerationId(prefix, dataLocations);
T state = loadGeneration(logger, namedXContentRegistry, generation, dataLocations);
if (generation > -1 && state == null) {
throw new IllegalStateException("unable to find state files with generation id " + generation +
" returned by findMaxGenerationId function, in data folders [" +
Arrays.stream(dataLocations).map(Path::toAbsolutePath).
map(Object::toString).collect(Collectors.joining(", ")) +
"], concurrent writes?");
}
return Tuple.tuple(state, generation);
}
/**
* Tries to load the latest state from the given data-locations.
*
* @param logger a logger instance.
* @param dataLocations the data-locations to try.
* @return the latest state or <code>null</code> if no state was found.
*/
public T loadLatestState(Logger logger, NamedXContentRegistry namedXContentRegistry, Path... dataLocations) throws
IOException {
return loadLatestStateWithGeneration(logger, namedXContentRegistry, dataLocations).v1();
}
/** /**
* Deletes all meta state directories recursively for the given data locations * Deletes all meta state directories recursively for the given data locations
* @param dataLocations the data location to delete * @param dataLocations the data location to delete
@ -399,4 +465,8 @@ public abstract class MetaDataStateFormat<T> {
} }
IOUtils.rm(stateDirectories); IOUtils.rm(stateDirectories);
} }
String getPrefix() {
return prefix;
}
} }

View File

@ -19,55 +19,127 @@
package org.elasticsearch.gateway; package org.elasticsearch.gateway;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.Manifest;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.function.Predicate; import java.util.function.Predicate;
/** /**
* Handles writing and loading both {@link MetaData} and {@link IndexMetaData} * Handles writing and loading {@link Manifest}, {@link MetaData} and {@link IndexMetaData}
*/ */
public class MetaStateService extends AbstractComponent { public class MetaStateService {
private static final Logger logger = LogManager.getLogger(MetaStateService.class);
private final NodeEnvironment nodeEnv; private final NodeEnvironment nodeEnv;
private final NamedXContentRegistry namedXContentRegistry; private final NamedXContentRegistry namedXContentRegistry;
// we allow subclasses in tests to redefine formats, e.g. to inject failures
protected MetaDataStateFormat<MetaData> META_DATA_FORMAT = MetaData.FORMAT;
protected MetaDataStateFormat<IndexMetaData> INDEX_META_DATA_FORMAT = IndexMetaData.FORMAT;
protected MetaDataStateFormat<Manifest> MANIFEST_FORMAT = Manifest.FORMAT;
public MetaStateService(NodeEnvironment nodeEnv, NamedXContentRegistry namedXContentRegistry) { public MetaStateService(NodeEnvironment nodeEnv, NamedXContentRegistry namedXContentRegistry) {
this.nodeEnv = nodeEnv; this.nodeEnv = nodeEnv;
this.namedXContentRegistry = namedXContentRegistry; this.namedXContentRegistry = namedXContentRegistry;
} }
/** /**
* Loads the full state, which includes both the global state and all the indices * Loads the full state, which includes both the global state and all the indices meta data. <br>
* meta state. * When loading, manifest file is consulted (represented by {@link Manifest} class), to load proper generations. <br>
* If there is no manifest file on disk, this method fallbacks to BWC mode, where latest generation of global and indices
* metadata is loaded. Please note that currently there is no way to distinguish between manifest file being removed and manifest
* file was not yet created. It means that this method always fallbacks to BWC mode, if there is no manifest file.
*
* @return tuple of {@link Manifest} and {@link MetaData} with global metadata and indices metadata. If there is no state on disk,
* meta state with globalGeneration -1 and empty meta data is returned.
* @throws IOException if some IOException when loading files occurs or there is no metadata referenced by manifest file.
*/ */
MetaData loadFullState() throws IOException { Tuple<Manifest, MetaData> loadFullState() throws IOException {
MetaData globalMetaData = loadGlobalState(); final Manifest manifest = MANIFEST_FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.nodeDataPaths());
MetaData.Builder metaDataBuilder; if (manifest == null) {
return loadFullStateBWC();
}
final MetaData.Builder metaDataBuilder;
if (manifest.isGlobalGenerationMissing()) {
metaDataBuilder = MetaData.builder();
} else {
final MetaData globalMetaData = META_DATA_FORMAT.loadGeneration(logger, namedXContentRegistry, manifest.getGlobalGeneration(),
nodeEnv.nodeDataPaths());
if (globalMetaData != null) { if (globalMetaData != null) {
metaDataBuilder = MetaData.builder(globalMetaData); metaDataBuilder = MetaData.builder(globalMetaData);
} else { } else {
metaDataBuilder = MetaData.builder(); throw new IOException("failed to find global metadata [generation: " + manifest.getGlobalGeneration() + "]");
} }
for (String indexFolderName : nodeEnv.availableIndexFolders()) { }
IndexMetaData indexMetaData = IndexMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry,
for (Map.Entry<Index, Long> entry : manifest.getIndexGenerations().entrySet()) {
final Index index = entry.getKey();
final long generation = entry.getValue();
final String indexFolderName = index.getUUID();
final IndexMetaData indexMetaData = INDEX_META_DATA_FORMAT.loadGeneration(logger, namedXContentRegistry, generation,
nodeEnv.resolveIndexFolder(indexFolderName)); nodeEnv.resolveIndexFolder(indexFolderName));
if (indexMetaData != null) { if (indexMetaData != null) {
metaDataBuilder.put(indexMetaData, false); metaDataBuilder.put(indexMetaData, false);
} else {
throw new IOException("failed to find metadata for existing index " + index.getName() + " [location: " + indexFolderName +
", generation: " + generation + "]");
}
}
return new Tuple<>(manifest, metaDataBuilder.build());
}
/**
* "Manifest-less" BWC version of loading metadata from disk. See also {@link #loadFullState()}
*/
private Tuple<Manifest, MetaData> loadFullStateBWC() throws IOException {
Map<Index, Long> indices = new HashMap<>();
MetaData.Builder metaDataBuilder;
Tuple<MetaData, Long> metaDataAndGeneration =
META_DATA_FORMAT.loadLatestStateWithGeneration(logger, namedXContentRegistry, nodeEnv.nodeDataPaths());
MetaData globalMetaData = metaDataAndGeneration.v1();
long globalStateGeneration = metaDataAndGeneration.v2();
if (globalMetaData != null) {
metaDataBuilder = MetaData.builder(globalMetaData);
assert Version.CURRENT.major < 8 : "failed to find manifest file, which is mandatory staring with Elasticsearch version 8.0";
} else {
metaDataBuilder = MetaData.builder();
}
for (String indexFolderName : nodeEnv.availableIndexFolders()) {
Tuple<IndexMetaData, Long> indexMetaDataAndGeneration =
INDEX_META_DATA_FORMAT.loadLatestStateWithGeneration(logger, namedXContentRegistry,
nodeEnv.resolveIndexFolder(indexFolderName));
assert Version.CURRENT.major < 8 : "failed to find manifest file, which is mandatory staring with Elasticsearch version 8.0";
IndexMetaData indexMetaData = indexMetaDataAndGeneration.v1();
long generation = indexMetaDataAndGeneration.v2();
if (indexMetaData != null) {
indices.put(indexMetaData.getIndex(), generation);
metaDataBuilder.put(indexMetaData, false);
} else { } else {
logger.debug("[{}] failed to find metadata for existing index location", indexFolderName); logger.debug("[{}] failed to find metadata for existing index location", indexFolderName);
} }
} }
return metaDataBuilder.build();
Manifest manifest = new Manifest(globalStateGeneration, indices);
return new Tuple<>(manifest, metaDataBuilder.build());
} }
/** /**
@ -75,7 +147,7 @@ public class MetaStateService extends AbstractComponent {
*/ */
@Nullable @Nullable
public IndexMetaData loadIndexState(Index index) throws IOException { public IndexMetaData loadIndexState(Index index) throws IOException {
return IndexMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.indexPaths(index)); return INDEX_META_DATA_FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.indexPaths(index));
} }
/** /**
@ -86,14 +158,14 @@ public class MetaStateService extends AbstractComponent {
for (String indexFolderName : nodeEnv.availableIndexFolders(excludeIndexPathIdsPredicate)) { for (String indexFolderName : nodeEnv.availableIndexFolders(excludeIndexPathIdsPredicate)) {
assert excludeIndexPathIdsPredicate.test(indexFolderName) == false : assert excludeIndexPathIdsPredicate.test(indexFolderName) == false :
"unexpected folder " + indexFolderName + " which should have been excluded"; "unexpected folder " + indexFolderName + " which should have been excluded";
IndexMetaData indexMetaData = IndexMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry, IndexMetaData indexMetaData = INDEX_META_DATA_FORMAT.loadLatestState(logger, namedXContentRegistry,
nodeEnv.resolveIndexFolder(indexFolderName)); nodeEnv.resolveIndexFolder(indexFolderName));
if (indexMetaData != null) { if (indexMetaData != null) {
final String indexPathId = indexMetaData.getIndex().getUUID(); final String indexPathId = indexMetaData.getIndex().getUUID();
if (indexFolderName.equals(indexPathId)) { if (indexFolderName.equals(indexPathId)) {
indexMetaDataList.add(indexMetaData); indexMetaDataList.add(indexMetaData);
} else { } else {
throw new IllegalStateException("[" + indexFolderName+ "] invalid index folder name, rename to [" + indexPathId + "]"); throw new IllegalStateException("[" + indexFolderName + "] invalid index folder name, rename to [" + indexPathId + "]");
} }
} else { } else {
logger.debug("[{}] failed to find metadata for existing index location", indexFolderName); logger.debug("[{}] failed to find metadata for existing index location", indexFolderName);
@ -102,42 +174,121 @@ public class MetaStateService extends AbstractComponent {
return indexMetaDataList; return indexMetaDataList;
} }
/**
* Loads Manifest file from disk, returns <code>Manifest.empty()</code> if there is no manifest file.
*/
public Manifest loadManifestOrEmpty() throws IOException {
Manifest manifest = MANIFEST_FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.nodeDataPaths());
if (manifest == null) {
manifest = Manifest.empty();
}
return manifest;
}
/** /**
* Loads the global state, *without* index state, see {@link #loadFullState()} for that. * Loads the global state, *without* index state, see {@link #loadFullState()} for that.
*/ */
MetaData loadGlobalState() throws IOException { MetaData loadGlobalState() throws IOException {
return MetaData.FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.nodeDataPaths()); return META_DATA_FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.nodeDataPaths());
}
/**
* Writes manifest file (represented by {@link Manifest}) to disk and performs cleanup of old manifest state file if
* the write succeeds or newly created manifest state if the write fails.
*
* @throws WriteStateException if exception when writing state occurs. See also {@link WriteStateException#isDirty()}
*/
public long writeManifestAndCleanup(String reason, Manifest manifest) throws WriteStateException {
logger.trace("[_meta] writing state, reason [{}]", reason);
try {
long generation = MANIFEST_FORMAT.writeAndCleanup(manifest, nodeEnv.nodeDataPaths());
logger.trace("[_meta] state written (generation: {})", generation);
return generation;
} catch (WriteStateException ex) {
throw new WriteStateException(ex.isDirty(), "[_meta]: failed to write meta state", ex);
}
} }
/** /**
* Writes the index state. * Writes the index state.
* * <p>
* This method is public for testing purposes. * This method is public for testing purposes.
*
* @throws WriteStateException if exception when writing state occurs. {@link WriteStateException#isDirty()} will always return
* false, because new index state file is not yet referenced by manifest file.
*/ */
public void writeIndex(String reason, IndexMetaData indexMetaData) throws IOException { public long writeIndex(String reason, IndexMetaData indexMetaData) throws WriteStateException {
final Index index = indexMetaData.getIndex(); final Index index = indexMetaData.getIndex();
logger.trace("[{}] writing state, reason [{}]", index, reason); logger.trace("[{}] writing state, reason [{}]", index, reason);
try { try {
IndexMetaData.FORMAT.write(indexMetaData, long generation = INDEX_META_DATA_FORMAT.write(indexMetaData,
nodeEnv.indexPaths(indexMetaData.getIndex())); nodeEnv.indexPaths(indexMetaData.getIndex()));
logger.trace("[{}] state written", index); logger.trace("[{}] state written", index);
} catch (Exception ex) { return generation;
logger.warn(() -> new ParameterizedMessage("[{}]: failed to write index state", index), ex); } catch (WriteStateException ex) {
throw new IOException("failed to write state for [" + index + "]", ex); throw new WriteStateException(false, "[" + index + "]: failed to write index state", ex);
} }
} }
/** /**
* Writes the global state, *without* the indices states. * Writes the global state, *without* the indices states.
*
* @throws WriteStateException if exception when writing state occurs. {@link WriteStateException#isDirty()} will always return
* false, because new global state file is not yet referenced by manifest file.
*/ */
void writeGlobalState(String reason, MetaData metaData) throws IOException { long writeGlobalState(String reason, MetaData metaData) throws WriteStateException {
logger.trace("[_global] writing state, reason [{}]", reason); logger.trace("[_global] writing state, reason [{}]", reason);
try { try {
MetaData.FORMAT.write(metaData, nodeEnv.nodeDataPaths()); long generation = META_DATA_FORMAT.write(metaData, nodeEnv.nodeDataPaths());
logger.trace("[_global] state written"); logger.trace("[_global] state written");
} catch (Exception ex) { return generation;
logger.warn("[_global]: failed to write global state", ex); } catch (WriteStateException ex) {
throw new IOException("failed to write global state", ex); throw new WriteStateException(false, "[_global]: failed to write global state", ex);
} }
} }
/**
* Removes old state files in global state directory.
*
* @param currentGeneration current state generation to keep in the directory.
*/
void cleanupGlobalState(long currentGeneration) {
META_DATA_FORMAT.cleanupOldFiles(currentGeneration, nodeEnv.nodeDataPaths());
}
/**
* Removes old state files in index directory.
*
* @param index index to perform clean up on.
* @param currentGeneration current state generation to keep in the index directory.
*/
public void cleanupIndex(Index index, long currentGeneration) {
INDEX_META_DATA_FORMAT.cleanupOldFiles(currentGeneration, nodeEnv.indexPaths(index));
}
/**
* Writes index metadata and updates manifest file accordingly.
* Used by tests.
*/
public void writeIndexAndUpdateManifest(String reason, IndexMetaData metaData) throws IOException {
long generation = writeIndex(reason, metaData);
Manifest manifest = loadManifestOrEmpty();
Map<Index, Long> indices = new HashMap<>(manifest.getIndexGenerations());
indices.put(metaData.getIndex(), generation);
manifest = new Manifest(manifest.getGlobalGeneration(), indices);
writeManifestAndCleanup(reason, manifest);
cleanupIndex(metaData.getIndex(), generation);
}
/**
* Writes global metadata and updates manifest file accordingly.
* Used by tests.
*/
public void writeGlobalStateAndUpdateManifest(String reason, MetaData metaData) throws IOException {
long generation = writeGlobalState(reason, metaData);
Manifest manifest = loadManifestOrEmpty();
manifest = new Manifest(generation, manifest.getIndexGenerations());
writeManifestAndCleanup(reason, manifest);
cleanupGlobalState(generation);
}
} }

View File

@ -90,7 +90,7 @@ public class TransportNodesListGatewayMetaState extends TransportNodesAction<Tra
@Override @Override
protected NodeGatewayMetaState nodeOperation(NodeRequest request) { protected NodeGatewayMetaState nodeOperation(NodeRequest request) {
try { try {
return new NodeGatewayMetaState(clusterService.localNode(), metaState.loadMetaState()); return new NodeGatewayMetaState(clusterService.localNode(), metaState.loadMetaData());
} catch (Exception e) { } catch (Exception e) {
throw new ElasticsearchException("failed to load metadata", e); throw new ElasticsearchException("failed to load metadata", e);
} }

View File

@ -24,9 +24,9 @@ import java.io.IOException;
* This exception is thrown when there is a problem of writing state to disk. * This exception is thrown when there is a problem of writing state to disk.
*/ */
public class WriteStateException extends IOException { public class WriteStateException extends IOException {
private boolean dirty; private final boolean dirty;
public WriteStateException(boolean dirty, String message, Exception cause) { WriteStateException(boolean dirty, String message, Exception cause) {
super(message, cause); super(message, cause);
this.dirty = dirty; this.dirty = dirty;
} }

View File

@ -2243,7 +2243,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
logger.trace("{} writing shard state, reason [{}]", shardId, writeReason); logger.trace("{} writing shard state, reason [{}]", shardId, writeReason);
final ShardStateMetaData newShardStateMetadata = final ShardStateMetaData newShardStateMetadata =
new ShardStateMetaData(newRouting.primary(), indexSettings.getUUID(), newRouting.allocationId()); new ShardStateMetaData(newRouting.primary(), indexSettings.getUUID(), newRouting.allocationId());
ShardStateMetaData.FORMAT.write(newShardStateMetadata, shardPath.getShardStatePath()); ShardStateMetaData.FORMAT.writeAndCleanup(newShardStateMetadata, shardPath.getShardStatePath());
} else { } else {
logger.trace("{} skip writing shard state, has been written before", shardId); logger.trace("{} skip writing shard state, has been written before", shardId);
} }

View File

@ -461,7 +461,7 @@ public class RemoveCorruptedShardDataCommand extends EnvironmentAwareCommand {
final ShardStateMetaData newShardStateMetaData = final ShardStateMetaData newShardStateMetaData =
new ShardStateMetaData(shardStateMetaData.primary, shardStateMetaData.indexUUID, newAllocationId); new ShardStateMetaData(shardStateMetaData.primary, shardStateMetaData.indexUUID, newAllocationId);
ShardStateMetaData.FORMAT.write(newShardStateMetaData, shardStatePath); ShardStateMetaData.FORMAT.writeAndCleanup(newShardStateMetaData, shardStatePath);
terminal.println(""); terminal.println("");
terminal.println("You should run the following command to allocate this shard:"); terminal.println("You should run the following command to allocate this shard:");

View File

@ -670,7 +670,7 @@ public class Node implements Closeable {
// we load the global state here (the persistent part of the cluster state stored on disk) to // we load the global state here (the persistent part of the cluster state stored on disk) to
// pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state. // pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.
if (DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings)) { if (DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings)) {
onDiskMetadata = injector.getInstance(GatewayMetaState.class).loadMetaState(); onDiskMetadata = injector.getInstance(GatewayMetaState.class).loadMetaData();
} else { } else {
onDiskMetadata = MetaData.EMPTY_META_DATA; onDiskMetadata = MetaData.EMPTY_META_DATA;
} }

View File

@ -35,7 +35,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.MetaStateService;
import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
@ -354,16 +354,16 @@ public class CreateIndexIT extends ESIntegTestCase {
@Override @Override
public Settings onNodeStopped(String nodeName) throws Exception { public Settings onNodeStopped(String nodeName) throws Exception {
if (dataOrMasterNodeNames.contains(nodeName)) { if (dataOrMasterNodeNames.contains(nodeName)) {
final NodeEnvironment nodeEnvironment = internalCluster().getInstance(NodeEnvironment.class, nodeName); final MetaStateService metaStateService = internalCluster().getInstance(MetaStateService.class, nodeName);
final IndexMetaData brokenMetaData = final IndexMetaData brokenMetaData =
IndexMetaData IndexMetaData
.builder(metaData) .builder(metaData)
.settings(Settings.builder().put(metaData.getSettings()).put("index.foo", true)) .settings(Settings.builder().put(metaData.getSettings()).put("index.foo", true))
.build(); .build();
// so evil // so evil
IndexMetaData.FORMAT.write(brokenMetaData, nodeEnvironment.indexPaths(brokenMetaData.getIndex())); metaStateService.writeIndexAndUpdateManifest("broken metadata", brokenMetaData);
} }
return Settings.EMPTY; return super.onNodeStopped(nodeName);
} }
}); });
ensureGreen(metaData.getIndex().getName()); // we have to wait for the index to show up in the metadata or we will fail in a race ensureGreen(metaData.getIndex().getName()); // we have to wait for the index to show up in the metadata or we will fail in a race

View File

@ -0,0 +1,107 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.Index;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.test.EqualsHashCodeTestUtils.checkEqualsAndHashCode;
import static org.hamcrest.Matchers.equalTo;
public class ManifestTests extends ESTestCase {
private Manifest copyState(Manifest state, boolean introduceErrors) {
long generation = state.getGlobalGeneration();
Map<Index, Long> indices = new HashMap<>(state.getIndexGenerations());
if (introduceErrors) {
switch (randomInt(3)) {
case 0: {
generation = generation + 1;
break;
}
case 1: {
indices.remove(randomFrom(indices.keySet()));
break;
}
case 2: {
Tuple<Index, Long> indexEntry = randomIndexEntry();
indices.put(indexEntry.v1(), indexEntry.v2());
break;
}
case 3: {
Index index = randomFrom(indices.keySet());
indices.compute(index, (i, g) -> g + 1);
break;
}
}
}
return new Manifest(generation, indices);
}
private Tuple<Index, Long> randomIndexEntry() {
final String name = randomAlphaOfLengthBetween(4, 15);
final String uuid = UUIDs.randomBase64UUID();
final Index index = new Index(name, uuid);
final long indexGeneration = randomNonNegativeLong();
return Tuple.tuple(index, indexGeneration);
}
private Manifest randomManifest() {
long generation = randomNonNegativeLong();
Map<Index, Long> indices = new HashMap<>();
for (int i = 0; i < randomIntBetween(1, 5); i++) {
Tuple<Index, Long> indexEntry = randomIndexEntry();
indices.put(indexEntry.v1(), indexEntry.v2());
}
return new Manifest(generation, indices);
}
public void testEqualsAndHashCode() {
checkEqualsAndHashCode(randomManifest(), org -> copyState(org, false), org -> copyState(org, true));
}
public void testXContent() throws IOException {
Manifest state = randomManifest();
final XContentBuilder builder = JsonXContent.contentBuilder();
builder.startObject();
Manifest.FORMAT.toXContent(builder, state);
builder.endObject();
BytesReference bytes = BytesReference.bytes(builder);
try (XContentParser parser = createParser(JsonXContent.jsonXContent, bytes)) {
assertThat(Manifest.fromXContent(parser), equalTo(state));
}
}
public void testEmptyManifest() {
assertTrue(Manifest.empty().isEmpty());
assertFalse(randomManifest().isEmpty());
}
}

View File

@ -395,18 +395,24 @@ public class GatewayIndexStateIT extends ESIntegTestCase {
.waitForNoRelocatingShards(true).waitForNodes("2")).actionGet(); .waitForNoRelocatingShards(true).waitForNodes("2")).actionGet();
} }
ClusterState state = client().admin().cluster().prepareState().get().getState(); ClusterState state = client().admin().cluster().prepareState().get().getState();
IndexMetaData metaData = state.getMetaData().index("test");
for (NodeEnvironment services : internalCluster().getInstances(NodeEnvironment.class)) { final IndexMetaData metaData = state.getMetaData().index("test");
IndexMetaData brokenMeta = IndexMetaData.builder(metaData).settings(Settings.builder().put(metaData.getSettings()) final IndexMetaData brokenMeta = IndexMetaData.builder(metaData).settings(Settings.builder().put(metaData.getSettings())
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT.minimumIndexCompatibilityVersion().id) .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT.minimumIndexCompatibilityVersion().id)
// this is invalid but should be archived // this is invalid but should be archived
.put("index.similarity.BM25.type", "classic") .put("index.similarity.BM25.type", "classic")
// this one is not validated ahead of time and breaks allocation // this one is not validated ahead of time and breaks allocation
.put("index.analysis.filter.myCollator.type", "icu_collation") .put("index.analysis.filter.myCollator.type", "icu_collation")
).build(); ).build();
IndexMetaData.FORMAT.write(brokenMeta, services.indexPaths(brokenMeta.getIndex())); internalCluster().fullRestart(new RestartCallback(){
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
final MetaStateService metaStateService = internalCluster().getInstance(MetaStateService.class, nodeName);
metaStateService.writeIndexAndUpdateManifest("broken metadata", brokenMeta);
return super.onNodeStopped(nodeName);
} }
internalCluster().fullRestart(); });
// ensureGreen(closedIndex) waits for the index to show up in the metadata // ensureGreen(closedIndex) waits for the index to show up in the metadata
// this is crucial otherwise the state call below might not contain the index yet // this is crucial otherwise the state call below might not contain the index yet
ensureGreen(metaData.getIndex().getName()); ensureGreen(metaData.getIndex().getName());
@ -457,13 +463,19 @@ public class GatewayIndexStateIT extends ESIntegTestCase {
.waitForNoRelocatingShards(true).waitForNodes("2")).actionGet(); .waitForNoRelocatingShards(true).waitForNodes("2")).actionGet();
} }
ClusterState state = client().admin().cluster().prepareState().get().getState(); ClusterState state = client().admin().cluster().prepareState().get().getState();
IndexMetaData metaData = state.getMetaData().index("test");
for (NodeEnvironment services : internalCluster().getInstances(NodeEnvironment.class)) { final IndexMetaData metaData = state.getMetaData().index("test");
IndexMetaData brokenMeta = IndexMetaData.builder(metaData).settings(metaData.getSettings() final IndexMetaData brokenMeta = IndexMetaData.builder(metaData).settings(metaData.getSettings()
.filter((s) -> "index.analysis.analyzer.test.tokenizer".equals(s) == false)).build(); .filter((s) -> "index.analysis.analyzer.test.tokenizer".equals(s) == false)).build();
IndexMetaData.FORMAT.write(brokenMeta, services.indexPaths(brokenMeta.getIndex())); internalCluster().fullRestart(new RestartCallback(){
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
final MetaStateService metaStateService = internalCluster().getInstance(MetaStateService.class, nodeName);
metaStateService.writeIndexAndUpdateManifest("broken metadata", brokenMeta);
return super.onNodeStopped(nodeName);
} }
internalCluster().fullRestart(); });
// ensureGreen(closedIndex) waits for the index to show up in the metadata // ensureGreen(closedIndex) waits for the index to show up in the metadata
// this is crucial otherwise the state call below might not contain the index yet // this is crucial otherwise the state call below might not contain the index yet
ensureGreen(metaData.getIndex().getName()); ensureGreen(metaData.getIndex().getName());
@ -494,14 +506,20 @@ public class GatewayIndexStateIT extends ESIntegTestCase {
.waitForNoRelocatingShards(true).waitForNodes("2")).actionGet(); .waitForNoRelocatingShards(true).waitForNodes("2")).actionGet();
} }
ClusterState state = client().admin().cluster().prepareState().get().getState(); ClusterState state = client().admin().cluster().prepareState().get().getState();
MetaData metaData = state.getMetaData();
for (NodeEnvironment nodeEnv : internalCluster().getInstances(NodeEnvironment.class)) { final MetaData metaData = state.getMetaData();
MetaData brokenMeta = MetaData.builder(metaData).persistentSettings(Settings.builder() final MetaData brokenMeta = MetaData.builder(metaData).persistentSettings(Settings.builder()
.put(metaData.persistentSettings()).put("this.is.unknown", true) .put(metaData.persistentSettings()).put("this.is.unknown", true)
.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), "broken").build()).build(); .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), "broken").build()).build();
MetaData.FORMAT.write(brokenMeta, nodeEnv.nodeDataPaths()); internalCluster().fullRestart(new RestartCallback(){
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
final MetaStateService metaStateService = internalCluster().getInstance(MetaStateService.class, nodeName);
metaStateService.writeGlobalStateAndUpdateManifest("broken metadata", brokenMeta);
return super.onNodeStopped(nodeName);
} }
internalCluster().fullRestart(); });
ensureYellow("test"); // wait for state recovery ensureYellow("test"); // wait for state recovery
state = client().admin().cluster().prepareState().get().getState(); state = client().admin().cluster().prepareState().get().getState();
assertEquals("true", state.metaData().persistentSettings().get("archived.this.is.unknown")); assertEquals("true", state.metaData().persistentSettings().get("archived.this.is.unknown"));

View File

@ -19,12 +19,14 @@
package org.elasticsearch.gateway; package org.elasticsearch.gateway;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.Manifest;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService; import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
@ -32,143 +34,113 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.plugins.MetaDataUpgrader; import org.elasticsearch.plugins.MetaDataUpgrader;
import org.elasticsearch.test.TestCustomMetaData; import org.elasticsearch.test.TestCustomMetaData;
import org.mockito.ArgumentCaptor;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.containsString;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
/**
* Test IndexMetaState for master and data only nodes return correct list of indices to write
* There are many parameters:
* - meta state is not in memory
* - meta state is in memory with old version/ new version
* - meta state is in memory with new version
* - version changed in cluster state event/ no change
* - node is data only node
* - node is master eligible
* for data only nodes: shard initializing on shard
*/
public class GatewayMetaStateTests extends ESAllocationTestCase { public class GatewayMetaStateTests extends ESAllocationTestCase {
ClusterChangedEvent generateEvent(boolean initializing, boolean versionChanged, boolean masterEligible) { private ClusterState noIndexClusterState(boolean masterEligible) {
//ridiculous settings to make sure we don't run into uninitialized because fo default MetaData metaData = MetaData.builder().build();
RoutingTable routingTable = RoutingTable.builder().build();
return ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metaData(metaData)
.routingTable(routingTable)
.nodes(generateDiscoveryNodes(masterEligible))
.build();
}
private ClusterState clusterStateWithUnassignedIndex(IndexMetaData indexMetaData, boolean masterEligible) {
MetaData metaData = MetaData.builder()
.put(indexMetaData, false)
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();
return ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metaData(metaData)
.routingTable(routingTable)
.nodes(generateDiscoveryNodes(masterEligible))
.build();
}
private ClusterState clusterStateWithAssignedIndex(IndexMetaData indexMetaData, boolean masterEligible) {
AllocationService strategy = createAllocationService(Settings.builder() AllocationService strategy = createAllocationService(Settings.builder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 100) .put("cluster.routing.allocation.node_concurrent_recoveries", 100)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", 100) .put("cluster.routing.allocation.cluster_concurrent_rebalance", 100)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 100) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 100)
.build()); .build());
ClusterState newClusterState, previousClusterState;
MetaData metaDataOldClusterState = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(2))
.build();
RoutingTable routingTableOldClusterState = RoutingTable.builder() ClusterState oldClusterState = clusterStateWithUnassignedIndex(indexMetaData, masterEligible);
.addAsNew(metaDataOldClusterState.index("test")) RoutingTable routingTable = strategy.reroute(oldClusterState, "reroute").routingTable();
.build();
// assign all shards
ClusterState init = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metaData(metaDataOldClusterState)
.routingTable(routingTableOldClusterState)
.nodes(generateDiscoveryNodes(masterEligible))
.build();
// new cluster state will have initializing shards on node 1
RoutingTable routingTableNewClusterState = strategy.reroute(init, "reroute").routingTable();
if (initializing == false) {
// pretend all initialized, nothing happened
ClusterState temp = ClusterState.builder(init).routingTable(routingTableNewClusterState)
.metaData(metaDataOldClusterState).build();
routingTableNewClusterState = strategy.applyStartedShards(temp, temp.getRoutingNodes().shardsWithState(INITIALIZING))
.routingTable();
routingTableOldClusterState = routingTableNewClusterState;
} else {
// nothing to do, we have one routing table with unassigned and one with initializing
}
// create new meta data either with version changed or not
MetaData metaDataNewClusterState = MetaData.builder() MetaData metaDataNewClusterState = MetaData.builder()
.put(init.metaData().index("test"), versionChanged) .put(oldClusterState.metaData().index("test"), false)
.build(); .build();
return ClusterState.builder(oldClusterState).routingTable(routingTable)
// create the cluster states with meta data and routing tables as computed before .metaData(metaDataNewClusterState).version(oldClusterState.getVersion() + 1).build();
previousClusterState = ClusterState.builder(init)
.metaData(metaDataOldClusterState)
.routingTable(routingTableOldClusterState)
.nodes(generateDiscoveryNodes(masterEligible))
.build();
newClusterState = ClusterState.builder(previousClusterState).routingTable(routingTableNewClusterState)
.metaData(metaDataNewClusterState).version(previousClusterState.getVersion() + 1).build();
ClusterChangedEvent event = new ClusterChangedEvent("test", newClusterState, previousClusterState);
assertThat(event.state().version(), equalTo(event.previousState().version() + 1));
return event;
} }
ClusterChangedEvent generateCloseEvent(boolean masterEligible) { private ClusterState clusterStateWithClosedIndex(IndexMetaData indexMetaData, boolean masterEligible) {
//ridiculous settings to make sure we don't run into uninitialized because fo default ClusterState oldClusterState = clusterStateWithAssignedIndex(indexMetaData, masterEligible);
AllocationService strategy = createAllocationService(Settings.builder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 100)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", 100)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 100)
.build());
ClusterState newClusterState, previousClusterState;
MetaData metaDataIndexCreated = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(2))
.build();
RoutingTable routingTableIndexCreated = RoutingTable.builder() MetaData metaDataNewClusterState = MetaData.builder()
.addAsNew(metaDataIndexCreated.index("test"))
.build();
// assign all shards
ClusterState init = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metaData(metaDataIndexCreated)
.routingTable(routingTableIndexCreated)
.nodes(generateDiscoveryNodes(masterEligible))
.build();
RoutingTable routingTableInitializing = strategy.reroute(init, "reroute").routingTable();
ClusterState temp = ClusterState.builder(init).routingTable(routingTableInitializing).build();
RoutingTable routingTableStarted = strategy.applyStartedShards(temp, temp.getRoutingNodes().shardsWithState(INITIALIZING))
.routingTable();
// create new meta data either with version changed or not
MetaData metaDataStarted = MetaData.builder()
.put(init.metaData().index("test"), true)
.build();
// create the cluster states with meta data and routing tables as computed before
MetaData metaDataClosed = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).state(IndexMetaData.State.CLOSE) .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).state(IndexMetaData.State.CLOSE)
.numberOfShards(5).numberOfReplicas(2)).version(metaDataStarted.version() + 1) .numberOfShards(5).numberOfReplicas(2))
.version(oldClusterState.metaData().version() + 1)
.build(); .build();
previousClusterState = ClusterState.builder(init) RoutingTable routingTable = RoutingTable.builder()
.metaData(metaDataStarted) .addAsNew(metaDataNewClusterState.index("test"))
.routingTable(routingTableStarted)
.nodes(generateDiscoveryNodes(masterEligible))
.build(); .build();
newClusterState = ClusterState.builder(previousClusterState)
.routingTable(routingTableIndexCreated)
.metaData(metaDataClosed)
.version(previousClusterState.getVersion() + 1).build();
ClusterChangedEvent event = new ClusterChangedEvent("test", newClusterState, previousClusterState); return ClusterState.builder(oldClusterState).routingTable(routingTable)
assertThat(event.state().version(), equalTo(event.previousState().version() + 1)); .metaData(metaDataNewClusterState).version(oldClusterState.getVersion() + 1).build();
return event; }
private ClusterState clusterStateWithJustOpenedIndex(IndexMetaData indexMetaData, boolean masterEligible) {
ClusterState oldClusterState = clusterStateWithClosedIndex(indexMetaData, masterEligible);
MetaData metaDataNewClusterState = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).state(IndexMetaData.State.OPEN)
.numberOfShards(5).numberOfReplicas(2))
.version(oldClusterState.metaData().version() + 1)
.build();
return ClusterState.builder(oldClusterState)
.metaData(metaDataNewClusterState).version(oldClusterState.getVersion() + 1).build();
} }
private DiscoveryNodes.Builder generateDiscoveryNodes(boolean masterEligible) { private DiscoveryNodes.Builder generateDiscoveryNodes(boolean masterEligible) {
@ -177,80 +149,278 @@ public class GatewayMetaStateTests extends ESAllocationTestCase {
.add(newNode("master_node", MASTER_DATA_ROLES)).localNodeId("node1").masterNodeId(masterEligible ? "node1" : "master_node"); .add(newNode("master_node", MASTER_DATA_ROLES)).localNodeId("node1").masterNodeId(masterEligible ? "node1" : "master_node");
} }
public void assertState(ClusterChangedEvent event, private Set<Index> randomPrevWrittenIndices(IndexMetaData indexMetaData) {
boolean stateInMemory, if (randomBoolean()) {
boolean expectMetaData) throws Exception { return Collections.singleton(indexMetaData.getIndex());
MetaData inMemoryMetaData = null;
Set<Index> oldIndicesList = emptySet();
if (stateInMemory) {
inMemoryMetaData = event.previousState().metaData();
oldIndicesList = GatewayMetaState.getRelevantIndices(event.previousState(), event.previousState(), oldIndicesList);
}
Set<Index> newIndicesList = GatewayMetaState.getRelevantIndices(event.state(),event.previousState(), oldIndicesList);
// third, get the actual write info
Iterator<GatewayMetaState.IndexMetaWriteInfo> indices = GatewayMetaState.resolveStatesToBeWritten(oldIndicesList, newIndicesList,
inMemoryMetaData, event.state().metaData()).iterator();
if (expectMetaData) {
assertThat(indices.hasNext(), equalTo(true));
assertThat(indices.next().getNewMetaData().getIndex().getName(), equalTo("test"));
assertThat(indices.hasNext(), equalTo(false));
} else { } else {
assertThat(indices.hasNext(), equalTo(false)); return Collections.emptySet();
} }
} }
public void testVersionChangeIsAlwaysWritten() throws Exception { private IndexMetaData createIndexMetaData(String name) {
// test that version changes are always written return IndexMetaData.builder(name).
boolean initializing = randomBoolean(); settings(settings(Version.CURRENT)).
boolean versionChanged = true; numberOfShards(5).
boolean stateInMemory = randomBoolean(); numberOfReplicas(2).
build();
}
public void testGetRelevantIndicesWithUnassignedShardsOnMasterEligibleNode() {
IndexMetaData indexMetaData = createIndexMetaData("test");
Set<Index> indices = GatewayMetaState.getRelevantIndices(
clusterStateWithUnassignedIndex(indexMetaData, true),
noIndexClusterState(true),
randomPrevWrittenIndices(indexMetaData));
assertThat(indices.size(), equalTo(1));
}
public void testGetRelevantIndicesWithUnassignedShardsOnDataOnlyNode() {
IndexMetaData indexMetaData = createIndexMetaData("test");
Set<Index> indices = GatewayMetaState.getRelevantIndices(
clusterStateWithUnassignedIndex(indexMetaData, false),
noIndexClusterState(false),
randomPrevWrittenIndices(indexMetaData));
assertThat(indices.size(), equalTo(0));
}
public void testGetRelevantIndicesWithAssignedShards() {
IndexMetaData indexMetaData = createIndexMetaData("test");
boolean masterEligible = randomBoolean(); boolean masterEligible = randomBoolean();
boolean expectMetaData = true; Set<Index> indices = GatewayMetaState.getRelevantIndices(
ClusterChangedEvent event = generateEvent(initializing, versionChanged, masterEligible); clusterStateWithAssignedIndex(indexMetaData, masterEligible),
assertState(event, stateInMemory, expectMetaData); clusterStateWithUnassignedIndex(indexMetaData, masterEligible),
randomPrevWrittenIndices(indexMetaData));
assertThat(indices.size(), equalTo(1));
} }
public void testNewShardsAlwaysWritten() throws Exception { public void testGetRelevantIndicesForClosedPrevWrittenIndexOnDataOnlyNode() {
// make sure new shards on data only node always written IndexMetaData indexMetaData = createIndexMetaData("test");
boolean initializing = true; Set<Index> indices = GatewayMetaState.getRelevantIndices(
boolean versionChanged = randomBoolean(); clusterStateWithClosedIndex(indexMetaData, false),
boolean stateInMemory = randomBoolean(); clusterStateWithAssignedIndex(indexMetaData, false),
boolean masterEligible = false; Collections.singleton(indexMetaData.getIndex()));
boolean expectMetaData = true; assertThat(indices.size(), equalTo(1));
ClusterChangedEvent event = generateEvent(initializing, versionChanged, masterEligible);
assertState(event, stateInMemory, expectMetaData);
} }
public void testAllUpToDateNothingWritten() throws Exception { public void testGetRelevantIndicesForClosedPrevNotWrittenIndexOnDataOnlyNode() {
// make sure state is not written again if we wrote already IndexMetaData indexMetaData = createIndexMetaData("test");
boolean initializing = false; Set<Index> indices = GatewayMetaState.getRelevantIndices(
boolean versionChanged = false; clusterStateWithJustOpenedIndex(indexMetaData, false),
boolean stateInMemory = true; clusterStateWithClosedIndex(indexMetaData, false),
boolean masterEligible = randomBoolean(); Collections.emptySet());
boolean expectMetaData = false; assertThat(indices.size(), equalTo(0));
ClusterChangedEvent event = generateEvent(initializing, versionChanged, masterEligible);
assertState(event, stateInMemory, expectMetaData);
} }
public void testNoWriteIfNothingChanged() throws Exception { public void testGetRelevantIndicesForWasClosedPrevWrittenIndexOnDataOnlyNode() {
boolean initializing = false; IndexMetaData indexMetaData = createIndexMetaData("test");
boolean versionChanged = false; Set<Index> indices = GatewayMetaState.getRelevantIndices(
boolean stateInMemory = true; clusterStateWithJustOpenedIndex(indexMetaData, false),
boolean masterEligible = randomBoolean(); clusterStateWithClosedIndex(indexMetaData, false),
boolean expectMetaData = false; Collections.singleton(indexMetaData.getIndex()));
ClusterChangedEvent event = generateEvent(initializing, versionChanged, masterEligible); assertThat(indices.size(), equalTo(1));
ClusterChangedEvent newEventWithNothingChanged = new ClusterChangedEvent("test cluster state", event.state(), event.state());
assertState(newEventWithNothingChanged, stateInMemory, expectMetaData);
} }
public void testWriteClosedIndex() throws Exception { public void testResolveStatesToBeWritten() throws WriteStateException {
// test that the closing of an index is written also on data only node Map<Index, Long> indices = new HashMap<>();
boolean masterEligible = randomBoolean(); Set<Index> relevantIndices = new HashSet<>();
boolean expectMetaData = true;
boolean stateInMemory = true; IndexMetaData removedIndex = createIndexMetaData("removed_index");
ClusterChangedEvent event = generateCloseEvent(masterEligible); indices.put(removedIndex.getIndex(), 1L);
assertState(event, stateInMemory, expectMetaData);
IndexMetaData versionChangedIndex = createIndexMetaData("version_changed_index");
indices.put(versionChangedIndex.getIndex(), 2L);
relevantIndices.add(versionChangedIndex.getIndex());
IndexMetaData notChangedIndex = createIndexMetaData("not_changed_index");
indices.put(notChangedIndex.getIndex(), 3L);
relevantIndices.add(notChangedIndex.getIndex());
IndexMetaData newIndex = createIndexMetaData("new_index");
relevantIndices.add(newIndex.getIndex());
MetaData oldMetaData = MetaData.builder()
.put(removedIndex, false)
.put(versionChangedIndex, false)
.put(notChangedIndex, false)
.build();
MetaData newMetaData = MetaData.builder()
.put(versionChangedIndex, true)
.put(notChangedIndex, false)
.put(newIndex, false)
.build();
IndexMetaData newVersionChangedIndex = newMetaData.index(versionChangedIndex.getIndex());
List<GatewayMetaState.IndexMetaDataAction> actions =
GatewayMetaState.resolveIndexMetaDataActions(indices, relevantIndices, oldMetaData, newMetaData);
assertThat(actions, hasSize(3));
for (GatewayMetaState.IndexMetaDataAction action : actions) {
if (action instanceof GatewayMetaState.KeepPreviousGeneration) {
assertThat(action.getIndex(), equalTo(notChangedIndex.getIndex()));
GatewayMetaState.AtomicClusterStateWriter writer = mock(GatewayMetaState.AtomicClusterStateWriter.class);
assertThat(action.execute(writer), equalTo(3L));
verifyZeroInteractions(writer);
}
if (action instanceof GatewayMetaState.WriteNewIndexMetaData) {
assertThat(action.getIndex(), equalTo(newIndex.getIndex()));
GatewayMetaState.AtomicClusterStateWriter writer = mock(GatewayMetaState.AtomicClusterStateWriter.class);
when(writer.writeIndex("freshly created", newIndex)).thenReturn(0L);
assertThat(action.execute(writer), equalTo(0L));
}
if (action instanceof GatewayMetaState.WriteChangedIndexMetaData) {
assertThat(action.getIndex(), equalTo(newVersionChangedIndex.getIndex()));
GatewayMetaState.AtomicClusterStateWriter writer = mock(GatewayMetaState.AtomicClusterStateWriter.class);
when(writer.writeIndex(anyString(), eq(newVersionChangedIndex))).thenReturn(3L);
assertThat(action.execute(writer), equalTo(3L));
ArgumentCaptor<String> reason = ArgumentCaptor.forClass(String.class);
verify(writer).writeIndex(reason.capture(), eq(newVersionChangedIndex));
assertThat(reason.getValue(), containsString(Long.toString(versionChangedIndex.getVersion())));
assertThat(reason.getValue(), containsString(Long.toString(newVersionChangedIndex.getVersion())));
}
}
}
private static class MetaStateServiceWithFailures extends MetaStateService {
private final int invertedFailRate;
private boolean failRandomly;
private <T> MetaDataStateFormat<T> wrap(MetaDataStateFormat<T> format) {
return new MetaDataStateFormat<T>(format.getPrefix()) {
@Override
public void toXContent(XContentBuilder builder, T state) throws IOException {
format.toXContent(builder, state);
}
@Override
public T fromXContent(XContentParser parser) throws IOException {
return format.fromXContent(parser);
}
@Override
protected Directory newDirectory(Path dir) {
MockDirectoryWrapper mock = newMockFSDirectory(dir);
if (failRandomly) {
MockDirectoryWrapper.Failure fail = new MockDirectoryWrapper.Failure() {
@Override
public void eval(MockDirectoryWrapper dir) throws IOException {
int r = randomIntBetween(0, invertedFailRate);
if (r == 0) {
throw new MockDirectoryWrapper.FakeIOException();
}
}
};
mock.failOn(fail);
}
closeAfterSuite(mock);
return mock;
}
};
}
MetaStateServiceWithFailures(int invertedFailRate, NodeEnvironment nodeEnv, NamedXContentRegistry namedXContentRegistry) {
super(nodeEnv, namedXContentRegistry);
META_DATA_FORMAT = wrap(MetaData.FORMAT);
INDEX_META_DATA_FORMAT = wrap(IndexMetaData.FORMAT);
MANIFEST_FORMAT = wrap(Manifest.FORMAT);
failRandomly = false;
this.invertedFailRate = invertedFailRate;
}
void failRandomly() {
failRandomly = true;
}
void noFailures() {
failRandomly = false;
}
}
private boolean metaDataEquals(MetaData md1, MetaData md2) {
boolean equals = MetaData.isGlobalStateEquals(md1, md2);
for (IndexMetaData imd : md1) {
IndexMetaData imd2 = md2.index(imd.getIndex());
equals = equals && imd.equals(imd2);
}
for (IndexMetaData imd : md2) {
IndexMetaData imd2 = md1.index(imd.getIndex());
equals = equals && imd.equals(imd2);
}
return equals;
}
private static MetaData randomMetaDataForTx() {
int settingNo = randomIntBetween(0, 10);
MetaData.Builder builder = MetaData.builder()
.persistentSettings(Settings.builder().put("setting" + settingNo, randomAlphaOfLength(5)).build());
int numOfIndices = randomIntBetween(0, 3);
for (int i = 0; i < numOfIndices; i++) {
int indexNo = randomIntBetween(0, 50);
IndexMetaData indexMetaData = IndexMetaData.builder("index" + indexNo).settings(
Settings.builder()
.put(IndexMetaData.SETTING_INDEX_UUID, "index" + indexNo)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.build()
).build();
builder.put(indexMetaData, false);
}
return builder.build();
}
public void testAtomicityWithFailures() throws IOException {
try (NodeEnvironment env = newNodeEnvironment()) {
MetaStateServiceWithFailures metaStateService =
new MetaStateServiceWithFailures(randomIntBetween(100, 1000), env, xContentRegistry());
// We only guarantee atomicity of writes, if there is initial Manifest file
Manifest manifest = Manifest.empty();
MetaData metaData = MetaData.EMPTY_META_DATA;
metaStateService.writeManifestAndCleanup("startup", Manifest.empty());
metaStateService.failRandomly();
Set<MetaData> possibleMetaData = new HashSet<>();
possibleMetaData.add(metaData);
for (int i = 0; i < randomIntBetween(1, 5); i++) {
GatewayMetaState.AtomicClusterStateWriter writer =
new GatewayMetaState.AtomicClusterStateWriter(metaStateService, manifest);
metaData = randomMetaDataForTx();
Map<Index, Long> indexGenerations = new HashMap<>();
try {
long globalGeneration = writer.writeGlobalState("global", metaData);
for (IndexMetaData indexMetaData : metaData) {
long generation = writer.writeIndex("index", indexMetaData);
indexGenerations.put(indexMetaData.getIndex(), generation);
}
Manifest newManifest = new Manifest(globalGeneration, indexGenerations);
writer.writeManifestAndCleanup("manifest", newManifest);
possibleMetaData.clear();
possibleMetaData.add(metaData);
manifest = newManifest;
} catch (WriteStateException e) {
if (e.isDirty()) {
possibleMetaData.add(metaData);
}
}
}
metaStateService.noFailures();
Tuple<Manifest, MetaData> manifestAndMetaData = metaStateService.loadFullState();
MetaData loadedMetaData = manifestAndMetaData.v2();
assertTrue(possibleMetaData.stream().anyMatch(md -> metaDataEquals(md, loadedMetaData)));
}
} }
public void testAddCustomMetaDataOnUpgrade() throws Exception { public void testAddCustomMetaDataOnUpgrade() throws Exception {

View File

@ -46,7 +46,6 @@ import org.elasticsearch.test.ESTestCase;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.URISyntaxException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.nio.file.DirectoryStream; import java.nio.file.DirectoryStream;
@ -71,11 +70,11 @@ public class MetaDataStateFormatTests extends ESTestCase {
/** /**
* Ensure we can read a pre-generated cluster state. * Ensure we can read a pre-generated cluster state.
*/ */
public void testReadClusterState() throws URISyntaxException, IOException { public void testReadClusterState() throws IOException {
final MetaDataStateFormat<MetaData> format = new MetaDataStateFormat<MetaData>("global-") { final MetaDataStateFormat<MetaData> format = new MetaDataStateFormat<MetaData>("global-") {
@Override @Override
public void toXContent(XContentBuilder builder, MetaData state) throws IOException { public void toXContent(XContentBuilder builder, MetaData state) {
fail("this test doesn't write"); fail("this test doesn't write");
} }
@ -104,7 +103,7 @@ public class MetaDataStateFormatTests extends ESTestCase {
Format format = new Format("foo-"); Format format = new Format("foo-");
DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(),
randomDouble(), randomBoolean()); randomDouble(), randomBoolean());
format.write(state, dirs); format.writeAndCleanup(state, dirs);
for (Path file : dirs) { for (Path file : dirs) {
Path[] list = content("*", file); Path[] list = content("*", file);
assertEquals(list.length, 1); assertEquals(list.length, 1);
@ -119,7 +118,7 @@ public class MetaDataStateFormatTests extends ESTestCase {
} }
DummyState state2 = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), DummyState state2 = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(),
randomDouble(), randomBoolean()); randomDouble(), randomBoolean());
format.write(state2, dirs); format.writeAndCleanup(state2, dirs);
for (Path file : dirs) { for (Path file : dirs) {
Path[] list = content("*", file); Path[] list = content("*", file);
@ -146,7 +145,7 @@ public class MetaDataStateFormatTests extends ESTestCase {
Format format = new Format("foo-"); Format format = new Format("foo-");
DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(),
randomDouble(), randomBoolean()); randomDouble(), randomBoolean());
format.write(state, dirs); format.writeAndCleanup(state, dirs);
for (Path file : dirs) { for (Path file : dirs) {
Path[] list = content("*", file); Path[] list = content("*", file);
assertEquals(list.length, 1); assertEquals(list.length, 1);
@ -170,7 +169,7 @@ public class MetaDataStateFormatTests extends ESTestCase {
Format format = new Format("foo-"); Format format = new Format("foo-");
DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(),
randomDouble(), randomBoolean()); randomDouble(), randomBoolean());
format.write(state, dirs); format.writeAndCleanup(state, dirs);
for (Path file : dirs) { for (Path file : dirs) {
Path[] list = content("*", file); Path[] list = content("*", file);
assertEquals(list.length, 1); assertEquals(list.length, 1);
@ -193,8 +192,7 @@ public class MetaDataStateFormatTests extends ESTestCase {
} }
} }
public static void corruptFile(Path file, Logger logger) throws IOException { public static void corruptFile(Path fileToCorrupt, Logger logger) throws IOException {
Path fileToCorrupt = file;
try (SimpleFSDirectory dir = new SimpleFSDirectory(fileToCorrupt.getParent())) { try (SimpleFSDirectory dir = new SimpleFSDirectory(fileToCorrupt.getParent())) {
long checksumBeforeCorruption; long checksumBeforeCorruption;
try (IndexInput input = dir.openInput(fileToCorrupt.getFileName().toString(), IOContext.DEFAULT)) { try (IndexInput input = dir.openInput(fileToCorrupt.getFileName().toString(), IOContext.DEFAULT)) {
@ -248,7 +246,7 @@ public class MetaDataStateFormatTests extends ESTestCase {
dirs[i] = createTempDir(); dirs[i] = createTempDir();
Files.createDirectories(dirs[i].resolve(MetaDataStateFormat.STATE_DIR_NAME)); Files.createDirectories(dirs[i].resolve(MetaDataStateFormat.STATE_DIR_NAME));
for (int j = 0; j < numStates; j++) { for (int j = 0; j < numStates; j++) {
format.write(meta.get(j), dirs[i]); format.writeAndCleanup(meta.get(j), dirs[i]);
if (randomBoolean() && (j < numStates - 1 || dirs.length > 0 && i != 0)) { // corrupt a file that we do not necessarily if (randomBoolean() && (j < numStates - 1 || dirs.length > 0 && i != 0)) { // corrupt a file that we do not necessarily
// need here.... // need here....
Path file = dirs[i].resolve(MetaDataStateFormat.STATE_DIR_NAME).resolve("global-" + j + ".st"); Path file = dirs[i].resolve(MetaDataStateFormat.STATE_DIR_NAME).resolve("global-" + j + ".st");
@ -299,7 +297,7 @@ public class MetaDataStateFormatTests extends ESTestCase {
format.noFailures(); format.noFailures();
DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(), DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(),
randomDouble(), randomBoolean()); randomDouble(), randomBoolean());
format.write(state, paths); format.writeAndCleanup(state, paths);
assertEquals(state, format.loadLatestState(logger, NamedXContentRegistry.EMPTY, paths)); assertEquals(state, format.loadLatestState(logger, NamedXContentRegistry.EMPTY, paths));
ensureOnlyOneStateFile(paths); ensureOnlyOneStateFile(paths);
return state; return state;
@ -324,7 +322,7 @@ public class MetaDataStateFormatTests extends ESTestCase {
Format.FAIL_FSYNC_TMP_FILE, Format.FAIL_RENAME_TMP_FILE); Format.FAIL_FSYNC_TMP_FILE, Format.FAIL_RENAME_TMP_FILE);
DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(), DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(),
randomDouble(), randomBoolean()); randomDouble(), randomBoolean());
WriteStateException ex = expectThrows(WriteStateException.class, () -> format.write(newState, path)); WriteStateException ex = expectThrows(WriteStateException.class, () -> format.writeAndCleanup(newState, path));
assertFalse(ex.isDirty()); assertFalse(ex.isDirty());
format.noFailures(); format.noFailures();
@ -347,7 +345,7 @@ public class MetaDataStateFormatTests extends ESTestCase {
DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(), DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(),
randomDouble(), randomBoolean()); randomDouble(), randomBoolean());
possibleStates.add(newState); possibleStates.add(newState);
WriteStateException ex = expectThrows(WriteStateException.class, () -> format.write(newState, path)); WriteStateException ex = expectThrows(WriteStateException.class, () -> format.writeAndCleanup(newState, path));
assertTrue(ex.isDirty()); assertTrue(ex.isDirty());
format.noFailures(); format.noFailures();
@ -370,7 +368,7 @@ public class MetaDataStateFormatTests extends ESTestCase {
format.failOnMethods(Format.FAIL_OPEN_STATE_FILE_WHEN_COPYING); format.failOnMethods(Format.FAIL_OPEN_STATE_FILE_WHEN_COPYING);
DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(), DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(),
randomDouble(), randomBoolean()); randomDouble(), randomBoolean());
WriteStateException ex = expectThrows(WriteStateException.class, () -> format.write(newState, paths)); WriteStateException ex = expectThrows(WriteStateException.class, () -> format.writeAndCleanup(newState, paths));
assertFalse(ex.isDirty()); assertFalse(ex.isDirty());
format.noFailures(); format.noFailures();
@ -396,7 +394,7 @@ public class MetaDataStateFormatTests extends ESTestCase {
DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(), DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(),
randomDouble(), randomBoolean()); randomDouble(), randomBoolean());
try { try {
format.write(newState, paths); format.writeAndCleanup(newState, paths);
possibleStates.clear(); possibleStates.clear();
possibleStates.add(newState); possibleStates.add(newState);
} catch (WriteStateException e) { } catch (WriteStateException e) {
@ -481,7 +479,6 @@ public class MetaDataStateFormatTests extends ESTestCase {
this.failureMode = FailureMode.NO_FAILURES; this.failureMode = FailureMode.NO_FAILURES;
} }
@Override @Override
public void toXContent(XContentBuilder builder, DummyState state) throws IOException { public void toXContent(XContentBuilder builder, DummyState state) throws IOException {
state.toXContent(builder, null); state.toXContent(builder, null);
@ -492,7 +489,6 @@ public class MetaDataStateFormatTests extends ESTestCase {
return new DummyState().parse(parser); return new DummyState().parse(parser);
} }
public void noFailures() { public void noFailures() {
this.failureMode = FailureMode.NO_FAILURES; this.failureMode = FailureMode.NO_FAILURES;
} }

View File

@ -191,7 +191,7 @@ public class MetaDataWriteDataNodesIT extends ESIntegTestCase {
private ImmutableOpenMap<String, IndexMetaData> getIndicesMetaDataOnNode(String nodeName) throws Exception { private ImmutableOpenMap<String, IndexMetaData> getIndicesMetaDataOnNode(String nodeName) throws Exception {
GatewayMetaState nodeMetaState = ((InternalTestCluster) cluster()).getInstance(GatewayMetaState.class, nodeName); GatewayMetaState nodeMetaState = ((InternalTestCluster) cluster()).getInstance(GatewayMetaState.class, nodeName);
MetaData nodeMetaData = nodeMetaState.loadMetaState(); MetaData nodeMetaData = nodeMetaState.loadMetaData();
return nodeMetaData.getIndices(); return nodeMetaData.getIndices();
} }
} }

View File

@ -20,84 +20,184 @@ package org.elasticsearch.gateway;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.Manifest;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.HashMap;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
public class MetaStateServiceTests extends ESTestCase { public class MetaStateServiceTests extends ESTestCase {
private static Settings indexSettings = Settings.builder()
private NodeEnvironment env;
private MetaStateService metaStateService;
@Override
public void setUp() throws Exception {
super.setUp();
env = newNodeEnvironment();
metaStateService = new MetaStateService(env, xContentRegistry());
}
@Override
public void tearDown() throws Exception {
super.tearDown();
env.close();
}
private static IndexMetaData indexMetaData(String name) {
return IndexMetaData.builder(name).settings(
Settings.builder()
.put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.build(); .build()
).build();
}
public void testWriteLoadIndex() throws Exception { public void testWriteLoadIndex() throws Exception {
try (NodeEnvironment env = newNodeEnvironment()) { IndexMetaData index = indexMetaData("test1");
MetaStateService metaStateService = new MetaStateService(env, xContentRegistry());
IndexMetaData index = IndexMetaData.builder("test1").settings(indexSettings).build();
metaStateService.writeIndex("test_write", index); metaStateService.writeIndex("test_write", index);
assertThat(metaStateService.loadIndexState(index.getIndex()), equalTo(index)); assertThat(metaStateService.loadIndexState(index.getIndex()), equalTo(index));
} }
}
public void testLoadMissingIndex() throws Exception { public void testLoadMissingIndex() throws Exception {
try (NodeEnvironment env = newNodeEnvironment()) {
MetaStateService metaStateService = new MetaStateService(env, xContentRegistry());
assertThat(metaStateService.loadIndexState(new Index("test1", "test1UUID")), nullValue()); assertThat(metaStateService.loadIndexState(new Index("test1", "test1UUID")), nullValue());
} }
}
public void testWriteLoadGlobal() throws Exception { public void testWriteLoadGlobal() throws Exception {
try (NodeEnvironment env = newNodeEnvironment()) {
MetaStateService metaStateService = new MetaStateService(env, xContentRegistry());
MetaData metaData = MetaData.builder() MetaData metaData = MetaData.builder()
.persistentSettings(Settings.builder().put("test1", "value1").build()) .persistentSettings(Settings.builder().put("test1", "value1").build())
.build(); .build();
metaStateService.writeGlobalState("test_write", metaData); metaStateService.writeGlobalState("test_write", metaData);
assertThat(metaStateService.loadGlobalState().persistentSettings(), equalTo(metaData.persistentSettings())); assertThat(metaStateService.loadGlobalState().persistentSettings(), equalTo(metaData.persistentSettings()));
} }
}
public void testWriteGlobalStateWithIndexAndNoIndexIsLoaded() throws Exception { public void testWriteGlobalStateWithIndexAndNoIndexIsLoaded() throws Exception {
try (NodeEnvironment env = newNodeEnvironment()) {
MetaStateService metaStateService = new MetaStateService(env, xContentRegistry());
MetaData metaData = MetaData.builder() MetaData metaData = MetaData.builder()
.persistentSettings(Settings.builder().put("test1", "value1").build()) .persistentSettings(Settings.builder().put("test1", "value1").build())
.build(); .build();
IndexMetaData index = IndexMetaData.builder("test1").settings(indexSettings).build(); IndexMetaData index = indexMetaData("test1");
MetaData metaDataWithIndex = MetaData.builder(metaData).put(index, true).build(); MetaData metaDataWithIndex = MetaData.builder(metaData).put(index, true).build();
metaStateService.writeGlobalState("test_write", metaDataWithIndex); metaStateService.writeGlobalState("test_write", metaDataWithIndex);
assertThat(metaStateService.loadGlobalState().persistentSettings(), equalTo(metaData.persistentSettings())); assertThat(metaStateService.loadGlobalState().persistentSettings(), equalTo(metaData.persistentSettings()));
assertThat(metaStateService.loadGlobalState().hasIndex("test1"), equalTo(false)); assertThat(metaStateService.loadGlobalState().hasIndex("test1"), equalTo(false));
} }
public void testLoadFullStateBWC() throws Exception {
IndexMetaData indexMetaData = indexMetaData("test1");
MetaData metaData = MetaData.builder()
.persistentSettings(Settings.builder().put("test1", "value1").build())
.put(indexMetaData, true)
.build();
long globalGeneration = metaStateService.writeGlobalState("test_write", metaData);
long indexGeneration = metaStateService.writeIndex("test_write", indexMetaData);
Tuple<Manifest, MetaData> manifestAndMetaData = metaStateService.loadFullState();
Manifest manifest = manifestAndMetaData.v1();
assertThat(manifest.getGlobalGeneration(), equalTo(globalGeneration));
assertThat(manifest.getIndexGenerations(), hasKey(indexMetaData.getIndex()));
assertThat(manifest.getIndexGenerations().get(indexMetaData.getIndex()), equalTo(indexGeneration));
MetaData loadedMetaData = manifestAndMetaData.v2();
assertThat(loadedMetaData.persistentSettings(), equalTo(metaData.persistentSettings()));
assertThat(loadedMetaData.hasIndex("test1"), equalTo(true));
assertThat(loadedMetaData.index("test1"), equalTo(indexMetaData));
} }
public void testLoadGlobal() throws Exception { public void testLoadEmptyStateNoManifest() throws IOException {
try (NodeEnvironment env = newNodeEnvironment()) { Tuple<Manifest, MetaData> manifestAndMetaData = metaStateService.loadFullState();
MetaStateService metaStateService = new MetaStateService(env, xContentRegistry());
IndexMetaData index = IndexMetaData.builder("test1").settings(indexSettings).build(); Manifest manifest = manifestAndMetaData.v1();
assertTrue(manifest.isEmpty());
MetaData metaData = manifestAndMetaData.v2();
assertTrue(MetaData.isGlobalStateEquals(metaData, MetaData.EMPTY_META_DATA));
}
public void testLoadEmptyStateWithManifest() throws IOException {
Manifest manifest = Manifest.empty();
metaStateService.writeManifestAndCleanup("test", manifest);
Tuple<Manifest, MetaData> manifestAndMetaData = metaStateService.loadFullState();
assertTrue(manifestAndMetaData.v1().isEmpty());
MetaData metaData = manifestAndMetaData.v2();
assertTrue(MetaData.isGlobalStateEquals(metaData, MetaData.EMPTY_META_DATA));
}
public void testLoadFullStateMissingGlobalMetaData() throws IOException {
IndexMetaData index = indexMetaData("test1");
long indexGeneration = metaStateService.writeIndex("test", index);
Manifest manifest = new Manifest(Manifest.empty().getGlobalGeneration(), new HashMap<Index, Long>() {{
put(index.getIndex(), indexGeneration);
}});
assertTrue(manifest.isGlobalGenerationMissing());
metaStateService.writeManifestAndCleanup("test", manifest);
Tuple<Manifest, MetaData> manifestAndMetaData = metaStateService.loadFullState();
assertThat(manifestAndMetaData.v1(), equalTo(manifest));
MetaData loadedMetaData = manifestAndMetaData.v2();
assertTrue(MetaData.isGlobalStateEquals(loadedMetaData, MetaData.EMPTY_META_DATA));
assertThat(loadedMetaData.hasIndex("test1"), equalTo(true));
assertThat(loadedMetaData.index("test1"), equalTo(index));
}
public void testLoadFullStateAndUpdate() throws IOException {
IndexMetaData index = indexMetaData("test1");
MetaData metaData = MetaData.builder() MetaData metaData = MetaData.builder()
.persistentSettings(Settings.builder().put("test1", "value1").build()) .persistentSettings(Settings.builder().put("test1", "value1").build())
.put(index, true) .put(index, true)
.build(); .build();
metaStateService.writeGlobalState("test_write", metaData); long globalGeneration = metaStateService.writeGlobalState("first global state write", metaData);
metaStateService.writeIndex("test_write", index); long indexGeneration = metaStateService.writeIndex("first index state write", index);
MetaData loadedState = metaStateService.loadFullState(); Manifest manifest = new Manifest(globalGeneration, new HashMap<Index, Long>() {{
assertThat(loadedState.persistentSettings(), equalTo(metaData.persistentSettings())); put(index.getIndex(), indexGeneration);
assertThat(loadedState.hasIndex("test1"), equalTo(true)); }});
assertThat(loadedState.index("test1"), equalTo(index));
} metaStateService.writeManifestAndCleanup("first manifest write", manifest);
MetaData newMetaData = MetaData.builder()
.persistentSettings(Settings.builder().put("test1", "value2").build())
.put(index, true)
.build();
globalGeneration = metaStateService.writeGlobalState("second global state write", newMetaData);
Tuple<Manifest, MetaData> manifestAndMetaData = metaStateService.loadFullState();
assertThat(manifestAndMetaData.v1(), equalTo(manifest));
MetaData loadedMetaData = manifestAndMetaData.v2();
assertThat(loadedMetaData.persistentSettings(), equalTo(metaData.persistentSettings()));
assertThat(loadedMetaData.hasIndex("test1"), equalTo(true));
assertThat(loadedMetaData.index("test1"), equalTo(index));
manifest = new Manifest(globalGeneration, new HashMap<Index, Long>() {{
put(index.getIndex(), indexGeneration);
}});
metaStateService.writeManifestAndCleanup("second manifest write", manifest);
metaStateService.cleanupGlobalState(globalGeneration);
metaStateService.cleanupIndex(index.getIndex(), indexGeneration);
manifestAndMetaData = metaStateService.loadFullState();
assertThat(manifestAndMetaData.v1(), equalTo(manifest));
loadedMetaData = manifestAndMetaData.v2();
assertThat(loadedMetaData.persistentSettings(), equalTo(newMetaData.persistentSettings()));
assertThat(loadedMetaData.hasIndex("test1"), equalTo(true));
assertThat(loadedMetaData.index("test1"), equalTo(index));
} }
} }

View File

@ -194,7 +194,7 @@ public class IndexShardTests extends IndexShardTestCase {
public static void write(ShardStateMetaData shardStateMetaData, public static void write(ShardStateMetaData shardStateMetaData,
Path... shardPaths) throws IOException { Path... shardPaths) throws IOException {
ShardStateMetaData.FORMAT.write(shardStateMetaData, shardPaths); ShardStateMetaData.FORMAT.writeAndCleanup(shardStateMetaData, shardPaths);
} }
public static Engine getEngineFromShard(IndexShard shard) { public static Engine getEngineFromShard(IndexShard shard) {

View File

@ -401,7 +401,7 @@ public class RemoveCorruptedShardDataCommandTests extends IndexShardTestCase {
// create _state of IndexMetaData // create _state of IndexMetaData
try(NodeEnvironment nodeEnvironment = new NodeEnvironment(environment.settings(), environment)) { try(NodeEnvironment nodeEnvironment = new NodeEnvironment(environment.settings(), environment)) {
final Path[] paths = nodeEnvironment.indexPaths(indexMetaData.getIndex()); final Path[] paths = nodeEnvironment.indexPaths(indexMetaData.getIndex());
IndexMetaData.FORMAT.write(indexMetaData, paths); IndexMetaData.FORMAT.writeAndCleanup(indexMetaData, paths);
logger.info("--> index metadata persisted to {} ", Arrays.toString(paths)); logger.info("--> index metadata persisted to {} ", Arrays.toString(paths));
} }
} }

View File

@ -43,7 +43,8 @@ public class ShardPathTests extends ESTestCase {
ShardId shardId = new ShardId("foo", "0xDEADBEEF", 0); ShardId shardId = new ShardId("foo", "0xDEADBEEF", 0);
Path[] paths = env.availableShardPaths(shardId); Path[] paths = env.availableShardPaths(shardId);
Path path = randomFrom(paths); Path path = randomFrom(paths);
ShardStateMetaData.FORMAT.write(new ShardStateMetaData(true, "0xDEADBEEF", AllocationId.newInitializing()), path); ShardStateMetaData.FORMAT.writeAndCleanup(
new ShardStateMetaData(true, "0xDEADBEEF", AllocationId.newInitializing()), path);
ShardPath shardPath = ShardPath.loadShardPath(logger, env, shardId, IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings)); ShardPath shardPath = ShardPath.loadShardPath(logger, env, shardId, IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings));
assertEquals(path, shardPath.getDataPath()); assertEquals(path, shardPath.getDataPath());
assertEquals("0xDEADBEEF", shardPath.getShardId().getIndex().getUUID()); assertEquals("0xDEADBEEF", shardPath.getShardId().getIndex().getUUID());
@ -62,7 +63,8 @@ public class ShardPathTests extends ESTestCase {
ShardId shardId = new ShardId("foo", indexUUID, 0); ShardId shardId = new ShardId("foo", indexUUID, 0);
Path[] paths = env.availableShardPaths(shardId); Path[] paths = env.availableShardPaths(shardId);
assumeTrue("This test tests multi data.path but we only got one", paths.length > 1); assumeTrue("This test tests multi data.path but we only got one", paths.length > 1);
ShardStateMetaData.FORMAT.write(new ShardStateMetaData(true, indexUUID, AllocationId.newInitializing()), paths); ShardStateMetaData.FORMAT.writeAndCleanup(
new ShardStateMetaData(true, indexUUID, AllocationId.newInitializing()), paths);
Exception e = expectThrows(IllegalStateException.class, () -> Exception e = expectThrows(IllegalStateException.class, () ->
ShardPath.loadShardPath(logger, env, shardId, IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings))); ShardPath.loadShardPath(logger, env, shardId, IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings)));
assertThat(e.getMessage(), containsString("more than one shard state found")); assertThat(e.getMessage(), containsString("more than one shard state found"));
@ -77,7 +79,8 @@ public class ShardPathTests extends ESTestCase {
ShardId shardId = new ShardId("foo", "foobar", 0); ShardId shardId = new ShardId("foo", "foobar", 0);
Path[] paths = env.availableShardPaths(shardId); Path[] paths = env.availableShardPaths(shardId);
Path path = randomFrom(paths); Path path = randomFrom(paths);
ShardStateMetaData.FORMAT.write(new ShardStateMetaData(true, "0xDEADBEEF", AllocationId.newInitializing()), path); ShardStateMetaData.FORMAT.writeAndCleanup(
new ShardStateMetaData(true, "0xDEADBEEF", AllocationId.newInitializing()), path);
Exception e = expectThrows(IllegalStateException.class, () -> Exception e = expectThrows(IllegalStateException.class, () ->
ShardPath.loadShardPath(logger, env, shardId, IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings))); ShardPath.loadShardPath(logger, env, shardId, IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings)));
assertThat(e.getMessage(), containsString("expected: foobar on shard path")); assertThat(e.getMessage(), containsString("expected: foobar on shard path"));
@ -124,7 +127,8 @@ public class ShardPathTests extends ESTestCase {
ShardId shardId = new ShardId("foo", indexUUID, 0); ShardId shardId = new ShardId("foo", indexUUID, 0);
Path[] paths = env.availableShardPaths(shardId); Path[] paths = env.availableShardPaths(shardId);
Path path = randomFrom(paths); Path path = randomFrom(paths);
ShardStateMetaData.FORMAT.write(new ShardStateMetaData(true, indexUUID, AllocationId.newInitializing()), path); ShardStateMetaData.FORMAT.writeAndCleanup(
new ShardStateMetaData(true, indexUUID, AllocationId.newInitializing()), path);
ShardPath shardPath = ShardPath.loadShardPath(logger, env, shardId, ShardPath shardPath = ShardPath.loadShardPath(logger, env, shardId,
IndexSettingsModule.newIndexSettings(shardId.getIndex(), indexSettings, nodeSettings)); IndexSettingsModule.newIndexSettings(shardId.getIndex(), indexSettings, nodeSettings));
boolean found = false; boolean found = false;

View File

@ -232,12 +232,12 @@ public class IndicesServiceTests extends ESSingleNodeTestCase {
} }
GatewayMetaState gwMetaState = getInstanceFromNode(GatewayMetaState.class); GatewayMetaState gwMetaState = getInstanceFromNode(GatewayMetaState.class);
MetaData meta = gwMetaState.loadMetaState(); MetaData meta = gwMetaState.loadMetaData();
assertNotNull(meta); assertNotNull(meta);
assertNotNull(meta.index("test")); assertNotNull(meta.index("test"));
assertAcked(client().admin().indices().prepareDelete("test")); assertAcked(client().admin().indices().prepareDelete("test"));
meta = gwMetaState.loadMetaState(); meta = gwMetaState.loadMetaData();
assertNotNull(meta); assertNotNull(meta);
assertNull(meta.index("test")); assertNull(meta.index("test"));