Check for incompatible mappings while upgrading old indices
Conflicting mappings that were allowed before v2.0 can cause runaway shard failures on upgrade. This commit adds a check that prevents a cluster from starting if it contains such indices as well as restoring such indices from a snapshot into already running cluster. Closes #11857
This commit is contained in:
parent
984b94e8b7
commit
f71c9a25a1
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.action.admin.indices.upgrade.post;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastShardResponse;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
@ -32,7 +33,9 @@ import java.text.ParseException;
|
|||
*/
|
||||
class ShardUpgradeResponse extends BroadcastShardResponse {
|
||||
|
||||
private org.apache.lucene.util.Version version;
|
||||
private org.apache.lucene.util.Version oldestLuceneSegment;
|
||||
|
||||
private Version upgradeVersion;
|
||||
|
||||
private boolean primary;
|
||||
|
||||
|
@ -40,14 +43,19 @@ class ShardUpgradeResponse extends BroadcastShardResponse {
|
|||
ShardUpgradeResponse() {
|
||||
}
|
||||
|
||||
ShardUpgradeResponse(ShardId shardId, boolean primary, org.apache.lucene.util.Version version) {
|
||||
ShardUpgradeResponse(ShardId shardId, boolean primary, Version upgradeVersion, org.apache.lucene.util.Version oldestLuceneSegment) {
|
||||
super(shardId);
|
||||
this.primary = primary;
|
||||
this.version = version;
|
||||
this.upgradeVersion = upgradeVersion;
|
||||
this.oldestLuceneSegment = oldestLuceneSegment;
|
||||
}
|
||||
|
||||
public org.apache.lucene.util.Version version() {
|
||||
return this.version;
|
||||
public org.apache.lucene.util.Version oldestLuceneSegment() {
|
||||
return this.oldestLuceneSegment;
|
||||
}
|
||||
|
||||
public Version upgradeVersion() {
|
||||
return this.upgradeVersion;
|
||||
}
|
||||
|
||||
public boolean primary() {
|
||||
|
@ -59,18 +67,21 @@ class ShardUpgradeResponse extends BroadcastShardResponse {
|
|||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
primary = in.readBoolean();
|
||||
upgradeVersion = Version.readVersion(in);
|
||||
try {
|
||||
version = org.apache.lucene.util.Version.parse(in.readString());
|
||||
oldestLuceneSegment = org.apache.lucene.util.Version.parse(in.readString());
|
||||
} catch (ParseException ex) {
|
||||
throw new IOException("failed to parse lucene version [" + version + "]", ex);
|
||||
throw new IOException("failed to parse lucene version [" + oldestLuceneSegment + "]", ex);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeBoolean(primary);
|
||||
out.writeString(version.toString());
|
||||
Version.writeVersion(upgradeVersion, out);
|
||||
out.writeString(oldestLuceneSegment.toString());
|
||||
}
|
||||
|
||||
}
|
|
@ -19,7 +19,7 @@
|
|||
|
||||
package org.elasticsearch.action.admin.indices.upgrade.post;
|
||||
|
||||
import org.apache.lucene.util.Version;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.PrimaryMissingActionException;
|
||||
import org.elasticsearch.action.ShardOperationFailedException;
|
||||
|
@ -34,6 +34,7 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
|||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.routing.*;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
|
@ -75,7 +76,7 @@ public class TransportUpgradeAction extends TransportBroadcastAction<UpgradeRequ
|
|||
int failedShards = 0;
|
||||
List<ShardOperationFailedException> shardFailures = null;
|
||||
Map<String, Integer> successfulPrimaryShards = newHashMap();
|
||||
Map<String, Version> versions = newHashMap();
|
||||
Map<String, Tuple<Version, org.apache.lucene.util.Version>> versions = newHashMap();
|
||||
for (int i = 0; i < shardsResponses.length(); i++) {
|
||||
Object shardResponse = shardsResponses.get(i);
|
||||
if (shardResponse == null) {
|
||||
|
@ -94,20 +95,35 @@ public class TransportUpgradeAction extends TransportBroadcastAction<UpgradeRequ
|
|||
Integer count = successfulPrimaryShards.get(index);
|
||||
successfulPrimaryShards.put(index, count == null ? 1 : count + 1);
|
||||
}
|
||||
Version version = versions.get(index);
|
||||
if (version == null || shardUpgradeResponse.version().onOrAfter(version) == false) {
|
||||
versions.put(index, shardUpgradeResponse.version());
|
||||
Tuple<Version, org.apache.lucene.util.Version> versionTuple = versions.get(index);
|
||||
if (versionTuple == null) {
|
||||
versions.put(index, new Tuple<>(shardUpgradeResponse.upgradeVersion(), shardUpgradeResponse.oldestLuceneSegment()));
|
||||
} else {
|
||||
// We already have versions for this index - let's see if we need to update them based on the current shard
|
||||
Version version = versionTuple.v1();
|
||||
org.apache.lucene.util.Version luceneVersion = versionTuple.v2();
|
||||
// For the metadata we are interested in the _latest_ elasticsearch version that was processing the metadata
|
||||
// Since we rewrite the mapping during upgrade the metadata is always rewritten by the latest version
|
||||
if (shardUpgradeResponse.upgradeVersion().after(versionTuple.v1())) {
|
||||
version = shardUpgradeResponse.upgradeVersion();
|
||||
}
|
||||
// For the lucene version we are interested in the _oldest_ lucene version since it determines the
|
||||
// oldest version that we need to support
|
||||
if (shardUpgradeResponse.oldestLuceneSegment().onOrAfter(versionTuple.v2()) == false) {
|
||||
luceneVersion = shardUpgradeResponse.oldestLuceneSegment();
|
||||
}
|
||||
versions.put(index, new Tuple<>(version, luceneVersion));
|
||||
}
|
||||
}
|
||||
}
|
||||
Map<String, String> updatedVersions = newHashMap();
|
||||
Map<String, Tuple<org.elasticsearch.Version, String>> updatedVersions = newHashMap();
|
||||
MetaData metaData = clusterState.metaData();
|
||||
for (Map.Entry<String, Version> versionEntry : versions.entrySet()) {
|
||||
for (Map.Entry<String, Tuple<Version, org.apache.lucene.util.Version>> versionEntry : versions.entrySet()) {
|
||||
String index = versionEntry.getKey();
|
||||
Integer primaryCount = successfulPrimaryShards.get(index);
|
||||
int expectedPrimaryCount = metaData.index(index).getNumberOfShards();
|
||||
if (primaryCount == metaData.index(index).getNumberOfShards()) {
|
||||
updatedVersions.put(index, versionEntry.getValue().toString());
|
||||
updatedVersions.put(index, new Tuple<>(versionEntry.getValue().v1(), versionEntry.getValue().v2().toString()));
|
||||
} else {
|
||||
logger.warn("Not updating settings for the index [{}] because upgraded of some primary shards failed - expected[{}], received[{}]", index,
|
||||
expectedPrimaryCount, primaryCount == null ? 0 : primaryCount);
|
||||
|
@ -130,8 +146,9 @@ public class TransportUpgradeAction extends TransportBroadcastAction<UpgradeRequ
|
|||
@Override
|
||||
protected ShardUpgradeResponse shardOperation(ShardUpgradeRequest request) {
|
||||
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).shardSafe(request.shardId().id());
|
||||
org.apache.lucene.util.Version version = indexShard.upgrade(request.upgradeRequest());
|
||||
return new ShardUpgradeResponse(request.shardId(), indexShard.routingEntry().primary(), version);
|
||||
org.apache.lucene.util.Version oldestLuceneSegment = indexShard.upgrade(request.upgradeRequest());
|
||||
// We are using the current version of elasticsearch as upgrade version since we update mapping to match the current version
|
||||
return new ShardUpgradeResponse(request.shardId(), indexShard.routingEntry().primary(), Version.CURRENT, oldestLuceneSegment);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -19,8 +19,10 @@
|
|||
|
||||
package org.elasticsearch.action.admin.indices.upgrade.post;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ShardOperationFailedException;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
||||
|
@ -37,13 +39,13 @@ import static com.google.common.collect.Maps.newHashMap;
|
|||
*/
|
||||
public class UpgradeResponse extends BroadcastResponse {
|
||||
|
||||
private Map<String, String> versions;
|
||||
private Map<String, Tuple<Version, String>> versions;
|
||||
|
||||
UpgradeResponse() {
|
||||
|
||||
}
|
||||
|
||||
UpgradeResponse(Map<String, String> versions, int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
|
||||
UpgradeResponse(Map<String, Tuple<Version, String>> versions, int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
|
||||
super(totalShards, successfulShards, failedShards, shardFailures);
|
||||
this.versions = versions;
|
||||
}
|
||||
|
@ -55,8 +57,9 @@ public class UpgradeResponse extends BroadcastResponse {
|
|||
versions = newHashMap();
|
||||
for (int i=0; i<size; i++) {
|
||||
String index = in.readString();
|
||||
String version = in.readString();
|
||||
versions.put(index, version);
|
||||
Version upgradeVersion = Version.readVersion(in);
|
||||
String oldestLuceneSegment = in.readString();
|
||||
versions.put(index, new Tuple<>(upgradeVersion, oldestLuceneSegment));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -64,13 +67,18 @@ public class UpgradeResponse extends BroadcastResponse {
|
|||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeVInt(versions.size());
|
||||
for(Map.Entry<String, String> entry : versions.entrySet()) {
|
||||
for(Map.Entry<String, Tuple<Version, String>> entry : versions.entrySet()) {
|
||||
out.writeString(entry.getKey());
|
||||
out.writeString(entry.getValue());
|
||||
Version.writeVersion(entry.getValue().v1(), out);
|
||||
out.writeString(entry.getValue().v2());
|
||||
}
|
||||
}
|
||||
|
||||
public Map<String, String> versions() {
|
||||
/**
|
||||
* Returns the highest upgrade version of the node that performed metadata upgrade and the
|
||||
* the version of the oldest lucene segment for each index that was upgraded.
|
||||
*/
|
||||
public Map<String, Tuple<Version, String>> versions() {
|
||||
return versions;
|
||||
}
|
||||
}
|
|
@ -19,7 +19,9 @@
|
|||
|
||||
package org.elasticsearch.action.admin.indices.upgrade.post;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -28,7 +30,7 @@ import java.util.Map;
|
|||
*/
|
||||
public class UpgradeSettingsClusterStateUpdateRequest extends ClusterStateUpdateRequest<UpgradeSettingsClusterStateUpdateRequest> {
|
||||
|
||||
private Map<String, String> versions;
|
||||
private Map<String, Tuple<Version, String>> versions;
|
||||
|
||||
public UpgradeSettingsClusterStateUpdateRequest() {
|
||||
|
||||
|
@ -37,14 +39,14 @@ public class UpgradeSettingsClusterStateUpdateRequest extends ClusterStateUpdate
|
|||
/**
|
||||
* Returns the index to version map for indices that should be updated
|
||||
*/
|
||||
public Map<String, String> versions() {
|
||||
public Map<String, Tuple<Version, String>> versions() {
|
||||
return versions;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the index to version map for indices that should be updated
|
||||
*/
|
||||
public UpgradeSettingsClusterStateUpdateRequest versions(Map<String, String> versions) {
|
||||
public UpgradeSettingsClusterStateUpdateRequest versions(Map<String, Tuple<Version, String>> versions) {
|
||||
this.versions = versions;
|
||||
return this;
|
||||
}
|
||||
|
|
|
@ -19,8 +19,10 @@
|
|||
|
||||
package org.elasticsearch.action.admin.indices.upgrade.post;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedRequest;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
||||
|
@ -35,16 +37,17 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
|
|||
*/
|
||||
public class UpgradeSettingsRequest extends AcknowledgedRequest<UpgradeSettingsRequest> {
|
||||
|
||||
|
||||
private Map<String, String> versions;
|
||||
private Map<String, Tuple<Version, String>> versions;
|
||||
|
||||
UpgradeSettingsRequest() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a new request to update minimum compatible version settings for one or more indices
|
||||
*
|
||||
* @param versions a map from index name to elasticsearch version, oldest lucene segment version tuple
|
||||
*/
|
||||
public UpgradeSettingsRequest(Map<String, String> versions) {
|
||||
public UpgradeSettingsRequest(Map<String, Tuple<Version, String>> versions) {
|
||||
this.versions = versions;
|
||||
}
|
||||
|
||||
|
@ -59,14 +62,14 @@ public class UpgradeSettingsRequest extends AcknowledgedRequest<UpgradeSettingsR
|
|||
}
|
||||
|
||||
|
||||
Map<String, String> versions() {
|
||||
Map<String, Tuple<Version, String>> versions() {
|
||||
return versions;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the index versions to be updated
|
||||
*/
|
||||
public UpgradeSettingsRequest versions(Map<String, String> versions) {
|
||||
public UpgradeSettingsRequest versions(Map<String, Tuple<Version, String>> versions) {
|
||||
this.versions = versions;
|
||||
return this;
|
||||
}
|
||||
|
@ -79,8 +82,9 @@ public class UpgradeSettingsRequest extends AcknowledgedRequest<UpgradeSettingsR
|
|||
versions = newHashMap();
|
||||
for (int i=0; i<size; i++) {
|
||||
String index = in.readString();
|
||||
String version = in.readString();
|
||||
versions.put(index, version);
|
||||
Version upgradeVersion = Version.readVersion(in);
|
||||
String oldestLuceneSegment = in.readString();
|
||||
versions.put(index, new Tuple<>(upgradeVersion, oldestLuceneSegment));
|
||||
}
|
||||
readTimeout(in);
|
||||
}
|
||||
|
@ -89,9 +93,10 @@ public class UpgradeSettingsRequest extends AcknowledgedRequest<UpgradeSettingsR
|
|||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeVInt(versions.size());
|
||||
for(Map.Entry<String, String> entry : versions.entrySet()) {
|
||||
for(Map.Entry<String, Tuple<Version, String>> entry : versions.entrySet()) {
|
||||
out.writeString(entry.getKey());
|
||||
out.writeString(entry.getValue());
|
||||
Version.writeVersion(entry.getValue().v1(), out);
|
||||
out.writeString(entry.getValue().v2());
|
||||
}
|
||||
writeTimeout(out);
|
||||
}
|
||||
|
|
|
@ -19,8 +19,10 @@
|
|||
|
||||
package org.elasticsearch.action.admin.indices.upgrade.post;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -36,7 +38,7 @@ public class UpgradeSettingsRequestBuilder extends AcknowledgedRequestBuilder<Up
|
|||
/**
|
||||
* Sets the index versions to be updated
|
||||
*/
|
||||
public UpgradeSettingsRequestBuilder setVersions(Map<String, String> versions) {
|
||||
public UpgradeSettingsRequestBuilder setVersions(Map<String, Tuple<Version, String>> versions) {
|
||||
request.versions(versions);
|
||||
return this;
|
||||
}
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
*/
|
||||
package org.elasticsearch.cluster.metadata;
|
||||
|
||||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||
import org.apache.lucene.analysis.Analyzer;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.routing.DjbHashFunction;
|
||||
import org.elasticsearch.cluster.routing.HashFunction;
|
||||
|
@ -27,6 +29,12 @@ import org.elasticsearch.common.component.AbstractComponent;
|
|||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.analysis.AnalysisService;
|
||||
import org.elasticsearch.index.analysis.NamedAnalyzer;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.similarity.SimilarityLookupService;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -45,11 +53,12 @@ public class MetaDataIndexUpgradeService extends AbstractComponent {
|
|||
|
||||
private final Class<? extends HashFunction> pre20HashFunction;
|
||||
private final Boolean pre20UseType;
|
||||
private final ScriptService scriptService;
|
||||
|
||||
@Inject
|
||||
public MetaDataIndexUpgradeService(Settings settings) {
|
||||
public MetaDataIndexUpgradeService(Settings settings, ScriptService scriptService) {
|
||||
super(settings);
|
||||
|
||||
this.scriptService = scriptService;
|
||||
final String pre20HashFunctionName = settings.get(DEPRECATED_SETTING_ROUTING_HASH_FUNCTION, null);
|
||||
final boolean hasCustomPre20HashFunction = pre20HashFunctionName != null;
|
||||
// the hash function package has changed we replace the two hash functions if their fully qualified name is used.
|
||||
|
@ -83,12 +92,24 @@ public class MetaDataIndexUpgradeService extends AbstractComponent {
|
|||
*/
|
||||
public IndexMetaData upgradeIndexMetaData(IndexMetaData indexMetaData) {
|
||||
// Throws an exception if there are too-old segments:
|
||||
if (isUpgraded(indexMetaData)) {
|
||||
return indexMetaData;
|
||||
}
|
||||
checkSupportedVersion(indexMetaData);
|
||||
IndexMetaData newMetaData = upgradeLegacyRoutingSettings(indexMetaData);
|
||||
newMetaData = addDefaultUnitsIfNeeded(newMetaData);
|
||||
checkMappingsCompatibility(newMetaData);
|
||||
newMetaData = markAsUpgraded(newMetaData);
|
||||
return newMetaData;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the index was already opened by this version of Elasticsearch and doesn't require any additional checks.
|
||||
*/
|
||||
private boolean isUpgraded(IndexMetaData indexMetaData) {
|
||||
return indexMetaData.upgradeVersion().onOrAfter(Version.V_2_0_0_beta1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Elasticsearch 2.0 no longer supports indices with pre Lucene v4.0 (Elasticsearch v 0.90.0) segments. All indices
|
||||
* that were created before Elasticsearch v0.90.0 should be upgraded using upgrade plugin before they can
|
||||
|
@ -239,4 +260,66 @@ public class MetaDataIndexUpgradeService extends AbstractComponent {
|
|||
// No changes:
|
||||
return indexMetaData;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Checks the mappings for compatibility with the current version
|
||||
*/
|
||||
private void checkMappingsCompatibility(IndexMetaData indexMetaData) {
|
||||
Index index = new Index(indexMetaData.getIndex());
|
||||
Settings settings = indexMetaData.settings();
|
||||
try {
|
||||
SimilarityLookupService similarityLookupService = new SimilarityLookupService(index, settings);
|
||||
// We cannot instantiate real analysis server at this point because the node might not have
|
||||
// been started yet. However, we don't really need real analyzers at this stage - so we can fake it
|
||||
try (AnalysisService analysisService = new FakeAnalysisService(index, settings)) {
|
||||
try (MapperService mapperService = new MapperService(index, settings, analysisService, similarityLookupService, scriptService)) {
|
||||
for (ObjectCursor<MappingMetaData> cursor : indexMetaData.getMappings().values()) {
|
||||
MappingMetaData mappingMetaData = cursor.value;
|
||||
mapperService.merge(mappingMetaData.type(), mappingMetaData.source(), false, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
// Wrap the inner exception so we have the index name in the exception message
|
||||
throw new IllegalStateException("unable to upgrade the mappings for the index [" + indexMetaData.getIndex() + "], reason: [" + ex.getMessage() + "]", ex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks index as upgraded so we don't have to test it again
|
||||
*/
|
||||
private IndexMetaData markAsUpgraded(IndexMetaData indexMetaData) {
|
||||
Settings settings = Settings.builder().put(indexMetaData.settings()).put(IndexMetaData.SETTING_VERSION_UPGRADED, Version.CURRENT).build();
|
||||
return IndexMetaData.builder(indexMetaData).settings(settings).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* A fake analysis server that returns the same keyword analyzer for all requests
|
||||
*/
|
||||
private static class FakeAnalysisService extends AnalysisService {
|
||||
|
||||
private Analyzer fakeAnalyzer = new Analyzer() {
|
||||
@Override
|
||||
protected TokenStreamComponents createComponents(String fieldName) {
|
||||
throw new UnsupportedOperationException("shouldn't be here");
|
||||
}
|
||||
};
|
||||
|
||||
public FakeAnalysisService(Index index, Settings indexSettings) {
|
||||
super(index, indexSettings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public NamedAnalyzer analyzer(String name) {
|
||||
return new NamedAnalyzer(name, fakeAnalyzer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
fakeAnalyzer.close();
|
||||
super.close();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
|||
import org.elasticsearch.cluster.settings.DynamicSettings;
|
||||
import org.elasticsearch.common.Booleans;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -334,7 +335,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
|
|||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
MetaData.Builder metaDataBuilder = MetaData.builder(currentState.metaData());
|
||||
for (Map.Entry<String, String> entry : request.versions().entrySet()) {
|
||||
for (Map.Entry<String, Tuple<Version, String>> entry : request.versions().entrySet()) {
|
||||
String index = entry.getKey();
|
||||
IndexMetaData indexMetaData = metaDataBuilder.get(index);
|
||||
if (indexMetaData != null) {
|
||||
|
@ -342,8 +343,8 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
|
|||
// No reason to pollute the settings, we didn't really upgrade anything
|
||||
metaDataBuilder.put(IndexMetaData.builder(indexMetaData)
|
||||
.settings(settingsBuilder().put(indexMetaData.settings())
|
||||
.put(IndexMetaData.SETTING_VERSION_MINIMUM_COMPATIBLE, entry.getValue())
|
||||
.put(IndexMetaData.SETTING_VERSION_UPGRADED, Version.CURRENT)
|
||||
.put(IndexMetaData.SETTING_VERSION_MINIMUM_COMPATIBLE, entry.getValue().v2())
|
||||
.put(IndexMetaData.SETTING_VERSION_UPGRADED, entry.getValue().v1())
|
||||
)
|
||||
);
|
||||
}
|
||||
|
|
|
@ -63,6 +63,7 @@ import org.elasticsearch.indices.TypeMissingException;
|
|||
import org.elasticsearch.percolator.PercolatorService;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -78,7 +79,7 @@ import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder;
|
|||
/**
|
||||
*
|
||||
*/
|
||||
public class MapperService extends AbstractIndexComponent {
|
||||
public class MapperService extends AbstractIndexComponent implements Closeable {
|
||||
|
||||
public static final String DEFAULT_MAPPING = "_default_";
|
||||
private static ObjectHashSet<String> META_FIELDS = ObjectHashSet.from(
|
||||
|
|
|
@ -19,11 +19,13 @@
|
|||
|
||||
package org.elasticsearch.rest.action.admin.indices.upgrade;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.indices.upgrade.get.UpgradeStatusResponse;
|
||||
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
|
||||
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
@ -86,8 +88,11 @@ public class RestUpgradeAction extends BaseRestHandler {
|
|||
builder.startObject();
|
||||
buildBroadcastShardsHeader(builder, request, response);
|
||||
builder.startObject("upgraded_indices");
|
||||
for (Map.Entry<String, String> entry : response.versions().entrySet()) {
|
||||
builder.field(entry.getKey(), entry.getValue(), XContentBuilder.FieldCaseConversion.NONE);
|
||||
for (Map.Entry<String, Tuple<Version, String>> entry : response.versions().entrySet()) {
|
||||
builder.startObject(entry.getKey(), XContentBuilder.FieldCaseConversion.NONE);
|
||||
builder.field("upgrade_version", entry.getValue().v1());
|
||||
builder.field("oldest_lucene_segment_version", entry.getValue().v2());
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
builder.endObject();
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.rest.action.admin.indices.upgrade;
|
||||
package org.elasticsearch.action.admin.indices.upgrade;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
|
@ -17,15 +17,17 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.rest.action.admin.indices.upgrade;
|
||||
package org.elasticsearch.action.admin.indices.upgrade;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.bwcompat.StaticIndexBackwardCompatibilityIT;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
|
||||
public class UpgradeReallyOldIndexIT extends StaticIndexBackwardCompatibilityIT {
|
||||
|
||||
|
@ -38,11 +40,25 @@ public class UpgradeReallyOldIndexIT extends StaticIndexBackwardCompatibilityIT
|
|||
assertTrue(UpgradeIT.hasAncientSegments(client(), indexName));
|
||||
assertNoFailures(client().admin().indices().prepareUpgrade(indexName).setUpgradeOnlyAncientSegments(true).get());
|
||||
|
||||
assertFalse(UpgradeIT.hasAncientSegments(client(), "index-0.90.6"));
|
||||
assertFalse(UpgradeIT.hasAncientSegments(client(), indexName));
|
||||
// This index has only ancient segments, so it should now be fully upgraded:
|
||||
UpgradeIT.assertUpgraded(client(), indexName);
|
||||
assertEquals(Version.CURRENT.luceneVersion.toString(), client().admin().indices().prepareGetSettings(indexName).get().getSetting(indexName, IndexMetaData.SETTING_VERSION_MINIMUM_COMPATIBLE));
|
||||
assertMinVersion(indexName, Version.CURRENT.luceneVersion);
|
||||
|
||||
assertEquals(client().admin().indices().prepareGetSettings(indexName).get().getSetting(indexName, IndexMetaData.SETTING_VERSION_UPGRADED), Integer.toString(Version.CURRENT.id));
|
||||
}
|
||||
|
||||
public void testUpgradeConflictingMapping() throws Exception {
|
||||
String indexName = "index-conflicting-mappings-1.7.0";
|
||||
logger.info("Checking static index " + indexName);
|
||||
Settings nodeSettings = prepareBackwardsDataDir(getDataPath(indexName + ".zip"));
|
||||
try {
|
||||
internalCluster().startNode(nodeSettings);
|
||||
fail("Should have failed to start the node");
|
||||
} catch (Exception ex) {
|
||||
assertThat(ex.getMessage(), containsString("conflicts with existing mapping in other types"));
|
||||
}
|
||||
}
|
||||
|
||||
private void assertMinVersion(String index, org.apache.lucene.util.Version version) {
|
|
@ -26,6 +26,7 @@ import org.apache.lucene.util.LuceneTestCase;
|
|||
import org.apache.lucene.util.TestUtil;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
|
||||
import org.elasticsearch.action.admin.indices.upgrade.UpgradeIT;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.search.SearchRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
|
@ -42,7 +43,6 @@ import org.elasticsearch.index.engine.EngineConfig;
|
|||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.index.shard.MergePolicyConfig;
|
||||
import org.elasticsearch.indices.recovery.RecoverySettings;
|
||||
import org.elasticsearch.rest.action.admin.indices.upgrade.UpgradeIT;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.aggregations.AggregationBuilders;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
|
||||
|
|
Binary file not shown.
|
@ -0,0 +1,93 @@
|
|||
import create_bwc_index
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
|
||||
def fetch_version(version):
|
||||
logging.info('fetching ES version %s' % version)
|
||||
if subprocess.call([sys.executable, os.path.join(os.path.split(sys.argv[0])[0], 'get-bwc-version.py'), version]) != 0:
|
||||
raise RuntimeError('failed to download ES version %s' % version)
|
||||
|
||||
def main():
|
||||
'''
|
||||
Creates a static back compat index (.zip) with conflicting mappings.
|
||||
'''
|
||||
|
||||
logging.basicConfig(format='[%(levelname)s] [%(asctime)s] %(message)s', level=logging.INFO,
|
||||
datefmt='%Y-%m-%d %I:%M:%S %p')
|
||||
logging.getLogger('elasticsearch').setLevel(logging.ERROR)
|
||||
logging.getLogger('urllib3').setLevel(logging.WARN)
|
||||
|
||||
tmp_dir = tempfile.mkdtemp()
|
||||
try:
|
||||
data_dir = os.path.join(tmp_dir, 'data')
|
||||
repo_dir = os.path.join(tmp_dir, 'repo')
|
||||
logging.info('Temp data dir: %s' % data_dir)
|
||||
logging.info('Temp repo dir: %s' % repo_dir)
|
||||
|
||||
version = '1.7.0'
|
||||
classifier = 'conflicting-mappings-%s' % version
|
||||
index_name = 'index-%s' % classifier
|
||||
|
||||
# Download old ES releases if necessary:
|
||||
release_dir = os.path.join('backwards', 'elasticsearch-%s' % version)
|
||||
if not os.path.exists(release_dir):
|
||||
fetch_version(version)
|
||||
|
||||
node = create_bwc_index.start_node(version, release_dir, data_dir, repo_dir, cluster_name=index_name)
|
||||
client = create_bwc_index.create_client()
|
||||
|
||||
put_conflicting_mappings(client, index_name)
|
||||
create_bwc_index.shutdown_node(node)
|
||||
print('%s server output:\n%s' % (version, node.stdout.read().decode('utf-8')))
|
||||
node = None
|
||||
create_bwc_index.compress_index(classifier, tmp_dir, 'core/src/test/resources/org/elasticsearch/action/admin/indices/upgrade')
|
||||
finally:
|
||||
if node is not None:
|
||||
create_bwc_index.shutdown_node(node)
|
||||
shutil.rmtree(tmp_dir)
|
||||
|
||||
def put_conflicting_mappings(client, index_name):
|
||||
client.indices.delete(index=index_name, ignore=404)
|
||||
logging.info('Create single shard test index')
|
||||
|
||||
mappings = {}
|
||||
# backwardcompat test for conflicting mappings, see #11857
|
||||
mappings['x'] = {
|
||||
'analyzer': 'standard',
|
||||
"properties": {
|
||||
"foo": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
}
|
||||
mappings['y'] = {
|
||||
'analyzer': 'standard',
|
||||
"properties": {
|
||||
"foo": {
|
||||
"type": "date"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
client.indices.create(index=index_name, body={
|
||||
'settings': {
|
||||
'number_of_shards': 1,
|
||||
'number_of_replicas': 0
|
||||
},
|
||||
'mappings': mappings
|
||||
})
|
||||
health = client.cluster.health(wait_for_status='green', wait_for_relocating_shards=0)
|
||||
assert health['timed_out'] == False, 'cluster health timed out %s' % health
|
||||
num_docs = random.randint(2000, 3000)
|
||||
create_bwc_index.index_documents(client, index_name, 'doc', num_docs)
|
||||
logging.info('Running basic asserts on the data added')
|
||||
create_bwc_index.run_basic_asserts(client, index_name, 'doc', num_docs)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
|
@ -25,7 +25,9 @@ def main():
|
|||
tmp_dir = tempfile.mkdtemp()
|
||||
try:
|
||||
data_dir = os.path.join(tmp_dir, 'data')
|
||||
repo_dir = os.path.join(tmp_dir, 'repo')
|
||||
logging.info('Temp data dir: %s' % data_dir)
|
||||
logging.info('Temp repo dir: %s' % repo_dir)
|
||||
|
||||
first_version = '0.20.6'
|
||||
second_version = '0.90.6'
|
||||
|
@ -36,7 +38,7 @@ def main():
|
|||
if not os.path.exists(release_dir):
|
||||
fetch_version(first_version)
|
||||
|
||||
node = create_bwc_index.start_node(first_version, release_dir, data_dir, cluster_name=index_name)
|
||||
node = create_bwc_index.start_node(first_version, release_dir, data_dir, repo_dir, cluster_name=index_name)
|
||||
client = create_bwc_index.create_client()
|
||||
|
||||
# Creates the index & indexes docs w/ first_version:
|
||||
|
@ -63,7 +65,7 @@ def main():
|
|||
fetch_version(second_version)
|
||||
|
||||
# Now also index docs with second_version:
|
||||
node = create_bwc_index.start_node(second_version, release_dir, data_dir, cluster_name=index_name)
|
||||
node = create_bwc_index.start_node(second_version, release_dir, data_dir, repo_dir, cluster_name=index_name)
|
||||
client = create_bwc_index.create_client()
|
||||
|
||||
# If we index too many docs, the random refresh/flush causes the ancient segments to be merged away:
|
||||
|
@ -102,7 +104,7 @@ def main():
|
|||
create_bwc_index.shutdown_node(node)
|
||||
print('%s server output:\n%s' % (second_version, node.stdout.read().decode('utf-8')))
|
||||
node = None
|
||||
create_bwc_index.compress_index('%s-and-%s' % (first_version, second_version), tmp_dir, 'src/test/resources/org/elasticsearch/rest/action/admin/indices/upgrade')
|
||||
create_bwc_index.compress_index('%s-and-%s' % (first_version, second_version), tmp_dir, 'core/src/test/resources/org/elasticsearch/action/admin/indices/upgrade')
|
||||
finally:
|
||||
if node is not None:
|
||||
create_bwc_index.shutdown_node(node)
|
||||
|
|
|
@ -18,4 +18,5 @@
|
|||
indices.upgrade:
|
||||
index: test_index
|
||||
|
||||
- match: {upgraded_indices.test_index: '/(\d\.)+\d/'}
|
||||
- match: {upgraded_indices.test_index.oldest_lucene_segment_version: '/(\d\.)+\d/'}
|
||||
- is_true: upgraded_indices.test_index.upgrade_version
|
||||
|
|
Loading…
Reference in New Issue