[STATE] Refactor state format to use incremental state IDs
Today there is a chance that the state version for shard, index or cluster state goes backwards or is reset on a full restart etc. depending on several factors not related to the state. To prevent any collisions with already existing state files and to maintain write-once properties this change introductes an incremental state ID instead of using the plain state version. This also fixes a bug when the previous legacy state had a greater version than the current state which causes an exception on node startup or if left-over files are present. Closes #10316
This commit is contained in:
parent
ff6b605778
commit
78d86bcf41
|
@ -56,17 +56,19 @@ public abstract class MetaDataStateFormat<T> {
|
|||
private static final int STATE_FILE_VERSION = 0;
|
||||
private static final int BUFFER_SIZE = 4096;
|
||||
private final XContentType format;
|
||||
private final boolean deleteOldFiles;
|
||||
private final String prefix;
|
||||
private final Pattern stateFilePattern;
|
||||
|
||||
|
||||
/**
|
||||
* Creates a new {@link MetaDataStateFormat} instance
|
||||
* @param format the format of the x-content
|
||||
* @param deleteOldFiles if <code>true</code> write operations will
|
||||
* clean up old files written with this format.
|
||||
*/
|
||||
protected MetaDataStateFormat(XContentType format, boolean deleteOldFiles) {
|
||||
protected MetaDataStateFormat(XContentType format, String prefix) {
|
||||
this.format = format;
|
||||
this.deleteOldFiles = deleteOldFiles;
|
||||
this.prefix = prefix;
|
||||
this.stateFilePattern = Pattern.compile(Pattern.quote(prefix) + "(\\d+)(" + MetaDataStateFormat.STATE_FILE_EXTENSION + ")?");
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -83,15 +85,16 @@ public abstract class MetaDataStateFormat<T> {
|
|||
* it's target filename of the pattern <tt>{prefix}{version}.st</tt>.
|
||||
*
|
||||
* @param state the state object to write
|
||||
* @param prefix the state names prefix used to compose the file name.
|
||||
* @param version the version of the state
|
||||
* @param locations the locations where the state should be written to.
|
||||
* @throws IOException if an IOException occurs
|
||||
*/
|
||||
public final void write(final T state, final String prefix, final long version, final Path... locations) throws IOException {
|
||||
public final void write(final T state, final long version, final Path... locations) throws IOException {
|
||||
Preconditions.checkArgument(locations != null, "Locations must not be null");
|
||||
Preconditions.checkArgument(locations.length > 0, "One or more locations required");
|
||||
String fileName = prefix + version + STATE_FILE_EXTENSION;
|
||||
final long maxStateId = findMaxStateId(prefix, locations)+1;
|
||||
assert maxStateId >= 0 : "maxStateId must be positive but was: [" + maxStateId + "]";
|
||||
final String fileName = prefix + maxStateId + STATE_FILE_EXTENSION;
|
||||
Path stateLocation = locations[0].resolve(STATE_DIR_NAME);
|
||||
Files.createDirectories(stateLocation);
|
||||
final Path tmpStatePath = stateLocation.resolve(fileName + ".tmp");
|
||||
|
@ -136,9 +139,7 @@ public abstract class MetaDataStateFormat<T> {
|
|||
} finally {
|
||||
Files.deleteIfExists(tmpStatePath);
|
||||
}
|
||||
if (deleteOldFiles) {
|
||||
cleanupOldFiles(prefix, fileName, locations);
|
||||
}
|
||||
cleanupOldFiles(prefix, fileName, locations);
|
||||
}
|
||||
|
||||
protected XContentBuilder newXContentBuilder(XContentType type, OutputStream stream ) throws IOException {
|
||||
|
@ -161,17 +162,14 @@ public abstract class MetaDataStateFormat<T> {
|
|||
* Reads the state from a given file and compares the expected version against the actual version of
|
||||
* the state.
|
||||
*/
|
||||
public final T read(Path file, long expectedVersion) throws IOException {
|
||||
public final T read(Path file) throws IOException {
|
||||
try (Directory dir = newDirectory(file.getParent())) {
|
||||
try (final IndexInput indexInput = dir.openInput(file.getFileName().toString(), IOContext.DEFAULT)) {
|
||||
// We checksum the entire file before we even go and parse it. If it's corrupted we barf right here.
|
||||
CodecUtil.checksumEntireFile(indexInput);
|
||||
CodecUtil.checkHeader(indexInput, STATE_FILE_CODEC, STATE_FILE_VERSION, STATE_FILE_VERSION);
|
||||
final XContentType xContentType = XContentType.values()[indexInput.readInt()];
|
||||
final long version = indexInput.readLong();
|
||||
if (version != expectedVersion) {
|
||||
throw new CorruptStateException("State version mismatch expected: " + expectedVersion + " but was: " + version);
|
||||
}
|
||||
indexInput.readLong(); // version currently unused
|
||||
long filePointer = indexInput.getFilePointer();
|
||||
long contentSize = indexInput.length() - CodecUtil.footerLength() - filePointer;
|
||||
try (IndexInput slice = indexInput.slice("state_xcontent", filePointer, contentSize)) {
|
||||
|
@ -210,25 +208,38 @@ public abstract class MetaDataStateFormat<T> {
|
|||
}
|
||||
}
|
||||
|
||||
long findMaxStateId(final String prefix, Path... locations) throws IOException {
|
||||
long maxId = -1;
|
||||
for (Path dataLocation : locations) {
|
||||
final Path resolve = dataLocation.resolve(STATE_DIR_NAME);
|
||||
if (Files.exists(resolve)) {
|
||||
try (DirectoryStream<Path> stream = Files.newDirectoryStream(resolve, prefix + "*")) {
|
||||
for (Path stateFile : stream) {
|
||||
final Matcher matcher = stateFilePattern.matcher(stateFile.getFileName().toString());
|
||||
if (matcher.matches()) {
|
||||
final long id = Long.parseLong(matcher.group(1));
|
||||
maxId = Math.max(maxId, id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return maxId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to load the latest state from the given data-locations. It tries to load the latest state determined by
|
||||
* the states version from one or more data directories and if none of the latest states can be loaded an exception
|
||||
* is thrown to prevent accidentally loading a previous state and silently omitting the latest state.
|
||||
*
|
||||
* @param logger an elasticsearch logger instance
|
||||
* @param format the actual metastate format to use
|
||||
* @param pattern the file name pattern to identify files belonging to this pattern and to read the version from.
|
||||
* The first capture group should return the version of the file. If the second capture group is has a
|
||||
* null value the files is considered a legacy file and will be treated as if the file contains a plain
|
||||
* x-content payload.
|
||||
* @param stateType the state type we are loading. used for logging contenxt only.
|
||||
* @param dataLocations the data-locations to try.
|
||||
* @return the latest state or <code>null</code> if no state was found.
|
||||
*/
|
||||
public static <T> T loadLatestState(ESLogger logger, MetaDataStateFormat<T> format, Pattern pattern, String stateType, Path... dataLocations) throws IOException {
|
||||
List<PathAndVersion> files = new ArrayList<>();
|
||||
long maxVersion = -1;
|
||||
boolean maxVersionIsLegacy = true;
|
||||
public T loadLatestState(ESLogger logger, Path... dataLocations) throws IOException {
|
||||
List<PathAndStateId> files = new ArrayList<>();
|
||||
long maxStateId = -1;
|
||||
boolean maxStateIdIsLegacy = true;
|
||||
if (dataLocations != null) { // select all eligable files first
|
||||
for (Path dataLocation : dataLocations) {
|
||||
final Path stateDir = dataLocation.resolve(STATE_DIR_NAME);
|
||||
|
@ -238,13 +249,13 @@ public abstract class MetaDataStateFormat<T> {
|
|||
// now, iterate over the current versions, and find latest one
|
||||
try (DirectoryStream<Path> paths = Files.newDirectoryStream(stateDir)) { // we don't pass a glob since we need the group part for parsing
|
||||
for (Path stateFile : paths) {
|
||||
final Matcher matcher = pattern.matcher(stateFile.getFileName().toString());
|
||||
final Matcher matcher = stateFilePattern.matcher(stateFile.getFileName().toString());
|
||||
if (matcher.matches()) {
|
||||
final long version = Long.parseLong(matcher.group(1));
|
||||
maxVersion = Math.max(maxVersion, version);
|
||||
final long stateId = Long.parseLong(matcher.group(1));
|
||||
maxStateId = Math.max(maxStateId, stateId);
|
||||
final boolean legacy = MetaDataStateFormat.STATE_FILE_EXTENSION.equals(matcher.group(2)) == false;
|
||||
maxVersionIsLegacy &= legacy; // on purpose, see NOTE below
|
||||
PathAndVersion pav = new PathAndVersion(stateFile, version, legacy);
|
||||
maxStateIdIsLegacy &= legacy; // on purpose, see NOTE below
|
||||
PathAndStateId pav = new PathAndStateId(stateFile, stateId, legacy);
|
||||
logger.trace("found state file: {}", pav);
|
||||
files.add(pav);
|
||||
}
|
||||
|
@ -259,30 +270,30 @@ public abstract class MetaDataStateFormat<T> {
|
|||
// new format (ie. legacy == false) then we know that the latest version state ought to use this new format.
|
||||
// In case the state file with the latest version does not use the new format while older state files do,
|
||||
// the list below will be empty and loading the state will fail
|
||||
for (PathAndVersion pathAndVersion : Collections2.filter(files, new VersionAndLegacyPredicate(maxVersion, maxVersionIsLegacy))) {
|
||||
for (PathAndStateId pathAndStateId : Collections2.filter(files, new StateIdAndLegacyPredicate(maxStateId, maxStateIdIsLegacy))) {
|
||||
try {
|
||||
final Path stateFile = pathAndVersion.file;
|
||||
final long version = pathAndVersion.version;
|
||||
final Path stateFile = pathAndStateId.file;
|
||||
final long id = pathAndStateId.id;
|
||||
final XContentParser parser;
|
||||
if (pathAndVersion.legacy) { // read the legacy format -- plain XContent
|
||||
if (pathAndStateId.legacy) { // read the legacy format -- plain XContent
|
||||
final byte[] data = Files.readAllBytes(stateFile);
|
||||
if (data.length == 0) {
|
||||
logger.debug("{}: no data for [{}], ignoring...", stateType, stateFile.toAbsolutePath());
|
||||
logger.debug("{}: no data for [{}], ignoring...", prefix, stateFile.toAbsolutePath());
|
||||
continue;
|
||||
}
|
||||
parser = XContentHelper.createParser(data, 0, data.length);
|
||||
state = format.fromXContent(parser);
|
||||
state = fromXContent(parser);
|
||||
if (state == null) {
|
||||
logger.debug("{}: no data for [{}], ignoring...", stateType, stateFile.toAbsolutePath());
|
||||
logger.debug("{}: no data for [{}], ignoring...", prefix, stateFile.toAbsolutePath());
|
||||
}
|
||||
} else {
|
||||
state = format.read(stateFile, version);
|
||||
logger.trace("state version [{}] read from [{}]", version, stateFile.getFileName());
|
||||
state = read(stateFile);
|
||||
logger.trace("state id [{}] read from [{}]", id, stateFile.getFileName());
|
||||
}
|
||||
return state;
|
||||
} catch (Throwable e) {
|
||||
exceptions.add(e);
|
||||
logger.debug("{}: failed to read [{}], ignoring...", e, pathAndVersion.file.toAbsolutePath(), stateType);
|
||||
logger.debug("{}: failed to read [{}], ignoring...", e, pathAndStateId.file.toAbsolutePath(), prefix);
|
||||
}
|
||||
}
|
||||
// if we reach this something went wrong
|
||||
|
@ -295,42 +306,42 @@ public abstract class MetaDataStateFormat<T> {
|
|||
}
|
||||
|
||||
/**
|
||||
* Filters out all {@link MetaDataStateFormat.PathAndVersion} instances with a different version than
|
||||
* Filters out all {@link org.elasticsearch.gateway.MetaDataStateFormat.PathAndStateId} instances with a different id than
|
||||
* the given one.
|
||||
*/
|
||||
private static final class VersionAndLegacyPredicate implements Predicate<PathAndVersion> {
|
||||
private final long version;
|
||||
private static final class StateIdAndLegacyPredicate implements Predicate<PathAndStateId> {
|
||||
private final long id;
|
||||
private final boolean legacy;
|
||||
|
||||
VersionAndLegacyPredicate(long version, boolean legacy) {
|
||||
this.version = version;
|
||||
StateIdAndLegacyPredicate(long id, boolean legacy) {
|
||||
this.id = id;
|
||||
this.legacy = legacy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean apply(PathAndVersion input) {
|
||||
return input.version == version && input.legacy == legacy;
|
||||
public boolean apply(PathAndStateId input) {
|
||||
return input.id == id && input.legacy == legacy;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal struct-like class that holds the parsed state version, the file
|
||||
* Internal struct-like class that holds the parsed state id, the file
|
||||
* and a flag if the file is a legacy state ie. pre 1.5
|
||||
*/
|
||||
private static class PathAndVersion {
|
||||
private static class PathAndStateId {
|
||||
final Path file;
|
||||
final long version;
|
||||
final long id;
|
||||
final boolean legacy;
|
||||
|
||||
private PathAndVersion(Path file, long version, boolean legacy) {
|
||||
private PathAndStateId(Path file, long id, boolean legacy) {
|
||||
this.file = file;
|
||||
this.version = version;
|
||||
this.id = id;
|
||||
this.legacy = legacy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "[version:" + version + ", legacy:" + legacy + ", file:" + file.toAbsolutePath() + "]";
|
||||
return "[id:" + id + ", legacy:" + legacy + ", file:" + file.toAbsolutePath() + "]";
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -36,7 +36,6 @@ import org.elasticsearch.index.Index;
|
|||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
/**
|
||||
* Handles writing and loading both {@link MetaData} and {@link IndexMetaData}
|
||||
|
@ -47,22 +46,20 @@ public class MetaStateService extends AbstractComponent {
|
|||
|
||||
static final String GLOBAL_STATE_FILE_PREFIX = "global-";
|
||||
private static final String INDEX_STATE_FILE_PREFIX = "state-";
|
||||
static final Pattern GLOBAL_STATE_FILE_PATTERN = Pattern.compile(GLOBAL_STATE_FILE_PREFIX + "(\\d+)(" + MetaDataStateFormat.STATE_FILE_EXTENSION + ")?");
|
||||
static final Pattern INDEX_STATE_FILE_PATTERN = Pattern.compile(INDEX_STATE_FILE_PREFIX + "(\\d+)(" + MetaDataStateFormat.STATE_FILE_EXTENSION + ")?");
|
||||
private static final String GLOBAL_STATE_LOG_TYPE = "[_global]";
|
||||
|
||||
private final NodeEnvironment nodeEnv;
|
||||
|
||||
private final XContentType format;
|
||||
private final ToXContent.Params formatParams;
|
||||
private final ToXContent.Params gatewayModeFormatParams;
|
||||
private final MetaDataStateFormat<IndexMetaData> indexStateFormat;
|
||||
private final MetaDataStateFormat<MetaData> globalStateFormat;
|
||||
|
||||
@Inject
|
||||
public MetaStateService(Settings settings, NodeEnvironment nodeEnv) {
|
||||
super(settings);
|
||||
this.nodeEnv = nodeEnv;
|
||||
this.format = XContentType.fromRestContentType(settings.get(FORMAT_SETTING, "smile"));
|
||||
|
||||
if (this.format == XContentType.SMILE) {
|
||||
Map<String, String> params = Maps.newHashMap();
|
||||
params.put("binary", "true");
|
||||
|
@ -77,6 +74,9 @@ public class MetaStateService extends AbstractComponent {
|
|||
gatewayModeParams.put(MetaData.CONTEXT_MODE_PARAM, MetaData.CONTEXT_MODE_GATEWAY);
|
||||
gatewayModeFormatParams = new ToXContent.MapParams(gatewayModeParams);
|
||||
}
|
||||
indexStateFormat = indexStateFormat(format, formatParams);
|
||||
globalStateFormat = globalStateFormat(format, gatewayModeFormatParams);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -109,15 +109,14 @@ public class MetaStateService extends AbstractComponent {
|
|||
*/
|
||||
@Nullable
|
||||
IndexMetaData loadIndexState(String index) throws IOException {
|
||||
return MetaDataStateFormat.loadLatestState(logger, indexStateFormat(format, formatParams, true),
|
||||
INDEX_STATE_FILE_PATTERN, "[" + index + "]", nodeEnv.indexPaths(new Index(index)));
|
||||
return indexStateFormat.loadLatestState(logger, nodeEnv.indexPaths(new Index(index)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Loads the global state, *without* index state, see {@link #loadFullState()} for that.
|
||||
*/
|
||||
MetaData loadGlobalState() throws IOException {
|
||||
return MetaDataStateFormat.loadLatestState(logger, globalStateFormat(format, gatewayModeFormatParams, true), GLOBAL_STATE_FILE_PATTERN, GLOBAL_STATE_LOG_TYPE, nodeEnv.nodeDataPaths());
|
||||
return globalStateFormat.loadLatestState(logger, nodeEnv.nodeDataPaths());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -125,10 +124,8 @@ public class MetaStateService extends AbstractComponent {
|
|||
*/
|
||||
void writeIndex(String reason, IndexMetaData indexMetaData, @Nullable IndexMetaData previousIndexMetaData) throws Exception {
|
||||
logger.trace("[{}] writing state, reason [{}]", indexMetaData.index(), reason);
|
||||
final boolean deleteOldFiles = previousIndexMetaData != null && previousIndexMetaData.version() != indexMetaData.version();
|
||||
final MetaDataStateFormat<IndexMetaData> writer = indexStateFormat(format, formatParams, deleteOldFiles);
|
||||
try {
|
||||
writer.write(indexMetaData, INDEX_STATE_FILE_PREFIX, indexMetaData.version(),
|
||||
indexStateFormat.write(indexMetaData, indexMetaData.version(),
|
||||
nodeEnv.indexPaths(new Index(indexMetaData.index())));
|
||||
} catch (Throwable ex) {
|
||||
logger.warn("[{}]: failed to write index state", ex, indexMetaData.index());
|
||||
|
@ -140,12 +137,11 @@ public class MetaStateService extends AbstractComponent {
|
|||
* Writes the global state, *without* the indices states.
|
||||
*/
|
||||
void writeGlobalState(String reason, MetaData metaData) throws Exception {
|
||||
logger.trace("{} writing state, reason [{}]", GLOBAL_STATE_LOG_TYPE, reason);
|
||||
final MetaDataStateFormat<MetaData> writer = globalStateFormat(format, gatewayModeFormatParams, true);
|
||||
logger.trace("[_global] writing state, reason [{}]", reason);
|
||||
try {
|
||||
writer.write(metaData, GLOBAL_STATE_FILE_PREFIX, metaData.version(), nodeEnv.nodeDataPaths());
|
||||
globalStateFormat.write(metaData, metaData.version(), nodeEnv.nodeDataPaths());
|
||||
} catch (Throwable ex) {
|
||||
logger.warn("{}: failed to write global state", ex, GLOBAL_STATE_LOG_TYPE);
|
||||
logger.warn("[_global]: failed to write global state", ex);
|
||||
throw new IOException("failed to write global state", ex);
|
||||
}
|
||||
}
|
||||
|
@ -153,8 +149,8 @@ public class MetaStateService extends AbstractComponent {
|
|||
/**
|
||||
* Returns a StateFormat that can read and write {@link MetaData}
|
||||
*/
|
||||
static MetaDataStateFormat<MetaData> globalStateFormat(XContentType format, final ToXContent.Params formatParams, final boolean deleteOldFiles) {
|
||||
return new MetaDataStateFormat<MetaData>(format, deleteOldFiles) {
|
||||
static MetaDataStateFormat<MetaData> globalStateFormat(XContentType format, final ToXContent.Params formatParams) {
|
||||
return new MetaDataStateFormat<MetaData>(format, GLOBAL_STATE_FILE_PREFIX) {
|
||||
|
||||
@Override
|
||||
public void toXContent(XContentBuilder builder, MetaData state) throws IOException {
|
||||
|
@ -171,8 +167,8 @@ public class MetaStateService extends AbstractComponent {
|
|||
/**
|
||||
* Returns a StateFormat that can read and write {@link IndexMetaData}
|
||||
*/
|
||||
static MetaDataStateFormat<IndexMetaData> indexStateFormat(XContentType format, final ToXContent.Params formatParams, boolean deleteOldFiles) {
|
||||
return new MetaDataStateFormat<IndexMetaData>(format, deleteOldFiles) {
|
||||
static MetaDataStateFormat<IndexMetaData> indexStateFormat(XContentType format, final ToXContent.Params formatParams) {
|
||||
return new MetaDataStateFormat<IndexMetaData>(format, INDEX_STATE_FILE_PREFIX) {
|
||||
|
||||
@Override
|
||||
public void toXContent(XContentBuilder builder, IndexMetaData state) throws IOException {
|
||||
|
|
|
@ -42,10 +42,7 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicReferenceArray;
|
||||
|
||||
/**
|
||||
|
@ -120,7 +117,7 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat
|
|||
final ShardId shardId = request.getShardId();
|
||||
final String indexUUID = request.getIndexUUID();
|
||||
logger.trace("{} loading local shard state info", shardId);
|
||||
ShardStateMetaData shardStateMetaData = ShardStateMetaData.load(logger, request.shardId, nodeEnv.shardPaths(request.shardId));
|
||||
final ShardStateMetaData shardStateMetaData = ShardStateMetaData.FORMAT.loadLatestState(logger, nodeEnv.shardPaths(request.shardId));
|
||||
if (shardStateMetaData != null) {
|
||||
// old shard metadata doesn't have the actual index UUID so we need to check if the actual uuid in the metadata
|
||||
// is equal to IndexMetaData.INDEX_UUID_NA_VALUE otherwise this shard doesn't belong to the requested index.
|
||||
|
|
|
@ -1274,7 +1274,8 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
return;
|
||||
}
|
||||
final ShardStateMetaData newShardStateMetadata = new ShardStateMetaData(newRouting.version(), newRouting.primary(), getIndexUUID());
|
||||
ShardStateMetaData.write(logger, writeReason, shardId, newShardStateMetadata, currentRouting != null, nodeEnv.shardPaths(shardId));
|
||||
logger.trace("{} writing shard state, reason [{}]", shardId, writeReason);
|
||||
ShardStateMetaData.FORMAT.write(newShardStateMetadata, newShardStateMetadata.version, nodeEnv.shardPaths(shardId));
|
||||
} catch (IOException e) { // this is how we used to handle it.... :(
|
||||
logger.warn("failed to write shard state", e);
|
||||
// we failed to write the shard state, we will try and write
|
||||
|
|
|
@ -38,7 +38,6 @@ import java.util.regex.Pattern;
|
|||
public final class ShardStateMetaData {
|
||||
|
||||
private static final String SHARD_STATE_FILE_PREFIX = "state-";
|
||||
private static final Pattern SHARD_STATE_FILE_PATTERN = Pattern.compile(SHARD_STATE_FILE_PREFIX + "(\\d+)(" + MetaDataStateFormat.STATE_FILE_EXTENSION + ")?");
|
||||
private static final String PRIMARY_KEY = "primary";
|
||||
private static final String VERSION_KEY = "version";
|
||||
private static final String INDEX_UUID_KEY = "index_uuid" ;
|
||||
|
@ -91,70 +90,57 @@ public final class ShardStateMetaData {
|
|||
return "version [" + version + "], primary [" + primary + "]";
|
||||
}
|
||||
|
||||
private static MetaDataStateFormat<ShardStateMetaData> newShardStateInfoFormat(boolean deleteOldFiles) {
|
||||
return new MetaDataStateFormat<ShardStateMetaData>(XContentType.JSON, deleteOldFiles) {
|
||||
public static final MetaDataStateFormat<ShardStateMetaData> FORMAT = new MetaDataStateFormat<ShardStateMetaData>(XContentType.JSON, SHARD_STATE_FILE_PREFIX) {
|
||||
|
||||
@Override
|
||||
protected XContentBuilder newXContentBuilder(XContentType type, OutputStream stream) throws IOException {
|
||||
XContentBuilder xContentBuilder = super.newXContentBuilder(type, stream);
|
||||
xContentBuilder.prettyPrint();
|
||||
return xContentBuilder;
|
||||
@Override
|
||||
protected XContentBuilder newXContentBuilder(XContentType type, OutputStream stream) throws IOException {
|
||||
XContentBuilder xContentBuilder = super.newXContentBuilder(type, stream);
|
||||
xContentBuilder.prettyPrint();
|
||||
return xContentBuilder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void toXContent(XContentBuilder builder, ShardStateMetaData shardStateMetaData) throws IOException {
|
||||
builder.field(VERSION_KEY, shardStateMetaData.version);
|
||||
builder.field(PRIMARY_KEY, shardStateMetaData.primary);
|
||||
builder.field(INDEX_UUID_KEY, shardStateMetaData.indexUUID);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ShardStateMetaData fromXContent(XContentParser parser) throws IOException {
|
||||
XContentParser.Token token = parser.nextToken();
|
||||
if (token == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void toXContent(XContentBuilder builder, ShardStateMetaData shardStateMetaData) throws IOException {
|
||||
builder.field(VERSION_KEY, shardStateMetaData.version);
|
||||
builder.field(PRIMARY_KEY, shardStateMetaData.primary);
|
||||
builder.field(INDEX_UUID_KEY, shardStateMetaData.indexUUID);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ShardStateMetaData fromXContent(XContentParser parser) throws IOException {
|
||||
XContentParser.Token token = parser.nextToken();
|
||||
if (token == null) {
|
||||
return null;
|
||||
}
|
||||
long version = -1;
|
||||
Boolean primary = null;
|
||||
String currentFieldName = null;
|
||||
String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE;
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
currentFieldName = parser.currentName();
|
||||
} else if (token.isValue()) {
|
||||
if (VERSION_KEY.equals(currentFieldName)) {
|
||||
version = parser.longValue();
|
||||
} else if (PRIMARY_KEY.equals(currentFieldName)) {
|
||||
primary = parser.booleanValue();
|
||||
} else if (INDEX_UUID_KEY.equals(currentFieldName)) {
|
||||
indexUUID = parser.text();
|
||||
} else {
|
||||
throw new CorruptStateException("unexpected field in shard state [" + currentFieldName + "]");
|
||||
}
|
||||
long version = -1;
|
||||
Boolean primary = null;
|
||||
String currentFieldName = null;
|
||||
String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE;
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
currentFieldName = parser.currentName();
|
||||
} else if (token.isValue()) {
|
||||
if (VERSION_KEY.equals(currentFieldName)) {
|
||||
version = parser.longValue();
|
||||
} else if (PRIMARY_KEY.equals(currentFieldName)) {
|
||||
primary = parser.booleanValue();
|
||||
} else if (INDEX_UUID_KEY.equals(currentFieldName)) {
|
||||
indexUUID = parser.text();
|
||||
} else {
|
||||
throw new CorruptStateException("unexpected token in shard state [" + token.name() + "]");
|
||||
throw new CorruptStateException("unexpected field in shard state [" + currentFieldName + "]");
|
||||
}
|
||||
} else {
|
||||
throw new CorruptStateException("unexpected token in shard state [" + token.name() + "]");
|
||||
}
|
||||
if (primary == null) {
|
||||
throw new CorruptStateException("missing value for [primary] in shard state");
|
||||
}
|
||||
if (version == -1) {
|
||||
throw new CorruptStateException("missing value for [version] in shard state");
|
||||
}
|
||||
return new ShardStateMetaData(version, primary, indexUUID);
|
||||
}
|
||||
};
|
||||
}
|
||||
if (primary == null) {
|
||||
throw new CorruptStateException("missing value for [primary] in shard state");
|
||||
}
|
||||
if (version == -1) {
|
||||
throw new CorruptStateException("missing value for [version] in shard state");
|
||||
}
|
||||
return new ShardStateMetaData(version, primary, indexUUID);
|
||||
}
|
||||
};
|
||||
|
||||
public static ShardStateMetaData load(ESLogger logger, ShardId shardId, Path... shardPaths) throws IOException {
|
||||
return MetaDataStateFormat.loadLatestState(logger, newShardStateInfoFormat(false), SHARD_STATE_FILE_PATTERN,
|
||||
shardId.toString(), shardPaths);
|
||||
}
|
||||
|
||||
public static void write(ESLogger logger, String reason, ShardId shardId, ShardStateMetaData shardStateMetaData,
|
||||
boolean deletePreviousState, Path... shardPaths) throws IOException {
|
||||
logger.trace("{} writing shard state, reason [{}]", shardId, reason);
|
||||
MetaDataStateFormat<ShardStateMetaData> stateFormat = newShardStateInfoFormat(deletePreviousState);
|
||||
stateFormat.write(shardStateMetaData, SHARD_STATE_FILE_PREFIX, shardStateMetaData.version, shardPaths);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.elasticsearch.Version;
|
|||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.io.FileSystemUtils;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
@ -49,6 +50,7 @@ import org.junit.Test;
|
|||
import java.io.Closeable;
|
||||
import java.io.InputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
|
@ -63,7 +65,6 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.hamcrest.Matchers.anyOf;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
@ -78,7 +79,7 @@ public class MetaDataStateFormatTest extends ElasticsearchTestCase {
|
|||
* Ensure we can read a pre-generated cluster state.
|
||||
*/
|
||||
public void testReadClusterState() throws URISyntaxException, IOException {
|
||||
final MetaDataStateFormat<MetaData> format = new MetaDataStateFormat<MetaData>(randomFrom(XContentType.values()), false) {
|
||||
final MetaDataStateFormat<MetaData> format = new MetaDataStateFormat<MetaData>(randomFrom(XContentType.values()), "global-") {
|
||||
|
||||
@Override
|
||||
public void toXContent(XContentBuilder builder, MetaData state) throws IOException {
|
||||
|
@ -95,7 +96,7 @@ public class MetaDataStateFormatTest extends ElasticsearchTestCase {
|
|||
assertThat(resource, notNullValue());
|
||||
Path dst = tmp.resolve("global-3.st");
|
||||
Files.copy(resource, dst);
|
||||
MetaData read = format.read(dst, 3);
|
||||
MetaData read = format.read(dst);
|
||||
assertThat(read, notNullValue());
|
||||
assertThat(read.uuid(), equalTo("3O1tDF1IRB6fSJ-GrTMUtg"));
|
||||
// indices are empty since they are serialized separately
|
||||
|
@ -106,47 +107,38 @@ public class MetaDataStateFormatTest extends ElasticsearchTestCase {
|
|||
for (int i = 0; i < dirs.length; i++) {
|
||||
dirs[i] = newTempDirPath(LifecycleScope.TEST);
|
||||
}
|
||||
final boolean deleteOldFiles = randomBoolean();
|
||||
Format format = new Format(randomFrom(XContentType.values()), deleteOldFiles);
|
||||
final long id = addDummyFiles("foo-", dirs);
|
||||
Format format = new Format(randomFrom(XContentType.values()), "foo-");
|
||||
DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), randomDouble(), randomBoolean());
|
||||
int version = between(0, Integer.MAX_VALUE/2);
|
||||
format.write(state, "foo-", version, dirs);
|
||||
format.write(state, version, dirs);
|
||||
for (Path file : dirs) {
|
||||
Path[] list = content(file);
|
||||
Path[] list = content("*", file);
|
||||
assertEquals(list.length, 1);
|
||||
assertThat(list[0].getFileName().toString(), equalTo(MetaDataStateFormat.STATE_DIR_NAME));
|
||||
Path stateDir = list[0];
|
||||
assertThat(Files.isDirectory(stateDir), is(true));
|
||||
list = content(stateDir);
|
||||
list = content("foo-*", stateDir);
|
||||
assertEquals(list.length, 1);
|
||||
assertThat(list[0].getFileName().toString(), equalTo("foo-" + version + ".st"));
|
||||
DummyState read = format.read(list[0], version);
|
||||
assertThat(list[0].getFileName().toString(), equalTo("foo-" + id + ".st"));
|
||||
DummyState read = format.read(list[0]);
|
||||
assertThat(read, equalTo(state));
|
||||
}
|
||||
final int version2 = between(version, Integer.MAX_VALUE);
|
||||
DummyState state2 = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), randomDouble(), randomBoolean());
|
||||
format.write(state2, "foo-", version2, dirs);
|
||||
format.write(state2, version2, dirs);
|
||||
|
||||
for (Path file : dirs) {
|
||||
Path[] list = content(file);
|
||||
Path[] list = content("*", file);
|
||||
assertEquals(list.length, 1);
|
||||
assertThat(list[0].getFileName().toString(), equalTo(MetaDataStateFormat.STATE_DIR_NAME));
|
||||
Path stateDir = list[0];
|
||||
assertThat(Files.isDirectory(stateDir), is(true));
|
||||
list = content(stateDir);
|
||||
assertEquals(list.length, deleteOldFiles ? 1 : 2);
|
||||
if (deleteOldFiles) {
|
||||
assertThat(list[0].getFileName().toString(), equalTo("foo-" + version2 + ".st"));
|
||||
DummyState read = format.read(list[0], version2);
|
||||
assertThat(read, equalTo(state2));
|
||||
} else {
|
||||
assertThat(list[0].getFileName().toString(), anyOf(equalTo("foo-" + version + ".st"), equalTo("foo-" + version2 + ".st")));
|
||||
assertThat(list[1].getFileName().toString(), anyOf(equalTo("foo-" + version + ".st"), equalTo("foo-" + version2 + ".st")));
|
||||
DummyState read = format.read(stateDir.resolve("foo-" + version2 + ".st"), version2);
|
||||
assertThat(read, equalTo(state2));
|
||||
read = format.read(stateDir.resolve("foo-" + version + ".st"), version);
|
||||
assertThat(read, equalTo(state));
|
||||
}
|
||||
list = content("foo-*", stateDir);
|
||||
assertEquals(list.length,1);
|
||||
assertThat(list[0].getFileName().toString(), equalTo("foo-"+ (id+1) + ".st"));
|
||||
DummyState read = format.read(list[0]);
|
||||
assertThat(read, equalTo(state2));
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -157,27 +149,22 @@ public class MetaDataStateFormatTest extends ElasticsearchTestCase {
|
|||
for (int i = 0; i < dirs.length; i++) {
|
||||
dirs[i] = newTempDirPath(LifecycleScope.TEST);
|
||||
}
|
||||
final boolean deleteOldFiles = randomBoolean();
|
||||
Format format = new Format(randomFrom(XContentType.values()), deleteOldFiles);
|
||||
final long id = addDummyFiles("foo-", dirs);
|
||||
|
||||
Format format = new Format(randomFrom(XContentType.values()), "foo-");
|
||||
DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), randomDouble(), randomBoolean());
|
||||
int version = between(0, Integer.MAX_VALUE/2);
|
||||
format.write(state, "foo-", version, dirs);
|
||||
format.write(state, version, dirs);
|
||||
for (Path file : dirs) {
|
||||
Path[] list = content(file);
|
||||
Path[] list = content("*", file);
|
||||
assertEquals(list.length, 1);
|
||||
assertThat(list[0].getFileName().toString(), equalTo(MetaDataStateFormat.STATE_DIR_NAME));
|
||||
Path stateDir = list[0];
|
||||
assertThat(Files.isDirectory(stateDir), is(true));
|
||||
list = content(stateDir);
|
||||
list = content("foo-*", stateDir);
|
||||
assertEquals(list.length, 1);
|
||||
assertThat(list[0].getFileName().toString(), equalTo("foo-" + version + ".st"));
|
||||
try {
|
||||
format.read(list[0], between(version+1, Integer.MAX_VALUE));
|
||||
fail("corruption expected");
|
||||
} catch (CorruptStateException ex) {
|
||||
// success
|
||||
}
|
||||
DummyState read = format.read(list[0], version);
|
||||
assertThat(list[0].getFileName().toString(), equalTo("foo-" + id + ".st"));
|
||||
DummyState read = format.read(list[0]);
|
||||
assertThat(read, equalTo(state));
|
||||
}
|
||||
}
|
||||
|
@ -187,26 +174,26 @@ public class MetaDataStateFormatTest extends ElasticsearchTestCase {
|
|||
for (int i = 0; i < dirs.length; i++) {
|
||||
dirs[i] = newTempDirPath(LifecycleScope.TEST);
|
||||
}
|
||||
final boolean deleteOldFiles = randomBoolean();
|
||||
Format format = new Format(randomFrom(XContentType.values()), deleteOldFiles);
|
||||
final long id = addDummyFiles("foo-", dirs);
|
||||
Format format = new Format(randomFrom(XContentType.values()), "foo-");
|
||||
DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), randomDouble(), randomBoolean());
|
||||
int version = between(0, Integer.MAX_VALUE/2);
|
||||
format.write(state, "foo-", version, dirs);
|
||||
format.write(state, version, dirs);
|
||||
for (Path file : dirs) {
|
||||
Path[] list = content(file);
|
||||
Path[] list = content("*", file);
|
||||
assertEquals(list.length, 1);
|
||||
assertThat(list[0].getFileName().toString(), equalTo(MetaDataStateFormat.STATE_DIR_NAME));
|
||||
Path stateDir = list[0];
|
||||
assertThat(Files.isDirectory(stateDir), is(true));
|
||||
list = content(stateDir);
|
||||
list = content("foo-*", stateDir);
|
||||
assertEquals(list.length, 1);
|
||||
assertThat(list[0].getFileName().toString(), equalTo("foo-" + version + ".st"));
|
||||
DummyState read = format.read(list[0], version);
|
||||
assertThat(list[0].getFileName().toString(), equalTo("foo-" + id + ".st"));
|
||||
DummyState read = format.read(list[0]);
|
||||
assertThat(read, equalTo(state));
|
||||
// now corrupt it
|
||||
corruptFile(list[0], logger);
|
||||
try {
|
||||
format.read(list[0], version);
|
||||
format.read(list[0]);
|
||||
fail("corrupted file");
|
||||
} catch (CorruptStateException ex) {
|
||||
// expected
|
||||
|
@ -230,9 +217,8 @@ public class MetaDataStateFormatTest extends ElasticsearchTestCase {
|
|||
bb.flip();
|
||||
byte oldValue = bb.get(0);
|
||||
byte newValue = (byte) ~oldValue;
|
||||
|
||||
raf.position(filePointer);
|
||||
raf.write(bb);
|
||||
bb.put(0, newValue);
|
||||
raf.write(bb, filePointer);
|
||||
logger.debug("Corrupting file {} -- flipping at position {} from {} to {} ", fileToCorrupt.getFileName().toString(), filePointer, Integer.toHexString(oldValue), Integer.toHexString(newValue));
|
||||
}
|
||||
long checksumAfterCorruption;
|
||||
|
@ -258,7 +244,7 @@ public class MetaDataStateFormatTest extends ElasticsearchTestCase {
|
|||
// If the latest version doesn't use the legacy format while previous versions do, then fail hard
|
||||
public void testLatestVersionDoesNotUseLegacy() throws IOException {
|
||||
final ToXContent.Params params = ToXContent.EMPTY_PARAMS;
|
||||
MetaDataStateFormat<MetaData> format = MetaStateService.globalStateFormat(randomFrom(XContentType.values()), params, randomBoolean());
|
||||
MetaDataStateFormat<MetaData> format = MetaStateService.globalStateFormat(randomFrom(XContentType.values()), params);
|
||||
final Path[] dirs = new Path[2];
|
||||
dirs[0] = newTempDirPath(LifecycleScope.TEST);
|
||||
dirs[1] = newTempDirPath(LifecycleScope.TEST);
|
||||
|
@ -268,7 +254,7 @@ public class MetaDataStateFormatTest extends ElasticsearchTestCase {
|
|||
final Path dir1 = randomFrom(dirs);
|
||||
final int v1 = randomInt(10);
|
||||
// write a first state file in the new format
|
||||
format.write(randomMeta(), MetaStateService.GLOBAL_STATE_FILE_PREFIX, v1, dir1);
|
||||
format.write(randomMeta(), v1, dir1);
|
||||
|
||||
// write older state files in the old format but with a newer version
|
||||
final int numLegacyFiles = randomIntBetween(1, 5);
|
||||
|
@ -283,24 +269,33 @@ public class MetaDataStateFormatTest extends ElasticsearchTestCase {
|
|||
}
|
||||
|
||||
try {
|
||||
MetaDataStateFormat.loadLatestState(logger, format, MetaStateService.GLOBAL_STATE_FILE_PATTERN, "foobar", dirs);
|
||||
format.loadLatestState(logger, dirs);
|
||||
fail("latest version can not be read");
|
||||
} catch (ElasticsearchIllegalStateException ex) {
|
||||
assertThat(ex.getMessage(), startsWith("Could not find a state file to recover from among "));
|
||||
}
|
||||
// write the next state file in the new format and ensure it get's a higher ID
|
||||
final MetaData meta = randomMeta();
|
||||
format.write(meta, v1, dirs);
|
||||
final MetaData metaData = format.loadLatestState(logger, dirs);
|
||||
assertEquals(meta.uuid(), metaData.uuid());
|
||||
final Path path = randomFrom(dirs);
|
||||
final Path[] files = FileSystemUtils.files(path.resolve("_state"));
|
||||
assertEquals(1, files.length);
|
||||
assertEquals("global-" + format.findMaxStateId("global-", dirs) + ".st", files[0].getFileName().toString());
|
||||
|
||||
}
|
||||
|
||||
// If both the legacy and the new format are available for the latest version, prefer the new format
|
||||
public void testPrefersNewerFormat() throws IOException {
|
||||
final ToXContent.Params params = ToXContent.EMPTY_PARAMS;
|
||||
MetaDataStateFormat<MetaData> format = MetaStateService.globalStateFormat(randomFrom(XContentType.values()), params, randomBoolean());
|
||||
MetaDataStateFormat<MetaData> format = MetaStateService.globalStateFormat(randomFrom(XContentType.values()), params);
|
||||
final Path[] dirs = new Path[2];
|
||||
dirs[0] = newTempDirPath(LifecycleScope.TEST);
|
||||
dirs[1] = newTempDirPath(LifecycleScope.TEST);
|
||||
for (Path dir : dirs) {
|
||||
Files.createDirectories(dir.resolve(MetaDataStateFormat.STATE_DIR_NAME));
|
||||
}
|
||||
final Path dir1 = randomFrom(dirs);
|
||||
final long v = randomInt(10);
|
||||
|
||||
MetaData meta = randomMeta();
|
||||
|
@ -317,10 +312,12 @@ public class MetaDataStateFormatTest extends ElasticsearchTestCase {
|
|||
}
|
||||
|
||||
// write a second state file in the new format but with the same version
|
||||
format.write(meta, MetaStateService.GLOBAL_STATE_FILE_PREFIX, v, dir1);
|
||||
format.write(meta, v, dirs);
|
||||
|
||||
MetaData state = MetaDataStateFormat.loadLatestState(logger, format, MetaStateService.GLOBAL_STATE_FILE_PATTERN, "foobar", dirs);
|
||||
assertThat(state.uuid(), equalTo(uuid));
|
||||
MetaData state = format.loadLatestState(logger, dirs);
|
||||
final Path path = randomFrom(dirs);
|
||||
assertTrue(Files.exists(path.resolve(MetaDataStateFormat.STATE_DIR_NAME).resolve("global-" + (v+1) + ".st")));
|
||||
assertEquals(state.uuid(), uuid);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -334,7 +331,7 @@ public class MetaDataStateFormatTest extends ElasticsearchTestCase {
|
|||
meta.add(randomMeta());
|
||||
}
|
||||
Set<Path> corruptedFiles = new HashSet<>();
|
||||
MetaDataStateFormat<MetaData> format = MetaStateService.globalStateFormat(randomFrom(XContentType.values()), params, randomBoolean());
|
||||
MetaDataStateFormat<MetaData> format = MetaStateService.globalStateFormat(randomFrom(XContentType.values()), params);
|
||||
for (int i = 0; i < dirs.length; i++) {
|
||||
dirs[i] = newTempDirPath(LifecycleScope.TEST);
|
||||
Files.createDirectories(dirs[i].resolve(MetaDataStateFormat.STATE_DIR_NAME));
|
||||
|
@ -352,7 +349,7 @@ public class MetaDataStateFormatTest extends ElasticsearchTestCase {
|
|||
}
|
||||
}
|
||||
for (int j = numLegacy; j < numStates; j++) {
|
||||
format.write(meta.get(j), MetaStateService.GLOBAL_STATE_FILE_PREFIX, j, dirs[i]);
|
||||
format.write(meta.get(j), j, dirs[i]);
|
||||
if (randomBoolean() && (j < numStates - 1 || dirs.length > 0 && i != 0)) { // corrupt a file that we do not necessarily need here....
|
||||
Path file = dirs[i].resolve(MetaDataStateFormat.STATE_DIR_NAME).resolve("global-" + j + ".st");
|
||||
corruptedFiles.add(file);
|
||||
|
@ -363,7 +360,7 @@ public class MetaDataStateFormatTest extends ElasticsearchTestCase {
|
|||
}
|
||||
List<Path> dirList = Arrays.asList(dirs);
|
||||
Collections.shuffle(dirList, getRandom());
|
||||
MetaData loadedMetaData = MetaDataStateFormat.loadLatestState(logger, format, MetaStateService.GLOBAL_STATE_FILE_PATTERN, "foobar", dirList.toArray(new Path[0]));
|
||||
MetaData loadedMetaData = format.loadLatestState(logger, dirList.toArray(new Path[0]));
|
||||
MetaData latestMetaData = meta.get(numStates-1);
|
||||
assertThat(loadedMetaData.uuid(), not(equalTo("_na_")));
|
||||
assertThat(loadedMetaData.uuid(), equalTo(latestMetaData.uuid()));
|
||||
|
@ -387,7 +384,7 @@ public class MetaDataStateFormatTest extends ElasticsearchTestCase {
|
|||
MetaDataStateFormatTest.corruptFile(file, logger);
|
||||
}
|
||||
try {
|
||||
MetaDataStateFormat.loadLatestState(logger, format, MetaStateService.GLOBAL_STATE_FILE_PATTERN, "foobar", dirList.toArray(new Path[0]));
|
||||
format.loadLatestState(logger, dirList.toArray(new Path[0]));
|
||||
fail("latest version can not be read");
|
||||
} catch (ElasticsearchException ex) {
|
||||
assertThat(ex.getCause(), instanceOf(CorruptStateException.class));
|
||||
|
@ -414,8 +411,8 @@ public class MetaDataStateFormatTest extends ElasticsearchTestCase {
|
|||
|
||||
private class Format extends MetaDataStateFormat<DummyState> {
|
||||
|
||||
Format(XContentType format, boolean deleteOldFiles) {
|
||||
super(format, deleteOldFiles);
|
||||
Format(XContentType format, String prefix) {
|
||||
super(format, prefix);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -567,9 +564,30 @@ public class MetaDataStateFormatTest extends ElasticsearchTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public Path[] content(Path dir) throws IOException {
|
||||
try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir)) {
|
||||
public Path[] content(String glob, Path dir) throws IOException {
|
||||
try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir, glob)) {
|
||||
return Iterators.toArray(stream.iterator(), Path.class);
|
||||
}
|
||||
}
|
||||
|
||||
public long addDummyFiles(String prefix, Path... paths) throws IOException {
|
||||
int realId = -1;
|
||||
for (Path path : paths) {
|
||||
if (randomBoolean()) {
|
||||
Path stateDir = path.resolve(MetaDataStateFormat.STATE_DIR_NAME);
|
||||
Files.createDirectories(stateDir);
|
||||
String actualPrefix = prefix;
|
||||
int id = randomIntBetween(0, 10);
|
||||
if (randomBoolean()) {
|
||||
actualPrefix = "dummy-";
|
||||
} else {
|
||||
realId = Math.max(realId, id);
|
||||
}
|
||||
try (OutputStream stream = Files.newOutputStream(stateDir.resolve(actualPrefix + id + MetaDataStateFormat.STATE_FILE_EXTENSION))) {
|
||||
stream.write(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
return realId + 1;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,24 +20,23 @@ package org.elasticsearch.index.shard;
|
|||
|
||||
import org.elasticsearch.ElasticsearchIllegalArgumentException;
|
||||
import org.elasticsearch.ElasticsearchIllegalStateException;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.routing.MutableShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
|
||||
import org.elasticsearch.test.ElasticsearchSingleNodeTest;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
|
||||
/**
|
||||
* Simple unit-test IndexShard related operations.
|
||||
|
@ -72,18 +71,18 @@ public class IndexShardTests extends ElasticsearchSingleNodeTest {
|
|||
long version = between(1, Integer.MAX_VALUE / 2);
|
||||
boolean primary = randomBoolean();
|
||||
ShardStateMetaData state1 = new ShardStateMetaData(version, primary, "foo");
|
||||
ShardStateMetaData.write(logger, "foo", id, state1, randomBoolean(), env.shardPaths(id));
|
||||
ShardStateMetaData shardStateMetaData = ShardStateMetaData.load(logger, id, env.shardPaths(id));
|
||||
write(state1, env.shardPaths(id));
|
||||
ShardStateMetaData shardStateMetaData = load(logger, env.shardPaths(id));
|
||||
assertEquals(shardStateMetaData, state1);
|
||||
|
||||
ShardStateMetaData state2 = new ShardStateMetaData(version, primary, "foo");
|
||||
ShardStateMetaData.write(logger, "foo", id, state2, randomBoolean(), env.shardPaths(id));
|
||||
shardStateMetaData = ShardStateMetaData.load(logger, id, env.shardPaths(id));
|
||||
write(state2, env.shardPaths(id));
|
||||
shardStateMetaData = load(logger, env.shardPaths(id));
|
||||
assertEquals(shardStateMetaData, state1);
|
||||
|
||||
ShardStateMetaData state3 = new ShardStateMetaData(version + 1, primary, "foo");
|
||||
ShardStateMetaData.write(logger, "foo", id, state3, randomBoolean(), env.shardPaths(id));
|
||||
shardStateMetaData = ShardStateMetaData.load(logger, id, env.shardPaths(id));
|
||||
write(state3, env.shardPaths(id));
|
||||
shardStateMetaData = load(logger, env.shardPaths(id));
|
||||
assertEquals(shardStateMetaData, state3);
|
||||
assertEquals("foo", state3.indexUUID);
|
||||
}
|
||||
|
@ -96,44 +95,44 @@ public class IndexShardTests extends ElasticsearchSingleNodeTest {
|
|||
NodeEnvironment env = getInstanceFromNode(NodeEnvironment.class);
|
||||
IndexService test = indicesService.indexService("test");
|
||||
IndexShard shard = test.shard(0);
|
||||
ShardStateMetaData shardStateMetaData = ShardStateMetaData.load(logger, shard.shardId, env.shardPaths(shard.shardId));
|
||||
ShardStateMetaData shardStateMetaData = load(logger, env.shardPaths(shard.shardId));
|
||||
assertEquals(getShardStateMetadata(shard), shardStateMetaData);
|
||||
ShardRouting routing = new MutableShardRouting(shard.shardRouting, shard.shardRouting.version()+1);
|
||||
shard.updateRoutingEntry(routing, true);
|
||||
|
||||
shardStateMetaData = ShardStateMetaData.load(logger, shard.shardId, env.shardPaths(shard.shardId));
|
||||
shardStateMetaData = load(logger, env.shardPaths(shard.shardId));
|
||||
assertEquals(shardStateMetaData, getShardStateMetadata(shard));
|
||||
assertEquals(shardStateMetaData, new ShardStateMetaData(routing.version(), routing.primary(), shard.indexSettings.get(IndexMetaData.SETTING_UUID)));
|
||||
|
||||
routing = new MutableShardRouting(shard.shardRouting, shard.shardRouting.version()+1);
|
||||
shard.updateRoutingEntry(routing, true);
|
||||
shardStateMetaData = ShardStateMetaData.load(logger, shard.shardId, env.shardPaths(shard.shardId));
|
||||
shardStateMetaData = load(logger, env.shardPaths(shard.shardId));
|
||||
assertEquals(shardStateMetaData, getShardStateMetadata(shard));
|
||||
assertEquals(shardStateMetaData, new ShardStateMetaData(routing.version(), routing.primary(), shard.indexSettings.get(IndexMetaData.SETTING_UUID)));
|
||||
|
||||
routing = new MutableShardRouting(shard.shardRouting, shard.shardRouting.version()+1);
|
||||
shard.updateRoutingEntry(routing, true);
|
||||
shardStateMetaData = ShardStateMetaData.load(logger, shard.shardId, env.shardPaths(shard.shardId));
|
||||
shardStateMetaData = load(logger, env.shardPaths(shard.shardId));
|
||||
assertEquals(shardStateMetaData, getShardStateMetadata(shard));
|
||||
assertEquals(shardStateMetaData, new ShardStateMetaData(routing.version(), routing.primary(), shard.indexSettings.get(IndexMetaData.SETTING_UUID)));
|
||||
|
||||
// test if we still write it even if the shard is not active
|
||||
MutableShardRouting inactiveRouting = new MutableShardRouting(shard.shardRouting.index(), shard.shardRouting.shardId().id(), shard.shardRouting.currentNodeId(), true, ShardRoutingState.INITIALIZING, shard.shardRouting.version() + 1);
|
||||
shard.persistMetadata(inactiveRouting, shard.shardRouting);
|
||||
shardStateMetaData = ShardStateMetaData.load(logger, shard.shardId, env.shardPaths(shard.shardId));
|
||||
shardStateMetaData = load(logger, env.shardPaths(shard.shardId));
|
||||
assertEquals("inactive shard state shouldn't be persisted", shardStateMetaData, getShardStateMetadata(shard));
|
||||
assertEquals("inactive shard state shouldn't be persisted", shardStateMetaData, new ShardStateMetaData(routing.version(), routing.primary(), shard.indexSettings.get(IndexMetaData.SETTING_UUID)));
|
||||
|
||||
|
||||
shard.updateRoutingEntry(new MutableShardRouting(shard.shardRouting, shard.shardRouting.version()+1), false);
|
||||
shardStateMetaData = ShardStateMetaData.load(logger, shard.shardId, env.shardPaths(shard.shardId));
|
||||
shardStateMetaData = load(logger, env.shardPaths(shard.shardId));
|
||||
assertFalse("shard state persisted despite of persist=false", shardStateMetaData.equals(getShardStateMetadata(shard)));
|
||||
assertEquals("shard state persisted despite of persist=false", shardStateMetaData, new ShardStateMetaData(routing.version(), routing.primary(), shard.indexSettings.get(IndexMetaData.SETTING_UUID)));
|
||||
|
||||
|
||||
routing = new MutableShardRouting(shard.shardRouting, shard.shardRouting.version()+1);
|
||||
shard.updateRoutingEntry(routing, true);
|
||||
shardStateMetaData = ShardStateMetaData.load(logger, shard.shardId, env.shardPaths(shard.shardId));
|
||||
shardStateMetaData = load(logger, env.shardPaths(shard.shardId));
|
||||
assertEquals(shardStateMetaData, getShardStateMetadata(shard));
|
||||
assertEquals(shardStateMetaData, new ShardStateMetaData(routing.version(), routing.primary(), shard.indexSettings.get(IndexMetaData.SETTING_UUID)));
|
||||
}
|
||||
|
@ -153,14 +152,14 @@ public class IndexShardTests extends ElasticsearchSingleNodeTest {
|
|||
}
|
||||
|
||||
ShardRouting routing = shard.routingEntry();
|
||||
ShardStateMetaData shardStateMetaData = ShardStateMetaData.load(logger, shard.shardId, env.shardPaths(shard.shardId));
|
||||
ShardStateMetaData shardStateMetaData = load(logger, env.shardPaths(shard.shardId));
|
||||
assertEquals(shardStateMetaData, getShardStateMetadata(shard));
|
||||
|
||||
routing = new MutableShardRouting(shard.shardId.index().getName(), shard.shardId.id(), routing.currentNodeId(), routing.primary(), ShardRoutingState.INITIALIZING, shard.shardRouting.version()+1);
|
||||
shard.updateRoutingEntry(routing, true);
|
||||
shard.deleteShardState();
|
||||
|
||||
assertNull("no shard state expected after delete on initializing", ShardStateMetaData.load(logger, shard.shardId, env.shardPaths(shard.shardId)));
|
||||
assertNull("no shard state expected after delete on initializing", load(logger, env.shardPaths(shard.shardId)));
|
||||
|
||||
|
||||
|
||||
|
@ -193,4 +192,13 @@ public class IndexShardTests extends ElasticsearchSingleNodeTest {
|
|||
assertTrue("more than one unique hashcode expected but got: " + hashCodes.size(), hashCodes.size() > 1);
|
||||
|
||||
}
|
||||
|
||||
public static ShardStateMetaData load(ESLogger logger, Path... shardPaths) throws IOException {
|
||||
return ShardStateMetaData.FORMAT.loadLatestState(logger, shardPaths);
|
||||
}
|
||||
|
||||
public static void write(ShardStateMetaData shardStateMetaData,
|
||||
Path... shardPaths) throws IOException {
|
||||
ShardStateMetaData.FORMAT.write(shardStateMetaData, shardStateMetaData.version, shardPaths);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue