Add validation for supported index version on node join, restore, upgrade & open index (#21830)

Today we can easily join a cluster that holds an index we don't support since
we currently allow rolling upgrades from 5.x to 6.x. Along the same lines we don't check if we can support an index based on the nodes in the cluster when we open, restore or metadata-upgrade and index. This commit adds
additional safety that fails cluster state validation, open, restore and /or upgrade if there is an open index with an incompatible index version created in the cluster.

Realtes to #21670
This commit is contained in:
Simon Willnauer 2016-12-01 15:40:35 +01:00 committed by GitHub
parent 155de53fe3
commit 6522538033
22 changed files with 300 additions and 56 deletions

View File

@ -218,12 +218,17 @@ public class Version {
} }
/** /**
* Returns the smallest version between the 2. * Returns the minimum version between the 2.
*/ */
public static Version smallest(Version version1, Version version2) { public static Version min(Version version1, Version version2) {
return version1.id < version2.id ? version1 : version2; return version1.id < version2.id ? version1 : version2;
} }
/**
* Returns the maximum version between the 2
*/
public static Version max(Version version1, Version version2) { return version1.id > version2.id ? version1 : version2; }
/** /**
* Returns the version given its string representation, current version if the argument is null or empty * Returns the version given its string representation, current version if the argument is null or empty
*/ */
@ -326,7 +331,22 @@ public class Version {
bwcMajor = major; bwcMajor = major;
bwcMinor = 0; bwcMinor = 0;
} }
return Version.smallest(this, fromId(bwcMajor * 1000000 + bwcMinor * 10000 + 99)); return Version.min(this, fromId(bwcMajor * 1000000 + bwcMinor * 10000 + 99));
}
/**
* Returns the minimum created index version that this version supports. Indices created with lower versions
* can't be used with this version.
*/
public Version minimumIndexCompatibilityVersion() {
final int bwcMajor;
if (major == 5) {
bwcMajor = 2; // we jumped from 2 to 5
} else {
bwcMajor = major - 1;
}
final int bwcMinor = 0;
return Version.min(this, fromId(bwcMajor * 1000000 + bwcMinor * 10000 + 99));
} }
/** /**
@ -414,5 +434,4 @@ public class Version {
public boolean isRelease() { public boolean isRelease() {
return build == 99; return build == 99;
} }
} }

View File

@ -316,7 +316,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
if (indexSettingsBuilder.get(SETTING_VERSION_CREATED) == null) { if (indexSettingsBuilder.get(SETTING_VERSION_CREATED) == null) {
DiscoveryNodes nodes = currentState.nodes(); DiscoveryNodes nodes = currentState.nodes();
final Version createdVersion = Version.smallest(Version.CURRENT, nodes.getSmallestNonClientNodeVersion()); final Version createdVersion = Version.min(Version.CURRENT, nodes.getSmallestNonClientNodeVersion());
indexSettingsBuilder.put(SETTING_VERSION_CREATED, createdVersion); indexSettingsBuilder.put(SETTING_VERSION_CREATED, createdVersion);
} }

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cluster.metadata; package org.elasticsearch.cluster.metadata;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest;
@ -160,12 +161,14 @@ public class MetaDataIndexStateService extends AbstractComponent {
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData()); MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
ClusterBlocks.Builder blocksBuilder = ClusterBlocks.builder() ClusterBlocks.Builder blocksBuilder = ClusterBlocks.builder()
.blocks(currentState.blocks()); .blocks(currentState.blocks());
final Version minIndexCompatibilityVersion = currentState.getNodes().getMaxNodeVersion()
.minimumIndexCompatibilityVersion();
for (IndexMetaData closedMetaData : indicesToOpen) { for (IndexMetaData closedMetaData : indicesToOpen) {
final String indexName = closedMetaData.getIndex().getName(); final String indexName = closedMetaData.getIndex().getName();
IndexMetaData indexMetaData = IndexMetaData.builder(closedMetaData).state(IndexMetaData.State.OPEN).build(); IndexMetaData indexMetaData = IndexMetaData.builder(closedMetaData).state(IndexMetaData.State.OPEN).build();
// The index might be closed because we couldn't import it due to old incompatible version // The index might be closed because we couldn't import it due to old incompatible version
// We need to check that this index can be upgraded to the current version // We need to check that this index can be upgraded to the current version
indexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData); indexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData, minIndexCompatibilityVersion);
try { try {
indicesService.verifyIndexMetadata(indexMetaData, indexMetaData); indicesService.verifyIndexMetadata(indexMetaData, indexMetaData);
} catch (Exception e) { } catch (Exception e) {

View File

@ -67,13 +67,13 @@ public class MetaDataIndexUpgradeService extends AbstractComponent {
* If the index does not need upgrade it returns the index metadata unchanged, otherwise it returns a modified index metadata. If index * If the index does not need upgrade it returns the index metadata unchanged, otherwise it returns a modified index metadata. If index
* cannot be updated the method throws an exception. * cannot be updated the method throws an exception.
*/ */
public IndexMetaData upgradeIndexMetaData(IndexMetaData indexMetaData) { public IndexMetaData upgradeIndexMetaData(IndexMetaData indexMetaData, Version minimumIndexCompatibilityVersion) {
// Throws an exception if there are too-old segments: // Throws an exception if there are too-old segments:
if (isUpgraded(indexMetaData)) { if (isUpgraded(indexMetaData)) {
assert indexMetaData == archiveBrokenIndexSettings(indexMetaData) : "all settings must have been upgraded before"; assert indexMetaData == archiveBrokenIndexSettings(indexMetaData) : "all settings must have been upgraded before";
return indexMetaData; return indexMetaData;
} }
checkSupportedVersion(indexMetaData); checkSupportedVersion(indexMetaData, minimumIndexCompatibilityVersion);
IndexMetaData newMetaData = indexMetaData; IndexMetaData newMetaData = indexMetaData;
// we have to run this first otherwise in we try to create IndexSettings // we have to run this first otherwise in we try to create IndexSettings
// with broken settings and fail in checkMappingsCompatibility // with broken settings and fail in checkMappingsCompatibility
@ -92,21 +92,26 @@ public class MetaDataIndexUpgradeService extends AbstractComponent {
} }
/** /**
* Elasticsearch 5.0 no longer supports indices with pre Lucene v5.0 (Elasticsearch v2.0.0.beta1) segments. All indices * Elasticsearch v6.0 no longer supports indices created pre v5.0. All indices
* that were created before Elasticsearch v2.0.0.beta1 should be reindexed in Elasticsearch 2.x * that were created before Elasticsearch v5.0 should be re-indexed in Elasticsearch 5.x
* before they can be opened by this version of elasticsearch. */ * before they can be opened by this version of elasticsearch.
private void checkSupportedVersion(IndexMetaData indexMetaData) { */
if (indexMetaData.getState() == IndexMetaData.State.OPEN && isSupportedVersion(indexMetaData) == false) { private void checkSupportedVersion(IndexMetaData indexMetaData, Version minimumIndexCompatibilityVersion) {
throw new IllegalStateException("The index [" + indexMetaData.getIndex() + "] was created before v2.0.0.beta1." if (indexMetaData.getState() == IndexMetaData.State.OPEN && isSupportedVersion(indexMetaData,
+ " It should be reindexed in Elasticsearch 2.x before upgrading to " + Version.CURRENT + "."); minimumIndexCompatibilityVersion) == false) {
throw new IllegalStateException("The index [" + indexMetaData.getIndex() + "] was created with version ["
+ indexMetaData.getCreationVersion() + "] but the minimum compatible version is ["
+ minimumIndexCompatibilityVersion + "]. It should be re-indexed in Elasticsearch " + minimumIndexCompatibilityVersion.major
+ ".x before upgrading to " + Version.CURRENT + ".");
} }
} }
/* /*
* Returns true if this index can be supported by the current version of elasticsearch * Returns true if this index can be supported by the current version of elasticsearch
*/ */
private static boolean isSupportedVersion(IndexMetaData indexMetaData) { private static boolean isSupportedVersion(IndexMetaData indexMetaData, Version minimumIndexCompatibilityVersion) {
return indexMetaData.getCreationVersion().onOrAfter(Version.V_5_0_0_beta1); return indexMetaData.getCreationVersion().onOrAfter(minimumIndexCompatibilityVersion);
} }
/** /**

View File

@ -56,10 +56,13 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
private final String masterNodeId; private final String masterNodeId;
private final String localNodeId; private final String localNodeId;
private final Version minNonClientNodeVersion; private final Version minNonClientNodeVersion;
private final Version maxNodeVersion;
private final Version minNodeVersion;
private DiscoveryNodes(ImmutableOpenMap<String, DiscoveryNode> nodes, ImmutableOpenMap<String, DiscoveryNode> dataNodes, private DiscoveryNodes(ImmutableOpenMap<String, DiscoveryNode> nodes, ImmutableOpenMap<String, DiscoveryNode> dataNodes,
ImmutableOpenMap<String, DiscoveryNode> masterNodes, ImmutableOpenMap<String, DiscoveryNode> ingestNodes, ImmutableOpenMap<String, DiscoveryNode> masterNodes, ImmutableOpenMap<String, DiscoveryNode> ingestNodes,
String masterNodeId, String localNodeId, Version minNonClientNodeVersion) { String masterNodeId, String localNodeId, Version minNonClientNodeVersion, Version maxNodeVersion,
Version minNodeVersion) {
this.nodes = nodes; this.nodes = nodes;
this.dataNodes = dataNodes; this.dataNodes = dataNodes;
this.masterNodes = masterNodes; this.masterNodes = masterNodes;
@ -67,6 +70,8 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
this.masterNodeId = masterNodeId; this.masterNodeId = masterNodeId;
this.localNodeId = localNodeId; this.localNodeId = localNodeId;
this.minNonClientNodeVersion = minNonClientNodeVersion; this.minNonClientNodeVersion = minNonClientNodeVersion;
this.minNodeVersion = minNodeVersion;
this.maxNodeVersion = maxNodeVersion;
} }
@Override @Override
@ -235,6 +240,24 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
return minNonClientNodeVersion; return minNonClientNodeVersion;
} }
/**
* Returns the version of the node with the oldest version in the cluster.
*
* @return the oldest version in the cluster
*/
public Version getMinNodeVersion() {
return minNodeVersion;
}
/**
* Returns the version of the node with the yougest version in the cluster
*
* @return the oldest version in the cluster
*/
public Version getMaxNodeVersion() {
return maxNodeVersion;
}
/** /**
* Resolve a node with a given id * Resolve a node with a given id
* *
@ -631,25 +654,27 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
ImmutableOpenMap.Builder<String, DiscoveryNode> masterNodesBuilder = ImmutableOpenMap.builder(); ImmutableOpenMap.Builder<String, DiscoveryNode> masterNodesBuilder = ImmutableOpenMap.builder();
ImmutableOpenMap.Builder<String, DiscoveryNode> ingestNodesBuilder = ImmutableOpenMap.builder(); ImmutableOpenMap.Builder<String, DiscoveryNode> ingestNodesBuilder = ImmutableOpenMap.builder();
Version minNodeVersion = Version.CURRENT; Version minNodeVersion = Version.CURRENT;
Version maxNodeVersion = Version.CURRENT;
Version minNonClientNodeVersion = Version.CURRENT; Version minNonClientNodeVersion = Version.CURRENT;
for (ObjectObjectCursor<String, DiscoveryNode> nodeEntry : nodes) { for (ObjectObjectCursor<String, DiscoveryNode> nodeEntry : nodes) {
if (nodeEntry.value.isDataNode()) { if (nodeEntry.value.isDataNode()) {
dataNodesBuilder.put(nodeEntry.key, nodeEntry.value); dataNodesBuilder.put(nodeEntry.key, nodeEntry.value);
minNonClientNodeVersion = Version.smallest(minNonClientNodeVersion, nodeEntry.value.getVersion()); minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion());
} }
if (nodeEntry.value.isMasterNode()) { if (nodeEntry.value.isMasterNode()) {
masterNodesBuilder.put(nodeEntry.key, nodeEntry.value); masterNodesBuilder.put(nodeEntry.key, nodeEntry.value);
minNonClientNodeVersion = Version.smallest(minNonClientNodeVersion, nodeEntry.value.getVersion()); minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion());
} }
if (nodeEntry.value.isIngestNode()) { if (nodeEntry.value.isIngestNode()) {
ingestNodesBuilder.put(nodeEntry.key, nodeEntry.value); ingestNodesBuilder.put(nodeEntry.key, nodeEntry.value);
} }
minNodeVersion = Version.smallest(minNodeVersion, nodeEntry.value.getVersion()); minNodeVersion = Version.min(minNodeVersion, nodeEntry.value.getVersion());
maxNodeVersion = Version.max(maxNodeVersion, nodeEntry.value.getVersion());
} }
return new DiscoveryNodes( return new DiscoveryNodes(
nodes.build(), dataNodesBuilder.build(), masterNodesBuilder.build(), ingestNodesBuilder.build(), nodes.build(), dataNodesBuilder.build(), masterNodesBuilder.build(), ingestNodesBuilder.build(),
masterNodeId, localNodeId, minNonClientNodeVersion masterNodeId, localNodeId, minNonClientNodeVersion, maxNodeVersion, minNodeVersion
); );
} }

View File

@ -19,14 +19,16 @@
package org.elasticsearch.discovery.zen; package org.elasticsearch.discovery.zen;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler; import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportChannel;
@ -37,6 +39,7 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
public class MembershipAction extends AbstractComponent { public class MembershipAction extends AbstractComponent {
@ -58,21 +61,20 @@ public class MembershipAction extends AbstractComponent {
private final TransportService transportService; private final TransportService transportService;
private final DiscoveryNodesProvider nodesProvider;
private final MembershipListener listener; private final MembershipListener listener;
public MembershipAction(Settings settings, TransportService transportService, public MembershipAction(Settings settings, TransportService transportService,
DiscoveryNodesProvider nodesProvider, MembershipListener listener) { Supplier<DiscoveryNode> localNodeSupplier, MembershipListener listener) {
super(settings); super(settings);
this.transportService = transportService; this.transportService = transportService;
this.nodesProvider = nodesProvider;
this.listener = listener; this.listener = listener;
transportService.registerRequestHandler(DISCOVERY_JOIN_ACTION_NAME, JoinRequest::new, transportService.registerRequestHandler(DISCOVERY_JOIN_ACTION_NAME, JoinRequest::new,
ThreadPool.Names.GENERIC, new JoinRequestRequestHandler()); ThreadPool.Names.GENERIC, new JoinRequestRequestHandler());
transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME, ValidateJoinRequest::new, transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME,
ThreadPool.Names.GENERIC, new ValidateJoinRequestRequestHandler()); () -> new ValidateJoinRequest(localNodeSupplier), ThreadPool.Names.GENERIC,
new ValidateJoinRequestRequestHandler());
transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME, LeaveRequest::new, transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME, LeaveRequest::new,
ThreadPool.Names.GENERIC, new LeaveRequestRequestHandler()); ThreadPool.Names.GENERIC, new LeaveRequestRequestHandler());
} }
@ -152,20 +154,23 @@ public class MembershipAction extends AbstractComponent {
} }
} }
class ValidateJoinRequest extends TransportRequest { static class ValidateJoinRequest extends TransportRequest {
private final Supplier<DiscoveryNode> localNode;
private ClusterState state; private ClusterState state;
ValidateJoinRequest() { ValidateJoinRequest(Supplier<DiscoveryNode> localNode) {
this.localNode = localNode;
} }
ValidateJoinRequest(ClusterState state) { ValidateJoinRequest(ClusterState state) {
this.state = state; this.state = state;
this.localNode = state.nodes()::getLocalNode;
} }
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
this.state = ClusterState.Builder.readFrom(in, nodesProvider.nodes().getLocalNode()); this.state = ClusterState.Builder.readFrom(in, localNode.get());
} }
@Override @Override
@ -175,15 +180,31 @@ public class MembershipAction extends AbstractComponent {
} }
} }
class ValidateJoinRequestRequestHandler implements TransportRequestHandler<ValidateJoinRequest> { static class ValidateJoinRequestRequestHandler implements TransportRequestHandler<ValidateJoinRequest> {
@Override @Override
public void messageReceived(ValidateJoinRequest request, TransportChannel channel) throws Exception { public void messageReceived(ValidateJoinRequest request, TransportChannel channel) throws Exception {
ensureIndexCompatibility(Version.CURRENT.minimumIndexCompatibilityVersion(), request.state.getMetaData());
// for now, the mere fact that we can serialize the cluster state acts as validation.... // for now, the mere fact that we can serialize the cluster state acts as validation....
channel.sendResponse(TransportResponse.Empty.INSTANCE); channel.sendResponse(TransportResponse.Empty.INSTANCE);
} }
} }
/**
* Ensures that all indices are compatible with the supported index version.
* @throws IllegalStateException if any index is incompatible with the given version
*/
static void ensureIndexCompatibility(final Version supportedIndexVersion, MetaData metaData) {
// we ensure that all indices in the cluster we join are compatible with us no matter if they are
// closed or not we can't read mappings of these indices so we need to reject the join...
for (IndexMetaData idxMetaData : metaData) {
if (idxMetaData.getCreationVersion().before(supportedIndexVersion)) {
throw new IllegalStateException("index " + idxMetaData.getIndex() + " version not supported: "
+ idxMetaData.getCreationVersion() + " minimum compatible index version is: " + supportedIndexVersion);
}
}
}
public static class LeaveRequest extends TransportRequest { public static class LeaveRequest extends TransportRequest {
private DiscoveryNode node; private DiscoveryNode node;

View File

@ -410,7 +410,6 @@ public class NodeJoinController extends AbstractComponent {
@Override @Override
public BatchResult<DiscoveryNode> execute(ClusterState currentState, List<DiscoveryNode> joiningNodes) throws Exception { public BatchResult<DiscoveryNode> execute(ClusterState currentState, List<DiscoveryNode> joiningNodes) throws Exception {
final BatchResult.Builder<DiscoveryNode> results = BatchResult.builder(); final BatchResult.Builder<DiscoveryNode> results = BatchResult.builder();
final DiscoveryNodes currentNodes = currentState.nodes(); final DiscoveryNodes currentNodes = currentState.nodes();
boolean nodesChanged = false; boolean nodesChanged = false;
ClusterState.Builder newState; ClusterState.Builder newState;
@ -435,8 +434,10 @@ public class NodeJoinController extends AbstractComponent {
assert nodesBuilder.isLocalNodeElectedMaster(); assert nodesBuilder.isLocalNodeElectedMaster();
Version minNodeVersion = Version.CURRENT;
// processing any joins // processing any joins
for (final DiscoveryNode node : joiningNodes) { for (final DiscoveryNode node : joiningNodes) {
minNodeVersion = Version.min(minNodeVersion, node.getVersion());
if (node.equals(BECOME_MASTER_TASK) || node.equals(FINISH_ELECTION_TASK)) { if (node.equals(BECOME_MASTER_TASK) || node.equals(FINISH_ELECTION_TASK)) {
// noop // noop
} else if (currentNodes.nodeExists(node)) { } else if (currentNodes.nodeExists(node)) {
@ -452,7 +453,9 @@ public class NodeJoinController extends AbstractComponent {
} }
results.success(node); results.success(node);
} }
// we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices
// we have to reject nodes that don't support all indices we have in this cluster
MembershipAction.ensureIndexCompatibility(minNodeVersion.minimumIndexCompatibilityVersion(), currentState.getMetaData());
if (nodesChanged) { if (nodesChanged) {
newState.nodes(nodesBuilder); newState.nodes(nodesBuilder);
return results.build(allocationService.reroute(newState.build(), "node_join")); return results.build(allocationService.reroute(newState.build(), "node_join"));

View File

@ -183,7 +183,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
new NewPendingClusterStateListener(), new NewPendingClusterStateListener(),
discoverySettings, discoverySettings,
clusterService.getClusterName()); clusterService.getClusterName());
this.membership = new MembershipAction(settings, transportService, this, new MembershipListener()); this.membership = new MembershipAction(settings, transportService, this::localNode, new MembershipListener());
this.joinThreadControl = new JoinThreadControl(threadPool); this.joinThreadControl = new JoinThreadControl(threadPool);
transportService.registerRequestHandler( transportService.registerRequestHandler(
@ -301,7 +301,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
if (!clusterChangedEvent.state().getNodes().isLocalNodeElectedMaster()) { if (!clusterChangedEvent.state().getNodes().isLocalNodeElectedMaster()) {
throw new IllegalStateException("Shouldn't publish state when not master"); throw new IllegalStateException("Shouldn't publish state when not master");
} }
try { try {
publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener); publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener);
} catch (FailedToCommitClusterStateException t) { } catch (FailedToCommitClusterStateException t) {
@ -852,6 +851,9 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
if (nodeJoinController == null) { if (nodeJoinController == null) {
throw new IllegalStateException("discovery module is not yet started"); throw new IllegalStateException("discovery module is not yet started");
} else { } else {
// we do this in a couple of places including the cluster update thread. This one here is really just best effort
// to ensure we fail as fast as possible.
MembershipAction.ensureIndexCompatibility(node.getVersion().minimumIndexCompatibilityVersion(), state.getMetaData());
// try and connect to the node, if it fails, we can raise an exception back to the client... // try and connect to the node, if it fails, we can raise an exception back to the client...
transportService.connectToNode(node); transportService.connectToNode(node);

View File

@ -245,7 +245,8 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
boolean changed = false; boolean changed = false;
final MetaData.Builder upgradedMetaData = MetaData.builder(metaData); final MetaData.Builder upgradedMetaData = MetaData.builder(metaData);
for (IndexMetaData indexMetaData : metaData) { for (IndexMetaData indexMetaData : metaData) {
IndexMetaData newMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData); IndexMetaData newMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData,
Version.CURRENT.minimumIndexCompatibilityVersion());
changed |= indexMetaData != newMetaData; changed |= indexMetaData != newMetaData;
upgradedMetaData.put(newMetaData, false); upgradedMetaData.put(newMetaData, false);
} }

View File

@ -21,6 +21,7 @@ package org.elasticsearch.gateway;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier; import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.block.ClusterBlocks;
@ -28,6 +29,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService; import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
@ -126,10 +128,18 @@ public class LocalAllocateDangledIndices extends AbstractComponent {
MetaData.Builder metaData = MetaData.builder(currentState.metaData()); MetaData.Builder metaData = MetaData.builder(currentState.metaData());
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable()); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable());
final Version minIndexCompatibilityVersion = currentState.getNodes().getMaxNodeVersion()
.minimumIndexCompatibilityVersion();
boolean importNeeded = false; boolean importNeeded = false;
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
for (IndexMetaData indexMetaData : request.indices) { for (IndexMetaData indexMetaData : request.indices) {
if (indexMetaData.getCreationVersion().before(minIndexCompatibilityVersion)) {
logger.warn("ignoring dangled index [{}] on node [{}]" +
" since it's created version [{}] is not supported by at least one node in the cluster minVersion [{}]",
indexMetaData.getIndex(), request.fromNode, indexMetaData.getCreationVersion(),
minIndexCompatibilityVersion);
continue;
}
if (currentState.metaData().hasIndex(indexMetaData.getIndex().getName())) { if (currentState.metaData().hasIndex(indexMetaData.getIndex().getName())) {
continue; continue;
} }
@ -144,7 +154,8 @@ public class LocalAllocateDangledIndices extends AbstractComponent {
try { try {
// The dangled index might be from an older version, we need to make sure it's compatible // The dangled index might be from an older version, we need to make sure it's compatible
// with the current version and upgrade it if needed. // with the current version and upgrade it if needed.
upgradedIndexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData); upgradedIndexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData,
minIndexCompatibilityVersion);
} catch (Exception ex) { } catch (Exception ex) {
// upgrade failed - adding index as closed // upgrade failed - adding index as closed
logger.warn((Supplier<?>) () -> new ParameterizedMessage("found dangled index [{}] on node [{}]. This index cannot be upgraded to the latest version, adding as closed", indexMetaData.getIndex(), request.fromNode), ex); logger.warn((Supplier<?>) () -> new ParameterizedMessage("found dangled index [{}] on node [{}]. This index cannot be upgraded to the latest version, adding as closed", indexMetaData.getIndex(), request.fromNode), ex);

View File

@ -83,6 +83,7 @@ import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static java.util.Collections.min;
import static java.util.Collections.unmodifiableSet; import static java.util.Collections.unmodifiableSet;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE;
@ -225,6 +226,8 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
if (!renamedIndices.isEmpty()) { if (!renamedIndices.isEmpty()) {
// We have some indices to restore // We have some indices to restore
ImmutableOpenMap.Builder<ShardId, RestoreInProgress.ShardRestoreStatus> shardsBuilder = ImmutableOpenMap.builder(); ImmutableOpenMap.Builder<ShardId, RestoreInProgress.ShardRestoreStatus> shardsBuilder = ImmutableOpenMap.builder();
final Version minIndexCompatibilityVersion = currentState.getNodes().getMaxNodeVersion()
.minimumIndexCompatibilityVersion();
for (Map.Entry<String, String> indexEntry : renamedIndices.entrySet()) { for (Map.Entry<String, String> indexEntry : renamedIndices.entrySet()) {
String index = indexEntry.getValue(); String index = indexEntry.getValue();
boolean partial = checkPartial(index); boolean partial = checkPartial(index);
@ -233,7 +236,8 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
IndexMetaData snapshotIndexMetaData = metaData.index(index); IndexMetaData snapshotIndexMetaData = metaData.index(index);
snapshotIndexMetaData = updateIndexSettings(snapshotIndexMetaData, request.indexSettings, request.ignoreIndexSettings); snapshotIndexMetaData = updateIndexSettings(snapshotIndexMetaData, request.indexSettings, request.ignoreIndexSettings);
try { try {
snapshotIndexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(snapshotIndexMetaData); snapshotIndexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(snapshotIndexMetaData,
minIndexCompatibilityVersion);
} catch (Exception ex) { } catch (Exception ex) {
throw new SnapshotRestoreException(snapshot, "cannot restore index [" + index + "] because it cannot be upgraded", ex); throw new SnapshotRestoreException(snapshot, "cannot restore index [" + index + "] because it cannot be upgraded", ex);
} }

View File

@ -909,7 +909,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
// we pick the smallest of the 2, to support both backward and forward compatibility // we pick the smallest of the 2, to support both backward and forward compatibility
// note, this is the only place we need to do this, since from here on, we use the serialized version // note, this is the only place we need to do this, since from here on, we use the serialized version
// as the version to use also when the node receiving this request will send the response with // as the version to use also when the node receiving this request will send the response with
Version version = Version.smallest(getCurrentVersion(), node.getVersion()); Version version = Version.min(getCurrentVersion(), node.getVersion());
stream.setVersion(version); stream.setVersion(version);
threadPool.getThreadContext().writeTo(stream); threadPool.getThreadContext().writeTo(stream);

View File

@ -64,7 +64,37 @@ public class VersionTests extends ESTestCase {
assertTrue(Version.fromString("5.0.0").onOrAfter(Version.fromString("5.0.0-beta2"))); assertTrue(Version.fromString("5.0.0").onOrAfter(Version.fromString("5.0.0-beta2")));
assertTrue(Version.fromString("5.0.0-rc1").onOrAfter(Version.fromString("5.0.0-beta24"))); assertTrue(Version.fromString("5.0.0-rc1").onOrAfter(Version.fromString("5.0.0-beta24")));
assertTrue(Version.fromString("5.0.0-alpha24").before(Version.fromString("5.0.0-beta0"))); assertTrue(Version.fromString("5.0.0-alpha24").before(Version.fromString("5.0.0-beta0")));
}
public void testMin() {
assertEquals(VersionUtils.getPreviousVersion(), Version.min(Version.CURRENT, VersionUtils.getPreviousVersion()));
assertEquals(Version.fromString("1.0.1"), Version.min(Version.fromString("1.0.1"), Version.CURRENT));
Version version = VersionUtils.randomVersion(random());
Version version1 = VersionUtils.randomVersion(random());
if (version.id <= version1.id) {
assertEquals(version, Version.min(version1, version));
} else {
assertEquals(version1, Version.min(version1, version));
}
}
public void testMax() {
assertEquals(Version.CURRENT, Version.max(Version.CURRENT, VersionUtils.getPreviousVersion()));
assertEquals(Version.CURRENT, Version.max(Version.fromString("1.0.1"), Version.CURRENT));
Version version = VersionUtils.randomVersion(random());
Version version1 = VersionUtils.randomVersion(random());
if (version.id >= version1.id) {
assertEquals(version, Version.max(version1, version));
} else {
assertEquals(version1, Version.max(version1, version));
}
}
public void testMinimumIndexCompatibilityVersion() {
assertEquals(Version.V_5_0_0, Version.V_6_0_0_alpha1_UNRELEASED.minimumIndexCompatibilityVersion());
assertEquals(Version.V_2_0_0, Version.V_5_0_0.minimumIndexCompatibilityVersion());
assertEquals(Version.V_2_0_0, Version.V_5_1_1_UNRELEASED.minimumIndexCompatibilityVersion());
assertEquals(Version.V_2_0_0, Version.V_5_0_0_alpha1.minimumIndexCompatibilityVersion());
} }
public void testVersionConstantPresent() { public void testVersionConstantPresent() {

View File

@ -189,7 +189,7 @@ public class OldIndexBackwardsCompatibilityIT extends ESIntegTestCase {
for (Version v : VersionUtils.allReleasedVersions()) { for (Version v : VersionUtils.allReleasedVersions()) {
if (VersionUtils.isSnapshot(v)) continue; // snapshots are unreleased, so there is no backcompat yet if (VersionUtils.isSnapshot(v)) continue; // snapshots are unreleased, so there is no backcompat yet
if (v.isRelease() == false) continue; // no guarantees for prereleases if (v.isRelease() == false) continue; // no guarantees for prereleases
if (v.before(Version.V_5_0_0)) continue; // we can only support one major version backward if (v.before(Version.CURRENT.minimumIndexCompatibilityVersion())) continue; // we can only support one major version backward
if (v.equals(Version.CURRENT)) continue; // the current version is always compatible with itself if (v.equals(Version.CURRENT)) continue; // the current version is always compatible with itself
expectedVersions.add("index-" + v.toString() + ".zip"); expectedVersions.add("index-" + v.toString() + ".zip");
} }

View File

@ -97,7 +97,7 @@ public class RestoreBackwardsCompatIT extends AbstractSnapshotIntegTestCase {
for (Version v : VersionUtils.allReleasedVersions()) { for (Version v : VersionUtils.allReleasedVersions()) {
if (VersionUtils.isSnapshot(v)) continue; // snapshots are unreleased, so there is no backcompat yet if (VersionUtils.isSnapshot(v)) continue; // snapshots are unreleased, so there is no backcompat yet
if (v.isRelease() == false) continue; // no guarantees for prereleases if (v.isRelease() == false) continue; // no guarantees for prereleases
if (v.before(Version.V_5_0_0)) continue; // we only support versions N and N-1 if (v.before(Version.CURRENT.minimumIndexCompatibilityVersion())) continue; // we only support versions N and N-1
if (v.equals(Version.CURRENT)) continue; // the current version is always compatible with itself if (v.equals(Version.CURRENT)) continue; // the current version is always compatible with itself
expectedVersions.add(v.toString()); expectedVersions.add(v.toString());
} }
@ -206,7 +206,7 @@ public class RestoreBackwardsCompatIT extends AbstractSnapshotIntegTestCase {
logger.info("--> restoring unsupported snapshot"); logger.info("--> restoring unsupported snapshot");
try { try {
client().admin().cluster().prepareRestoreSnapshot(repo, snapshot).setRestoreGlobalState(true).setWaitForCompletion(true).get(); client().admin().cluster().prepareRestoreSnapshot(repo, snapshot).setRestoreGlobalState(true).setWaitForCompletion(true).get();
fail("should have failed to restore"); fail("should have failed to restore - " + repo);
} catch (SnapshotRestoreException ex) { } catch (SnapshotRestoreException ex) {
assertThat(ex.getMessage(), containsString("cannot restore index")); assertThat(ex.getMessage(), containsString("cannot restore index"));
assertThat(ex.getMessage(), containsString("because it cannot be upgraded")); assertThat(ex.getMessage(), containsString("because it cannot be upgraded"));

View File

@ -60,11 +60,11 @@ public class MetaDataIndexUpgradeServiceTests extends ESTestCase {
Collections.emptyMap()), IndexScopedSettings.DEFAULT_SCOPED_SETTINGS); Collections.emptyMap()), IndexScopedSettings.DEFAULT_SCOPED_SETTINGS);
IndexMetaData src = newIndexMeta("foo", Settings.builder().put("index.refresh_interval", "-200").build()); IndexMetaData src = newIndexMeta("foo", Settings.builder().put("index.refresh_interval", "-200").build());
assertFalse(service.isUpgraded(src)); assertFalse(service.isUpgraded(src));
src = service.upgradeIndexMetaData(src); src = service.upgradeIndexMetaData(src, Version.CURRENT.minimumIndexCompatibilityVersion());
assertTrue(service.isUpgraded(src)); assertTrue(service.isUpgraded(src));
assertEquals("-200", src.getSettings().get("archived.index.refresh_interval")); assertEquals("-200", src.getSettings().get("archived.index.refresh_interval"));
assertNull(src.getSettings().get("index.refresh_interval")); assertNull(src.getSettings().get("index.refresh_interval"));
assertSame(src, service.upgradeIndexMetaData(src)); // no double upgrade assertSame(src, service.upgradeIndexMetaData(src, Version.CURRENT.minimumIndexCompatibilityVersion())); // no double upgrade
} }
public void testIsUpgraded() { public void testIsUpgraded() {
@ -87,16 +87,17 @@ public class MetaDataIndexUpgradeServiceTests extends ESTestCase {
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.fromString("2.4.0")) .put(IndexMetaData.SETTING_VERSION_CREATED, Version.fromString("2.4.0"))
.put(IndexMetaData.SETTING_VERSION_MINIMUM_COMPATIBLE, .put(IndexMetaData.SETTING_VERSION_MINIMUM_COMPATIBLE,
Version.CURRENT.luceneVersion.toString()).build()); Version.CURRENT.luceneVersion.toString()).build());
String message = expectThrows(IllegalStateException.class, () -> service.upgradeIndexMetaData(metaData)).getMessage(); String message = expectThrows(IllegalStateException.class, () -> service.upgradeIndexMetaData(metaData,
assertEquals(message, "The index [[foo/BOOM]] was created before v2.0.0.beta1. It should be reindexed in Elasticsearch 2.x " + Version.CURRENT.minimumIndexCompatibilityVersion())).getMessage();
"before upgrading to " + Version.CURRENT.toString() + "."); assertEquals(message, "The index [[foo/BOOM]] was created with version [2.4.0] but the minimum compatible version is [5.0.0]." +
" It should be re-indexed in Elasticsearch 5.x before upgrading to " + Version.CURRENT.toString() + ".");
IndexMetaData goodMeta = newIndexMeta("foo", Settings.builder() IndexMetaData goodMeta = newIndexMeta("foo", Settings.builder()
.put(IndexMetaData.SETTING_VERSION_UPGRADED, Version.V_5_0_0_beta1) .put(IndexMetaData.SETTING_VERSION_UPGRADED, Version.V_5_0_0_beta1)
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.fromString("5.1.0")) .put(IndexMetaData.SETTING_VERSION_CREATED, Version.fromString("5.1.0"))
.put(IndexMetaData.SETTING_VERSION_MINIMUM_COMPATIBLE, .put(IndexMetaData.SETTING_VERSION_MINIMUM_COMPATIBLE,
Version.CURRENT.luceneVersion.toString()).build()); Version.CURRENT.luceneVersion.toString()).build());
service.upgradeIndexMetaData(goodMeta); service.upgradeIndexMetaData(goodMeta, Version.V_5_0_0.minimumIndexCompatibilityVersion());
} }
public static IndexMetaData newIndexMeta(String name, Settings indexSettings) { public static IndexMetaData newIndexMeta(String name, Settings indexSettings) {

View File

@ -22,6 +22,7 @@ package org.elasticsearch.cluster.node;
import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -250,4 +251,22 @@ public class DiscoveryNodesTests extends ESTestCase {
abstract Set<String> matchingNodeIds(DiscoveryNodes nodes); abstract Set<String> matchingNodeIds(DiscoveryNodes nodes);
} }
public void testMaxMinNodeVersion() {
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
discoBuilder.add(new DiscoveryNode("name_" + 1, "node_" + 1, buildNewFakeTransportAddress(), Collections.emptyMap(),
new HashSet<>(randomSubsetOf(Arrays.asList(DiscoveryNode.Role.values()))),
Version.fromString("5.1.0")));
discoBuilder.add(new DiscoveryNode("name_" + 2, "node_" + 2, buildNewFakeTransportAddress(), Collections.emptyMap(),
new HashSet<>(randomSubsetOf(Arrays.asList(DiscoveryNode.Role.values()))),
Version.fromString("6.3.0")));
discoBuilder.add(new DiscoveryNode("name_" + 3, "node_" + 3, buildNewFakeTransportAddress(), Collections.emptyMap(),
new HashSet<>(randomSubsetOf(Arrays.asList(DiscoveryNode.Role.values()))),
Version.fromString("1.1.0")));
discoBuilder.localNodeId("name_1");
discoBuilder.masterNodeId("name_2");
DiscoveryNodes build = discoBuilder.build();
assertEquals( Version.fromString("6.3.0"), build.getMaxNodeVersion());
assertEquals( Version.fromString("1.1.0"), build.getMinNodeVersion());
}
} }

View File

@ -20,13 +20,16 @@
package org.elasticsearch.discovery.zen; package org.elasticsearch.discovery.zen;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
@ -35,22 +38,40 @@ import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNode.Role; import org.elasticsearch.cluster.node.DiscoveryNode.Role;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.zen.PublishClusterStateActionTests.AssertingAckListener; import org.elasticsearch.discovery.zen.PublishClusterStateActionTests.AssertingAckListener;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseOptions;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet; import static java.util.Collections.emptySet;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
import static org.elasticsearch.cluster.routing.RoutingTableTests.updateActiveAllocations;
import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING; import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING;
import static org.elasticsearch.discovery.zen.ZenDiscovery.shouldIgnoreOrRejectNewClusterState; import static org.elasticsearch.discovery.zen.ZenDiscovery.shouldIgnoreOrRejectNewClusterState;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
@ -283,4 +304,82 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
}); });
return discoveryNodes; return discoveryNodes;
} }
public void testValidateOnUnsupportedIndexVersionCreated() throws Exception {
final int iters = randomIntBetween(3, 10);
for (int i = 0; i < iters; i++) {
ClusterState.Builder stateBuilder = ClusterState.builder(ClusterName.DEFAULT);
final DiscoveryNode otherNode = new DiscoveryNode("other_node", buildNewFakeTransportAddress(), emptyMap(),
EnumSet.allOf(DiscoveryNode.Role.class), Version.CURRENT);
MembershipAction.ValidateJoinRequestRequestHandler request = new MembershipAction.ValidateJoinRequestRequestHandler();
final boolean incompatible = randomBoolean();
IndexMetaData indexMetaData = IndexMetaData.builder("test").settings(Settings.builder()
.put(SETTING_VERSION_CREATED, incompatible ? VersionUtils.getPreviousVersion(Version.CURRENT.minimumIndexCompatibilityVersion())
: VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumIndexCompatibilityVersion(), Version.CURRENT))
.put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)
.put(SETTING_CREATION_DATE, System.currentTimeMillis()))
.state(IndexMetaData.State.OPEN)
.build();
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(indexMetaData.getIndex());
RoutingTable.Builder routing = new RoutingTable.Builder();
routing.addAsNew(indexMetaData);
final ShardId shardId = new ShardId("test", "_na_", 0);
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId);
final DiscoveryNode primaryNode = otherNode;
indexShardRoutingBuilder.addShard(TestShardRouting.newShardRouting("test", 0, primaryNode.getId(), null, true,
ShardRoutingState.INITIALIZING, new UnassignedInfo(UnassignedInfo.Reason.INDEX_REOPENED, "getting there")));
indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build());
IndexRoutingTable indexRoutingTable = indexRoutingTableBuilder.build();
IndexMetaData updatedIndexMetaData = updateActiveAllocations(indexRoutingTable, indexMetaData);
stateBuilder.metaData(MetaData.builder().put(updatedIndexMetaData, false).generateClusterUuidIfNeeded())
.routingTable(RoutingTable.builder().add(indexRoutingTable).build());
if (incompatible) {
IllegalStateException ex = expectThrows(IllegalStateException.class, () ->
request.messageReceived(new MembershipAction.ValidateJoinRequest(stateBuilder.build()), null));
assertEquals("index [test] version not supported: "
+ VersionUtils.getPreviousVersion(Version.CURRENT.minimumCompatibilityVersion())
+ " minimum compatible index version is: " + Version.CURRENT.minimumCompatibilityVersion(), ex.getMessage());
} else {
AtomicBoolean sendResponse = new AtomicBoolean(false);
request.messageReceived(new MembershipAction.ValidateJoinRequest(stateBuilder.build()), new TransportChannel() {
@Override
public String action() {
return null;
}
@Override
public String getProfileName() {
return null;
}
@Override
public long getRequestId() {
return 0;
}
@Override
public String getChannelType() {
return null;
}
@Override
public void sendResponse(TransportResponse response) throws IOException {
sendResponse.set(true);
}
@Override
public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
}
@Override
public void sendResponse(Exception exception) throws IOException {
}
});
assertTrue(sendResponse.get());
}
}
}
} }

View File

@ -414,7 +414,7 @@ public class GatewayIndexStateIT extends ESIntegTestCase {
IndexMetaData metaData = state.getMetaData().index("test"); IndexMetaData metaData = state.getMetaData().index("test");
for (NodeEnvironment services : internalCluster().getInstances(NodeEnvironment.class)) { for (NodeEnvironment services : internalCluster().getInstances(NodeEnvironment.class)) {
IndexMetaData brokenMeta = IndexMetaData.builder(metaData).settings(Settings.builder().put(metaData.getSettings()) IndexMetaData brokenMeta = IndexMetaData.builder(metaData).settings(Settings.builder().put(metaData.getSettings())
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_5_0_0_beta1.id) .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT.minimumIndexCompatibilityVersion().id)
// this is invalid but should be archived // this is invalid but should be archived
.put("index.similarity.BM25.type", "classic") .put("index.similarity.BM25.type", "classic")
// this one is not validated ahead of time and breaks allocation // this one is not validated ahead of time and breaks allocation

View File

@ -396,7 +396,7 @@ public class GatewayMetaStateTests extends ESAllocationTestCase {
this.upgrade = upgrade; this.upgrade = upgrade;
} }
@Override @Override
public IndexMetaData upgradeIndexMetaData(IndexMetaData indexMetaData) { public IndexMetaData upgradeIndexMetaData(IndexMetaData indexMetaData, Version minimumIndexCompatibilityVersion) {
return upgrade ? IndexMetaData.builder(indexMetaData).build() : indexMetaData; return upgrade ? IndexMetaData.builder(indexMetaData).build() : indexMetaData;
} }
} }

View File

@ -20,6 +20,7 @@
package org.elasticsearch.indices.cluster; package org.elasticsearch.indices.cluster;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
import org.elasticsearch.action.admin.cluster.reroute.TransportClusterRerouteAction; import org.elasticsearch.action.admin.cluster.reroute.TransportClusterRerouteAction;
@ -158,7 +159,7 @@ public class ClusterStateChanges extends AbstractComponent {
MetaDataIndexUpgradeService metaDataIndexUpgradeService = new MetaDataIndexUpgradeService(settings, null, null) { MetaDataIndexUpgradeService metaDataIndexUpgradeService = new MetaDataIndexUpgradeService(settings, null, null) {
// metaData upgrader should do nothing // metaData upgrader should do nothing
@Override @Override
public IndexMetaData upgradeIndexMetaData(IndexMetaData indexMetaData) { public IndexMetaData upgradeIndexMetaData(IndexMetaData indexMetaData, Version minimumIndexCompatibilityVersion) {
return indexMetaData; return indexMetaData;
} }
}; };

View File

@ -142,7 +142,7 @@ public abstract class ESBackcompatTestCase extends ESIntegTestCase {
} }
CompatibilityVersion annotation = clazz.getAnnotation(CompatibilityVersion.class); CompatibilityVersion annotation = clazz.getAnnotation(CompatibilityVersion.class);
if (annotation != null) { if (annotation != null) {
return Version.smallest(Version.fromId(annotation.version()), compatibilityVersion(clazz.getSuperclass())); return Version.min(Version.fromId(annotation.version()), compatibilityVersion(clazz.getSuperclass()));
} }
return compatibilityVersion(clazz.getSuperclass()); return compatibilityVersion(clazz.getSuperclass());
} }