diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml
index 46ddcb0ad0e..72e10cb9398 100644
--- a/buildSrc/src/main/resources/checkstyle_suppressions.xml
+++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml
@@ -419,7 +419,6 @@
-
diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java
index 36fa1895af4..b20f636977a 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java
@@ -80,6 +80,7 @@ public class TransportUpdateSettingsAction extends TransportMasterNodeActiontrue iff the settings update should only add but not update settings. If the setting already exists
+ * it should not be overwritten by this update. The default is false
+ */
+ public boolean isPreserveExisting() {
+ return preserveExisting;
+ }
+
+ /**
+ * Iff set to true
this settings update will only add settings not already set on an index. Existing settings remain
+ * unchanged.
+ */
+ public UpdateSettingsClusterStateUpdateRequest setPreserveExisting(boolean preserveExisting) {
+ this.preserveExisting = preserveExisting;
+ return this;
}
/**
diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/settings/put/UpdateSettingsRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/settings/put/UpdateSettingsRequest.java
index c654d6926fa..fb4525a6842 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/indices/settings/put/UpdateSettingsRequest.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/indices/settings/put/UpdateSettingsRequest.java
@@ -47,6 +47,7 @@ public class UpdateSettingsRequest extends AcknowledgedRequesttrue iff the settings update should only add but not update settings. If the setting already exists
+ * it should not be overwritten by this update. The default is false
+ */
+ public boolean isPreserveExisting() {
+ return preserveExisting;
+ }
+
+ /**
+ * Iff set to true
this settings update will only add settings not already set on an index. Existing settings remain
+ * unchanged.
+ */
+ public UpdateSettingsRequest setPreserveExisting(boolean preserveExisting) {
+ this.preserveExisting = preserveExisting;
+ return this;
+ }
+
/**
* Sets the settings to be updated (either json/yaml/properties format)
*/
@@ -149,6 +167,7 @@ public class UpdateSettingsRequest extends AcknowledgedRequest(Priority.URGENT, request, listener) {
@@ -221,7 +222,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
}
int updatedNumberOfReplicas = openSettings.getAsInt(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, -1);
- if (updatedNumberOfReplicas != -1) {
+ if (updatedNumberOfReplicas != -1 && preserveExisting == false) {
routingTableBuilder.updateNumberOfReplicas(updatedNumberOfReplicas, actualIndices);
metaDataBuilder.updateNumberOfReplicas(updatedNumberOfReplicas, actualIndices);
logger.info("updating number_of_replicas to [{}] for indices {}", updatedNumberOfReplicas, actualIndices);
@@ -239,6 +240,9 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
Settings.Builder updates = Settings.builder();
Settings.Builder indexSettings = Settings.builder().put(indexMetaData.getSettings());
if (indexScopedSettings.updateDynamicSettings(openSettings, indexSettings, updates, index.getName())) {
+ if (preserveExisting) {
+ indexSettings.put(indexMetaData.getSettings());
+ }
metaDataBuilder.put(IndexMetaData.builder(indexMetaData).settings(indexSettings));
}
}
@@ -250,6 +254,9 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
Settings.Builder updates = Settings.builder();
Settings.Builder indexSettings = Settings.builder().put(indexMetaData.getSettings());
if (indexScopedSettings.updateSettings(closedSettings, indexSettings, updates, index.getName())) {
+ if (preserveExisting) {
+ indexSettings.put(indexMetaData.getSettings());
+ }
metaDataBuilder.put(IndexMetaData.builder(indexMetaData).settings(indexSettings));
}
}
diff --git a/core/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java b/core/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java
index adffb8e9e01..410adc82da1 100644
--- a/core/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java
+++ b/core/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java
@@ -19,7 +19,11 @@
package org.elasticsearch.common.settings;
+import org.apache.lucene.search.spell.LevensteinDistance;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.regex.Regex;
@@ -30,10 +34,14 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
/**
* A basic setting service that can be used for per-index and per-cluster settings.
@@ -221,9 +229,17 @@ public abstract class AbstractScopedSettings extends AbstractComponent {
* * Validates that all given settings are registered and valid
*/
public final void validate(Settings settings) {
- for (Map.Entry entry : settings.getAsMap().entrySet()) {
- validate(entry.getKey(), settings);
+ List exceptions = new ArrayList<>();
+ // we want them sorted for deterministic error messages
+ SortedMap sortedSettings = new TreeMap<>(settings.getAsMap());
+ for (Map.Entry entry : sortedSettings.entrySet()) {
+ try {
+ validate(entry.getKey(), settings);
+ } catch (RuntimeException ex) {
+ exceptions.add(ex);
+ }
}
+ ExceptionsHelper.rethrowAndSuppress(exceptions);
}
@@ -233,7 +249,21 @@ public abstract class AbstractScopedSettings extends AbstractComponent {
public final void validate(String key, Settings settings) {
Setting setting = get(key);
if (setting == null) {
- throw new IllegalArgumentException("unknown setting [" + key + "]");
+ LevensteinDistance ld = new LevensteinDistance();
+ List> scoredKeys = new ArrayList<>();
+ for (String k : this.keySettings.keySet()) {
+ float distance = ld.getDistance(key, k);
+ if (distance > 0.7f) {
+ scoredKeys.add(new Tuple<>(distance, k));
+ }
+ }
+ CollectionUtil.timSort(scoredKeys, (a,b) -> b.v1().compareTo(a.v1()));
+ String msg = "unknown setting [" + key + "]";
+ List keys = scoredKeys.stream().map((a) -> a.v2()).collect(Collectors.toList());
+ if (keys.isEmpty() == false) {
+ msg += " did you mean " + (keys.size() == 1 ? "[" + keys.get(0) + "]": "any of " + keys.toString()) + "?";
+ }
+ throw new IllegalArgumentException(msg);
}
setting.get(settings);
}
diff --git a/core/src/main/java/org/elasticsearch/common/settings/SettingsModule.java b/core/src/main/java/org/elasticsearch/common/settings/SettingsModule.java
index 33233ff627e..2e7acd6ae8c 100644
--- a/core/src/main/java/org/elasticsearch/common/settings/SettingsModule.java
+++ b/core/src/main/java/org/elasticsearch/common/settings/SettingsModule.java
@@ -20,13 +20,22 @@
package org.elasticsearch.common.settings;
import org.elasticsearch.common.inject.AbstractModule;
+import org.elasticsearch.common.logging.ESLogger;
+import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.tribe.TribeService;
+import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
/**
* A module that binds the provided settings to the {@link Settings} interface.
@@ -37,9 +46,12 @@ public class SettingsModule extends AbstractModule {
private final Set settingsFilterPattern = new HashSet<>();
private final Map> nodeSettings = new HashMap<>();
private final Map> indexSettings = new HashMap<>();
- private static final Predicate TRIBE_CLIENT_NODE_SETTINGS_PREDICATE = (s) -> s.startsWith("tribe.") && TribeService.TRIBE_SETTING_KEYS.contains(s) == false;
+ private static final Predicate TRIBE_CLIENT_NODE_SETTINGS_PREDICATE = (s) -> s.startsWith("tribe.")
+ && TribeService.TRIBE_SETTING_KEYS.contains(s) == false;
+ private final ESLogger logger;
public SettingsModule(Settings settings) {
+ logger = Loggers.getLogger(getClass(), settings);
this.settings = settings;
for (Setting> setting : ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) {
registerSetting(setting);
@@ -53,6 +65,56 @@ public class SettingsModule extends AbstractModule {
protected void configure() {
final IndexScopedSettings indexScopedSettings = new IndexScopedSettings(settings, new HashSet<>(this.indexSettings.values()));
final ClusterSettings clusterSettings = new ClusterSettings(settings, new HashSet<>(this.nodeSettings.values()));
+ Settings indexSettings = settings.filter((s) -> s.startsWith("index.") && clusterSettings.get(s) == null);
+ if (indexSettings.isEmpty() == false) {
+ try {
+ String separator = IntStream.range(0, 85).mapToObj(s -> "*").collect(Collectors.joining("")).trim();
+ StringBuilder builder = new StringBuilder();
+ builder.append(System.lineSeparator());
+ builder.append(separator);
+ builder.append(System.lineSeparator());
+ builder.append("Found index level settings on node level configuration.");
+ builder.append(System.lineSeparator());
+ builder.append(System.lineSeparator());
+ int count = 0;
+ for (String word : ("Since elasticsearch 5.x index level settings can NOT be set on the nodes configuration like " +
+ "the elasticsearch.yaml, in system properties or command line arguments." +
+ "In order to upgrade all indices the settings must be updated via the /${index}/_settings API. " +
+ "Unless all settings are dynamic all indices must be closed in order to apply the upgrade" +
+ "Indices created in the future should use index templates to set default values."
+ ).split(" ")) {
+ if (count + word.length() > 85) {
+ builder.append(System.lineSeparator());
+ count = 0;
+ }
+ count += word.length() + 1;
+ builder.append(word).append(" ");
+ }
+
+ builder.append(System.lineSeparator());
+ builder.append(System.lineSeparator());
+ builder.append("Please ensure all required values are updated on all indices by executing: ");
+ builder.append(System.lineSeparator());
+ builder.append(System.lineSeparator());
+ builder.append("curl -XPUT 'http://localhost:9200/_all/_settings?preserve_existing=true' -d '");
+ try (XContentBuilder xContentBuilder = XContentBuilder.builder(XContentType.JSON.xContent())) {
+ xContentBuilder.prettyPrint();
+ xContentBuilder.startObject();
+ indexSettings.toXContent(xContentBuilder, new ToXContent.MapParams(Collections.singletonMap("flat_settings", "true")));
+ xContentBuilder.endObject();
+ builder.append(xContentBuilder.string());
+ }
+ builder.append("'");
+ builder.append(System.lineSeparator());
+ builder.append(separator);
+ builder.append(System.lineSeparator());
+
+ logger.warn(builder.toString());
+ throw new IllegalArgumentException("node settings must not contain any index level settings");
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
// by now we are fully configured, lets check node level settings for unregistered index settings
final Predicate acceptOnlyClusterSettings = TRIBE_CLIENT_NODE_SETTINGS_PREDICATE.negate();
clusterSettings.validate(settings.filter(acceptOnlyClusterSettings));
diff --git a/core/src/main/java/org/elasticsearch/gateway/Gateway.java b/core/src/main/java/org/elasticsearch/gateway/Gateway.java
index f5d38112c4f..c879e6ab710 100644
--- a/core/src/main/java/org/elasticsearch/gateway/Gateway.java
+++ b/core/src/main/java/org/elasticsearch/gateway/Gateway.java
@@ -19,9 +19,6 @@
package org.elasticsearch.gateway;
-import com.carrotsearch.hppc.ObjectFloatHashMap;
-import com.carrotsearch.hppc.ObjectHashSet;
-import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.cluster.ClusterChangedEvent;
@@ -34,9 +31,11 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.env.NodeEnvironment;
-import org.elasticsearch.index.Index;
+import org.elasticsearch.index.NodeServicesProvider;
+import org.elasticsearch.indices.IndicesService;
import java.nio.file.Path;
+import java.util.Arrays;
import java.util.function.Supplier;
/**
@@ -53,10 +52,15 @@ public class Gateway extends AbstractComponent implements ClusterStateListener {
private final TransportNodesListGatewayMetaState listGatewayMetaState;
private final Supplier minimumMasterNodesProvider;
+ private final IndicesService indicesService;
+ private final NodeServicesProvider nodeServicesProvider;
public Gateway(Settings settings, ClusterService clusterService, NodeEnvironment nodeEnv, GatewayMetaState metaState,
- TransportNodesListGatewayMetaState listGatewayMetaState, Discovery discovery) {
+ TransportNodesListGatewayMetaState listGatewayMetaState, Discovery discovery,
+ NodeServicesProvider nodeServicesProvider, IndicesService indicesService) {
super(settings);
+ this.nodeServicesProvider = nodeServicesProvider;
+ this.indicesService = indicesService;
this.clusterService = clusterService;
this.nodeEnv = nodeEnv;
this.metaState = metaState;
@@ -66,9 +70,9 @@ public class Gateway extends AbstractComponent implements ClusterStateListener {
}
public void performStateRecovery(final GatewayStateRecoveredListener listener) throws GatewayException {
- ObjectHashSet nodesIds = new ObjectHashSet<>(clusterService.state().nodes().masterNodes().keys());
- logger.trace("performing state recovery from {}", nodesIds);
- TransportNodesListGatewayMetaState.NodesGatewayMetaState nodesState = listGatewayMetaState.list(nodesIds.toArray(String.class), null).actionGet();
+ String[] nodesIds = clusterService.state().nodes().masterNodes().keys().toArray(String.class);
+ logger.trace("performing state recovery from {}", Arrays.toString(nodesIds));
+ TransportNodesListGatewayMetaState.NodesGatewayMetaState nodesState = listGatewayMetaState.list(nodesIds, null).actionGet();
int requiredAllocation = Math.max(1, minimumMasterNodesProvider.get());
@@ -80,7 +84,6 @@ public class Gateway extends AbstractComponent implements ClusterStateListener {
}
}
- ObjectFloatHashMap indices = new ObjectFloatHashMap<>();
MetaData electedGlobalState = null;
int found = 0;
for (TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState) {
@@ -93,55 +96,34 @@ public class Gateway extends AbstractComponent implements ClusterStateListener {
} else if (nodeState.metaData().version() > electedGlobalState.version()) {
electedGlobalState = nodeState.metaData();
}
- for (ObjectCursor cursor : nodeState.metaData().indices().values()) {
- indices.addTo(cursor.value.getIndex(), 1);
- }
}
if (found < requiredAllocation) {
listener.onFailure("found [" + found + "] metadata states, required [" + requiredAllocation + "]");
return;
}
- // update the global state, and clean the indices, we elect them in the next phase
- MetaData.Builder metaDataBuilder = MetaData.builder(electedGlobalState).removeAllIndices();
-
- assert !indices.containsKey(null);
- final Object[] keys = indices.keys;
- for (int i = 0; i < keys.length; i++) {
- if (keys[i] != null) {
- Index index = (Index) keys[i];
- IndexMetaData electedIndexMetaData = null;
- int indexMetaDataCount = 0;
- for (TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState) {
- if (nodeState.metaData() == null) {
- continue;
- }
- IndexMetaData indexMetaData = nodeState.metaData().index(index);
- if (indexMetaData == null) {
- continue;
- }
- if (electedIndexMetaData == null) {
- electedIndexMetaData = indexMetaData;
- } else if (indexMetaData.getVersion() > electedIndexMetaData.getVersion()) {
- electedIndexMetaData = indexMetaData;
- }
- indexMetaDataCount++;
- }
- if (electedIndexMetaData != null) {
- if (indexMetaDataCount < requiredAllocation) {
- logger.debug("[{}] found [{}], required [{}], not adding", index, indexMetaDataCount, requiredAllocation);
- }
- metaDataBuilder.put(electedIndexMetaData, false);
+ // verify index metadata
+ MetaData.Builder metaDataBuilder = MetaData.builder(electedGlobalState);
+ for (IndexMetaData indexMetaData : electedGlobalState) {
+ try {
+ if (indexMetaData.getState() == IndexMetaData.State.OPEN) {
+ // verify that we can actually create this index - if not we recover it as closed with lots of warn logs
+ indicesService.verifyIndexMetadata(nodeServicesProvider, indexMetaData);
}
+ } catch (Exception e) {
+ logger.warn("recovering index {} failed - recovering as closed", e, indexMetaData.getIndex());
+ indexMetaData = IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE).build();
+ metaDataBuilder.put(indexMetaData, true);
}
}
ClusterState.Builder builder = ClusterState.builder(clusterService.state().getClusterName());
builder.metaData(metaDataBuilder);
listener.onSuccess(builder.build());
}
+
public void reset() throws Exception {
try {
Path[] dataPaths = nodeEnv.nodeDataPaths();
- logger.trace("removing node data paths: [{}]", (Object)dataPaths);
+ logger.trace("removing node data paths: [{}]", (Object) dataPaths);
IOUtils.rm(dataPaths);
} catch (Exception ex) {
logger.debug("failed to delete shard locations", ex);
diff --git a/core/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java b/core/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java
index 950b4351e1d..1da82468997 100644
--- a/core/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java
+++ b/core/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java
@@ -86,8 +86,8 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
if (DiscoveryNode.masterNode(settings) || DiscoveryNode.dataNode(settings)) {
try {
ensureNoPre019State();
- pre20Upgrade();
IndexFolderUpgrader.upgradeIndicesIfNeeded(settings, nodeEnv);
+ upgradeMetaData();
long startNS = System.nanoTime();
metaStateService.loadFullState();
logger.debug("took {} to load state", TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - startNS)));
@@ -222,7 +222,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
* MetaDataIndexUpgradeService might also update obsolete settings if needed. When this happens we rewrite
* index metadata with new settings.
*/
- private void pre20Upgrade() throws Exception {
+ private void upgradeMetaData() throws Exception {
MetaData metaData = loadMetaState();
List updateIndexMetaData = new ArrayList<>();
for (IndexMetaData indexMetaData : metaData) {
@@ -235,7 +235,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
// means the upgrade can continue. Now it's safe to overwrite index metadata with the new version.
for (IndexMetaData indexMetaData : updateIndexMetaData) {
// since we still haven't upgraded the index folders, we write index state in the old folder
- metaStateService.writeIndex("upgrade", indexMetaData, nodeEnv.resolveIndexFolder(indexMetaData.getIndex().getName()));
+ metaStateService.writeIndex("upgrade", indexMetaData, nodeEnv.resolveIndexFolder(indexMetaData.getIndex().getUUID()));
}
}
diff --git a/core/src/main/java/org/elasticsearch/gateway/GatewayService.java b/core/src/main/java/org/elasticsearch/gateway/GatewayService.java
index 16d67a84c4a..7dcc45f1c0a 100644
--- a/core/src/main/java/org/elasticsearch/gateway/GatewayService.java
+++ b/core/src/main/java/org/elasticsearch/gateway/GatewayService.java
@@ -43,6 +43,8 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.env.NodeEnvironment;
+import org.elasticsearch.index.NodeServicesProvider;
+import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
@@ -95,9 +97,11 @@ public class GatewayService extends AbstractLifecycleComponent i
@Inject
public GatewayService(Settings settings, AllocationService allocationService, ClusterService clusterService,
ThreadPool threadPool, NodeEnvironment nodeEnvironment, GatewayMetaState metaState,
- TransportNodesListGatewayMetaState listGatewayMetaState, Discovery discovery) {
+ TransportNodesListGatewayMetaState listGatewayMetaState, Discovery discovery,
+ NodeServicesProvider nodeServicesProvider, IndicesService indicesService) {
super(settings);
- this.gateway = new Gateway(settings, clusterService, nodeEnvironment, metaState, listGatewayMetaState, discovery);
+ this.gateway = new Gateway(settings, clusterService, nodeEnvironment, metaState, listGatewayMetaState, discovery,
+ nodeServicesProvider, indicesService);
this.allocationService = allocationService;
this.clusterService = clusterService;
this.threadPool = threadPool;
diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShardStateMetaData.java b/core/src/main/java/org/elasticsearch/index/shard/ShardStateMetaData.java
index 315371c7286..407f271fd65 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/ShardStateMetaData.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/ShardStateMetaData.java
@@ -102,7 +102,7 @@ public final class ShardStateMetaData {
return "version [" + legacyVersion + "], primary [" + primary + "], allocation [" + allocationId + "]";
}
- public static final MetaDataStateFormat FORMAT = new MetaDataStateFormat(XContentType.JSON, SHARD_STATE_FILE_PREFIX) {
+ public static final MetaDataStateFormat FORMAT = new MetaDataStateFormat(XContentType.SMILE, SHARD_STATE_FILE_PREFIX) {
@Override
protected XContentBuilder newXContentBuilder(XContentType type, OutputStream stream) throws IOException {
diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesService.java b/core/src/main/java/org/elasticsearch/indices/IndicesService.java
index b43d33b1bd9..a0dba7089a9 100644
--- a/core/src/main/java/org/elasticsearch/indices/IndicesService.java
+++ b/core/src/main/java/org/elasticsearch/indices/IndicesService.java
@@ -19,6 +19,7 @@
package org.elasticsearch.indices;
+import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.CollectionUtil;
@@ -33,6 +34,7 @@ import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.CircuitBreaker;
@@ -66,6 +68,7 @@ import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
+import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.refresh.RefreshStats;
@@ -74,6 +77,7 @@ import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
+import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStoreConfig;
@@ -88,9 +92,11 @@ import org.elasticsearch.search.query.QueryPhase;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.threadpool.ThreadPool;
+import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
@@ -324,6 +330,7 @@ public class IndicesService extends AbstractLifecycleComponent i
* @throws IndexAlreadyExistsException if the index already exists.
*/
public synchronized IndexService createIndex(final NodeServicesProvider nodeServicesProvider, IndexMetaData indexMetaData, List builtInListeners) throws IOException {
+
if (!lifecycle.started()) {
throw new IllegalStateException("Can't create an index [" + indexMetaData.getIndex() + "], node is closed");
}
@@ -331,37 +338,22 @@ public class IndicesService extends AbstractLifecycleComponent i
throw new IllegalArgumentException("index must have a real UUID found value: [" + indexMetaData.getIndexUUID() + "]");
}
final Index index = indexMetaData.getIndex();
- final Predicate indexNameMatcher = (indexExpression) -> indexNameExpressionResolver.matchesIndex(index.getName(), indexExpression, clusterService.state());
- final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexNameMatcher, indexScopeSetting);
if (hasIndex(index)) {
throw new IndexAlreadyExistsException(index);
}
- logger.debug("creating Index [{}], shards [{}]/[{}{}]",
- indexMetaData.getIndex(),
- idxSettings.getNumberOfShards(),
- idxSettings.getNumberOfReplicas(),
- idxSettings.isShadowReplicaIndex() ? "s" : "");
-
- final IndexModule indexModule = new IndexModule(idxSettings, indexStoreConfig, analysisRegistry);
- pluginsService.onIndexModule(indexModule);
- for (IndexEventListener listener : builtInListeners) {
- indexModule.addIndexEventListener(listener);
- }
+ List finalListeners = new ArrayList<>(builtInListeners);
final IndexEventListener onStoreClose = new IndexEventListener() {
@Override
public void onStoreClosed(ShardId shardId) {
indicesQueryCache.onClose(shardId);
}
};
- indexModule.addIndexEventListener(onStoreClose);
- indexModule.addIndexEventListener(oldShardsStats);
- final IndexEventListener listener = indexModule.freeze();
- listener.beforeIndexCreated(index, idxSettings.getSettings());
- final IndexService indexService = indexModule.newIndexService(nodeEnv, this, nodeServicesProvider, indicesQueryCache, mapperRegistry, indicesFieldDataCache, indexingMemoryController);
+ finalListeners.add(onStoreClose);
+ finalListeners.add(oldShardsStats);
+ final IndexService indexService = createIndexService("create index", nodeServicesProvider, indexMetaData, indicesQueryCache, indicesFieldDataCache, finalListeners, indexingMemoryController);
boolean success = false;
try {
- assert indexService.getIndexEventListener() == listener;
- listener.afterIndexCreated(indexService);
+ indexService.getIndexEventListener().afterIndexCreated(indexService);
indices = newMapBuilder(indices).put(index.getUUID(), indexService).immutableMap();
success = true;
return indexService;
@@ -370,7 +362,54 @@ public class IndicesService extends AbstractLifecycleComponent i
indexService.close("plugins_failed", true);
}
}
+ }
+ /**
+ * This creates a new IndexService without registering it
+ */
+ private synchronized IndexService createIndexService(final String reason, final NodeServicesProvider nodeServicesProvider, IndexMetaData indexMetaData, IndicesQueryCache indicesQueryCache, IndicesFieldDataCache indicesFieldDataCache, List builtInListeners, IndexingOperationListener... indexingOperationListeners) throws IOException {
+ final Index index = indexMetaData.getIndex();
+ final Predicate indexNameMatcher = (indexExpression) -> indexNameExpressionResolver.matchesIndex(index.getName(), indexExpression, clusterService.state());
+ final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexNameMatcher, indexScopeSetting);
+ logger.debug("creating Index [{}], shards [{}]/[{}{}] - reason [{}]",
+ indexMetaData.getIndex(),
+ idxSettings.getNumberOfShards(),
+ idxSettings.getNumberOfReplicas(),
+ idxSettings.isShadowReplicaIndex() ? "s" : "", reason);
+
+ final IndexModule indexModule = new IndexModule(idxSettings, indexStoreConfig, analysisRegistry);
+ pluginsService.onIndexModule(indexModule);
+ for (IndexEventListener listener : builtInListeners) {
+ indexModule.addIndexEventListener(listener);
+ }
+ final IndexEventListener listener = indexModule.freeze();
+ listener.beforeIndexCreated(index, idxSettings.getSettings());
+ return indexModule.newIndexService(nodeEnv, this, nodeServicesProvider, indicesQueryCache, mapperRegistry, indicesFieldDataCache, indexingOperationListeners);
+ }
+
+ /**
+ * This method verifies that the given {@link IndexMetaData} holds sane values to create an {@link IndexService}. This method will throw an
+ * exception if the creation fails. The created {@link IndexService} will not be registered and will be closed immediately.
+ */
+ public synchronized void verifyIndexMetadata(final NodeServicesProvider nodeServicesProvider, IndexMetaData metaData) throws IOException {
+ final List closeables = new ArrayList<>();
+ try {
+ IndicesFieldDataCache indicesFieldDataCache = new IndicesFieldDataCache(settings, new IndexFieldDataCache.Listener() {});
+ closeables.add(indicesFieldDataCache);
+ IndicesQueryCache indicesQueryCache = new IndicesQueryCache(settings);
+ closeables.add(indicesQueryCache);
+ // this will also fail if some plugin fails etc. which is nice since we can verify that early
+ final IndexService service = createIndexService("metadata verification", nodeServicesProvider,
+ metaData, indicesQueryCache, indicesFieldDataCache, Collections.emptyList());
+ for (ObjectCursor typeMapping : metaData.getMappings().values()) {
+ // don't apply the default mapping, it has been applied when the mapping was created
+ service.mapperService().merge(typeMapping.value.type(), typeMapping.value.source(),
+ MapperService.MergeReason.MAPPING_RECOVERY, true);
+ }
+ closeables.add(() -> service.close("metadata verification", false));
+ } finally {
+ IOUtils.close(closeables);
+ }
}
/**
diff --git a/core/src/main/java/org/elasticsearch/ingest/processor/ForEachProcessor.java b/core/src/main/java/org/elasticsearch/ingest/processor/ForEachProcessor.java
index 5b101fbfb32..b6f6a85d219 100644
--- a/core/src/main/java/org/elasticsearch/ingest/processor/ForEachProcessor.java
+++ b/core/src/main/java/org/elasticsearch/ingest/processor/ForEachProcessor.java
@@ -59,11 +59,8 @@ public final class ForEachProcessor extends AbstractProcessor {
List