Merge branch 'master' into enhancement/remove_node_client_setting

This commit is contained in:
javanna 2016-03-22 10:34:40 +01:00 committed by Luca Cavanna
commit eebd0cfccd
27 changed files with 606 additions and 91 deletions

View File

@ -419,7 +419,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]env[/\\]NodeEnvironment.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]env[/\\]NodeEnvironment.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]AsyncShardFetch.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]AsyncShardFetch.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]DanglingIndicesState.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]DanglingIndicesState.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]Gateway.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]GatewayAllocator.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]GatewayAllocator.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]GatewayMetaState.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]GatewayMetaState.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]GatewayService.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]GatewayService.java" checks="LineLength" />

View File

@ -80,6 +80,7 @@ public class TransportUpdateSettingsAction extends TransportMasterNodeAction<Upd
UpdateSettingsClusterStateUpdateRequest clusterStateUpdateRequest = new UpdateSettingsClusterStateUpdateRequest() UpdateSettingsClusterStateUpdateRequest clusterStateUpdateRequest = new UpdateSettingsClusterStateUpdateRequest()
.indices(concreteIndices) .indices(concreteIndices)
.settings(request.settings()) .settings(request.settings())
.setPreserveExisting(request.isPreserveExisting())
.ackTimeout(request.timeout()) .ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout()); .masterNodeTimeout(request.masterNodeTimeout());

View File

@ -29,8 +29,23 @@ public class UpdateSettingsClusterStateUpdateRequest extends IndicesClusterState
private Settings settings; private Settings settings;
public UpdateSettingsClusterStateUpdateRequest() { private boolean preserveExisting = false;
/**
* Returns <code>true</code> iff the settings update should only add but not update settings. If the setting already exists
* it should not be overwritten by this update. The default is <code>false</code>
*/
public boolean isPreserveExisting() {
return preserveExisting;
}
/**
* Iff set to <code>true</code> this settings update will only add settings not already set on an index. Existing settings remain
* unchanged.
*/
public UpdateSettingsClusterStateUpdateRequest setPreserveExisting(boolean preserveExisting) {
this.preserveExisting = preserveExisting;
return this;
} }
/** /**

View File

@ -47,6 +47,7 @@ public class UpdateSettingsRequest extends AcknowledgedRequest<UpdateSettingsReq
private String[] indices; private String[] indices;
private IndicesOptions indicesOptions = IndicesOptions.fromOptions(false, false, true, true); private IndicesOptions indicesOptions = IndicesOptions.fromOptions(false, false, true, true);
private Settings settings = EMPTY_SETTINGS; private Settings settings = EMPTY_SETTINGS;
private boolean preserveExisting = false;
public UpdateSettingsRequest() { public UpdateSettingsRequest() {
} }
@ -127,6 +128,23 @@ public class UpdateSettingsRequest extends AcknowledgedRequest<UpdateSettingsReq
return this; return this;
} }
/**
* Returns <code>true</code> iff the settings update should only add but not update settings. If the setting already exists
* it should not be overwritten by this update. The default is <code>false</code>
*/
public boolean isPreserveExisting() {
return preserveExisting;
}
/**
* Iff set to <code>true</code> this settings update will only add settings not already set on an index. Existing settings remain
* unchanged.
*/
public UpdateSettingsRequest setPreserveExisting(boolean preserveExisting) {
this.preserveExisting = preserveExisting;
return this;
}
/** /**
* Sets the settings to be updated (either json/yaml/properties format) * Sets the settings to be updated (either json/yaml/properties format)
*/ */
@ -149,6 +167,7 @@ public class UpdateSettingsRequest extends AcknowledgedRequest<UpdateSettingsReq
indicesOptions = IndicesOptions.readIndicesOptions(in); indicesOptions = IndicesOptions.readIndicesOptions(in);
settings = readSettingsFromStream(in); settings = readSettingsFromStream(in);
readTimeout(in); readTimeout(in);
preserveExisting = in.readBoolean();
} }
@Override @Override
@ -158,5 +177,6 @@ public class UpdateSettingsRequest extends AcknowledgedRequest<UpdateSettingsReq
indicesOptions.writeIndicesOptions(out); indicesOptions.writeIndicesOptions(out);
writeSettingsToStream(settings, out); writeSettingsToStream(settings, out);
writeTimeout(out); writeTimeout(out);
out.writeBoolean(preserveExisting);
} }
} }

View File

@ -84,4 +84,9 @@ public class UpdateSettingsRequestBuilder extends AcknowledgedRequestBuilder<Upd
request.settings(source); request.settings(source);
return this; return this;
} }
public UpdateSettingsRequestBuilder setPreserveExisting(boolean preserveExisting) {
request.setPreserveExisting(preserveExisting);
return this;
}
} }

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.metadata; package org.elasticsearch.cluster.metadata;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest;
@ -37,6 +38,8 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.NodeServicesProvider;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.snapshots.SnapshotsService;
@ -59,10 +62,16 @@ public class MetaDataIndexStateService extends AbstractComponent {
private final AllocationService allocationService; private final AllocationService allocationService;
private final MetaDataIndexUpgradeService metaDataIndexUpgradeService; private final MetaDataIndexUpgradeService metaDataIndexUpgradeService;
private final NodeServicesProvider nodeServiceProvider;
private final IndicesService indicesService;
@Inject @Inject
public MetaDataIndexStateService(Settings settings, ClusterService clusterService, AllocationService allocationService, MetaDataIndexUpgradeService metaDataIndexUpgradeService) { public MetaDataIndexStateService(Settings settings, ClusterService clusterService, AllocationService allocationService,
MetaDataIndexUpgradeService metaDataIndexUpgradeService,
NodeServicesProvider nodeServicesProvider, IndicesService indicesService) {
super(settings); super(settings);
this.nodeServiceProvider = nodeServicesProvider;
this.indicesService = indicesService;
this.clusterService = clusterService; this.clusterService = clusterService;
this.allocationService = allocationService; this.allocationService = allocationService;
this.metaDataIndexUpgradeService = metaDataIndexUpgradeService; this.metaDataIndexUpgradeService = metaDataIndexUpgradeService;
@ -162,6 +171,12 @@ public class MetaDataIndexStateService extends AbstractComponent {
// The index might be closed because we couldn't import it due to old incompatible version // The index might be closed because we couldn't import it due to old incompatible version
// We need to check that this index can be upgraded to the current version // We need to check that this index can be upgraded to the current version
indexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData); indexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData);
try {
indicesService.verifyIndexMetadata(nodeServiceProvider, indexMetaData);
} catch (Exception e) {
throw new ElasticsearchException("Failed to verify index " + indexMetaData.getIndex(), e);
}
mdBuilder.put(indexMetaData, true); mdBuilder.put(indexMetaData, true);
blocksBuilder.removeIndexBlock(indexName, INDEX_CLOSED_BLOCK); blocksBuilder.removeIndexBlock(indexName, INDEX_CLOSED_BLOCK);
} }

View File

@ -176,6 +176,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
final Settings skippedSettigns = skipppedSettings.build(); final Settings skippedSettigns = skipppedSettings.build();
final Settings closedSettings = settingsForClosedIndices.build(); final Settings closedSettings = settingsForClosedIndices.build();
final Settings openSettings = settingsForOpenIndices.build(); final Settings openSettings = settingsForOpenIndices.build();
final boolean preserveExisting = request.isPreserveExisting();
clusterService.submitStateUpdateTask("update-settings", clusterService.submitStateUpdateTask("update-settings",
new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, listener) { new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, listener) {
@ -221,7 +222,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
} }
int updatedNumberOfReplicas = openSettings.getAsInt(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, -1); int updatedNumberOfReplicas = openSettings.getAsInt(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, -1);
if (updatedNumberOfReplicas != -1) { if (updatedNumberOfReplicas != -1 && preserveExisting == false) {
routingTableBuilder.updateNumberOfReplicas(updatedNumberOfReplicas, actualIndices); routingTableBuilder.updateNumberOfReplicas(updatedNumberOfReplicas, actualIndices);
metaDataBuilder.updateNumberOfReplicas(updatedNumberOfReplicas, actualIndices); metaDataBuilder.updateNumberOfReplicas(updatedNumberOfReplicas, actualIndices);
logger.info("updating number_of_replicas to [{}] for indices {}", updatedNumberOfReplicas, actualIndices); logger.info("updating number_of_replicas to [{}] for indices {}", updatedNumberOfReplicas, actualIndices);
@ -239,6 +240,9 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
Settings.Builder updates = Settings.builder(); Settings.Builder updates = Settings.builder();
Settings.Builder indexSettings = Settings.builder().put(indexMetaData.getSettings()); Settings.Builder indexSettings = Settings.builder().put(indexMetaData.getSettings());
if (indexScopedSettings.updateDynamicSettings(openSettings, indexSettings, updates, index.getName())) { if (indexScopedSettings.updateDynamicSettings(openSettings, indexSettings, updates, index.getName())) {
if (preserveExisting) {
indexSettings.put(indexMetaData.getSettings());
}
metaDataBuilder.put(IndexMetaData.builder(indexMetaData).settings(indexSettings)); metaDataBuilder.put(IndexMetaData.builder(indexMetaData).settings(indexSettings));
} }
} }
@ -250,6 +254,9 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
Settings.Builder updates = Settings.builder(); Settings.Builder updates = Settings.builder();
Settings.Builder indexSettings = Settings.builder().put(indexMetaData.getSettings()); Settings.Builder indexSettings = Settings.builder().put(indexMetaData.getSettings());
if (indexScopedSettings.updateSettings(closedSettings, indexSettings, updates, index.getName())) { if (indexScopedSettings.updateSettings(closedSettings, indexSettings, updates, index.getName())) {
if (preserveExisting) {
indexSettings.put(indexMetaData.getSettings());
}
metaDataBuilder.put(IndexMetaData.builder(indexMetaData).settings(indexSettings)); metaDataBuilder.put(IndexMetaData.builder(indexMetaData).settings(indexSettings));
} }
} }

View File

@ -19,7 +19,11 @@
package org.elasticsearch.common.settings; package org.elasticsearch.common.settings;
import org.apache.lucene.search.spell.LevensteinDistance;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.regex.Regex;
@ -30,10 +34,14 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors;
/** /**
* A basic setting service that can be used for per-index and per-cluster settings. * A basic setting service that can be used for per-index and per-cluster settings.
@ -221,10 +229,18 @@ public abstract class AbstractScopedSettings extends AbstractComponent {
* * Validates that all given settings are registered and valid * * Validates that all given settings are registered and valid
*/ */
public final void validate(Settings settings) { public final void validate(Settings settings) {
for (Map.Entry<String, String> entry : settings.getAsMap().entrySet()) { List<RuntimeException> exceptions = new ArrayList<>();
// we want them sorted for deterministic error messages
SortedMap<String, String> sortedSettings = new TreeMap<>(settings.getAsMap());
for (Map.Entry<String, String> entry : sortedSettings.entrySet()) {
try {
validate(entry.getKey(), settings); validate(entry.getKey(), settings);
} catch (RuntimeException ex) {
exceptions.add(ex);
} }
} }
ExceptionsHelper.rethrowAndSuppress(exceptions);
}
/** /**
@ -233,7 +249,21 @@ public abstract class AbstractScopedSettings extends AbstractComponent {
public final void validate(String key, Settings settings) { public final void validate(String key, Settings settings) {
Setting setting = get(key); Setting setting = get(key);
if (setting == null) { if (setting == null) {
throw new IllegalArgumentException("unknown setting [" + key + "]"); LevensteinDistance ld = new LevensteinDistance();
List<Tuple<Float, String>> scoredKeys = new ArrayList<>();
for (String k : this.keySettings.keySet()) {
float distance = ld.getDistance(key, k);
if (distance > 0.7f) {
scoredKeys.add(new Tuple<>(distance, k));
}
}
CollectionUtil.timSort(scoredKeys, (a,b) -> b.v1().compareTo(a.v1()));
String msg = "unknown setting [" + key + "]";
List<String> keys = scoredKeys.stream().map((a) -> a.v2()).collect(Collectors.toList());
if (keys.isEmpty() == false) {
msg += " did you mean " + (keys.size() == 1 ? "[" + keys.get(0) + "]": "any of " + keys.toString()) + "?";
}
throw new IllegalArgumentException(msg);
} }
setting.get(settings); setting.get(settings);
} }

View File

@ -20,13 +20,22 @@
package org.elasticsearch.common.settings; package org.elasticsearch.common.settings;
import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.tribe.TribeService; import org.elasticsearch.tribe.TribeService;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/** /**
* A module that binds the provided settings to the {@link Settings} interface. * A module that binds the provided settings to the {@link Settings} interface.
@ -37,9 +46,12 @@ public class SettingsModule extends AbstractModule {
private final Set<String> settingsFilterPattern = new HashSet<>(); private final Set<String> settingsFilterPattern = new HashSet<>();
private final Map<String, Setting<?>> nodeSettings = new HashMap<>(); private final Map<String, Setting<?>> nodeSettings = new HashMap<>();
private final Map<String, Setting<?>> indexSettings = new HashMap<>(); private final Map<String, Setting<?>> indexSettings = new HashMap<>();
private static final Predicate<String> TRIBE_CLIENT_NODE_SETTINGS_PREDICATE = (s) -> s.startsWith("tribe.") && TribeService.TRIBE_SETTING_KEYS.contains(s) == false; private static final Predicate<String> TRIBE_CLIENT_NODE_SETTINGS_PREDICATE = (s) -> s.startsWith("tribe.")
&& TribeService.TRIBE_SETTING_KEYS.contains(s) == false;
private final ESLogger logger;
public SettingsModule(Settings settings) { public SettingsModule(Settings settings) {
logger = Loggers.getLogger(getClass(), settings);
this.settings = settings; this.settings = settings;
for (Setting<?> setting : ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) { for (Setting<?> setting : ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) {
registerSetting(setting); registerSetting(setting);
@ -53,6 +65,56 @@ public class SettingsModule extends AbstractModule {
protected void configure() { protected void configure() {
final IndexScopedSettings indexScopedSettings = new IndexScopedSettings(settings, new HashSet<>(this.indexSettings.values())); final IndexScopedSettings indexScopedSettings = new IndexScopedSettings(settings, new HashSet<>(this.indexSettings.values()));
final ClusterSettings clusterSettings = new ClusterSettings(settings, new HashSet<>(this.nodeSettings.values())); final ClusterSettings clusterSettings = new ClusterSettings(settings, new HashSet<>(this.nodeSettings.values()));
Settings indexSettings = settings.filter((s) -> s.startsWith("index.") && clusterSettings.get(s) == null);
if (indexSettings.isEmpty() == false) {
try {
String separator = IntStream.range(0, 85).mapToObj(s -> "*").collect(Collectors.joining("")).trim();
StringBuilder builder = new StringBuilder();
builder.append(System.lineSeparator());
builder.append(separator);
builder.append(System.lineSeparator());
builder.append("Found index level settings on node level configuration.");
builder.append(System.lineSeparator());
builder.append(System.lineSeparator());
int count = 0;
for (String word : ("Since elasticsearch 5.x index level settings can NOT be set on the nodes configuration like " +
"the elasticsearch.yaml, in system properties or command line arguments." +
"In order to upgrade all indices the settings must be updated via the /${index}/_settings API. " +
"Unless all settings are dynamic all indices must be closed in order to apply the upgrade" +
"Indices created in the future should use index templates to set default values."
).split(" ")) {
if (count + word.length() > 85) {
builder.append(System.lineSeparator());
count = 0;
}
count += word.length() + 1;
builder.append(word).append(" ");
}
builder.append(System.lineSeparator());
builder.append(System.lineSeparator());
builder.append("Please ensure all required values are updated on all indices by executing: ");
builder.append(System.lineSeparator());
builder.append(System.lineSeparator());
builder.append("curl -XPUT 'http://localhost:9200/_all/_settings?preserve_existing=true' -d '");
try (XContentBuilder xContentBuilder = XContentBuilder.builder(XContentType.JSON.xContent())) {
xContentBuilder.prettyPrint();
xContentBuilder.startObject();
indexSettings.toXContent(xContentBuilder, new ToXContent.MapParams(Collections.singletonMap("flat_settings", "true")));
xContentBuilder.endObject();
builder.append(xContentBuilder.string());
}
builder.append("'");
builder.append(System.lineSeparator());
builder.append(separator);
builder.append(System.lineSeparator());
logger.warn(builder.toString());
throw new IllegalArgumentException("node settings must not contain any index level settings");
} catch (IOException e) {
throw new RuntimeException(e);
}
}
// by now we are fully configured, lets check node level settings for unregistered index settings // by now we are fully configured, lets check node level settings for unregistered index settings
final Predicate<String> acceptOnlyClusterSettings = TRIBE_CLIENT_NODE_SETTINGS_PREDICATE.negate(); final Predicate<String> acceptOnlyClusterSettings = TRIBE_CLIENT_NODE_SETTINGS_PREDICATE.negate();
clusterSettings.validate(settings.filter(acceptOnlyClusterSettings)); clusterSettings.validate(settings.filter(acceptOnlyClusterSettings));

View File

@ -19,9 +19,6 @@
package org.elasticsearch.gateway; package org.elasticsearch.gateway;
import com.carrotsearch.hppc.ObjectFloatHashMap;
import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
@ -34,9 +31,11 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index; import org.elasticsearch.index.NodeServicesProvider;
import org.elasticsearch.indices.IndicesService;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Arrays;
import java.util.function.Supplier; import java.util.function.Supplier;
/** /**
@ -53,10 +52,15 @@ public class Gateway extends AbstractComponent implements ClusterStateListener {
private final TransportNodesListGatewayMetaState listGatewayMetaState; private final TransportNodesListGatewayMetaState listGatewayMetaState;
private final Supplier<Integer> minimumMasterNodesProvider; private final Supplier<Integer> minimumMasterNodesProvider;
private final IndicesService indicesService;
private final NodeServicesProvider nodeServicesProvider;
public Gateway(Settings settings, ClusterService clusterService, NodeEnvironment nodeEnv, GatewayMetaState metaState, public Gateway(Settings settings, ClusterService clusterService, NodeEnvironment nodeEnv, GatewayMetaState metaState,
TransportNodesListGatewayMetaState listGatewayMetaState, Discovery discovery) { TransportNodesListGatewayMetaState listGatewayMetaState, Discovery discovery,
NodeServicesProvider nodeServicesProvider, IndicesService indicesService) {
super(settings); super(settings);
this.nodeServicesProvider = nodeServicesProvider;
this.indicesService = indicesService;
this.clusterService = clusterService; this.clusterService = clusterService;
this.nodeEnv = nodeEnv; this.nodeEnv = nodeEnv;
this.metaState = metaState; this.metaState = metaState;
@ -66,9 +70,9 @@ public class Gateway extends AbstractComponent implements ClusterStateListener {
} }
public void performStateRecovery(final GatewayStateRecoveredListener listener) throws GatewayException { public void performStateRecovery(final GatewayStateRecoveredListener listener) throws GatewayException {
ObjectHashSet<String> nodesIds = new ObjectHashSet<>(clusterService.state().nodes().masterNodes().keys()); String[] nodesIds = clusterService.state().nodes().masterNodes().keys().toArray(String.class);
logger.trace("performing state recovery from {}", nodesIds); logger.trace("performing state recovery from {}", Arrays.toString(nodesIds));
TransportNodesListGatewayMetaState.NodesGatewayMetaState nodesState = listGatewayMetaState.list(nodesIds.toArray(String.class), null).actionGet(); TransportNodesListGatewayMetaState.NodesGatewayMetaState nodesState = listGatewayMetaState.list(nodesIds, null).actionGet();
int requiredAllocation = Math.max(1, minimumMasterNodesProvider.get()); int requiredAllocation = Math.max(1, minimumMasterNodesProvider.get());
@ -80,7 +84,6 @@ public class Gateway extends AbstractComponent implements ClusterStateListener {
} }
} }
ObjectFloatHashMap<Index> indices = new ObjectFloatHashMap<>();
MetaData electedGlobalState = null; MetaData electedGlobalState = null;
int found = 0; int found = 0;
for (TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState) { for (TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState) {
@ -93,51 +96,30 @@ public class Gateway extends AbstractComponent implements ClusterStateListener {
} else if (nodeState.metaData().version() > electedGlobalState.version()) { } else if (nodeState.metaData().version() > electedGlobalState.version()) {
electedGlobalState = nodeState.metaData(); electedGlobalState = nodeState.metaData();
} }
for (ObjectCursor<IndexMetaData> cursor : nodeState.metaData().indices().values()) {
indices.addTo(cursor.value.getIndex(), 1);
}
} }
if (found < requiredAllocation) { if (found < requiredAllocation) {
listener.onFailure("found [" + found + "] metadata states, required [" + requiredAllocation + "]"); listener.onFailure("found [" + found + "] metadata states, required [" + requiredAllocation + "]");
return; return;
} }
// update the global state, and clean the indices, we elect them in the next phase // verify index metadata
MetaData.Builder metaDataBuilder = MetaData.builder(electedGlobalState).removeAllIndices(); MetaData.Builder metaDataBuilder = MetaData.builder(electedGlobalState);
for (IndexMetaData indexMetaData : electedGlobalState) {
assert !indices.containsKey(null); try {
final Object[] keys = indices.keys; if (indexMetaData.getState() == IndexMetaData.State.OPEN) {
for (int i = 0; i < keys.length; i++) { // verify that we can actually create this index - if not we recover it as closed with lots of warn logs
if (keys[i] != null) { indicesService.verifyIndexMetadata(nodeServicesProvider, indexMetaData);
Index index = (Index) keys[i];
IndexMetaData electedIndexMetaData = null;
int indexMetaDataCount = 0;
for (TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState) {
if (nodeState.metaData() == null) {
continue;
}
IndexMetaData indexMetaData = nodeState.metaData().index(index);
if (indexMetaData == null) {
continue;
}
if (electedIndexMetaData == null) {
electedIndexMetaData = indexMetaData;
} else if (indexMetaData.getVersion() > electedIndexMetaData.getVersion()) {
electedIndexMetaData = indexMetaData;
}
indexMetaDataCount++;
}
if (electedIndexMetaData != null) {
if (indexMetaDataCount < requiredAllocation) {
logger.debug("[{}] found [{}], required [{}], not adding", index, indexMetaDataCount, requiredAllocation);
}
metaDataBuilder.put(electedIndexMetaData, false);
} }
} catch (Exception e) {
logger.warn("recovering index {} failed - recovering as closed", e, indexMetaData.getIndex());
indexMetaData = IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE).build();
metaDataBuilder.put(indexMetaData, true);
} }
} }
ClusterState.Builder builder = ClusterState.builder(clusterService.state().getClusterName()); ClusterState.Builder builder = ClusterState.builder(clusterService.state().getClusterName());
builder.metaData(metaDataBuilder); builder.metaData(metaDataBuilder);
listener.onSuccess(builder.build()); listener.onSuccess(builder.build());
} }
public void reset() throws Exception { public void reset() throws Exception {
try { try {
Path[] dataPaths = nodeEnv.nodeDataPaths(); Path[] dataPaths = nodeEnv.nodeDataPaths();

View File

@ -86,8 +86,8 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
if (DiscoveryNode.masterNode(settings) || DiscoveryNode.dataNode(settings)) { if (DiscoveryNode.masterNode(settings) || DiscoveryNode.dataNode(settings)) {
try { try {
ensureNoPre019State(); ensureNoPre019State();
pre20Upgrade();
IndexFolderUpgrader.upgradeIndicesIfNeeded(settings, nodeEnv); IndexFolderUpgrader.upgradeIndicesIfNeeded(settings, nodeEnv);
upgradeMetaData();
long startNS = System.nanoTime(); long startNS = System.nanoTime();
metaStateService.loadFullState(); metaStateService.loadFullState();
logger.debug("took {} to load state", TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - startNS))); logger.debug("took {} to load state", TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - startNS)));
@ -222,7 +222,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
* MetaDataIndexUpgradeService might also update obsolete settings if needed. When this happens we rewrite * MetaDataIndexUpgradeService might also update obsolete settings if needed. When this happens we rewrite
* index metadata with new settings. * index metadata with new settings.
*/ */
private void pre20Upgrade() throws Exception { private void upgradeMetaData() throws Exception {
MetaData metaData = loadMetaState(); MetaData metaData = loadMetaState();
List<IndexMetaData> updateIndexMetaData = new ArrayList<>(); List<IndexMetaData> updateIndexMetaData = new ArrayList<>();
for (IndexMetaData indexMetaData : metaData) { for (IndexMetaData indexMetaData : metaData) {
@ -235,7 +235,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
// means the upgrade can continue. Now it's safe to overwrite index metadata with the new version. // means the upgrade can continue. Now it's safe to overwrite index metadata with the new version.
for (IndexMetaData indexMetaData : updateIndexMetaData) { for (IndexMetaData indexMetaData : updateIndexMetaData) {
// since we still haven't upgraded the index folders, we write index state in the old folder // since we still haven't upgraded the index folders, we write index state in the old folder
metaStateService.writeIndex("upgrade", indexMetaData, nodeEnv.resolveIndexFolder(indexMetaData.getIndex().getName())); metaStateService.writeIndex("upgrade", indexMetaData, nodeEnv.resolveIndexFolder(indexMetaData.getIndex().getUUID()));
} }
} }

View File

@ -43,6 +43,8 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.NodeServicesProvider;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -95,9 +97,11 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
@Inject @Inject
public GatewayService(Settings settings, AllocationService allocationService, ClusterService clusterService, public GatewayService(Settings settings, AllocationService allocationService, ClusterService clusterService,
ThreadPool threadPool, NodeEnvironment nodeEnvironment, GatewayMetaState metaState, ThreadPool threadPool, NodeEnvironment nodeEnvironment, GatewayMetaState metaState,
TransportNodesListGatewayMetaState listGatewayMetaState, Discovery discovery) { TransportNodesListGatewayMetaState listGatewayMetaState, Discovery discovery,
NodeServicesProvider nodeServicesProvider, IndicesService indicesService) {
super(settings); super(settings);
this.gateway = new Gateway(settings, clusterService, nodeEnvironment, metaState, listGatewayMetaState, discovery); this.gateway = new Gateway(settings, clusterService, nodeEnvironment, metaState, listGatewayMetaState, discovery,
nodeServicesProvider, indicesService);
this.allocationService = allocationService; this.allocationService = allocationService;
this.clusterService = clusterService; this.clusterService = clusterService;
this.threadPool = threadPool; this.threadPool = threadPool;

View File

@ -102,7 +102,7 @@ public final class ShardStateMetaData {
return "version [" + legacyVersion + "], primary [" + primary + "], allocation [" + allocationId + "]"; return "version [" + legacyVersion + "], primary [" + primary + "], allocation [" + allocationId + "]";
} }
public static final MetaDataStateFormat<ShardStateMetaData> FORMAT = new MetaDataStateFormat<ShardStateMetaData>(XContentType.JSON, SHARD_STATE_FILE_PREFIX) { public static final MetaDataStateFormat<ShardStateMetaData> FORMAT = new MetaDataStateFormat<ShardStateMetaData>(XContentType.SMILE, SHARD_STATE_FILE_PREFIX) {
@Override @Override
protected XContentBuilder newXContentBuilder(XContentType type, OutputStream stream) throws IOException { protected XContentBuilder newXContentBuilder(XContentType type, OutputStream stream) throws IOException {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.indices; package org.elasticsearch.indices;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.CollectionUtil; import org.apache.lucene.util.CollectionUtil;
@ -33,6 +34,7 @@ import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreaker;
@ -66,6 +68,7 @@ import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldDataCache; import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats; import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.recovery.RecoveryStats; import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.refresh.RefreshStats;
@ -74,6 +77,7 @@ import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.IndexingStats; import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStoreConfig; import org.elasticsearch.index.store.IndexStoreConfig;
@ -88,9 +92,11 @@ import org.elasticsearch.search.query.QueryPhase;
import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
@ -324,6 +330,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
* @throws IndexAlreadyExistsException if the index already exists. * @throws IndexAlreadyExistsException if the index already exists.
*/ */
public synchronized IndexService createIndex(final NodeServicesProvider nodeServicesProvider, IndexMetaData indexMetaData, List<IndexEventListener> builtInListeners) throws IOException { public synchronized IndexService createIndex(final NodeServicesProvider nodeServicesProvider, IndexMetaData indexMetaData, List<IndexEventListener> builtInListeners) throws IOException {
if (!lifecycle.started()) { if (!lifecycle.started()) {
throw new IllegalStateException("Can't create an index [" + indexMetaData.getIndex() + "], node is closed"); throw new IllegalStateException("Can't create an index [" + indexMetaData.getIndex() + "], node is closed");
} }
@ -331,37 +338,22 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
throw new IllegalArgumentException("index must have a real UUID found value: [" + indexMetaData.getIndexUUID() + "]"); throw new IllegalArgumentException("index must have a real UUID found value: [" + indexMetaData.getIndexUUID() + "]");
} }
final Index index = indexMetaData.getIndex(); final Index index = indexMetaData.getIndex();
final Predicate<String> indexNameMatcher = (indexExpression) -> indexNameExpressionResolver.matchesIndex(index.getName(), indexExpression, clusterService.state());
final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexNameMatcher, indexScopeSetting);
if (hasIndex(index)) { if (hasIndex(index)) {
throw new IndexAlreadyExistsException(index); throw new IndexAlreadyExistsException(index);
} }
logger.debug("creating Index [{}], shards [{}]/[{}{}]", List<IndexEventListener> finalListeners = new ArrayList<>(builtInListeners);
indexMetaData.getIndex(),
idxSettings.getNumberOfShards(),
idxSettings.getNumberOfReplicas(),
idxSettings.isShadowReplicaIndex() ? "s" : "");
final IndexModule indexModule = new IndexModule(idxSettings, indexStoreConfig, analysisRegistry);
pluginsService.onIndexModule(indexModule);
for (IndexEventListener listener : builtInListeners) {
indexModule.addIndexEventListener(listener);
}
final IndexEventListener onStoreClose = new IndexEventListener() { final IndexEventListener onStoreClose = new IndexEventListener() {
@Override @Override
public void onStoreClosed(ShardId shardId) { public void onStoreClosed(ShardId shardId) {
indicesQueryCache.onClose(shardId); indicesQueryCache.onClose(shardId);
} }
}; };
indexModule.addIndexEventListener(onStoreClose); finalListeners.add(onStoreClose);
indexModule.addIndexEventListener(oldShardsStats); finalListeners.add(oldShardsStats);
final IndexEventListener listener = indexModule.freeze(); final IndexService indexService = createIndexService("create index", nodeServicesProvider, indexMetaData, indicesQueryCache, indicesFieldDataCache, finalListeners, indexingMemoryController);
listener.beforeIndexCreated(index, idxSettings.getSettings());
final IndexService indexService = indexModule.newIndexService(nodeEnv, this, nodeServicesProvider, indicesQueryCache, mapperRegistry, indicesFieldDataCache, indexingMemoryController);
boolean success = false; boolean success = false;
try { try {
assert indexService.getIndexEventListener() == listener; indexService.getIndexEventListener().afterIndexCreated(indexService);
listener.afterIndexCreated(indexService);
indices = newMapBuilder(indices).put(index.getUUID(), indexService).immutableMap(); indices = newMapBuilder(indices).put(index.getUUID(), indexService).immutableMap();
success = true; success = true;
return indexService; return indexService;
@ -370,7 +362,54 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
indexService.close("plugins_failed", true); indexService.close("plugins_failed", true);
} }
} }
}
/**
* This creates a new IndexService without registering it
*/
private synchronized IndexService createIndexService(final String reason, final NodeServicesProvider nodeServicesProvider, IndexMetaData indexMetaData, IndicesQueryCache indicesQueryCache, IndicesFieldDataCache indicesFieldDataCache, List<IndexEventListener> builtInListeners, IndexingOperationListener... indexingOperationListeners) throws IOException {
final Index index = indexMetaData.getIndex();
final Predicate<String> indexNameMatcher = (indexExpression) -> indexNameExpressionResolver.matchesIndex(index.getName(), indexExpression, clusterService.state());
final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexNameMatcher, indexScopeSetting);
logger.debug("creating Index [{}], shards [{}]/[{}{}] - reason [{}]",
indexMetaData.getIndex(),
idxSettings.getNumberOfShards(),
idxSettings.getNumberOfReplicas(),
idxSettings.isShadowReplicaIndex() ? "s" : "", reason);
final IndexModule indexModule = new IndexModule(idxSettings, indexStoreConfig, analysisRegistry);
pluginsService.onIndexModule(indexModule);
for (IndexEventListener listener : builtInListeners) {
indexModule.addIndexEventListener(listener);
}
final IndexEventListener listener = indexModule.freeze();
listener.beforeIndexCreated(index, idxSettings.getSettings());
return indexModule.newIndexService(nodeEnv, this, nodeServicesProvider, indicesQueryCache, mapperRegistry, indicesFieldDataCache, indexingOperationListeners);
}
/**
* This method verifies that the given {@link IndexMetaData} holds sane values to create an {@link IndexService}. This method will throw an
* exception if the creation fails. The created {@link IndexService} will not be registered and will be closed immediately.
*/
public synchronized void verifyIndexMetadata(final NodeServicesProvider nodeServicesProvider, IndexMetaData metaData) throws IOException {
final List<Closeable> closeables = new ArrayList<>();
try {
IndicesFieldDataCache indicesFieldDataCache = new IndicesFieldDataCache(settings, new IndexFieldDataCache.Listener() {});
closeables.add(indicesFieldDataCache);
IndicesQueryCache indicesQueryCache = new IndicesQueryCache(settings);
closeables.add(indicesQueryCache);
// this will also fail if some plugin fails etc. which is nice since we can verify that early
final IndexService service = createIndexService("metadata verification", nodeServicesProvider,
metaData, indicesQueryCache, indicesFieldDataCache, Collections.emptyList());
for (ObjectCursor<MappingMetaData> typeMapping : metaData.getMappings().values()) {
// don't apply the default mapping, it has been applied when the mapping was created
service.mapperService().merge(typeMapping.value.type(), typeMapping.value.source(),
MapperService.MergeReason.MAPPING_RECOVERY, true);
}
closeables.add(() -> service.close("metadata verification", false));
} finally {
IOUtils.close(closeables);
}
} }
/** /**

View File

@ -59,11 +59,8 @@ public final class ForEachProcessor extends AbstractProcessor {
List<Object> values = ingestDocument.getFieldValue(field, List.class); List<Object> values = ingestDocument.getFieldValue(field, List.class);
List<Object> newValues = new ArrayList<>(values.size()); List<Object> newValues = new ArrayList<>(values.size());
for (Object value : values) { for (Object value : values) {
Map<String, Object> innerSource = new HashMap<>(); Map<String, Object> innerSource = new HashMap<>(ingestDocument.getSourceAndMetadata());
innerSource.put("_value", value); innerSource.put("_value", value); // scalar value to access the list item being evaluated
for (IngestDocument.MetaData metaData : IngestDocument.MetaData.values()) {
innerSource.put(metaData.getFieldName(), ingestDocument.getSourceAndMetadata().get(metaData.getFieldName()));
}
IngestDocument innerIngestDocument = new IngestDocument(innerSource, ingestDocument.getIngestMetadata()); IngestDocument innerIngestDocument = new IngestDocument(innerSource, ingestDocument.getIngestMetadata());
for (Processor processor : processors) { for (Processor processor : processors) {
processor.execute(innerIngestDocument); processor.execute(innerIngestDocument);

View File

@ -127,7 +127,8 @@ class InstallPluginCommand extends Command {
"repository-azure", "repository-azure",
"repository-hdfs", "repository-hdfs",
"repository-s3", "repository-s3",
"store-smb"))); "store-smb",
"xpack")));
private final Environment env; private final Environment env;
private final OptionSpec<Void> batchOption; private final OptionSpec<Void> batchOption;

View File

@ -47,6 +47,7 @@ public class RestUpdateSettingsAction extends BaseRestHandler {
"timeout", "timeout",
"master_timeout", "master_timeout",
"index", "index",
"preserve_existing",
"expand_wildcards", "expand_wildcards",
"ignore_unavailable", "ignore_unavailable",
"allow_no_indices")); "allow_no_indices"));
@ -62,6 +63,7 @@ public class RestUpdateSettingsAction extends BaseRestHandler {
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
UpdateSettingsRequest updateSettingsRequest = updateSettingsRequest(Strings.splitStringByCommaToArray(request.param("index"))); UpdateSettingsRequest updateSettingsRequest = updateSettingsRequest(Strings.splitStringByCommaToArray(request.param("index")));
updateSettingsRequest.timeout(request.paramAsTime("timeout", updateSettingsRequest.timeout())); updateSettingsRequest.timeout(request.paramAsTime("timeout", updateSettingsRequest.timeout()));
updateSettingsRequest.setPreserveExisting(request.paramAsBoolean("preserve_existing", updateSettingsRequest.isPreserveExisting()));
updateSettingsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", updateSettingsRequest.masterNodeTimeout())); updateSettingsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", updateSettingsRequest.masterNodeTimeout()));
updateSettingsRequest.indicesOptions(IndicesOptions.fromRequest(request, updateSettingsRequest.indicesOptions())); updateSettingsRequest.indicesOptions(IndicesOptions.fromRequest(request, updateSettingsRequest.indicesOptions()));

View File

@ -187,6 +187,15 @@ public class ScopedSettingsTests extends ESTestCase {
assertEquals("boom", copy.get(IndexModule.INDEX_STORE_TYPE_SETTING)); // test fallback to node settings assertEquals("boom", copy.get(IndexModule.INDEX_STORE_TYPE_SETTING)); // test fallback to node settings
} }
public void testValidateWithSuggestion() {
IndexScopedSettings settings = new IndexScopedSettings(
Settings.EMPTY,
IndexScopedSettings.BUILT_IN_INDEX_SETTINGS);
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class,
() -> settings.validate(Settings.builder().put("index.numbe_of_replica", "1").build()));
assertEquals(iae.getMessage(), "unknown setting [index.numbe_of_replica] did you mean [index.number_of_replicas]?");
}
public void testValidate() { public void testValidate() {
IndexScopedSettings settings = new IndexScopedSettings( IndexScopedSettings settings = new IndexScopedSettings(
Settings.EMPTY, Settings.EMPTY,

View File

@ -36,12 +36,35 @@ public class SettingsModuleTests extends ModuleTestCase {
{ {
Settings settings = Settings.builder().put("cluster.routing.allocation.balance.shard", "[2.0]").build(); Settings settings = Settings.builder().put("cluster.routing.allocation.balance.shard", "[2.0]").build();
SettingsModule module = new SettingsModule(settings); SettingsModule module = new SettingsModule(settings);
try { IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
assertInstanceBinding(module, Settings.class, (s) -> s == settings); () -> assertInstanceBinding(module, Settings.class, (s) -> s == settings));
fail();
} catch (IllegalArgumentException ex) {
assertEquals("Failed to parse value [[2.0]] for setting [cluster.routing.allocation.balance.shard]", ex.getMessage()); assertEquals("Failed to parse value [[2.0]] for setting [cluster.routing.allocation.balance.shard]", ex.getMessage());
} }
{
Settings settings = Settings.builder().put("cluster.routing.allocation.balance.shard", "[2.0]")
.put("some.foo.bar", 1).build();
SettingsModule module = new SettingsModule(settings);
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> assertInstanceBinding(module, Settings.class, (s) -> s == settings));
assertEquals("Failed to parse value [[2.0]] for setting [cluster.routing.allocation.balance.shard]", ex.getMessage());
assertEquals(1, ex.getSuppressed().length);
assertEquals("unknown setting [some.foo.bar]", ex.getSuppressed()[0].getMessage());
}
{
Settings settings = Settings.builder().put("index.codec", "default")
.put("index.foo.bar", 1).build();
SettingsModule module = new SettingsModule(settings);
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> assertInstanceBinding(module, Settings.class, (s) -> s == settings));
assertEquals("node settings must not contain any index level settings", ex.getMessage());
}
{
Settings settings = Settings.builder().put("index.codec", "default").build();
SettingsModule module = new SettingsModule(settings);
assertInstanceBinding(module, Settings.class, (s) -> s == settings);
} }
} }

View File

@ -19,9 +19,13 @@
package org.elasticsearch.gateway; package org.elasticsearch.gateway;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.ShardRoutingState;
@ -31,7 +35,11 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.indices.IndexClosedException; import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.Node; import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
@ -40,6 +48,7 @@ import org.elasticsearch.test.InternalTestCluster.RestartCallback;
import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
@ -364,4 +373,123 @@ public class GatewayIndexStateIT extends ESIntegTestCase {
logger.info("--> verify the doc is there"); logger.info("--> verify the doc is there");
assertThat(client().prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true)); assertThat(client().prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true));
} }
/**
* This test really tests worst case scenario where we have a broken setting or any setting that prevents an index from being
* allocated in our metadata that we recover. In that case we now have the ability to check the index on local recovery from disk
* if it is sane and if we can successfully create an IndexService. This also includes plugins etc.
*/
public void testRecoverBrokenIndexMetadata() throws Exception {
logger.info("--> starting one node");
internalCluster().startNode();
logger.info("--> indexing a simple document");
client().prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet();
logger.info("--> waiting for green status");
if (usually()) {
ensureYellow();
} else {
internalCluster().startNode();
client().admin().cluster()
.health(Requests.clusterHealthRequest()
.waitForGreenStatus()
.waitForEvents(Priority.LANGUID)
.waitForRelocatingShards(0).waitForNodes("2")).actionGet();
}
ClusterState state = client().admin().cluster().prepareState().get().getState();
IndexMetaData metaData = state.getMetaData().index("test");
for (NodeEnvironment services : internalCluster().getInstances(NodeEnvironment.class)) {
IndexMetaData brokenMeta = IndexMetaData.builder(metaData).settings(Settings.builder().put(metaData.getSettings())
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_2_0_0_beta1.id)
// this is invalid but should be archived
.put("index.similarity.BM25.type", "classic")
// this one is not validated ahead of time and breaks allocation
.put("index.analysis.filter.myCollator.type", "icu_collation")
).build();
IndexMetaData.FORMAT.write(brokenMeta, brokenMeta.getVersion(), services.indexPaths(brokenMeta.getIndex()));
}
internalCluster().fullRestart();
// ensureGreen(closedIndex) waits for the index to show up in the metadata
// this is crucial otherwise the state call below might not contain the index yet
ensureGreen(metaData.getIndex().getName());
state = client().admin().cluster().prepareState().get().getState();
assertEquals(IndexMetaData.State.CLOSE, state.getMetaData().index(metaData.getIndex()).getState());
assertEquals("classic", state.getMetaData().index(metaData.getIndex()).getSettings().get("archived.index.similarity.BM25.type"));
// try to open it with the broken setting - fail again!
ElasticsearchException ex = expectThrows(ElasticsearchException.class, () -> client().admin().indices().prepareOpen("test").get());
assertEquals(ex.getMessage(), "Failed to verify index " + metaData.getIndex());
assertNotNull(ex.getCause());
assertEquals(IllegalArgumentException.class, ex.getCause().getClass());
assertEquals(ex.getCause().getMessage(), "Unknown tokenfilter type [icu_collation] for [myCollator]");
client().admin().indices().prepareUpdateSettings()
.setSettings(Settings.builder().putNull("index.analysis.filter.myCollator.type")).get();
client().admin().indices().prepareOpen("test").get();
ensureYellow();
logger.info("--> verify 1 doc in the index");
assertHitCount(client().prepareSearch().setQuery(matchAllQuery()).get(), 1L);
}
/**
* This test really tests worst case scenario where we have a missing analyzer setting.
* In that case we now have the ability to check the index on local recovery from disk
* if it is sane and if we can successfully create an IndexService.
* This also includes plugins etc.
*/
public void testRecoverMissingAnalyzer() throws Exception {
logger.info("--> starting one node");
internalCluster().startNode();
prepareCreate("test").setSettings(Settings.builder()
.put("index.analysis.analyzer.test.tokenizer", "keyword")
.put("index.number_of_shards", "1"))
.addMapping("type1", "{\n" +
" \"type1\": {\n" +
" \"properties\": {\n" +
" \"field1\": {\n" +
" \"type\": \"text\",\n" +
" \"analyzer\": \"test\"\n" +
" }\n" +
" }\n" +
" }\n" +
" }}").get();
logger.info("--> indexing a simple document");
client().prepareIndex("test", "type1", "1").setSource("field1", "value one").setRefresh(true).execute().actionGet();
logger.info("--> waiting for green status");
if (usually()) {
ensureYellow();
} else {
internalCluster().startNode();
client().admin().cluster()
.health(Requests.clusterHealthRequest()
.waitForGreenStatus()
.waitForEvents(Priority.LANGUID)
.waitForRelocatingShards(0).waitForNodes("2")).actionGet();
}
ClusterState state = client().admin().cluster().prepareState().get().getState();
IndexMetaData metaData = state.getMetaData().index("test");
for (NodeEnvironment services : internalCluster().getInstances(NodeEnvironment.class)) {
IndexMetaData brokenMeta = IndexMetaData.builder(metaData).settings(metaData.getSettings()
.filter((s) -> "index.analysis.analyzer.test.tokenizer".equals(s) == false)).build();
IndexMetaData.FORMAT.write(brokenMeta, brokenMeta.getVersion(), services.indexPaths(brokenMeta.getIndex()));
}
internalCluster().fullRestart();
// ensureGreen(closedIndex) waits for the index to show up in the metadata
// this is crucial otherwise the state call below might not contain the index yet
ensureGreen(metaData.getIndex().getName());
state = client().admin().cluster().prepareState().get().getState();
assertEquals(IndexMetaData.State.CLOSE, state.getMetaData().index(metaData.getIndex()).getState());
// try to open it with the broken setting - fail again!
ElasticsearchException ex = expectThrows(ElasticsearchException.class, () -> client().admin().indices().prepareOpen("test").get());
assertEquals(ex.getMessage(), "Failed to verify index " + metaData.getIndex());
assertNotNull(ex.getCause());
assertEquals(MapperParsingException.class, ex.getCause().getClass());
assertEquals(ex.getCause().getMessage(), "analyzer [test] not found for field [field1]");
client().admin().indices().prepareUpdateSettings()
.setSettings(Settings.builder().put("index.analysis.analyzer.test.tokenizer", "keyword")).get();
client().admin().indices().prepareOpen("test").get();
ensureYellow();
logger.info("--> verify 1 doc in the index");
assertHitCount(client().prepareSearch().setQuery(matchQuery("field1", "value one")).get(), 1L);
}
} }

View File

@ -40,8 +40,7 @@ public class GatewayServiceTests extends ESTestCase {
.put("http.enabled", "false") .put("http.enabled", "false")
.put("discovery.type", "local") .put("discovery.type", "local")
.put(settings.build()).build(), .put(settings.build()).build(),
null, clusterService, null, null, null, null, new NoopDiscovery()); null, clusterService, null, null, null, null, new NoopDiscovery(), null, null);
} }
public void testDefaultRecoverAfterTime() throws IOException { public void testDefaultRecoverAfterTime() throws IOException {

View File

@ -20,9 +20,12 @@
package org.elasticsearch.ingest.processor; package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.TestProcessor; import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.ingest.core.CompoundProcessor; import org.elasticsearch.ingest.core.CompoundProcessor;
import org.elasticsearch.ingest.core.IngestDocument; import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.ingest.core.ValueSource;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList; import java.util.ArrayList;
@ -120,6 +123,42 @@ public class ForEachProcessorTests extends ESTestCase {
assertThat(ingestDocument.getFieldValue("values.1.id", String.class), equalTo("_id")); assertThat(ingestDocument.getFieldValue("values.1.id", String.class), equalTo("_id"));
} }
public void testRestOfTheDocumentIsAvailable() throws Exception {
List<Map<String, Object>> values = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Map<String, Object> object = new HashMap<>();
object.put("field", "value");
values.add(object);
}
Map<String, Object> document = new HashMap<>();
document.put("values", values);
document.put("flat_values", new ArrayList<>());
document.put("other", "value");
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", null, null, null, null, document);
TemplateService ts = TestTemplateService.instance();
ForEachProcessor processor = new ForEachProcessor(
"_tag", "values", Arrays.asList(
new AppendProcessor("_tag", ts.compile("flat_values"), ValueSource.wrap("value", ts)),
new SetProcessor("_tag", ts.compile("_value.new_field"), (model) -> model.get("other")))
);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue("values.0.new_field", String.class), equalTo("value"));
assertThat(ingestDocument.getFieldValue("values.1.new_field", String.class), equalTo("value"));
assertThat(ingestDocument.getFieldValue("values.2.new_field", String.class), equalTo("value"));
assertThat(ingestDocument.getFieldValue("values.3.new_field", String.class), equalTo("value"));
assertThat(ingestDocument.getFieldValue("values.4.new_field", String.class), equalTo("value"));
List<String> flatValues = ingestDocument.getFieldValue("flat_values", List.class);
assertThat(flatValues.size(), equalTo(5));
assertThat(flatValues.get(0), equalTo("value"));
assertThat(flatValues.get(1), equalTo("value"));
assertThat(flatValues.get(2), equalTo("value"));
assertThat(flatValues.get(3), equalTo("value"));
assertThat(flatValues.get(4), equalTo("value"));
}
public void testRandom() throws Exception { public void testRandom() throws Exception {
int numProcessors = randomInt(8); int numProcessors = randomInt(8);
List<Processor> processors = new ArrayList<>(numProcessors); List<Processor> processors = new ArrayList<>(numProcessors);

View File

@ -12,6 +12,15 @@ that plugins that define custom settings must register all of their settings
during plugin loading using the `SettingsModule#registerSettings(Setting)` during plugin loading using the `SettingsModule#registerSettings(Setting)`
method. method.
==== Index Level Settings
In previous versions Elasticsearch allowed to specify index level setting
as _defaults_ on the node level, inside the `elasticsearch.yaml` file or even via
command-line parameters. From Elasticsearch 5.0 on only selected settings like
for instance `index.codec` can be set on the node level. All other settings must be
set on each individual index. To set default values on every index, index templates
should be used instead.
==== Node settings ==== Node settings
The `name` setting has been removed and is replaced by `node.name`. Usage of The `name` setting has been removed and is replaced by `node.name`. Usage of

View File

@ -142,6 +142,9 @@ public class IndicesRequestTests extends ESIntegTestCase {
protected Settings nodeSettings(int ordinal) { protected Settings nodeSettings(int ordinal) {
// must set this independently of the plugin so it overrides MockTransportService // must set this independently of the plugin so it overrides MockTransportService
return Settings.builder().put(super.nodeSettings(ordinal)) return Settings.builder().put(super.nodeSettings(ordinal))
// InternalClusterInfoService sends IndicesStatsRequest periodically which messes with this test
// this setting disables it...
.put("cluster.routing.allocation.disk.threshold_enabled", false)
.put(NetworkModule.TRANSPORT_SERVICE_TYPE_KEY, "intercepting").build(); .put(NetworkModule.TRANSPORT_SERVICE_TYPE_KEY, "intercepting").build();
} }

View File

@ -219,3 +219,58 @@
- length: { _source: 2 } - length: { _source: 2 }
- match: { _source.do_nothing: "foo" } - match: { _source.do_nothing: "foo" }
- match: { _source.error: "processor first_processor [remove]: field [field_to_remove] not present as part of path [field_to_remove]" } - match: { _source.error: "processor first_processor [remove]: field [field_to_remove] not present as part of path [field_to_remove]" }
---
"Test rolling up json object arrays":
- do:
ingest.put_pipeline:
id: "_id"
body: >
{
"processors": [
{
"foreach": {
"field": "values",
"processors": [
{
"append": {
"field": "values_flat",
"value": "{{_value.key}}_{{_value.value}}"
}
}
]
}
}
]
}
- match: { acknowledged: true }
- do:
index:
index: test
type: test
id: 1
pipeline: "_id"
body: {
values_flat : [],
values: [
{
level: 1,
key: "foo",
value: "bar"
},
{
level: 2,
key: "foo",
value: "baz"
}
]
}
- do:
get:
index: test
type: test
id: 1
- length: { _source: 2 }
- match: { _source.values_flat: ["foo_bar", "foo_baz"] }

View File

@ -16,6 +16,10 @@
"type": "time", "type": "time",
"description": "Specify timeout for connection to master" "description": "Specify timeout for connection to master"
}, },
"preserve_existing": {
"type": "boolean",
"description": "Whether to update existing settings. If set to `true` existing settings on an index remain unchanged, the default is `false`"
},
"ignore_unavailable": { "ignore_unavailable": {
"type": "boolean", "type": "boolean",
"description": "Whether specified concrete indices should be ignored when unavailable (missing or closed)" "description": "Whether specified concrete indices should be ignored when unavailable (missing or closed)"

View File

@ -54,3 +54,70 @@ setup:
body: body:
number_of_replicas: 1 number_of_replicas: 1
---
"Test preserve_existing settings":
- do:
indices.put_settings:
index: test-index
body:
number_of_replicas: 0
- do:
indices.get_settings:
flat_settings: false
- match:
test-index.settings.index.number_of_replicas: "0"
- do:
indices.put_settings:
preserve_existing: true
index: test-index
body:
index.number_of_replicas: 1
index.translog.durability: "async"
- do:
indices.get_settings:
flat_settings: false
- match:
test-index.settings.index.number_of_replicas: "0"
- match:
test-index.settings.index.translog.durability: "async"
- do:
indices.close:
index: test-index
- do:
indices.put_settings:
preserve_existing: true
index: test-index
body:
index.translog.durability: "request"
index.query_string.lenient: "true"
- do:
indices.get_settings:
index: test-index
flat_settings: false
- match:
test-index.settings.index.query_string.lenient: "true"
- match:
test-index.settings.index.translog.durability: "async"
- do:
indices.open:
index: test-index
- do:
indices.get_settings:
index: test-index
flat_settings: false
- match:
test-index.settings.index.query_string.lenient: "true"
- match:
test-index.settings.index.translog.durability: "async"