[CORE] Drop support for state written pre 0.90

Today we have several upgrade methods that can read state written
pre 0.90 or even pre 0.19. Version 2.0 should not support these state
formats. Users of these version should upgrade to a 1.x or 0.90.x version
first.

Closes #8850
This commit is contained in:
Simon Willnauer 2014-12-09 15:59:30 +01:00
parent e5a7eaff22
commit 905dc90eec
2 changed files with 20 additions and 205 deletions

View File

@ -27,7 +27,6 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.bootstrap.Elasticsearch;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction; import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
@ -39,7 +38,6 @@ import org.elasticsearch.cluster.routing.operation.hash.djb.DjbHashFunction;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
@ -51,13 +49,10 @@ import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.file.DirectoryStream; import java.nio.file.DirectoryStream;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -167,7 +162,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
} }
if (DiscoveryNode.masterNode(settings)) { if (DiscoveryNode.masterNode(settings)) {
try { try {
pre019Upgrade(); ensureNoPre019State();
pre20Upgrade(); pre20Upgrade();
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
loadState(); loadState();
@ -479,11 +474,10 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
} }
private void pre019Upgrade() throws Exception { /**
long index = -1; * Throws an IAE if a pre 0.19 state is detected
Path metaDataFile = null; */
MetaData metaData = null; private void ensureNoPre019State() throws Exception {
long version = -1;
for (Path dataLocation : nodeEnv.nodeDataPaths()) { for (Path dataLocation : nodeEnv.nodeDataPaths()) {
final Path stateLocation = dataLocation.resolve(MetaDataStateFormat.STATE_DIR_NAME); final Path stateLocation = dataLocation.resolve(MetaDataStateFormat.STATE_DIR_NAME);
if (!Files.exists(stateLocation)) { if (!Files.exists(stateLocation)) {
@ -494,86 +488,15 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("[upgrade]: processing [" + stateFile.getFileName() + "]"); logger.trace("[upgrade]: processing [" + stateFile.getFileName() + "]");
} }
String name = stateFile.getFileName().toString(); final String name = stateFile.getFileName().toString();
if (!name.startsWith("metadata-")) { if (name.startsWith("metadata-")) {
continue; throw new ElasticsearchIllegalStateException("Detected pre 0.19 metadata file please upgrade to a version before "
} + Version.CURRENT.minimumCompatibilityVersion()
long fileIndex = Long.parseLong(name.substring(name.indexOf('-') + 1)); + " first to upgrade state structures - metadata found: [" + stateFile.getParent().toAbsolutePath());
if (fileIndex >= index) {
// try and read the meta data
try {
byte[] data = Files.readAllBytes(stateFile);
if (data.length == 0) {
continue;
}
try (XContentParser parser = XContentHelper.createParser(data, 0, data.length)) {
String currentFieldName = null;
XContentParser.Token token = parser.nextToken();
if (token != null) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
if ("meta-data".equals(currentFieldName)) {
metaData = MetaData.Builder.fromXContent(parser);
}
} else if (token.isValue()) {
if ("version".equals(currentFieldName)) {
version = parser.longValue();
}
}
}
}
}
index = fileIndex;
metaDataFile = stateFile;
} catch (IOException e) {
logger.warn("failed to read pre 0.19 state from [" + name + "], ignoring...", e);
}
} }
} }
} }
} }
if (metaData == null) {
return;
}
logger.info("found old metadata state, loading metadata from [{}] and converting to new metadata location and structure...", metaDataFile.toAbsolutePath());
writeGlobalState("upgrade", MetaData.builder(metaData).version(version).build());
for (IndexMetaData indexMetaData : metaData) {
IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(indexMetaData).version(version);
// set the created version to 0.18
indexMetaDataBuilder.settings(ImmutableSettings.settingsBuilder().put(indexMetaData.settings()).put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_0_18_0));
writeIndex("upgrade", indexMetaDataBuilder.build(), null);
}
// rename shards state to backup state
Path backupFile = metaDataFile.resolveSibling("backup-" + metaDataFile.getFileName());
Files.move(metaDataFile, backupFile, StandardCopyOption.ATOMIC_MOVE);
// delete all other shards state files
for (Path dataLocation : nodeEnv.nodeDataPaths()) {
Path stateLocation = dataLocation.resolve(MetaDataStateFormat.STATE_DIR_NAME);
if (!Files.exists(stateLocation)) {
continue;
}
try (DirectoryStream<Path> stream = Files.newDirectoryStream(stateLocation)) {
for (Path stateFile : stream) {
String name = stateFile.getFileName().toString();
if (!name.startsWith("metadata-")) {
continue;
}
try {
Files.delete(stateFile);
} catch (Exception ex) {
logger.debug("failed to delete file " + stateFile, ex);
}
}
}
}
logger.info("conversion to new metadata location and format done, backup create at [{}]", backupFile.toAbsolutePath());
} }
/** /**

View File

@ -20,6 +20,8 @@
package org.elasticsearch.gateway.local.state.shards; package org.elasticsearch.gateway.local.state.shards;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
@ -27,7 +29,6 @@ import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.*; import org.elasticsearch.common.xcontent.*;
@ -63,7 +64,7 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste
listGatewayStartedShards.initGateway(this); listGatewayStartedShards.initGateway(this);
if (DiscoveryNode.dataNode(settings)) { if (DiscoveryNode.dataNode(settings)) {
try { try {
pre019Upgrade(); ensureNoPre019State();
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
currentState = loadShardsStateInfo(); currentState = loadShardsStateInfo();
logger.debug("took {} to load started shards state", TimeValue.timeValueMillis(System.currentTimeMillis() - start)); logger.debug("took {} to load started shards state", TimeValue.timeValueMillis(System.currentTimeMillis() - start));
@ -151,19 +152,6 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste
it.remove(); it.remove();
} }
} }
// REMOVED: don't delete shard state, rely on IndicesStore to delete the shard location
// only once all shards are allocated on another node
// now, go over the current ones and delete ones that are not in the new one
// for (Map.Entry<ShardId, ShardStateInfo> entry : currentState.entrySet()) {
// ShardId shardId = entry.getKey();
// if (!newState.containsKey(shardId)) {
// if (!metaState.isDangling(shardId.index().name())) {
// deleteShardState(shardId);
// }
// }
// }
this.currentState = newState; this.currentState = newState;
} }
@ -249,114 +237,18 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste
}; };
} }
private void pre019Upgrade() throws Exception { private void ensureNoPre019State() throws Exception {
long index = -1;
Path latest = null;
for (Path dataLocation : nodeEnv.nodeDataPaths()) { for (Path dataLocation : nodeEnv.nodeDataPaths()) {
final Path stateLocation = dataLocation.resolve(MetaDataStateFormat.STATE_DIR_NAME); final Path stateLocation = dataLocation.resolve(MetaDataStateFormat.STATE_DIR_NAME);
if (!Files.exists(stateLocation)) { if (Files.exists(stateLocation)) {
continue; try (DirectoryStream<Path> stream = Files.newDirectoryStream(stateLocation, "shards-*")) {
} for (Path stateFile : stream) {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(stateLocation, "shards-*")) { throw new ElasticsearchIllegalStateException("Detected pre 0.19 shard state file please upgrade to a version before "
for (Path stateFile : stream) { + Version.CURRENT.minimumCompatibilityVersion()
if (logger.isTraceEnabled()) { + " first to upgrade state structures - shard state found: [" + stateFile.getParent().toAbsolutePath());
logger.trace("[find_latest_state]: processing [" + stateFile.getFileName() + "]");
}
String name = stateFile.getFileName().toString();
assert name.startsWith("shards-");
long fileIndex = Long.parseLong(name.substring(name.indexOf('-') + 1));
if (fileIndex >= index) {
// try and read the meta data
try {
byte[] data = Files.readAllBytes(stateFile);
if (data.length == 0) {
logger.debug("[upgrade]: not data for [" + name + "], ignoring...");
}
pre09ReadState(data);
index = fileIndex;
latest = stateFile;
} catch (IOException e) {
logger.warn("[upgrade]: failed to read state from [" + name + "], ignoring...", e);
}
} }
} }
} }
} }
if (latest == null) {
return;
}
logger.info("found old shards state, loading started shards from [{}] and converting to new shards state locations...", latest.toAbsolutePath());
Map<ShardId, ShardStateInfo> shardsState = pre09ReadState(Files.readAllBytes(latest));
for (Map.Entry<ShardId, ShardStateInfo> entry : shardsState.entrySet()) {
writeShardState("upgrade", entry.getKey(), entry.getValue(), null);
}
// rename shards state to backup state
Path backupFile = latest.resolveSibling("backup-" + latest.getFileName());
Files.move(latest, backupFile, StandardCopyOption.ATOMIC_MOVE);
// delete all other shards state files
for (Path dataLocation : nodeEnv.nodeDataPaths()) {
final Path stateLocation = dataLocation.resolve(MetaDataStateFormat.STATE_DIR_NAME);
if (!Files.exists(stateLocation)) {
continue;
}
try (DirectoryStream<Path> stream = Files.newDirectoryStream(stateLocation, "shards-*")) {
for (Path stateFile : stream) {
try {
Files.delete(stateFile);
} catch (Exception ex) {
logger.debug("Failed to delete state file {}", ex, stateFile);
}
}
}
}
logger.info("conversion to new shards state location and format done, backup create at [{}]", backupFile.toAbsolutePath());
}
private Map<ShardId, ShardStateInfo> pre09ReadState(byte[] data) throws IOException {
final Map<ShardId, ShardStateInfo> shardsState = Maps.newHashMap();
try (XContentParser parser = XContentHelper.createParser(data, 0, data.length)) {
String currentFieldName = null;
XContentParser.Token token = parser.nextToken();
if (token == null) {
// no data...
return shardsState;
}
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_ARRAY) {
if ("shards".equals(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token == XContentParser.Token.START_OBJECT) {
String shardIndex = null;
int shardId = -1;
long version = -1;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if ("index".equals(currentFieldName)) {
shardIndex = parser.text();
} else if ("id".equals(currentFieldName)) {
shardId = parser.intValue();
} else if (VERSION_KEY.equals(currentFieldName)) {
version = parser.longValue();
}
}
}
shardsState.put(new ShardId(shardIndex, shardId), new ShardStateInfo(version, null));
}
}
}
}
}
return shardsState;
}
} }
} }