Core: Don't allow indices containing too-old segments to be opened

When index is introduced into the cluster via cluster upgrade, restore or as a dangled index the MetaDataIndexUpgradeService checks if this index can be upgraded to the current version. If upgrade is not possible, the newly upgraded cluster startup and restore process are aborted, the dangled index is imported as a closed index that cannot be open.

Closes #10215
This commit is contained in:
Igor Motov 2015-05-07 16:14:59 -04:00
parent eaa8576bba
commit 21ed6bb90c
14 changed files with 446 additions and 75 deletions

View File

@ -182,7 +182,7 @@ def generate_index(client, version, index_name):
}
}
# completion type was added in 0.90.3
if version not in ['0.90.0.Beta1', '0.90.0.RC1', '0.90.0.RC2', '0.90.0', '0.90.1', '0.90.2']:
if not version.startswith('0.20') and version not in ['0.90.0.Beta1', '0.90.0.RC1', '0.90.0.RC2', '0.90.0', '0.90.1', '0.90.2']:
mappings['analyzer_type1']['properties']['completion_with_index_analyzer'] = {
'type': 'completion',
'index_analyzer': 'standard'
@ -257,7 +257,7 @@ def generate_index(client, version, index_name):
logging.info('Running basic asserts on the data added')
run_basic_asserts(client, index_name, 'doc', num_docs)
def snapshot_index(client, cfg, version, repo_dir):
def snapshot_index(client, version, repo_dir):
# Add bogus persistent settings to make sure they can be restored
client.cluster.put_settings(body={
'persistent': {
@ -361,7 +361,7 @@ def create_bwc_index(cfg, version):
index_name = 'index-%s' % version.lower()
generate_index(client, version, index_name)
if snapshot_supported:
snapshot_index(client, cfg, version, repo_dir)
snapshot_index(client, version, repo_dir)
# 10067: get a delete-by-query into the translog on upgrade. We must do
# this after the snapshot, because it calls flush. Otherwise the index

View File

@ -0,0 +1,76 @@
import create_bwc_index
import logging
import os
import shutil
import subprocess
import sys
import tempfile
def fetch_version(version):
logging.info('fetching ES version %s' % version)
if subprocess.call([sys.executable, os.path.join(os.path.split(sys.argv[0])[0], 'get-bwc-version.py'), version]) != 0:
raise RuntimeError('failed to download ES version %s' % version)
def main():
'''
Creates a back compat index (.zip) using v0.20 and then creates a snapshot of it using v1.1
'''
logging.basicConfig(format='[%(levelname)s] [%(asctime)s] %(message)s', level=logging.INFO,
datefmt='%Y-%m-%d %I:%M:%S %p')
logging.getLogger('elasticsearch').setLevel(logging.ERROR)
logging.getLogger('urllib3').setLevel(logging.WARN)
tmp_dir = tempfile.mkdtemp()
try:
data_dir = os.path.join(tmp_dir, 'data')
logging.info('Temp data dir: %s' % data_dir)
first_version = '0.20.6'
second_version = '1.1.2'
index_name = 'index-%s-and-%s' % (first_version, second_version)
# Download old ES releases if necessary:
release_dir = os.path.join('backwards', 'elasticsearch-%s' % first_version)
if not os.path.exists(release_dir):
fetch_version(first_version)
node = create_bwc_index.start_node(first_version, release_dir, data_dir, cluster_name=index_name)
client = create_bwc_index.create_client()
# Creates the index & indexes docs w/ first_version:
create_bwc_index.generate_index(client, first_version, index_name)
# Make sure we write segments:
flush_result = client.indices.flush(index=index_name)
if not flush_result['ok']:
raise RuntimeError('flush failed: %s' % str(flush_result))
create_bwc_index.shutdown_node(node)
print('%s server output:\n%s' % (first_version, node.stdout.read().decode('utf-8')))
node = None
release_dir = os.path.join('backwards', 'elasticsearch-%s' % second_version)
if not os.path.exists(release_dir):
fetch_version(second_version)
# Now use second_version to snapshot the index:
node = create_bwc_index.start_node(second_version, release_dir, data_dir, cluster_name=index_name)
client = create_bwc_index.create_client()
repo_dir = os.path.join(tmp_dir, 'repo')
create_bwc_index.snapshot_index(client, second_version, repo_dir)
create_bwc_index.shutdown_node(node)
print('%s server output:\n%s' % (second_version, node.stdout.read().decode('utf-8')))
create_bwc_index.compress(tmp_dir, "src/test/resources/org/elasticsearch/bwcompat", 'unsupportedrepo-%s.zip' % first_version, 'repo')
node = None
finally:
if node is not None:
create_bwc_index.shutdown_node(node)
shutil.rmtree(tmp_dir)
if __name__ == '__main__':
main()

View File

@ -160,6 +160,7 @@ public class IndexMetaData implements Diffable<IndexMetaData> {
public static final String SETTING_BLOCKS_WRITE = "index.blocks.write";
public static final String SETTING_BLOCKS_METADATA = "index.blocks.metadata";
public static final String SETTING_VERSION_CREATED = "index.version.created";
public static final String SETTING_VERSION_MINIMUM_COMPATIBLE = "index.version.minimum_compatible";
public static final String SETTING_CREATION_DATE = "index.creation_date";
public static final String SETTING_UUID = "index.uuid";
public static final String SETTING_LEGACY_ROUTING_HASH_FUNCTION = "index.legacy.routing.hash.type";
@ -192,6 +193,7 @@ public class IndexMetaData implements Diffable<IndexMetaData> {
private final DiscoveryNodeFilters excludeFilters;
private final Version indexCreatedVersion;
private final Version indexMinimumCompatibleVersion;
private final HashFunction routingHashFunction;
private final boolean useTypeForRouting;
@ -226,6 +228,7 @@ public class IndexMetaData implements Diffable<IndexMetaData> {
excludeFilters = DiscoveryNodeFilters.buildFromKeyValue(OR, excludeMap);
}
indexCreatedVersion = Version.indexCreated(settings);
indexMinimumCompatibleVersion = settings.getAsVersion(SETTING_VERSION_MINIMUM_COMPATIBLE, indexCreatedVersion);
final Class<? extends HashFunction> hashFunctionClass = settings.getAsClass(SETTING_LEGACY_ROUTING_HASH_FUNCTION, null);
if (hashFunctionClass == null) {
routingHashFunction = MURMUR3_HASH_FUNCTION;
@ -278,6 +281,8 @@ public class IndexMetaData implements Diffable<IndexMetaData> {
/**
* Return the {@link Version} on which this index has been created. This
* information is typically useful for backward compatibility.
*
* Returns null if the index was created before 0.19.0.RC1.
*/
public Version creationVersion() {
return indexCreatedVersion;
@ -287,6 +292,20 @@ public class IndexMetaData implements Diffable<IndexMetaData> {
return creationVersion();
}
/**
* Return the {@link Version} of that created the oldest segment in the index.
*
* If the index was created before v1.6 and didn't go through upgrade API the creation verion is returned.
* Returns null if the index was created before 0.19.0.RC1.
*/
public Version minimumCompatibleVersion() {
return indexMinimumCompatibleVersion;
}
public Version getMinimumCompatibleVersion() {
return minimumCompatibleVersion();
}
/**
* Return the {@link HashFunction} that should be used for routing.
*/

View File

@ -39,6 +39,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexException;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.indices.IndexPrimaryShardNotAllocatedException;
import org.elasticsearch.rest.RestStatus;
@ -58,11 +59,14 @@ public class MetaDataIndexStateService extends AbstractComponent {
private final AllocationService allocationService;
private final MetaDataIndexUpgradeService metaDataIndexUpgradeService;
@Inject
public MetaDataIndexStateService(Settings settings, ClusterService clusterService, AllocationService allocationService) {
public MetaDataIndexStateService(Settings settings, ClusterService clusterService, AllocationService allocationService, MetaDataIndexUpgradeService metaDataIndexUpgradeService) {
super(settings);
this.clusterService = clusterService;
this.allocationService = allocationService;
this.metaDataIndexUpgradeService = metaDataIndexUpgradeService;
}
public void closeIndex(final CloseIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
@ -160,7 +164,15 @@ public class MetaDataIndexStateService extends AbstractComponent {
ClusterBlocks.Builder blocksBuilder = ClusterBlocks.builder()
.blocks(currentState.blocks());
for (String index : indicesToOpen) {
mdBuilder.put(IndexMetaData.builder(currentState.metaData().index(index)).state(IndexMetaData.State.OPEN));
IndexMetaData indexMetaData = IndexMetaData.builder(currentState.metaData().index(index)).state(IndexMetaData.State.OPEN).build();
// The index might be closed because we couldn't import it due to old incompatible version
// We need to check that this index can be upgraded to the current version
try {
indexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData);
} catch (Exception ex) {
throw new IndexException(new Index(index), "cannot open the index due to upgrade failure", ex);
}
mdBuilder.put(indexMetaData, true);
blocksBuilder.removeIndexBlock(index, INDEX_CLOSED_BLOCK);
}

View File

@ -0,0 +1,135 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.routing.DjbHashFunction;
import org.elasticsearch.cluster.routing.HashFunction;
import org.elasticsearch.cluster.routing.SimpleHashFunction;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
/**
* This service is responsible for upgrading legacy index metadata to the current version
*
* Every time an existing index is introduced into cluster this service should be used
* to upgrade the existing index metadata to the latest version of the cluster. It typically
* occurs during cluster upgrade, when dangling indices are imported into the cluster or indices
* are restored from a repository.
*/
public class MetaDataIndexUpgradeService extends AbstractComponent {
private static final String DEPRECATED_SETTING_ROUTING_HASH_FUNCTION = "cluster.routing.operation.hash.type";
private static final String DEPRECATED_SETTING_ROUTING_USE_TYPE = "cluster.routing.operation.use_type";
private final Class<? extends HashFunction> pre20HashFunction;
private final Boolean pre20UseType;
@Inject
public MetaDataIndexUpgradeService(Settings settings) {
super(settings);
final String pre20HashFunctionName = settings.get(DEPRECATED_SETTING_ROUTING_HASH_FUNCTION, null);
final boolean hasCustomPre20HashFunction = pre20HashFunctionName != null;
// the hash function package has changed we replace the two hash functions if their fully qualified name is used.
if (hasCustomPre20HashFunction) {
switch (pre20HashFunctionName) {
case "org.elasticsearch.cluster.routing.operation.hash.simple.SimpleHashFunction":
pre20HashFunction = SimpleHashFunction.class;
break;
case "org.elasticsearch.cluster.routing.operation.hash.djb.DjbHashFunction":
pre20HashFunction = DjbHashFunction.class;
break;
default:
pre20HashFunction = settings.getAsClass(DEPRECATED_SETTING_ROUTING_HASH_FUNCTION, DjbHashFunction.class, "org.elasticsearch.cluster.routing.", "HashFunction");
}
} else {
pre20HashFunction = DjbHashFunction.class;
}
pre20UseType = settings.getAsBoolean(DEPRECATED_SETTING_ROUTING_USE_TYPE, null);
if (hasCustomPre20HashFunction|| pre20UseType != null) {
logger.warn("Settings [{}] and [{}] are deprecated. Index settings from your old indices have been updated to record the fact that they "
+ "used some custom routing logic, you can now remove these settings from your `elasticsearch.yml` file", DEPRECATED_SETTING_ROUTING_HASH_FUNCTION, DEPRECATED_SETTING_ROUTING_USE_TYPE);
}
}
/**
* Checks that the index can be upgraded to the current version of the master node.
*
* If the index does need upgrade it returns the index metadata unchanged, otherwise it returns a modified index metadata. If index cannot be
* updated the method throws an exception.
*/
public IndexMetaData upgradeIndexMetaData(IndexMetaData indexMetaData) throws Exception {
IndexMetaData newMetaData = indexMetaData;
newMetaData = checkSupportedVersion(newMetaData);
newMetaData = upgradeLegacyRoutingSettings(newMetaData);
return newMetaData;
}
/**
* Elasticsearch 2.0 deprecated no longer supports indices with pre Lucene v4.0 segments. All indices
* that were created before Elasticsearch v0.90.0 should be upgraded using upgrade plugin before they can
* be open by this version of elasticsearch.
*/
private IndexMetaData checkSupportedVersion(IndexMetaData indexMetaData) throws Exception {
if (indexMetaData.getState() == IndexMetaData.State.OPEN && isSupportedVersion(indexMetaData) == false) {
throw new IllegalStateException("The index [" + indexMetaData.getIndex() + "] was created before v0.90.0 and wasn't upgraded."
+ " This index should be open using a version before " + Version.CURRENT.minimumCompatibilityVersion()
+ " and upgraded using the upgrade API.");
}
return indexMetaData;
}
/*
* Returns true if this index can be supported by the current version of elasticsearch
*/
private static boolean isSupportedVersion(IndexMetaData indexMetaData) {
return indexMetaData.minimumCompatibleVersion() != null &&
indexMetaData.minimumCompatibleVersion().luceneVersion.onOrAfter(Version.V_0_90_0_Beta1.luceneVersion);
}
/**
* Elasticsearch 2.0 deprecated custom routing hash functions. So what we do here is that for old indices, we
* move this old and deprecated node setting to an index setting so that we can keep things backward compatible.
*/
private IndexMetaData upgradeLegacyRoutingSettings(IndexMetaData indexMetaData) throws Exception {
if (indexMetaData.settings().get(IndexMetaData.SETTING_LEGACY_ROUTING_HASH_FUNCTION) == null
&& indexMetaData.getCreationVersion().before(Version.V_2_0_0)) {
// these settings need an upgrade
Settings indexSettings = ImmutableSettings.builder().put(indexMetaData.settings())
.put(IndexMetaData.SETTING_LEGACY_ROUTING_HASH_FUNCTION, pre20HashFunction)
.put(IndexMetaData.SETTING_LEGACY_ROUTING_USE_TYPE, pre20UseType == null ? false : pre20UseType)
.build();
return IndexMetaData.builder(indexMetaData)
.version(indexMetaData.version())
.settings(indexSettings)
.build();
} else if (indexMetaData.getCreationVersion().onOrAfter(Version.V_2_0_0)) {
if (indexMetaData.getSettings().get(IndexMetaData.SETTING_LEGACY_ROUTING_HASH_FUNCTION) != null
|| indexMetaData.getSettings().get(IndexMetaData.SETTING_LEGACY_ROUTING_USE_TYPE) != null) {
throw new IllegalStateException("Indices created on or after 2.0 should NOT contain [" + IndexMetaData.SETTING_LEGACY_ROUTING_HASH_FUNCTION
+ "] + or [" + IndexMetaData.SETTING_LEGACY_ROUTING_USE_TYPE + "] in their index settings");
}
}
return indexMetaData;
}
}

View File

@ -20,41 +20,39 @@
package org.elasticsearch.gateway;
import com.google.common.collect.ImmutableSet;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.MultiDataPathUpgrader;
import org.elasticsearch.env.NodeEnvironment;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import static com.google.common.collect.Lists.newArrayList;
/**
*
*/
public class GatewayMetaState extends AbstractComponent implements ClusterStateListener {
private static final String DEPRECATED_SETTING_ROUTING_HASH_FUNCTION = "cluster.routing.operation.hash.type";
private static final String DEPRECATED_SETTING_ROUTING_USE_TYPE = "cluster.routing.operation.use_type";
private final NodeEnvironment nodeEnv;
private final MetaStateService metaStateService;
private final DanglingIndicesState danglingIndicesState;
private final MetaDataIndexUpgradeService metaDataIndexUpgradeService;
@Nullable
private volatile MetaData previousMetaData;
@ -63,11 +61,13 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
@Inject
public GatewayMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateService metaStateService,
DanglingIndicesState danglingIndicesState, TransportNodesListGatewayMetaState nodesListGatewayMetaState) throws Exception {
DanglingIndicesState danglingIndicesState, TransportNodesListGatewayMetaState nodesListGatewayMetaState,
MetaDataIndexUpgradeService metaDataIndexUpgradeService) throws Exception {
super(settings);
this.nodeEnv = nodeEnv;
this.metaStateService = metaStateService;
this.danglingIndicesState = danglingIndicesState;
this.metaDataIndexUpgradeService = metaDataIndexUpgradeService;
nodesListGatewayMetaState.init(this);
if (DiscoveryNode.dataNode(settings)) {
@ -212,58 +212,27 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
}
/**
* Elasticsearch 2.0 deprecated custom routing hash functions. So what we do here is that for old indices, we
* move this old & deprecated node setting to an index setting so that we can keep things backward compatible.
* Elasticsearch 2.0 removed several deprecated features and as well as support for Lucene 3.x. This method calls
* {@link MetaDataIndexUpgradeService} to makes sure that indices are compatible with the current version. The
* MetaDataIndexUpgradeService might also update updates obsolete settings if needed. When this happens we rewrite
* index metadata with new settings.
*/
private void pre20Upgrade() throws Exception {
final Class<? extends HashFunction> pre20HashFunction;
final String pre20HashFunctionName = settings.get(DEPRECATED_SETTING_ROUTING_HASH_FUNCTION, null);
final boolean hasCustomPre20HashFunction = pre20HashFunctionName != null;
// the hash function package has changed we replace the two hash functions if their fully qualified name is used.
if (hasCustomPre20HashFunction) {
switch (pre20HashFunctionName) {
case "org.elasticsearch.cluster.routing.operation.hash.simple.SimpleHashFunction":
pre20HashFunction = SimpleHashFunction.class;
break;
case "org.elasticsearch.cluster.routing.operation.hash.djb.DjbHashFunction":
pre20HashFunction = DjbHashFunction.class;
break;
default:
pre20HashFunction = settings.getAsClass(DEPRECATED_SETTING_ROUTING_HASH_FUNCTION, DjbHashFunction.class, "org.elasticsearch.cluster.routing.", "HashFunction");
}
} else {
pre20HashFunction = DjbHashFunction.class;
}
final Boolean pre20UseType = settings.getAsBoolean(DEPRECATED_SETTING_ROUTING_USE_TYPE, null);
MetaData metaData = loadMetaState();
List<IndexMetaData> updateIndexMetaData = newArrayList();
for (IndexMetaData indexMetaData : metaData) {
if (indexMetaData.settings().get(IndexMetaData.SETTING_LEGACY_ROUTING_HASH_FUNCTION) == null
&& indexMetaData.getCreationVersion().before(Version.V_2_0_0)) {
// these settings need an upgrade
Settings indexSettings = ImmutableSettings.builder().put(indexMetaData.settings())
.put(IndexMetaData.SETTING_LEGACY_ROUTING_HASH_FUNCTION, pre20HashFunction)
.put(IndexMetaData.SETTING_LEGACY_ROUTING_USE_TYPE, pre20UseType == null ? false : pre20UseType)
.build();
IndexMetaData newMetaData = IndexMetaData.builder(indexMetaData)
.version(indexMetaData.version())
.settings(indexSettings)
.build();
metaStateService.writeIndex("upgrade", newMetaData, null);
} else if (indexMetaData.getCreationVersion().onOrAfter(Version.V_2_0_0)) {
if (indexMetaData.getSettings().get(IndexMetaData.SETTING_LEGACY_ROUTING_HASH_FUNCTION) != null
|| indexMetaData.getSettings().get(IndexMetaData.SETTING_LEGACY_ROUTING_USE_TYPE) != null) {
throw new IllegalStateException("Indices created on or after 2.0 should NOT contain [" + IndexMetaData.SETTING_LEGACY_ROUTING_HASH_FUNCTION
+ "] + or [" + IndexMetaData.SETTING_LEGACY_ROUTING_USE_TYPE + "] in their index settings");
}
IndexMetaData newMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData);
if (indexMetaData != newMetaData) {
updateIndexMetaData.add(newMetaData);
}
}
if (hasCustomPre20HashFunction || pre20UseType != null) {
logger.warn("Settings [{}] and [{}] are deprecated. Index settings from your old indices have been updated to record the fact that they "
+ "used some custom routing logic, you can now remove these settings from your `elasticsearch.yml` file", DEPRECATED_SETTING_ROUTING_HASH_FUNCTION, DEPRECATED_SETTING_ROUTING_USE_TYPE);
// We successfully checked all indices for backward compatibility and found no non-upgradable indices, which
// means the upgrade can continue. Now it's safe to overwrite index metadata with the new version.
for (IndexMetaData indexMetaData : updateIndexMetaData) {
metaStateService.writeIndex("upgrade", indexMetaData, null);
}
}
// shard state BWC
private void ensureNoPre019ShardState(NodeEnvironment nodeEnv) throws Exception {
for (Path dataLocation : nodeEnv.nodeDataPaths()) {

View File

@ -25,6 +25,7 @@ import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
@ -54,12 +55,16 @@ public class LocalAllocateDangledIndices extends AbstractComponent {
private final AllocationService allocationService;
private final MetaDataIndexUpgradeService metaDataIndexUpgradeService;
@Inject
public LocalAllocateDangledIndices(Settings settings, TransportService transportService, ClusterService clusterService, AllocationService allocationService) {
public LocalAllocateDangledIndices(Settings settings, TransportService transportService, ClusterService clusterService,
AllocationService allocationService, MetaDataIndexUpgradeService metaDataIndexUpgradeService) {
super(settings);
this.transportService = transportService;
this.clusterService = clusterService;
this.allocationService = allocationService;
this.metaDataIndexUpgradeService = metaDataIndexUpgradeService;
transportService.registerRequestHandler(ACTION_NAME, AllocateDangledRequest.class, ThreadPool.Names.SAME, new AllocateDangledRequestHandler());
}
@ -129,10 +134,24 @@ public class LocalAllocateDangledIndices extends AbstractComponent {
continue;
}
importNeeded = true;
metaData.put(indexMetaData, false);
blocks.addBlocks(indexMetaData);
routingTableBuilder.addAsRecovery(indexMetaData);
sb.append("[").append(indexMetaData.index()).append("/").append(indexMetaData.state()).append("]");
IndexMetaData upgradedIndexMetaData;
try {
// The dangled index might be from an older version, we need to make sure it's compatible
// with the current version and upgrade it if needed.
upgradedIndexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData);
} catch (Exception ex) {
// upgrade failed - adding index as closed
logger.warn("found dangled index [{}] on node [{}]. This index cannot be upgraded to the latest version, adding as closed", ex,
indexMetaData.index(), request.fromNode);
upgradedIndexMetaData = IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE).version(indexMetaData.version() + 1).build();
}
metaData.put(upgradedIndexMetaData, false);
blocks.addBlocks(upgradedIndexMetaData);
if (upgradedIndexMetaData.getState() == IndexMetaData.State.OPEN) {
routingTableBuilder.addAsRecovery(upgradedIndexMetaData);
}
sb.append("[").append(upgradedIndexMetaData.index()).append("/").append(upgradedIndexMetaData.state()).append("]");
}
if (!importNeeded) {
return currentState;

View File

@ -21,6 +21,7 @@ package org.elasticsearch.indices;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.action.update.UpdateHelper;
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.SpawnModules;
@ -76,6 +77,7 @@ public class IndicesModule extends AbstractModule implements SpawnModules {
bind(IndicesTTLService.class).asEagerSingleton();
bind(IndicesWarmer.class).asEagerSingleton();
bind(UpdateHelper.class).asEagerSingleton();
bind(MetaDataIndexUpgradeService.class).asEagerSingleton();
bind(IndicesFieldDataCacheListener.class).asEagerSingleton();
}

View File

@ -116,11 +116,14 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
private final DynamicSettings dynamicSettings;
private final MetaDataIndexUpgradeService metaDataIndexUpgradeService;
private final CopyOnWriteArrayList<ActionListener<RestoreCompletionResponse>> listeners = new CopyOnWriteArrayList<>();
@Inject
public RestoreService(Settings settings, ClusterService clusterService, RepositoriesService repositoriesService, TransportService transportService,
AllocationService allocationService, MetaDataCreateIndexService createIndexService, @ClusterDynamicSettings DynamicSettings dynamicSettings) {
AllocationService allocationService, MetaDataCreateIndexService createIndexService, @ClusterDynamicSettings DynamicSettings dynamicSettings,
MetaDataIndexUpgradeService metaDataIndexUpgradeService) {
super(settings);
this.clusterService = clusterService;
this.repositoriesService = repositoriesService;
@ -128,6 +131,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
this.allocationService = allocationService;
this.createIndexService = createIndexService;
this.dynamicSettings = dynamicSettings;
this.metaDataIndexUpgradeService = metaDataIndexUpgradeService;
transportService.registerRequestHandler(UPDATE_RESTORE_ACTION_NAME, UpdateIndexShardRestoreStatusRequest.class, ThreadPool.Names.SAME, new UpdateRestoreStateRequestHandler());
clusterService.add(this);
}
@ -183,6 +187,11 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
String renamedIndex = indexEntry.getKey();
IndexMetaData snapshotIndexMetaData = metaData.index(index);
snapshotIndexMetaData = updateIndexSettings(snapshotIndexMetaData, request.indexSettings, request.ignoreIndexSettings);
try {
snapshotIndexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(snapshotIndexMetaData);
} catch (Exception ex) {
throw new SnapshotRestoreException(snapshotId, "cannot restore index [" + index + "] because it cannot be upgraded", ex);
}
// Check that the index is closed or doesn't exist
IndexMetaData currentIndexMetaData = currentState.metaData().index(renamedIndex);
IntSet ignoreShards = new IntHashSet();

View File

@ -19,6 +19,7 @@
package org.elasticsearch.bwcompat;
import com.google.common.base.Predicate;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.util.LuceneTestCase;
@ -28,6 +29,8 @@ import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.ImmutableSettings;
@ -36,10 +39,10 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.MultiDataPathUpgrader;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexException;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.merge.policy.MergePolicyModule;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.node.Node;
import org.elasticsearch.rest.action.admin.indices.upgrade.UpgradeTest;
@ -56,6 +59,7 @@ import org.elasticsearch.test.rest.client.http.HttpRequestBuilder;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.io.InputStream;
@ -65,6 +69,7 @@ import java.util.*;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.matchers.JUnitMatchers.containsString;
// needs at least 2 nodes since it bumps replicas to 1
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 0)
@ -75,19 +80,26 @@ public class OldIndexBackwardsCompatibilityTests extends ElasticsearchIntegratio
// We have a 0.20.6.zip etc for this.
List<String> indexes;
List<String> unsupportedIndexes;
static Path singleDataPath;
static Path[] multiDataPath;
@Before
public void initIndexesList() throws Exception {
indexes = new ArrayList<>();
indexes = loadIndexesList("index");
unsupportedIndexes = loadIndexesList("unsupported");
}
private List<String> loadIndexesList(String prefix) throws IOException {
List<String> indexes = new ArrayList<>();
Path dir = getDataPath(".");
try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir, "index-*.zip")) {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir, prefix + "-*.zip")) {
for (Path path : stream) {
indexes.add(path.getFileName().toString());
}
}
Collections.sort(indexes);
return indexes;
}
@AfterClass
@ -146,7 +158,7 @@ public class OldIndexBackwardsCompatibilityTests extends ElasticsearchIntegratio
String loadIndex(String indexFile) throws Exception {
Path unzipDir = createTempDir();
Path unzipDataDir = unzipDir.resolve("data");
String indexName = indexFile.replace(".zip", "").toLowerCase(Locale.ROOT);
String indexName = indexFile.replace(".zip", "").toLowerCase(Locale.ROOT).replace("unsupported-", "index-");
// decompress the index
Path backwardsIndex = getDataPath(indexFile);
@ -172,6 +184,10 @@ public class OldIndexBackwardsCompatibilityTests extends ElasticsearchIntegratio
logger.info("--> injecting index [{}] into multi data path", indexName);
copyIndex(logger, src, indexName, multiDataPath);
}
return indexName;
}
void importIndex(String indexName) throws IOException {
final Iterable<NodeEnvironment> instances = internalCluster().getInstances(NodeEnvironment.class);
for (NodeEnvironment nodeEnv : instances) { // upgrade multidata path
MultiDataPathUpgrader.upgradeMultiDataPath(nodeEnv, logger);
@ -179,7 +195,6 @@ public class OldIndexBackwardsCompatibilityTests extends ElasticsearchIntegratio
// force reloading dangling indices with a cluster state republish
client().admin().cluster().prepareReroute().get();
ensureGreen(indexName);
return indexName;
}
// randomly distribute the files from src over dests paths
@ -220,7 +235,7 @@ public class OldIndexBackwardsCompatibilityTests extends ElasticsearchIntegratio
}
void unloadIndex(String indexName) throws Exception {
ElasticsearchAssertions.assertAcked(client().admin().indices().prepareDelete(indexName).get());
assertAcked(client().admin().indices().prepareDelete(indexName).get());
}
public void testAllVersionsTested() throws Exception {
@ -258,9 +273,52 @@ public class OldIndexBackwardsCompatibilityTests extends ElasticsearchIntegratio
}
}
@Test
public void testHandlingOfUnsupportedDanglingIndexes() throws Exception {
setupCluster();
Collections.shuffle(unsupportedIndexes, getRandom());
for (String index : unsupportedIndexes) {
assertUnsupportedIndexHandling(index);
}
}
/**
* Waits for the index to show up in the cluster state in closed state
*/
void ensureClosed(final String index) throws InterruptedException {
assertTrue(awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {
ClusterState state = client().admin().cluster().prepareState().get().getState();
return state.metaData().hasIndex(index) && state.metaData().index(index).getState() == IndexMetaData.State.CLOSE;
}
}));
}
/**
* Checks that the given index cannot be opened due to incompatible version
*/
void assertUnsupportedIndexHandling(String index) throws Exception {
long startTime = System.currentTimeMillis();
logger.info("--> Testing old index " + index);
String indexName = loadIndex(index);
// force reloading dangling indices with a cluster state republish
client().admin().cluster().prepareReroute().get();
ensureClosed(indexName);
try {
client().admin().indices().prepareOpen(indexName).get();
fail("Shouldn't be able to open an old index");
} catch (IndexException ex) {
assertThat(ex.getMessage(), containsString("cannot open the index due to upgrade failure"));
}
unloadIndex(indexName);
logger.info("--> Done testing " + index + ", took " + ((System.currentTimeMillis() - startTime) / 1000.0) + " seconds");
}
void assertOldIndexWorks(String index) throws Exception {
Version version = extractVersion(index);
String indexName = loadIndex(index);
importIndex(indexName);
assertIndexSanity(indexName);
assertBasicSearchWorks(indexName);
assertBasicAggregationWorks(indexName);

View File

@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.bwcompat;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.Node;
import org.junit.Test;
import static org.hamcrest.Matchers.containsString;
public class RecoveryWithUnsupportedIndicesTests extends StaticIndexBackwardCompatibilityTest {
@Test
public void testUpgradeStartClusterOn_0_20_6() throws Exception {
String indexName = "unsupported-0.20.6";
logger.info("Checking static index " + indexName);
Settings nodeSettings = prepareBackwardsDataDir(getDataPath(indexName + ".zip"), Node.HTTP_ENABLED, true);
try {
internalCluster().startNode(nodeSettings);
fail();
} catch (Exception ex) {
assertThat(ex.getMessage(), containsString(" was created before v0.90.0 and wasn't upgraded"));
}
}
}

View File

@ -26,11 +26,11 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.AbstractSnapshotTests;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.SnapshotRestoreException;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import org.junit.Test;
@ -41,7 +41,6 @@ import java.net.URI;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Locale;
import java.util.SortedSet;
@ -55,7 +54,6 @@ import static org.hamcrest.Matchers.*;
@ClusterScope(scope = Scope.TEST)
public class RestoreBackwardsCompatTests extends AbstractSnapshotTests {
@Test
public void restoreOldSnapshots() throws Exception {
String repo = "test_repo";
@ -63,7 +61,7 @@ public class RestoreBackwardsCompatTests extends AbstractSnapshotTests {
List<String> repoVersions = repoVersions();
assertThat(repoVersions.size(), greaterThan(0));
for (String version : repoVersions) {
createRepo(version, repo);
createRepo("repo", version, repo);
testOldSnapshot(version, repo, snapshot);
}
@ -93,13 +91,33 @@ public class RestoreBackwardsCompatTests extends AbstractSnapshotTests {
}
}
@Test
public void testRestoreUnsupportedSnapshots() throws Exception {
String repo = "test_repo";
String snapshot = "test_1";
List<String> repoVersions = unsupportedRepoVersions();
assertThat(repoVersions.size(), greaterThan(0));
for (String version : repoVersions) {
createRepo("unsupportedrepo", version, repo);
assertUnsupportedIndexFailsToRestore(repo, snapshot);
}
}
private List<String> repoVersions() throws Exception {
return listRepoVersions("repo");
}
private List<String> unsupportedRepoVersions() throws Exception {
return listRepoVersions("unsupportedrepo");
}
private List<String> listRepoVersions(String prefix) throws Exception {
List<String> repoVersions = newArrayList();
Path repoFiles = getDataPath(".");
try (DirectoryStream<Path> stream = Files.newDirectoryStream(repoFiles, "repo-*.zip")) {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(repoFiles, prefix + "-*.zip")) {
for (Path entry : stream) {
String fileName = entry.getFileName().toString();
String version = fileName.substring("repo-".length());
String version = fileName.substring(prefix.length() + 1);
version = version.substring(0, version.length() - ".zip".length());
repoVersions.add(version);
}
@ -107,8 +125,8 @@ public class RestoreBackwardsCompatTests extends AbstractSnapshotTests {
return repoVersions;
}
private void createRepo(String version, String repo) throws Exception {
String repoFile = "repo-" + version + ".zip";
private void createRepo(String prefix, String version, String repo) throws Exception {
String repoFile = prefix + "-" + version + ".zip";
URI repoFileUri = getClass().getResource(repoFile).toURI();
URI repoJarUri = new URI("jar:" + repoFileUri.toString() + "!/repo/");
logger.info("--> creating repository [{}] for version [{}]", repo, version);
@ -156,5 +174,16 @@ public class RestoreBackwardsCompatTests extends AbstractSnapshotTests {
cluster().wipeTemplates();
}
private void assertUnsupportedIndexFailsToRestore(String repo, String snapshot) throws IOException {
logger.info("--> restoring unsupported snapshot");
try {
client().admin().cluster().prepareRestoreSnapshot(repo, snapshot).setRestoreGlobalState(true).setWaitForCompletion(true).get();
fail("should have failed to restore");
} catch (SnapshotRestoreException ex) {
assertThat(ex.getMessage(), containsString("cannot restore index"));
assertThat(ex.getMessage(), containsString("because it cannot be upgraded"));
}
}
}