Fix IndexMetaData loads after rollover (#33394)
When we rollover and index we write the conditions of the rollover that the old index met into the old index. Loading this index metadata requires a working `NamedXContentRegistry` that has been populated with parsers from the rollover infrastructure. We had a few loads that didn't use a working `NamedXContentRegistry` and so would fail if they ever encountered an index that had been rolled over. Here are the locations of the loads and how I fixed them: * IndexFolderUpgrader - removed entirely. It existed to support opening indices made in Elasticsearch 2.x. Since we only need this change as far back as 6.4.1 which will supports reading from indices created as far back as 5.0.0 we should be good here. * TransportNodesListGatewayStartedShards - wired the `NamedXContentRegistry` into place. * TransportNodesListShardStoreMetaData - wired the `NamedXContentRegistry` into place. * OldIndexUtils - removed entirely. It existed to support the zip based index backwards compatibility tests which we've since replaced with code that actually runs old versions of Elasticsearch. In addition to fixing the actual problem I added full cluster restart integration tests for rollover which would have caught this problem and I added an extra assertion to IndexMetaData's deserialization code which will trip if we try to deserialize and index's metadata without a fully formed `NamedXContentRegistry`. It won't catch if use the *wrong* `NamedXContentRegistry` but it is better than nothing. Closes #33316
This commit is contained in:
parent
6a3adbd935
commit
0d45752e50
|
@ -477,6 +477,62 @@ public class FullClusterRestartIT extends ESRestTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test upgrading after a rollover. Specifically:
|
||||
* <ol>
|
||||
* <li>Create an index with a write alias
|
||||
* <li>Write some documents to the write alias
|
||||
* <li>Roll over the index
|
||||
* <li>Make sure the document count is correct
|
||||
* <li>Upgrade
|
||||
* <li>Write some more documents to the write alias
|
||||
* <li>Make sure the document count is correct
|
||||
* </ol>
|
||||
*/
|
||||
public void testRollover() throws IOException {
|
||||
if (runningAgainstOldCluster) {
|
||||
Request createIndex = new Request("PUT", "/" + index + "-000001");
|
||||
createIndex.setJsonEntity("{"
|
||||
+ " \"aliases\": {"
|
||||
+ " \"" + index + "_write\": {}"
|
||||
+ " }"
|
||||
+ "}");
|
||||
client().performRequest(createIndex);
|
||||
}
|
||||
|
||||
int bulkCount = 10;
|
||||
StringBuilder bulk = new StringBuilder();
|
||||
for (int i = 0; i < bulkCount; i++) {
|
||||
bulk.append("{\"index\":{}}\n");
|
||||
bulk.append("{\"test\":\"test\"}\n");
|
||||
}
|
||||
Request bulkRequest = new Request("POST", "/" + index + "_write/doc/_bulk");
|
||||
bulkRequest.setJsonEntity(bulk.toString());
|
||||
bulkRequest.addParameter("refresh", "");
|
||||
assertThat(EntityUtils.toString(client().performRequest(bulkRequest).getEntity()), containsString("\"errors\":false"));
|
||||
|
||||
if (runningAgainstOldCluster) {
|
||||
Request rolloverRequest = new Request("POST", "/" + index + "_write/_rollover");
|
||||
rolloverRequest.setJsonEntity("{"
|
||||
+ " \"conditions\": {"
|
||||
+ " \"max_docs\": 5"
|
||||
+ " }"
|
||||
+ "}");
|
||||
client().performRequest(rolloverRequest);
|
||||
|
||||
assertThat(EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/indices?v")).getEntity()),
|
||||
containsString("testrollover-000002"));
|
||||
}
|
||||
|
||||
Request countRequest = new Request("POST", "/" + index + "-*/_search");
|
||||
countRequest.addParameter("size", "0");
|
||||
Map<String, Object> count = entityAsMap(client().performRequest(countRequest));
|
||||
assertNoFailures(count);
|
||||
|
||||
int expectedCount = bulkCount + (runningAgainstOldCluster ? 0 : bulkCount);
|
||||
assertEquals(expectedCount, (int) XContentMapValues.extractValue("hits.total", count));
|
||||
}
|
||||
|
||||
void assertBasicSearchWorks(int count) throws IOException {
|
||||
logger.info("--> testing basic search");
|
||||
{
|
||||
|
@ -947,7 +1003,7 @@ public class FullClusterRestartIT extends ESRestTestCase {
|
|||
Request writeToRestoredRequest = new Request("POST", "/restored_" + index + "/doc/_bulk");
|
||||
writeToRestoredRequest.addParameter("refresh", "true");
|
||||
writeToRestoredRequest.setJsonEntity(bulk.toString());
|
||||
client().performRequest(writeToRestoredRequest);
|
||||
assertThat(EntityUtils.toString(client().performRequest(writeToRestoredRequest).getEntity()), containsString("\"errors\":false"));
|
||||
|
||||
// And count to make sure the add worked
|
||||
// Make sure search finds all documents
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
|||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.ToXContentFragment;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
@ -1346,6 +1347,8 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContentFragmen
|
|||
|
||||
@Override
|
||||
public IndexMetaData fromXContent(XContentParser parser) throws IOException {
|
||||
assert parser.getXContentRegistry() != NamedXContentRegistry.EMPTY
|
||||
: "loading index metadata requires a working named xcontent registry";
|
||||
return Builder.fromXContent(parser);
|
||||
}
|
||||
};
|
||||
|
|
|
@ -1,134 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.util;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardCopyOption;
|
||||
|
||||
/**
|
||||
* Renames index folders from {index.name} to {index.uuid}
|
||||
*/
|
||||
public class IndexFolderUpgrader {
|
||||
private final NodeEnvironment nodeEnv;
|
||||
private final Settings settings;
|
||||
private final Logger logger = Loggers.getLogger(IndexFolderUpgrader.class);
|
||||
|
||||
/**
|
||||
* Creates a new upgrader instance
|
||||
* @param settings node settings
|
||||
* @param nodeEnv the node env to operate on
|
||||
*/
|
||||
IndexFolderUpgrader(Settings settings, NodeEnvironment nodeEnv) {
|
||||
this.settings = settings;
|
||||
this.nodeEnv = nodeEnv;
|
||||
}
|
||||
|
||||
/**
|
||||
* Moves the index folder found in <code>source</code> to <code>target</code>
|
||||
*/
|
||||
void upgrade(final Index index, final Path source, final Path target) throws IOException {
|
||||
boolean success = false;
|
||||
try {
|
||||
Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);
|
||||
success = true;
|
||||
} catch (NoSuchFileException | FileNotFoundException exception) {
|
||||
// thrown when the source is non-existent because the folder was renamed
|
||||
// by another node (shared FS) after we checked if the target exists
|
||||
logger.error(() -> new ParameterizedMessage("multiple nodes trying to upgrade [{}] in parallel, retry " +
|
||||
"upgrading with single node", target), exception);
|
||||
throw exception;
|
||||
} finally {
|
||||
if (success) {
|
||||
logger.info("{} moved from [{}] to [{}]", index, source, target);
|
||||
logger.trace("{} syncing directory [{}]", index, target);
|
||||
IOUtils.fsync(target, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Renames <code>indexFolderName</code> index folders found in node paths and custom path
|
||||
* iff {@link #needsUpgrade(Index, String)} is true.
|
||||
* Index folder in custom paths are renamed first followed by index folders in each node path.
|
||||
*/
|
||||
void upgrade(final String indexFolderName) throws IOException {
|
||||
for (NodeEnvironment.NodePath nodePath : nodeEnv.nodePaths()) {
|
||||
final Path indexFolderPath = nodePath.indicesPath.resolve(indexFolderName);
|
||||
final IndexMetaData indexMetaData = IndexMetaData.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, indexFolderPath);
|
||||
if (indexMetaData != null) {
|
||||
final Index index = indexMetaData.getIndex();
|
||||
if (needsUpgrade(index, indexFolderName)) {
|
||||
logger.info("{} upgrading [{}] to new naming convention", index, indexFolderPath);
|
||||
final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings);
|
||||
if (indexSettings.hasCustomDataPath()) {
|
||||
// we rename index folder in custom path before renaming them in any node path
|
||||
// to have the index state under a not-yet-upgraded index folder, which we use to
|
||||
// continue renaming after a incomplete upgrade.
|
||||
final Path customLocationSource = nodeEnv.resolveBaseCustomLocation(indexSettings)
|
||||
.resolve(indexFolderName);
|
||||
final Path customLocationTarget = customLocationSource.resolveSibling(index.getUUID());
|
||||
// we rename the folder in custom path only the first time we encounter a state
|
||||
// in a node path, which needs upgrading, it is a no-op for subsequent node paths
|
||||
if (Files.exists(customLocationSource) // might not exist if no data was written for this index
|
||||
&& Files.exists(customLocationTarget) == false) {
|
||||
upgrade(index, customLocationSource, customLocationTarget);
|
||||
} else {
|
||||
logger.info("[{}] no upgrade needed - already upgraded", customLocationTarget);
|
||||
}
|
||||
}
|
||||
upgrade(index, indexFolderPath, indexFolderPath.resolveSibling(index.getUUID()));
|
||||
} else {
|
||||
logger.debug("[{}] no upgrade needed - already upgraded", indexFolderPath);
|
||||
}
|
||||
} else {
|
||||
logger.warn("[{}] no index state found - ignoring", indexFolderPath);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Upgrades all indices found under <code>nodeEnv</code>. Already upgraded indices are ignored.
|
||||
*/
|
||||
public static void upgradeIndicesIfNeeded(final Settings settings, final NodeEnvironment nodeEnv) throws IOException {
|
||||
final IndexFolderUpgrader upgrader = new IndexFolderUpgrader(settings, nodeEnv);
|
||||
for (String indexFolderName : nodeEnv.availableIndexFolders()) {
|
||||
upgrader.upgrade(indexFolderName);
|
||||
}
|
||||
}
|
||||
|
||||
static boolean needsUpgrade(Index index, String indexFolderName) {
|
||||
return indexFolderName.equals(index.getUUID()) == false;
|
||||
}
|
||||
}
|
|
@ -35,7 +35,6 @@ import org.elasticsearch.common.collect.ImmutableOpenMap;
|
|||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.IndexFolderUpgrader;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.plugins.MetaDataUpgrader;
|
||||
|
@ -84,7 +83,6 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateA
|
|||
if (DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings)) {
|
||||
try {
|
||||
ensureNoPre019State();
|
||||
IndexFolderUpgrader.upgradeIndicesIfNeeded(settings, nodeEnv);
|
||||
final MetaData metaData = metaStateService.loadFullState();
|
||||
final MetaData upgradedMetaData = upgradeMetaData(metaData, metaDataIndexUpgradeService, metaDataUpgrader);
|
||||
// We finished global state validation and successfully checked all indices for backward compatibility
|
||||
|
|
|
@ -69,15 +69,18 @@ public class TransportNodesListGatewayStartedShards extends
|
|||
public static final String ACTION_NAME = "internal:gateway/local/started_shards";
|
||||
private final NodeEnvironment nodeEnv;
|
||||
private final IndicesService indicesService;
|
||||
private final NamedXContentRegistry namedXContentRegistry;
|
||||
|
||||
@Inject
|
||||
public TransportNodesListGatewayStartedShards(Settings settings, ThreadPool threadPool, ClusterService clusterService,
|
||||
TransportService transportService, ActionFilters actionFilters,
|
||||
NodeEnvironment env, IndicesService indicesService) {
|
||||
NodeEnvironment env, IndicesService indicesService,
|
||||
NamedXContentRegistry namedXContentRegistry) {
|
||||
super(settings, ACTION_NAME, threadPool, clusterService, transportService, actionFilters,
|
||||
Request::new, NodeRequest::new, ThreadPool.Names.FETCH_SHARD_STARTED, NodeGatewayStartedShards.class);
|
||||
this.nodeEnv = env;
|
||||
this.indicesService = indicesService;
|
||||
this.namedXContentRegistry = namedXContentRegistry;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -112,7 +115,7 @@ public class TransportNodesListGatewayStartedShards extends
|
|||
try {
|
||||
final ShardId shardId = request.getShardId();
|
||||
logger.trace("{} loading local shard state info", shardId);
|
||||
ShardStateMetaData shardStateMetaData = ShardStateMetaData.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY,
|
||||
ShardStateMetaData shardStateMetaData = ShardStateMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry,
|
||||
nodeEnv.availableShardPaths(request.shardId));
|
||||
if (shardStateMetaData != null) {
|
||||
IndexMetaData metaData = clusterService.state().metaData().index(shardId.getIndex());
|
||||
|
@ -120,7 +123,7 @@ public class TransportNodesListGatewayStartedShards extends
|
|||
// we may send this requests while processing the cluster state that recovered the index
|
||||
// sometimes the request comes in before the local node processed that cluster state
|
||||
// in such cases we can load it from disk
|
||||
metaData = IndexMetaData.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY,
|
||||
metaData = IndexMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry,
|
||||
nodeEnv.indexPaths(shardId.getIndex()));
|
||||
}
|
||||
if (metaData == null) {
|
||||
|
|
|
@ -67,17 +67,19 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction<T
|
|||
public static final String ACTION_NAME = "internal:cluster/nodes/indices/shard/store";
|
||||
|
||||
private final IndicesService indicesService;
|
||||
|
||||
private final NodeEnvironment nodeEnv;
|
||||
private final NamedXContentRegistry namedXContentRegistry;
|
||||
|
||||
@Inject
|
||||
public TransportNodesListShardStoreMetaData(Settings settings, ThreadPool threadPool,
|
||||
ClusterService clusterService, TransportService transportService,
|
||||
IndicesService indicesService, NodeEnvironment nodeEnv, ActionFilters actionFilters) {
|
||||
IndicesService indicesService, NodeEnvironment nodeEnv,
|
||||
ActionFilters actionFilters, NamedXContentRegistry namedXContentRegistry) {
|
||||
super(settings, ACTION_NAME, threadPool, clusterService, transportService, actionFilters,
|
||||
Request::new, NodeRequest::new, ThreadPool.Names.FETCH_SHARD_STORE, NodeStoreFilesMetaData.class);
|
||||
this.indicesService = indicesService;
|
||||
this.nodeEnv = nodeEnv;
|
||||
this.namedXContentRegistry = namedXContentRegistry;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -129,7 +131,7 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction<T
|
|||
// we may send this requests while processing the cluster state that recovered the index
|
||||
// sometimes the request comes in before the local node processed that cluster state
|
||||
// in such cases we can load it from disk
|
||||
metaData = IndexMetaData.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY,
|
||||
metaData = IndexMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry,
|
||||
nodeEnv.indexPaths(shardId.getIndex()));
|
||||
}
|
||||
if (metaData == null) {
|
||||
|
|
|
@ -1,270 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.util;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.routing.AllocationId;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.io.FileSystemUtils;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.ShardPath;
|
||||
import org.elasticsearch.index.shard.ShardStateMetaData;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
|
||||
@LuceneTestCase.SuppressFileSystems("ExtrasFS")
|
||||
public class IndexFolderUpgraderTests extends ESTestCase {
|
||||
|
||||
/**
|
||||
* tests custom data paths are upgraded
|
||||
*/
|
||||
public void testUpgradeCustomDataPath() throws IOException {
|
||||
Path customPath = createTempDir();
|
||||
final Settings nodeSettings = Settings.builder()
|
||||
.put(Environment.PATH_SHARED_DATA_SETTING.getKey(), customPath.toAbsolutePath().toString()).build();
|
||||
try (NodeEnvironment nodeEnv = newNodeEnvironment(nodeSettings)) {
|
||||
final Index index = new Index(randomAlphaOfLength(10), UUIDs.randomBase64UUID());
|
||||
Settings settings = Settings.builder()
|
||||
.put(nodeSettings)
|
||||
.put(IndexMetaData.SETTING_INDEX_UUID, index.getUUID())
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_6_0_0)
|
||||
.put(IndexMetaData.SETTING_DATA_PATH, customPath.toAbsolutePath().toString())
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 5))
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.build();
|
||||
IndexMetaData indexState = IndexMetaData.builder(index.getName()).settings(settings).build();
|
||||
int numIdxFiles = randomIntBetween(1, 5);
|
||||
int numTranslogFiles = randomIntBetween(1, 5);
|
||||
IndexSettings indexSettings = new IndexSettings(indexState, nodeSettings);
|
||||
writeIndex(nodeEnv, indexSettings, numIdxFiles, numTranslogFiles);
|
||||
IndexFolderUpgrader helper = new IndexFolderUpgrader(settings, nodeEnv);
|
||||
helper.upgrade(indexSettings.getIndex().getName());
|
||||
checkIndex(nodeEnv, indexSettings, numIdxFiles, numTranslogFiles);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* tests upgrade on partially upgraded index, when we crash while upgrading
|
||||
*/
|
||||
public void testPartialUpgradeCustomDataPath() throws IOException {
|
||||
Path customPath = createTempDir();
|
||||
final Settings nodeSettings = Settings.builder()
|
||||
.put(Environment.PATH_SHARED_DATA_SETTING.getKey(), customPath.toAbsolutePath().toString()).build();
|
||||
try (NodeEnvironment nodeEnv = newNodeEnvironment(nodeSettings)) {
|
||||
final Index index = new Index(randomAlphaOfLength(10), UUIDs.randomBase64UUID());
|
||||
Settings settings = Settings.builder()
|
||||
.put(nodeSettings)
|
||||
.put(IndexMetaData.SETTING_INDEX_UUID, index.getUUID())
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_6_0_0)
|
||||
.put(IndexMetaData.SETTING_DATA_PATH, customPath.toAbsolutePath().toString())
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 5))
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.build();
|
||||
IndexMetaData indexState = IndexMetaData.builder(index.getName()).settings(settings).build();
|
||||
int numIdxFiles = randomIntBetween(1, 5);
|
||||
int numTranslogFiles = randomIntBetween(1, 5);
|
||||
IndexSettings indexSettings = new IndexSettings(indexState, nodeSettings);
|
||||
writeIndex(nodeEnv, indexSettings, numIdxFiles, numTranslogFiles);
|
||||
IndexFolderUpgrader helper = new IndexFolderUpgrader(settings, nodeEnv) {
|
||||
@Override
|
||||
void upgrade(Index index, Path source, Path target) throws IOException {
|
||||
if(randomBoolean()) {
|
||||
throw new FileNotFoundException("simulated");
|
||||
}
|
||||
}
|
||||
};
|
||||
// only upgrade some paths
|
||||
try {
|
||||
helper.upgrade(index.getName());
|
||||
} catch (IOException e) {
|
||||
assertTrue(e instanceof FileNotFoundException);
|
||||
}
|
||||
helper = new IndexFolderUpgrader(settings, nodeEnv);
|
||||
// try to upgrade again
|
||||
helper.upgrade(indexSettings.getIndex().getName());
|
||||
checkIndex(nodeEnv, indexSettings, numIdxFiles, numTranslogFiles);
|
||||
}
|
||||
}
|
||||
|
||||
public void testUpgrade() throws IOException {
|
||||
final Settings nodeSettings = Settings.EMPTY;
|
||||
try (NodeEnvironment nodeEnv = newNodeEnvironment(nodeSettings)) {
|
||||
final Index index = new Index(randomAlphaOfLength(10), UUIDs.randomBase64UUID());
|
||||
Settings settings = Settings.builder()
|
||||
.put(nodeSettings)
|
||||
.put(IndexMetaData.SETTING_INDEX_UUID, index.getUUID())
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_6_0_0)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 5))
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.build();
|
||||
IndexMetaData indexState = IndexMetaData.builder(index.getName()).settings(settings).build();
|
||||
int numIdxFiles = randomIntBetween(1, 5);
|
||||
int numTranslogFiles = randomIntBetween(1, 5);
|
||||
IndexSettings indexSettings = new IndexSettings(indexState, nodeSettings);
|
||||
writeIndex(nodeEnv, indexSettings, numIdxFiles, numTranslogFiles);
|
||||
IndexFolderUpgrader helper = new IndexFolderUpgrader(settings, nodeEnv);
|
||||
helper.upgrade(indexSettings.getIndex().getName());
|
||||
checkIndex(nodeEnv, indexSettings, numIdxFiles, numTranslogFiles);
|
||||
}
|
||||
}
|
||||
|
||||
public void testUpgradeIndices() throws IOException {
|
||||
final Settings nodeSettings = Settings.EMPTY;
|
||||
try (NodeEnvironment nodeEnv = newNodeEnvironment(nodeSettings)) {
|
||||
Map<IndexSettings, Tuple<Integer, Integer>> indexSettingsMap = new HashMap<>();
|
||||
for (int i = 0; i < randomIntBetween(2, 5); i++) {
|
||||
final Index index = new Index(randomAlphaOfLength(10), UUIDs.randomBase64UUID());
|
||||
Settings settings = Settings.builder()
|
||||
.put(nodeSettings)
|
||||
.put(IndexMetaData.SETTING_INDEX_UUID, index.getUUID())
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_6_0_0)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 5))
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.build();
|
||||
IndexMetaData indexState = IndexMetaData.builder(index.getName()).settings(settings).build();
|
||||
Tuple<Integer, Integer> fileCounts = new Tuple<>(randomIntBetween(1, 5), randomIntBetween(1, 5));
|
||||
IndexSettings indexSettings = new IndexSettings(indexState, nodeSettings);
|
||||
indexSettingsMap.put(indexSettings, fileCounts);
|
||||
writeIndex(nodeEnv, indexSettings, fileCounts.v1(), fileCounts.v2());
|
||||
}
|
||||
IndexFolderUpgrader.upgradeIndicesIfNeeded(nodeSettings, nodeEnv);
|
||||
for (Map.Entry<IndexSettings, Tuple<Integer, Integer>> entry : indexSettingsMap.entrySet()) {
|
||||
checkIndex(nodeEnv, entry.getKey(), entry.getValue().v1(), entry.getValue().v2());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testNeedsUpgrade() throws IOException {
|
||||
final Index index = new Index("foo", UUIDs.randomBase64UUID());
|
||||
IndexMetaData indexState = IndexMetaData.builder(index.getName())
|
||||
.settings(Settings.builder()
|
||||
.put(IndexMetaData.SETTING_INDEX_UUID, index.getUUID())
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
try (NodeEnvironment nodeEnvironment = newNodeEnvironment()) {
|
||||
IndexMetaData.FORMAT.write(indexState, nodeEnvironment.indexPaths(index));
|
||||
assertFalse(IndexFolderUpgrader.needsUpgrade(index, index.getUUID()));
|
||||
}
|
||||
}
|
||||
|
||||
private void checkIndex(NodeEnvironment nodeEnv, IndexSettings indexSettings,
|
||||
int numIdxFiles, int numTranslogFiles) throws IOException {
|
||||
final Index index = indexSettings.getIndex();
|
||||
// ensure index state can be loaded
|
||||
IndexMetaData loadLatestState = IndexMetaData.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY,
|
||||
nodeEnv.indexPaths(index));
|
||||
assertNotNull(loadLatestState);
|
||||
assertEquals(loadLatestState.getIndex(), index);
|
||||
for (int shardId = 0; shardId < indexSettings.getNumberOfShards(); shardId++) {
|
||||
// ensure shard path can be loaded
|
||||
ShardPath targetShardPath = ShardPath.loadShardPath(logger, nodeEnv, new ShardId(index, shardId), indexSettings);
|
||||
assertNotNull(targetShardPath);
|
||||
// ensure shard contents are copied over
|
||||
final Path translog = targetShardPath.resolveTranslog();
|
||||
final Path idx = targetShardPath.resolveIndex();
|
||||
|
||||
// ensure index and translog files are copied over
|
||||
assertEquals(numTranslogFiles, FileSystemUtils.files(translog).length);
|
||||
assertEquals(numIdxFiles, FileSystemUtils.files(idx).length);
|
||||
Path[] files = FileSystemUtils.files(translog);
|
||||
final HashSet<Path> translogFiles = new HashSet<>(Arrays.asList(files));
|
||||
for (int i = 0; i < numTranslogFiles; i++) {
|
||||
final String name = Integer.toString(i);
|
||||
translogFiles.contains(translog.resolve(name + ".translog"));
|
||||
byte[] content = Files.readAllBytes(translog.resolve(name + ".translog"));
|
||||
assertEquals(name , new String(content, StandardCharsets.UTF_8));
|
||||
}
|
||||
Path[] indexFileList = FileSystemUtils.files(idx);
|
||||
final HashSet<Path> idxFiles = new HashSet<>(Arrays.asList(indexFileList));
|
||||
for (int i = 0; i < numIdxFiles; i++) {
|
||||
final String name = Integer.toString(i);
|
||||
idxFiles.contains(idx.resolve(name + ".tst"));
|
||||
byte[] content = Files.readAllBytes(idx.resolve(name + ".tst"));
|
||||
assertEquals(name, new String(content, StandardCharsets.UTF_8));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void writeIndex(NodeEnvironment nodeEnv, IndexSettings indexSettings,
|
||||
int numIdxFiles, int numTranslogFiles) throws IOException {
|
||||
NodeEnvironment.NodePath[] nodePaths = nodeEnv.nodePaths();
|
||||
Path[] oldIndexPaths = new Path[nodePaths.length];
|
||||
for (int i = 0; i < nodePaths.length; i++) {
|
||||
oldIndexPaths[i] = nodePaths[i].indicesPath.resolve(indexSettings.getIndex().getName());
|
||||
}
|
||||
IndexMetaData.FORMAT.write(indexSettings.getIndexMetaData(), oldIndexPaths);
|
||||
for (int id = 0; id < indexSettings.getNumberOfShards(); id++) {
|
||||
Path oldIndexPath = randomFrom(oldIndexPaths);
|
||||
ShardId shardId = new ShardId(indexSettings.getIndex(), id);
|
||||
if (indexSettings.hasCustomDataPath()) {
|
||||
Path customIndexPath = nodeEnv.resolveBaseCustomLocation(indexSettings).resolve(indexSettings.getIndex().getName());
|
||||
writeShard(shardId, customIndexPath, numIdxFiles, numTranslogFiles);
|
||||
} else {
|
||||
writeShard(shardId, oldIndexPath, numIdxFiles, numTranslogFiles);
|
||||
}
|
||||
ShardStateMetaData state = new ShardStateMetaData(true, indexSettings.getUUID(), AllocationId.newInitializing());
|
||||
ShardStateMetaData.FORMAT.write(state, oldIndexPath.resolve(String.valueOf(shardId.getId())));
|
||||
}
|
||||
}
|
||||
|
||||
private void writeShard(ShardId shardId, Path indexLocation,
|
||||
final int numIdxFiles, final int numTranslogFiles) throws IOException {
|
||||
Path oldShardDataPath = indexLocation.resolve(String.valueOf(shardId.getId()));
|
||||
final Path translogPath = oldShardDataPath.resolve(ShardPath.TRANSLOG_FOLDER_NAME);
|
||||
final Path idxPath = oldShardDataPath.resolve(ShardPath.INDEX_FOLDER_NAME);
|
||||
Files.createDirectories(translogPath);
|
||||
Files.createDirectories(idxPath);
|
||||
for (int i = 0; i < numIdxFiles; i++) {
|
||||
String filename = Integer.toString(i);
|
||||
try (BufferedWriter w = Files.newBufferedWriter(idxPath.resolve(filename + ".tst"),
|
||||
StandardCharsets.UTF_8)) {
|
||||
w.write(filename);
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < numTranslogFiles; i++) {
|
||||
String filename = Integer.toString(i);
|
||||
try (BufferedWriter w = Files.newBufferedWriter(translogPath.resolve(filename + ".translog"),
|
||||
StandardCharsets.UTF_8)) {
|
||||
w.write(filename);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,131 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.test;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.index.MergePolicyConfig;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.DirectoryStream;
|
||||
import java.nio.file.FileVisitResult;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.SimpleFileVisitor;
|
||||
import java.nio.file.attribute.BasicFileAttributes;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static junit.framework.TestCase.assertFalse;
|
||||
import static junit.framework.TestCase.assertTrue;
|
||||
import static org.elasticsearch.test.ESTestCase.randomInt;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
|
||||
public class OldIndexUtils {
|
||||
|
||||
public static List<String> loadDataFilesList(String prefix, Path bwcIndicesPath) throws IOException {
|
||||
List<String> indexes = new ArrayList<>();
|
||||
try (DirectoryStream<Path> stream = Files.newDirectoryStream(bwcIndicesPath, prefix + "-*.zip")) {
|
||||
for (Path path : stream) {
|
||||
indexes.add(path.getFileName().toString());
|
||||
}
|
||||
}
|
||||
Collections.sort(indexes);
|
||||
return indexes;
|
||||
}
|
||||
|
||||
public static Settings getSettings() {
|
||||
return Settings.builder()
|
||||
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) // disable merging so no segments will be upgraded
|
||||
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 30) //
|
||||
// speed up recoveries
|
||||
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 30)
|
||||
.build();
|
||||
}
|
||||
|
||||
public static Path getIndexDir(
|
||||
final Logger logger,
|
||||
final String indexName,
|
||||
final String indexFile,
|
||||
final Path dataDir) throws IOException {
|
||||
final Version version = Version.fromString(indexName.substring("index-".length()));
|
||||
final List<Path> indexFolders = new ArrayList<>();
|
||||
try (DirectoryStream<Path> stream = Files.newDirectoryStream(dataDir.resolve("0/indices"),
|
||||
(p) -> p.getFileName().toString().startsWith("extra") == false)) { // extra FS can break this...
|
||||
for (final Path path : stream) {
|
||||
indexFolders.add(path);
|
||||
}
|
||||
}
|
||||
assertThat(indexFolders.toString(), indexFolders.size(), equalTo(1));
|
||||
final IndexMetaData indexMetaData = IndexMetaData.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY,
|
||||
indexFolders.get(0));
|
||||
assertNotNull(indexMetaData);
|
||||
assertThat(indexFolders.get(0).getFileName().toString(), equalTo(indexMetaData.getIndexUUID()));
|
||||
assertThat(indexMetaData.getCreationVersion(), equalTo(version));
|
||||
return indexFolders.get(0);
|
||||
}
|
||||
|
||||
// randomly distribute the files from src over dests paths
|
||||
public static void copyIndex(final Logger logger, final Path src, final String folderName, final Path... dests) throws IOException {
|
||||
Path destinationDataPath = dests[randomInt(dests.length - 1)];
|
||||
for (Path dest : dests) {
|
||||
Path indexDir = dest.resolve(folderName);
|
||||
assertFalse(Files.exists(indexDir));
|
||||
Files.createDirectories(indexDir);
|
||||
}
|
||||
Files.walkFileTree(src, new SimpleFileVisitor<Path>() {
|
||||
@Override
|
||||
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
|
||||
Path relativeDir = src.relativize(dir);
|
||||
for (Path dest : dests) {
|
||||
Path destDir = dest.resolve(folderName).resolve(relativeDir);
|
||||
Files.createDirectories(destDir);
|
||||
}
|
||||
return FileVisitResult.CONTINUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
|
||||
if (file.getFileName().toString().equals(IndexWriter.WRITE_LOCK_NAME)) {
|
||||
// skip lock file, we don't need it
|
||||
logger.trace("Skipping lock file: {}", file);
|
||||
return FileVisitResult.CONTINUE;
|
||||
}
|
||||
|
||||
Path relativeFile = src.relativize(file);
|
||||
Path destFile = destinationDataPath.resolve(folderName).resolve(relativeFile);
|
||||
logger.trace("--> Moving {} to {}", relativeFile, destFile);
|
||||
Files.move(file, destFile);
|
||||
assertFalse(Files.exists(file));
|
||||
assertTrue(Files.exists(destFile));
|
||||
return FileVisitResult.CONTINUE;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue