listener, final int retryCount) {
IndexService indexService = indicesService.indexServiceSafe(request.concreteIndex());
- IndexShard indexShard = indexService.shardSafe(request.shardId());
+ IndexShard indexShard = indexService.getShard(request.shardId());
final UpdateHelper.Result result = updateHelper.prepare(request, indexShard);
switch (result.operation()) {
case UPSERT:
@@ -266,7 +266,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
UpdateResponse update = result.action();
IndexService indexServiceOrNull = indicesService.indexService(request.concreteIndex());
if (indexServiceOrNull != null) {
- IndexShard shard = indexService.shard(request.shardId());
+ IndexShard shard = indexService.getShardOrNull(request.shardId());
if (shard != null) {
shard.indexingService().noopUpdate(request.type());
}
diff --git a/core/src/main/java/org/elasticsearch/bootstrap/BootstrapInfo.java b/core/src/main/java/org/elasticsearch/bootstrap/BootstrapInfo.java
index 76485bb86e5..f1278af96f4 100644
--- a/core/src/main/java/org/elasticsearch/bootstrap/BootstrapInfo.java
+++ b/core/src/main/java/org/elasticsearch/bootstrap/BootstrapInfo.java
@@ -45,9 +45,16 @@ public final class BootstrapInfo {
}
/**
- * Returns true if secure computing mode is enabled (linux/amd64 only)
+ * Returns true if secure computing mode is enabled (linux/amd64, OS X only)
*/
public static boolean isSeccompInstalled() {
return Natives.isSeccompInstalled();
}
+
+ /**
+ * codebase location for untrusted scripts (provide some additional safety)
+ *
+ * This is not a full URL, just a path.
+ */
+ public static final String UNTRUSTED_CODEBASE = "/untrusted";
}
diff --git a/core/src/main/java/org/elasticsearch/bootstrap/ESPolicy.java b/core/src/main/java/org/elasticsearch/bootstrap/ESPolicy.java
index 9db66ca9c14..ae993f25814 100644
--- a/core/src/main/java/org/elasticsearch/bootstrap/ESPolicy.java
+++ b/core/src/main/java/org/elasticsearch/bootstrap/ESPolicy.java
@@ -26,29 +26,27 @@ import java.net.URL;
import java.security.CodeSource;
import java.security.Permission;
import java.security.PermissionCollection;
-import java.security.Permissions;
import java.security.Policy;
import java.security.ProtectionDomain;
import java.security.URIParameter;
-import java.util.PropertyPermission;
/** custom policy for union of static and dynamic permissions */
final class ESPolicy extends Policy {
/** template policy file, the one used in tests */
static final String POLICY_RESOURCE = "security.policy";
- /** limited policy for groovy scripts */
- static final String GROOVY_RESOURCE = "groovy.policy";
+ /** limited policy for scripts */
+ static final String UNTRUSTED_RESOURCE = "untrusted.policy";
final Policy template;
- final Policy groovy;
+ final Policy untrusted;
final PermissionCollection dynamic;
public ESPolicy(PermissionCollection dynamic) throws Exception {
URI policyUri = getClass().getResource(POLICY_RESOURCE).toURI();
- URI groovyUri = getClass().getResource(GROOVY_RESOURCE).toURI();
+ URI untrustedUri = getClass().getResource(UNTRUSTED_RESOURCE).toURI();
this.template = Policy.getInstance("JavaPolicy", new URIParameter(policyUri));
- this.groovy = Policy.getInstance("JavaPolicy", new URIParameter(groovyUri));
+ this.untrusted = Policy.getInstance("JavaPolicy", new URIParameter(untrustedUri));
this.dynamic = dynamic;
}
@@ -56,15 +54,17 @@ final class ESPolicy extends Policy {
public boolean implies(ProtectionDomain domain, Permission permission) {
CodeSource codeSource = domain.getCodeSource();
// codesource can be null when reducing privileges via doPrivileged()
- if (codeSource != null) {
- URL location = codeSource.getLocation();
- // location can be null... ??? nobody knows
- // https://bugs.openjdk.java.net/browse/JDK-8129972
- if (location != null) {
- // run groovy scripts with no permissions (except logging property)
- if ("/groovy/script".equals(location.getFile())) {
- return groovy.implies(domain, permission);
- }
+ if (codeSource == null) {
+ return false;
+ }
+
+ URL location = codeSource.getLocation();
+ // location can be null... ??? nobody knows
+ // https://bugs.openjdk.java.net/browse/JDK-8129972
+ if (location != null) {
+ // run scripts with limited permissions
+ if (BootstrapInfo.UNTRUSTED_CODEBASE.equals(location.getFile())) {
+ return untrusted.implies(domain, permission);
}
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java
index 3e027d1c5cb..049eadcb281 100644
--- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java
+++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java
@@ -204,6 +204,9 @@ public class MetaDataCreateIndexService extends AbstractComponent {
if (state.metaData().hasAlias(index)) {
throw new InvalidIndexNameException(new Index(index), index, "already exists as alias");
}
+ if (index.equals(".") || index.equals("..")) {
+ throw new InvalidIndexNameException(new Index(index), index, "must not be '.' or '..'");
+ }
}
private void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener listener, final Semaphore mdLock) {
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java
index 68f708d194a..f21ced8cc0a 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java
@@ -602,12 +602,12 @@ public class DiskThresholdDecider extends AllocationDecider {
return allocation.decision(Decision.YES, NAME, "disk threshold decider disabled");
}
- // Allow allocation regardless if only a single node is available
- if (allocation.nodes().size() <= 1) {
+ // Allow allocation regardless if only a single data node is available
+ if (allocation.nodes().dataNodes().size() <= 1) {
if (logger.isTraceEnabled()) {
- logger.trace("only a single node is present, allowing allocation");
+ logger.trace("only a single data node is present, allowing allocation");
}
- return allocation.decision(Decision.YES, NAME, "only a single node is present");
+ return allocation.decision(Decision.YES, NAME, "only a single data node is present");
}
// Fail open there is no info available
diff --git a/core/src/main/java/org/elasticsearch/common/hash/MessageDigests.java b/core/src/main/java/org/elasticsearch/common/hash/MessageDigests.java
new file mode 100644
index 00000000000..7b3a108cc45
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/common/hash/MessageDigests.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.hash;
+
+import org.elasticsearch.ElasticsearchException;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+public class MessageDigests {
+
+ private static final MessageDigest MD5_DIGEST;
+ private static final MessageDigest SHA_1_DIGEST;
+ private static final MessageDigest SHA_256_DIGEST;
+
+ static {
+ try {
+ MD5_DIGEST = MessageDigest.getInstance("MD5");
+ SHA_1_DIGEST = MessageDigest.getInstance("SHA-1");
+ SHA_256_DIGEST = MessageDigest.getInstance("SHA-256");
+ } catch (NoSuchAlgorithmException e) {
+ throw new ElasticsearchException("Unexpected exception creating MessageDigest instance", e);
+ }
+ }
+
+ public static MessageDigest md5() {
+ return clone(MD5_DIGEST);
+ }
+
+ public static MessageDigest sha1() {
+ return clone(SHA_1_DIGEST);
+ }
+
+ public static MessageDigest sha256() {
+ return clone(SHA_256_DIGEST);
+ }
+
+ private static MessageDigest clone(MessageDigest messageDigest) {
+ try {
+ return (MessageDigest) messageDigest.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new ElasticsearchException("Unexpected exception cloning MessageDigest instance", e);
+ }
+ }
+
+ private static final char[] HEX_DIGITS = "0123456789abcdef".toCharArray();
+ public static String toHexString(byte[] bytes) {
+ if (bytes == null) {
+ throw new NullPointerException("bytes");
+ }
+ StringBuilder sb = new StringBuilder(2 * bytes.length);
+
+ for (int i = 0; i < bytes.length; i++) {
+ byte b = bytes[i];
+ sb.append(HEX_DIGITS[b >> 4 & 0xf]).append(HEX_DIGITS[b & 0xf]);
+ }
+
+ return sb.toString();
+ }
+}
diff --git a/core/src/main/java/org/elasticsearch/common/http/client/HttpDownloadHelper.java b/core/src/main/java/org/elasticsearch/common/http/client/HttpDownloadHelper.java
index ed2de6e5e7d..7fe26ed81d9 100644
--- a/core/src/main/java/org/elasticsearch/common/http/client/HttpDownloadHelper.java
+++ b/core/src/main/java/org/elasticsearch/common/http/client/HttpDownloadHelper.java
@@ -19,19 +19,22 @@
package org.elasticsearch.common.http.client;
-import java.nio.charset.StandardCharsets;
-import com.google.common.hash.Hashing;
import org.apache.lucene.util.IOUtils;
-import org.elasticsearch.*;
+import org.elasticsearch.Build;
+import org.elasticsearch.ElasticsearchCorruptionException;
+import org.elasticsearch.ElasticsearchTimeoutException;
+import org.elasticsearch.Version;
import org.elasticsearch.common.Base64;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.common.unit.TimeValue;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
@@ -96,7 +99,7 @@ public class HttpDownloadHelper {
public static Checksummer SHA1_CHECKSUM = new Checksummer() {
@Override
public String checksum(byte[] filebytes) {
- return Hashing.sha1().hashBytes(filebytes).toString();
+ return MessageDigests.toHexString(MessageDigests.sha1().digest(filebytes));
}
@Override
@@ -109,7 +112,7 @@ public class HttpDownloadHelper {
public static Checksummer MD5_CHECKSUM = new Checksummer() {
@Override
public String checksum(byte[] filebytes) {
- return Hashing.md5().hashBytes(filebytes).toString();
+ return MessageDigests.toHexString(MessageDigests.md5().digest(filebytes));
}
@Override
diff --git a/core/src/main/java/org/elasticsearch/common/logging/log4j/LogConfigurator.java b/core/src/main/java/org/elasticsearch/common/logging/log4j/LogConfigurator.java
index b30928bdef2..c195226d629 100644
--- a/core/src/main/java/org/elasticsearch/common/logging/log4j/LogConfigurator.java
+++ b/core/src/main/java/org/elasticsearch/common/logging/log4j/LogConfigurator.java
@@ -93,12 +93,14 @@ public class LogConfigurator {
loaded = true;
// TODO: this is partly a copy of InternalSettingsPreparer...we should pass in Environment and not do all this...
Environment environment = new Environment(settings);
- Settings.Builder settingsBuilder = settingsBuilder().put(settings);
+ Settings.Builder settingsBuilder = settingsBuilder();
resolveConfig(environment, settingsBuilder);
settingsBuilder
.putProperties("elasticsearch.", System.getProperties())
- .putProperties("es.", System.getProperties())
- .replacePropertyPlaceholders();
+ .putProperties("es.", System.getProperties());
+ // add custom settings after config was added so that they are not overwritten by config
+ settingsBuilder.put(settings);
+ settingsBuilder.replacePropertyPlaceholders();
Properties props = new Properties();
for (Map.Entry entry : settingsBuilder.build().getAsMap().entrySet()) {
String key = "log4j." + entry.getKey();
diff --git a/core/src/main/java/org/elasticsearch/index/IndexModule.java b/core/src/main/java/org/elasticsearch/index/IndexModule.java
index d94eb4f9c7d..192984815d2 100644
--- a/core/src/main/java/org/elasticsearch/index/IndexModule.java
+++ b/core/src/main/java/org/elasticsearch/index/IndexModule.java
@@ -20,21 +20,37 @@
package org.elasticsearch.index;
import org.elasticsearch.common.inject.AbstractModule;
-import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.inject.util.Providers;
+import org.elasticsearch.index.aliases.IndexAliasesService;
+import org.elasticsearch.index.engine.EngineFactory;
+import org.elasticsearch.index.engine.InternalEngineFactory;
+import org.elasticsearch.index.fielddata.IndexFieldDataService;
+import org.elasticsearch.index.mapper.MapperService;
+import org.elasticsearch.index.shard.IndexSearcherWrapper;
/**
*
*/
public class IndexModule extends AbstractModule {
- private final Settings settings;
-
- public IndexModule(Settings settings) {
- this.settings = settings;
- }
-
+ // pkg private so tests can mock
+ Class extends EngineFactory> engineFactoryImpl = InternalEngineFactory.class;
+ Class extends IndexSearcherWrapper> indexSearcherWrapper = null;
+
@Override
protected void configure() {
+ bind(EngineFactory.class).to(engineFactoryImpl).asEagerSingleton();
+ if (indexSearcherWrapper == null) {
+ bind(IndexSearcherWrapper.class).toProvider(Providers.of(null));
+ } else {
+ bind(IndexSearcherWrapper.class).to(indexSearcherWrapper).asEagerSingleton();
+ }
bind(IndexService.class).asEagerSingleton();
+ bind(IndexServicesProvider.class).asEagerSingleton();
+ bind(MapperService.class).asEagerSingleton();
+ bind(IndexAliasesService.class).asEagerSingleton();
+ bind(IndexFieldDataService.class).asEagerSingleton();
}
+
+
}
diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java
index d9a046e230e..602ccc3bbce 100644
--- a/core/src/main/java/org/elasticsearch/index/IndexService.java
+++ b/core/src/main/java/org/elasticsearch/index/IndexService.java
@@ -25,13 +25,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
-import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.inject.CreationException;
import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.common.inject.Injector;
-import org.elasticsearch.common.inject.Injectors;
-import org.elasticsearch.common.inject.Module;
-import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
@@ -48,19 +42,15 @@ import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.IndexShard;
-import org.elasticsearch.index.shard.IndexShardModule;
+import org.elasticsearch.index.shard.ShadowIndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.Store;
-import org.elasticsearch.index.store.StoreModule;
-import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InternalIndicesLifecycle;
-import org.elasticsearch.indices.cache.query.IndicesQueryCache;
-import org.elasticsearch.plugins.PluginsService;
import java.io.Closeable;
import java.io.IOException;
@@ -81,86 +71,42 @@ import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder;
*/
public class IndexService extends AbstractIndexComponent implements IndexComponent, Iterable {
- private final Injector injector;
-
private final Settings indexSettings;
-
- private final PluginsService pluginsService;
-
private final InternalIndicesLifecycle indicesLifecycle;
-
private final AnalysisService analysisService;
-
- private final MapperService mapperService;
-
- private final IndexQueryParserService queryParserService;
-
- private final SimilarityService similarityService;
-
- private final IndexAliasesService aliasesService;
-
- private final IndexCache indexCache;
-
private final IndexFieldDataService indexFieldData;
-
private final BitsetFilterCache bitsetFilterCache;
-
private final IndexSettingsService settingsService;
-
private final NodeEnvironment nodeEnv;
private final IndicesService indicesServices;
-
- private volatile Map shards = emptyMap();
-
- private static class IndexShardInjectorPair {
- private final IndexShard indexShard;
- private final Injector injector;
-
- public IndexShardInjectorPair(IndexShard indexShard, Injector injector) {
- this.indexShard = indexShard;
- this.injector = injector;
- }
-
- public IndexShard getIndexShard() {
- return indexShard;
- }
-
- public Injector getInjector() {
- return injector;
- }
- }
-
+ private final IndexServicesProvider indexServicesProvider;
+ private final IndexStore indexStore;
+ private volatile Map shards = emptyMap();
private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean deleted = new AtomicBoolean(false);
@Inject
- public IndexService(Injector injector, Index index, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv,
- AnalysisService analysisService, MapperService mapperService, IndexQueryParserService queryParserService,
- SimilarityService similarityService, IndexAliasesService aliasesService, IndexCache indexCache,
+ public IndexService(Index index, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv,
+ AnalysisService analysisService,
IndexSettingsService settingsService,
- IndexFieldDataService indexFieldData, BitsetFilterCache bitSetFilterCache, IndicesService indicesServices) {
-
+ IndexFieldDataService indexFieldData,
+ BitsetFilterCache bitSetFilterCache,
+ IndicesService indicesServices,
+ IndexServicesProvider indexServicesProvider,
+ IndexStore indexStore) {
super(index, indexSettings);
- this.injector = injector;
this.indexSettings = indexSettings;
this.analysisService = analysisService;
- this.mapperService = mapperService;
- this.queryParserService = queryParserService;
- this.similarityService = similarityService;
- this.aliasesService = aliasesService;
- this.indexCache = indexCache;
this.indexFieldData = indexFieldData;
this.settingsService = settingsService;
this.bitsetFilterCache = bitSetFilterCache;
-
- this.pluginsService = injector.getInstance(PluginsService.class);
this.indicesServices = indicesServices;
- this.indicesLifecycle = (InternalIndicesLifecycle) injector.getInstance(IndicesLifecycle.class);
-
- // inject workarounds for cyclic dep
+ this.indicesLifecycle = (InternalIndicesLifecycle) indexServicesProvider.getIndicesLifecycle();
+ this.nodeEnv = nodeEnv;
+ this.indexServicesProvider = indexServicesProvider;
+ this.indexStore = indexStore;
indexFieldData.setListener(new FieldDataCacheListener(this));
bitSetFilterCache.setListener(new BitsetCacheListener(this));
- this.nodeEnv = nodeEnv;
}
public int numberOfShards() {
@@ -173,7 +119,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
@Override
public Iterator iterator() {
- return shards.values().stream().map((p) -> p.getIndexShard()).iterator();
+ return shards.values().iterator();
}
public boolean hasShard(int shardId) {
@@ -184,19 +130,15 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
* Return the shard with the provided id, or null if there is no such shard.
*/
@Nullable
- public IndexShard shard(int shardId) {
- IndexShardInjectorPair indexShardInjectorPair = shards.get(shardId);
- if (indexShardInjectorPair != null) {
- return indexShardInjectorPair.getIndexShard();
- }
- return null;
+ public IndexShard getShardOrNull(int shardId) {
+ return shards.get(shardId);
}
/**
* Return the shard with the provided id, or throw an exception if it doesn't exist.
*/
- public IndexShard shardSafe(int shardId) {
- IndexShard indexShard = shard(shardId);
+ public IndexShard getShard(int shardId) {
+ IndexShard indexShard = getShardOrNull(shardId);
if (indexShard == null) {
throw new ShardNotFoundException(new ShardId(index, shardId));
}
@@ -207,16 +149,12 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
return shards.keySet();
}
- public Injector injector() {
- return injector;
- }
-
public IndexSettingsService settingsService() {
return this.settingsService;
}
public IndexCache cache() {
- return indexCache;
+ return indexServicesProvider.getIndexCache();
}
public IndexFieldDataService fieldData() {
@@ -232,19 +170,19 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
}
public MapperService mapperService() {
- return mapperService;
+ return indexServicesProvider.getMapperService();
}
public IndexQueryParserService queryParserService() {
- return queryParserService;
+ return indexServicesProvider.getQueryParserService();
}
public SimilarityService similarityService() {
- return similarityService;
+ return indexServicesProvider.getSimilarityService();
}
public IndexAliasesService aliasesService() {
- return aliasesService;
+ return indexServicesProvider.getIndexAliasesService();
}
public synchronized void close(final String reason, boolean delete) {
@@ -261,16 +199,6 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
}
}
- /**
- * Return the shard injector for the provided id, or throw an exception if there is no such shard.
- */
- public Injector shardInjectorSafe(int shardId) {
- IndexShardInjectorPair indexShardInjectorPair = shards.get(shardId);
- if (indexShardInjectorPair == null) {
- throw new ShardNotFoundException(new ShardId(index, shardId));
- }
- return indexShardInjectorPair.getInjector();
- }
public String indexUUID() {
return indexSettings.get(IndexMetaData.SETTING_INDEX_UUID, IndexMetaData.INDEX_UUID_NA_VALUE);
@@ -301,10 +229,14 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
if (closed.get()) {
throw new IllegalStateException("Can't create shard [" + index.name() + "][" + sShardId + "], closed");
}
+ if (indexSettings.get("index.translog.type") != null) { // TODO remove?
+ throw new IllegalStateException("a custom translog type is no longer supported. got [" + indexSettings.get("index.translog.type") + "]");
+ }
final ShardId shardId = new ShardId(index, sShardId);
ShardLock lock = null;
boolean success = false;
- Injector shardInjector = null;
+ Store store = null;
+ IndexShard indexShard = null;
try {
lock = nodeEnv.shardLock(shardId, TimeUnit.SECONDS.toMillis(5));
indicesLifecycle.beforeIndexShardCreated(shardId, indexSettings);
@@ -325,7 +257,6 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
if (path == null) {
// TODO: we should, instead, hold a "bytes reserved" of how large we anticipate this shard will be, e.g. for a shard
// that's being relocated/replicated we know how large it will become once it's done copying:
-
// Count up how many shards are currently on each data path:
Map dataPathToShardCount = new HashMap<>();
for(IndexShard shard : this) {
@@ -351,39 +282,17 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
// if we are on a shared FS we only own the shard (ie. we can safely delete it) if we are the primary.
final boolean canDeleteShardContent = IndexMetaData.isOnSharedFilesystem(indexSettings) == false ||
(primary && IndexMetaData.isOnSharedFilesystem(indexSettings));
- ModulesBuilder modules = new ModulesBuilder();
- // plugin modules must be added here, before others or we can get crazy injection errors...
- for (Module pluginModule : pluginsService.shardModules(indexSettings)) {
- modules.add(pluginModule);
- }
- modules.add(new IndexShardModule(shardId, primary, indexSettings));
- modules.add(new StoreModule(injector.getInstance(IndexStore.class).shardDirectory(), lock,
- new StoreCloseListener(shardId, canDeleteShardContent, new Closeable() {
- @Override
- public void close() throws IOException {
- injector.getInstance(IndicesQueryCache.class).onClose(shardId);
- }
- }), path));
- pluginsService.processModules(modules);
-
- try {
- shardInjector = modules.createChildInjector(injector);
- } catch (CreationException e) {
- ElasticsearchException ex = new ElasticsearchException("failed to create shard", Injectors.getFirstErrorFailure(e));
- ex.setShard(shardId);
- throw ex;
- } catch (Throwable e) {
- ElasticsearchException ex = new ElasticsearchException("failed to create shard", e);
- ex.setShard(shardId);
- throw ex;
+ store = new Store(shardId, indexSettings, indexStore.newDirectoryService(path), lock, new StoreCloseListener(shardId, canDeleteShardContent, () -> indexServicesProvider.getIndicesQueryCache().onClose(shardId)));
+ if (useShadowEngine(primary, indexSettings)) {
+ indexShard = new ShadowIndexShard(shardId, indexSettings, path, store, indexServicesProvider);
+ } else {
+ indexShard = new IndexShard(shardId, indexSettings, path, store, indexServicesProvider);
}
- IndexShard indexShard = shardInjector.getInstance(IndexShard.class);
indicesLifecycle.indexShardStateChanged(indexShard, null, "shard created");
indicesLifecycle.afterIndexShardCreated(indexShard);
-
- shards = newMapBuilder(shards).put(shardId.id(), new IndexShardInjectorPair(indexShard, shardInjector)).immutableMap();
settingsService.addListener(indexShard);
+ shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
success = true;
return indexShard;
} catch (IOException e) {
@@ -393,45 +302,35 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(lock);
- if (shardInjector != null) {
- IndexShard indexShard = shardInjector.getInstance(IndexShard.class);
- closeShardInjector("initialization failed", shardId, shardInjector, indexShard);
- }
+ closeShard("initialization failed", shardId, indexShard, store);
}
}
}
+ static boolean useShadowEngine(boolean primary, Settings indexSettings) {
+ return primary == false && IndexMetaData.isIndexUsingShadowReplicas(indexSettings);
+ }
+
public synchronized void removeShard(int shardId, String reason) {
final ShardId sId = new ShardId(index, shardId);
- final Injector shardInjector;
final IndexShard indexShard;
if (shards.containsKey(shardId) == false) {
return;
}
logger.debug("[{}] closing... (reason: [{}])", shardId, reason);
- HashMap newShards = new HashMap<>(shards);
- IndexShardInjectorPair indexShardInjectorPair = newShards.remove(shardId);
- indexShard = indexShardInjectorPair.getIndexShard();
- shardInjector = indexShardInjectorPair.getInjector();
+ HashMap newShards = new HashMap<>(shards);
+ indexShard = newShards.remove(shardId);
shards = unmodifiableMap(newShards);
- closeShardInjector(reason, sId, shardInjector, indexShard);
+ closeShard(reason, sId, indexShard, indexShard.store());
logger.debug("[{}] closed (reason: [{}])", shardId, reason);
}
- private void closeShardInjector(String reason, ShardId sId, Injector shardInjector, IndexShard indexShard) {
+ private void closeShard(String reason, ShardId sId, IndexShard indexShard, Store store) {
final int shardId = sId.id();
try {
try {
indicesLifecycle.beforeIndexShardClosed(sId, indexShard, indexSettings);
} finally {
- // close everything else even if the beforeIndexShardClosed threw an exception
- for (Class extends Closeable> closeable : pluginsService.shardServices()) {
- try {
- shardInjector.getInstance(closeable).close();
- } catch (Throwable e) {
- logger.debug("[{}] failed to clean plugin shard service [{}]", e, shardId, closeable);
- }
- }
// this logic is tricky, we want to close the engine so we rollback the changes done to it
// and close the shard so no operations are allowed to it
if (indexShard != null) {
@@ -449,30 +348,13 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
}
} finally {
try {
- shardInjector.getInstance(Store.class).close();
+ store.close();
} catch (Throwable e) {
logger.warn("[{}] failed to close store on shard removal (reason: [{}])", e, shardId, reason);
}
}
}
- /**
- * Closes an optional resource. Returns true if the resource was found;
- * NOTE: this method swallows all exceptions thrown from the close method of the injector and logs them as debug log
- */
- private boolean closeInjectorOptionalResource(ShardId shardId, Injector shardInjector, Class extends Closeable> toClose) {
- try {
- final Closeable instance = shardInjector.getInstance(toClose);
- if (instance == null) {
- return false;
- }
- IOUtils.close(instance);
- } catch (Throwable t) {
- logger.debug("{} failed to close {}", t, shardId, Strings.toUnderscoreCase(toClose.getSimpleName()));
- }
- return true;
- }
-
private void onShardClose(ShardLock lock, boolean ownsShard) {
if (deleted.get()) { // we remove that shards content if this index has been deleted
@@ -492,6 +374,10 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
}
}
+ public IndexServicesProvider getIndexServices() {
+ return indexServicesProvider;
+ }
+
private class StoreCloseListener implements Store.OnClose {
private final ShardId shardId;
private final boolean ownsShard;
@@ -533,7 +419,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
@Override
public void onCache(ShardId shardId, Accountable accountable) {
if (shardId != null) {
- final IndexShard shard = indexService.shard(shardId.id());
+ final IndexShard shard = indexService.getShardOrNull(shardId.id());
if (shard != null) {
long ramBytesUsed = accountable != null ? accountable.ramBytesUsed() : 0l;
shard.shardBitsetFilterCache().onCached(ramBytesUsed);
@@ -544,7 +430,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
@Override
public void onRemoval(ShardId shardId, Accountable accountable) {
if (shardId != null) {
- final IndexShard shard = indexService.shard(shardId.id());
+ final IndexShard shard = indexService.getShardOrNull(shardId.id());
if (shard != null) {
long ramBytesUsed = accountable != null ? accountable.ramBytesUsed() : 0l;
shard.shardBitsetFilterCache().onRemoval(ramBytesUsed);
@@ -563,7 +449,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
@Override
public void onCache(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, Accountable ramUsage) {
if (shardId != null) {
- final IndexShard shard = indexService.shard(shardId.id());
+ final IndexShard shard = indexService.getShardOrNull(shardId.id());
if (shard != null) {
shard.fieldData().onCache(shardId, fieldNames, fieldDataType, ramUsage);
}
@@ -573,7 +459,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
@Override
public void onRemoval(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) {
if (shardId != null) {
- final IndexShard shard = indexService.shard(shardId.id());
+ final IndexShard shard = indexService.getShardOrNull(shardId.id());
if (shard != null) {
shard.fieldData().onRemoval(shardId, fieldNames, fieldDataType, wasEvicted, sizeInBytes);
}
diff --git a/core/src/main/java/org/elasticsearch/index/IndexServicesProvider.java b/core/src/main/java/org/elasticsearch/index/IndexServicesProvider.java
new file mode 100644
index 00000000000..fe8428425e2
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/index/IndexServicesProvider.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.index;
+
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.index.aliases.IndexAliasesService;
+import org.elasticsearch.index.cache.IndexCache;
+import org.elasticsearch.index.codec.CodecService;
+import org.elasticsearch.index.engine.EngineFactory;
+import org.elasticsearch.index.fielddata.IndexFieldDataService;
+import org.elasticsearch.index.mapper.MapperService;
+import org.elasticsearch.index.query.IndexQueryParserService;
+import org.elasticsearch.index.shard.IndexSearcherWrapper;
+import org.elasticsearch.index.similarity.SimilarityService;
+import org.elasticsearch.index.termvectors.TermVectorsService;
+import org.elasticsearch.indices.IndicesLifecycle;
+import org.elasticsearch.indices.IndicesWarmer;
+import org.elasticsearch.indices.cache.query.IndicesQueryCache;
+import org.elasticsearch.threadpool.ThreadPool;
+
+/**
+ * Simple provider class that holds the Index and Node level services used by
+ * a shard.
+ * This is just a temporary solution until we cleaned up index creation and removed injectors on that level as well.
+ */
+public final class IndexServicesProvider {
+
+ private final IndicesLifecycle indicesLifecycle;
+ private final ThreadPool threadPool;
+ private final MapperService mapperService;
+ private final IndexQueryParserService queryParserService;
+ private final IndexCache indexCache;
+ private final IndexAliasesService indexAliasesService;
+ private final IndicesQueryCache indicesQueryCache;
+ private final CodecService codecService;
+ private final TermVectorsService termVectorsService;
+ private final IndexFieldDataService indexFieldDataService;
+ private final IndicesWarmer warmer;
+ private final SimilarityService similarityService;
+ private final EngineFactory factory;
+ private final BigArrays bigArrays;
+ private final IndexSearcherWrapper indexSearcherWrapper;
+
+ @Inject
+ public IndexServicesProvider(IndicesLifecycle indicesLifecycle, ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, IndicesQueryCache indicesQueryCache, CodecService codecService, TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, @Nullable IndicesWarmer warmer, SimilarityService similarityService, EngineFactory factory, BigArrays bigArrays, @Nullable IndexSearcherWrapper indexSearcherWrapper) {
+ this.indicesLifecycle = indicesLifecycle;
+ this.threadPool = threadPool;
+ this.mapperService = mapperService;
+ this.queryParserService = queryParserService;
+ this.indexCache = indexCache;
+ this.indexAliasesService = indexAliasesService;
+ this.indicesQueryCache = indicesQueryCache;
+ this.codecService = codecService;
+ this.termVectorsService = termVectorsService;
+ this.indexFieldDataService = indexFieldDataService;
+ this.warmer = warmer;
+ this.similarityService = similarityService;
+ this.factory = factory;
+ this.bigArrays = bigArrays;
+ this.indexSearcherWrapper = indexSearcherWrapper;
+ }
+
+ public IndicesLifecycle getIndicesLifecycle() {
+ return indicesLifecycle;
+ }
+
+ public ThreadPool getThreadPool() {
+ return threadPool;
+ }
+
+ public MapperService getMapperService() {
+ return mapperService;
+ }
+
+ public IndexQueryParserService getQueryParserService() {
+ return queryParserService;
+ }
+
+ public IndexCache getIndexCache() {
+ return indexCache;
+ }
+
+ public IndexAliasesService getIndexAliasesService() {
+ return indexAliasesService;
+ }
+
+ public IndicesQueryCache getIndicesQueryCache() {
+ return indicesQueryCache;
+ }
+
+ public CodecService getCodecService() {
+ return codecService;
+ }
+
+ public TermVectorsService getTermVectorsService() {
+ return termVectorsService;
+ }
+
+ public IndexFieldDataService getIndexFieldDataService() {
+ return indexFieldDataService;
+ }
+
+ public IndicesWarmer getWarmer() {
+ return warmer;
+ }
+
+ public SimilarityService getSimilarityService() {
+ return similarityService;
+ }
+
+ public EngineFactory getFactory() {
+ return factory;
+ }
+
+ public BigArrays getBigArrays() {
+ return bigArrays;
+ }
+
+ public IndexSearcherWrapper getIndexSearcherWrapper() { return indexSearcherWrapper; }
+}
diff --git a/core/src/main/java/org/elasticsearch/index/aliases/IndexAliasesServiceModule.java b/core/src/main/java/org/elasticsearch/index/aliases/IndexAliasesServiceModule.java
deleted file mode 100644
index 1bb9a58d142..00000000000
--- a/core/src/main/java/org/elasticsearch/index/aliases/IndexAliasesServiceModule.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.index.aliases;
-
-import org.elasticsearch.common.inject.AbstractModule;
-
-/**
- *
- */
-public class IndexAliasesServiceModule extends AbstractModule {
-
- @Override
- protected void configure() {
- bind(IndexAliasesService.class).asEagerSingleton();
- }
-}
\ No newline at end of file
diff --git a/core/src/main/java/org/elasticsearch/index/analysis/Analysis.java b/core/src/main/java/org/elasticsearch/index/analysis/Analysis.java
index d4a01189fdd..861f0705b38 100644
--- a/core/src/main/java/org/elasticsearch/index/analysis/Analysis.java
+++ b/core/src/main/java/org/elasticsearch/index/analysis/Analysis.java
@@ -319,7 +319,9 @@ public class Analysis {
* @see #isCharacterTokenStream(TokenStream)
*/
public static boolean generatesCharacterTokenStream(Analyzer analyzer, String fieldName) throws IOException {
- return isCharacterTokenStream(analyzer.tokenStream(fieldName, ""));
+ try (TokenStream ts = analyzer.tokenStream(fieldName, "")) {
+ return isCharacterTokenStream(ts);
+ }
}
}
diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java
index f9331c4416a..1330ef05a7f 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java
@@ -59,6 +59,8 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.function.Supplier;
/**
*
@@ -78,7 +80,6 @@ public abstract class Engine implements Closeable {
protected final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
protected final ReleasableLock readLock = new ReleasableLock(rwl.readLock());
protected final ReleasableLock writeLock = new ReleasableLock(rwl.writeLock());
-
protected volatile Throwable failedEngine = null;
protected Engine(EngineConfig engineConfig) {
@@ -227,8 +228,8 @@ public abstract class Engine implements Closeable {
PENDING_OPERATIONS
}
- final protected GetResult getFromSearcher(Get get) throws EngineException {
- final Searcher searcher = acquireSearcher("get");
+ final protected GetResult getFromSearcher(Get get, Function searcherFactory) throws EngineException {
+ final Searcher searcher = searcherFactory.apply("get");
final Versions.DocIdAndVersion docIdAndVersion;
try {
docIdAndVersion = Versions.loadDocIdAndVersion(searcher.reader(), get.uid());
@@ -256,7 +257,11 @@ public abstract class Engine implements Closeable {
}
}
- public abstract GetResult get(Get get) throws EngineException;
+ public final GetResult get(Get get) throws EngineException {
+ return get(get, this::acquireSearcher);
+ }
+
+ public abstract GetResult get(Get get, Function searcherFactory) throws EngineException;
/**
* Returns a new searcher instance. The consumer of this
@@ -279,7 +284,7 @@ public abstract class Engine implements Closeable {
try {
final Searcher retVal = newSearcher(source, searcher, manager);
success = true;
- return config().getWrappingService().wrap(engineConfig, retVal);
+ return retVal;
} finally {
if (!success) {
manager.release(searcher);
diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java
index c6e67243514..a79587e4347 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java
@@ -25,6 +25,7 @@ import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.similarities.Similarity;
+import org.apache.lucene.util.SetOnce;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
@@ -32,6 +33,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.indexing.ShardIndexingService;
+import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
@@ -73,7 +75,7 @@ public final class EngineConfig {
private final boolean forceNewTranslog;
private final QueryCache queryCache;
private final QueryCachingPolicy queryCachingPolicy;
- private final IndexSearcherWrappingService wrappingService;
+ private final SetOnce searcherWrapper = new SetOnce<>();
/**
* Index setting for compound file on flush. This setting is realtime updateable.
@@ -121,7 +123,7 @@ public final class EngineConfig {
Settings indexSettings, IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy,
MergePolicy mergePolicy, MergeSchedulerConfig mergeSchedulerConfig, Analyzer analyzer,
Similarity similarity, CodecService codecService, Engine.FailedEngineListener failedEngineListener,
- TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, IndexSearcherWrappingService wrappingService, TranslogConfig translogConfig) {
+ TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, TranslogConfig translogConfig) {
this.shardId = shardId;
this.indexSettings = indexSettings;
this.threadPool = threadPool;
@@ -135,7 +137,6 @@ public final class EngineConfig {
this.similarity = similarity;
this.codecService = codecService;
this.failedEngineListener = failedEngineListener;
- this.wrappingService = wrappingService;
this.compoundOnFlush = indexSettings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush);
codecName = indexSettings.get(EngineConfig.INDEX_CODEC_SETTING, EngineConfig.DEFAULT_CODEC_NAME);
indexingBufferSize = DEFAULT_INDEX_BUFFER_SIZE;
@@ -380,10 +381,6 @@ public final class EngineConfig {
return queryCachingPolicy;
}
- public IndexSearcherWrappingService getWrappingService() {
- return wrappingService;
- }
-
/**
* Returns the translog config for this engine
*/
diff --git a/core/src/main/java/org/elasticsearch/index/engine/IndexSearcherWrapper.java b/core/src/main/java/org/elasticsearch/index/engine/IndexSearcherWrapper.java
deleted file mode 100644
index 665d17a2f86..00000000000
--- a/core/src/main/java/org/elasticsearch/index/engine/IndexSearcherWrapper.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.index.engine;
-
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.search.IndexSearcher;
-
-/**
- * Extension point to add custom functionality at request time to the {@link DirectoryReader}
- * and {@link IndexSearcher} managed by the {@link Engine}.
- */
-public interface IndexSearcherWrapper {
-
- /**
- * @param reader The provided directory reader to be wrapped to add custom functionality
- * @return a new directory reader wrapping the provided directory reader or if no wrapping was performed
- * the provided directory reader
- */
- DirectoryReader wrap(DirectoryReader reader);
-
- /**
- * @param engineConfig The engine config which can be used to get the query cache and query cache policy from
- * when creating a new index searcher
- * @param searcher The provided index searcher to be wrapped to add custom functionality
- * @return a new index searcher wrapping the provided index searcher or if no wrapping was performed
- * the provided index searcher
- */
- IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws EngineException;
-
-}
diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index 5b76040da5e..227212dd86e 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -66,6 +66,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+import java.util.function.Supplier;
/**
*
@@ -303,7 +305,7 @@ public class InternalEngine extends Engine {
}
@Override
- public GetResult get(Get get) throws EngineException {
+ public GetResult get(Get get, Function searcherFactory) throws EngineException {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
if (get.realtime()) {
@@ -324,7 +326,7 @@ public class InternalEngine extends Engine {
}
// no version, get the version from the index, we know that we refresh on flush
- return getFromSearcher(get);
+ return getFromSearcher(get, searcherFactory);
}
}
diff --git a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java
index f589b289c17..7588ffae355 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java
@@ -35,6 +35,7 @@ import org.elasticsearch.index.translog.Translog;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import java.util.function.Function;
/**
* ShadowEngine is a specialized engine that only allows read-only operations
@@ -168,9 +169,9 @@ public class ShadowEngine extends Engine {
}
@Override
- public GetResult get(Get get) throws EngineException {
+ public GetResult get(Get get, Function searcherFacotry) throws EngineException {
// There is no translog, so we can get it directly from the searcher
- return getFromSearcher(get);
+ return getFromSearcher(get, searcherFacotry);
}
@Override
diff --git a/core/src/main/java/org/elasticsearch/index/mapper/core/TokenCountFieldMapper.java b/core/src/main/java/org/elasticsearch/index/mapper/core/TokenCountFieldMapper.java
index a148d940bbe..faa2b7e66a0 100644
--- a/core/src/main/java/org/elasticsearch/index/mapper/core/TokenCountFieldMapper.java
+++ b/core/src/main/java/org/elasticsearch/index/mapper/core/TokenCountFieldMapper.java
@@ -19,6 +19,7 @@
package org.elasticsearch.index.mapper.core;
+import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.PositionIncrementAttribute;
import org.apache.lucene.document.Field;
@@ -145,7 +146,7 @@ public class TokenCountFieldMapper extends IntegerFieldMapper {
if (valueAndBoost.value() == null) {
count = fieldType().nullValue();
} else {
- count = countPositions(analyzer.analyzer().tokenStream(simpleName(), valueAndBoost.value()));
+ count = countPositions(analyzer, simpleName(), valueAndBoost.value());
}
addIntegerFields(context, fields, count, valueAndBoost.boost());
}
@@ -156,12 +157,14 @@ public class TokenCountFieldMapper extends IntegerFieldMapper {
/**
* Count position increments in a token stream. Package private for testing.
- * @param tokenStream token stream to count
+ * @param analyzer analyzer to create token stream
+ * @param fieldName field name to pass to analyzer
+ * @param fieldValue field value to pass to analyzer
* @return number of position increments in a token stream
* @throws IOException if tokenStream throws it
*/
- static int countPositions(TokenStream tokenStream) throws IOException {
- try {
+ static int countPositions(Analyzer analyzer, String fieldName, String fieldValue) throws IOException {
+ try (TokenStream tokenStream = analyzer.tokenStream(fieldName, fieldValue)) {
int count = 0;
PositionIncrementAttribute position = tokenStream.addAttribute(PositionIncrementAttribute.class);
tokenStream.reset();
@@ -171,8 +174,6 @@ public class TokenCountFieldMapper extends IntegerFieldMapper {
tokenStream.end();
count += position.getPositionIncrement();
return count;
- } finally {
- tokenStream.close();
}
}
diff --git a/core/src/main/java/org/elasticsearch/index/percolator/stats/PercolateStats.java b/core/src/main/java/org/elasticsearch/index/percolator/PercolateStats.java
similarity index 99%
rename from core/src/main/java/org/elasticsearch/index/percolator/stats/PercolateStats.java
rename to core/src/main/java/org/elasticsearch/index/percolator/PercolateStats.java
index 49f2375a03a..f927a42761f 100644
--- a/core/src/main/java/org/elasticsearch/index/percolator/stats/PercolateStats.java
+++ b/core/src/main/java/org/elasticsearch/index/percolator/PercolateStats.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.elasticsearch.index.percolator.stats;
+package org.elasticsearch.index.percolator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
diff --git a/core/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java b/core/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java
index 7dd26ec55db..d811f1f6e71 100644
--- a/core/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java
+++ b/core/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java
@@ -19,6 +19,7 @@
package org.elasticsearch.index.percolator;
+import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
@@ -27,6 +28,8 @@ import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.metrics.CounterMetric;
+import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -41,20 +44,18 @@ import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.DocumentTypeListener;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.internal.TypeFieldMapper;
-import org.elasticsearch.index.percolator.stats.ShardPercolateService;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
-import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.percolator.PercolatorService;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -64,39 +65,35 @@ import java.util.concurrent.atomic.AtomicBoolean;
* Once a document type has been created, the real-time percolator will start to listen to write events and update the
* this registry with queries in real time.
*/
-public class PercolatorQueriesRegistry extends AbstractIndexShardComponent implements Closeable{
+public final class PercolatorQueriesRegistry extends AbstractIndexShardComponent implements Closeable {
public final String MAP_UNMAPPED_FIELDS_AS_STRING = "index.percolator.map_unmapped_fields_as_string";
// This is a shard level service, but these below are index level service:
private final IndexQueryParserService queryParserService;
private final MapperService mapperService;
- private final IndicesLifecycle indicesLifecycle;
private final IndexFieldDataService indexFieldDataService;
private final ShardIndexingService indexingService;
- private final ShardPercolateService shardPercolateService;
private final ConcurrentMap percolateQueries = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
- private final ShardLifecycleListener shardLifecycleListener = new ShardLifecycleListener();
private final RealTimePercolatorOperationListener realTimePercolatorOperationListener = new RealTimePercolatorOperationListener();
private final PercolateTypeListener percolateTypeListener = new PercolateTypeListener();
private final AtomicBoolean realTimePercolatorEnabled = new AtomicBoolean(false);
private boolean mapUnmappedFieldsAsString;
+ private final MeanMetric percolateMetric = new MeanMetric();
+ private final CounterMetric currentMetric = new CounterMetric();
+ private final CounterMetric numberOfQueries = new CounterMetric();
public PercolatorQueriesRegistry(ShardId shardId, @IndexSettings Settings indexSettings, IndexQueryParserService queryParserService,
- ShardIndexingService indexingService, IndicesLifecycle indicesLifecycle, MapperService mapperService,
- IndexFieldDataService indexFieldDataService, ShardPercolateService shardPercolateService) {
+ ShardIndexingService indexingService, MapperService mapperService,
+ IndexFieldDataService indexFieldDataService) {
super(shardId, indexSettings);
this.queryParserService = queryParserService;
this.mapperService = mapperService;
- this.indicesLifecycle = indicesLifecycle;
this.indexingService = indexingService;
this.indexFieldDataService = indexFieldDataService;
- this.shardPercolateService = shardPercolateService;
this.mapUnmappedFieldsAsString = indexSettings.getAsBoolean(MAP_UNMAPPED_FIELDS_AS_STRING, false);
-
- indicesLifecycle.addListener(shardLifecycleListener);
mapperService.addTypeListener(percolateTypeListener);
}
@@ -107,7 +104,6 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent imple
@Override
public void close() {
mapperService.removeTypeListener(percolateTypeListener);
- indicesLifecycle.removeListener(shardLifecycleListener);
indexingService.removeListener(realTimePercolatorOperationListener);
clear();
}
@@ -116,30 +112,25 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent imple
percolateQueries.clear();
}
- void enableRealTimePercolator() {
+ public void enableRealTimePercolator() {
if (realTimePercolatorEnabled.compareAndSet(false, true)) {
indexingService.addListener(realTimePercolatorOperationListener);
}
}
- void disableRealTimePercolator() {
- if (realTimePercolatorEnabled.compareAndSet(true, false)) {
- indexingService.removeListener(realTimePercolatorOperationListener);
- }
- }
-
public void addPercolateQuery(String idAsString, BytesReference source) {
Query newquery = parsePercolatorDocument(idAsString, source);
BytesRef id = new BytesRef(idAsString);
- Query previousQuery = percolateQueries.put(id, newquery);
- shardPercolateService.addedQuery(id, previousQuery, newquery);
+ percolateQueries.put(id, newquery);
+ numberOfQueries.inc();
+
}
public void removePercolateQuery(String idAsString) {
BytesRef id = new BytesRef(idAsString);
Query query = percolateQueries.remove(id);
if (query != null) {
- shardPercolateService.removedQuery(id, query);
+ numberOfQueries.dec();
}
}
@@ -225,55 +216,27 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent imple
enableRealTimePercolator();
}
}
-
}
- private class ShardLifecycleListener extends IndicesLifecycle.Listener {
-
- @Override
- public void afterIndexShardCreated(IndexShard indexShard) {
- if (hasPercolatorType(indexShard)) {
- enableRealTimePercolator();
+ public void loadQueries(IndexReader reader) {
+ logger.trace("loading percolator queries...");
+ final int loadedQueries;
+ try {
+ Query query = new TermQuery(new Term(TypeFieldMapper.NAME, PercolatorService.TYPE_NAME));
+ QueriesLoaderCollector queryCollector = new QueriesLoaderCollector(PercolatorQueriesRegistry.this, logger, mapperService, indexFieldDataService);
+ IndexSearcher indexSearcher = new IndexSearcher(reader);
+ indexSearcher.setQueryCache(null);
+ indexSearcher.search(query, queryCollector);
+ Map queries = queryCollector.queries();
+ for (Map.Entry entry : queries.entrySet()) {
+ percolateQueries.put(entry.getKey(), entry.getValue());
+ numberOfQueries.inc();
}
+ loadedQueries = queries.size();
+ } catch (Exception e) {
+ throw new PercolatorException(shardId.index(), "failed to load queries from percolator index", e);
}
-
- @Override
- public void beforeIndexShardPostRecovery(IndexShard indexShard) {
- if (hasPercolatorType(indexShard)) {
- // percolator index has started, fetch what we can from it and initialize the indices
- // we have
- logger.trace("loading percolator queries for [{}]...", shardId);
- int loadedQueries = loadQueries(indexShard);
- logger.debug("done loading [{}] percolator queries for [{}]", loadedQueries, shardId);
- }
- }
-
- private boolean hasPercolatorType(IndexShard indexShard) {
- ShardId otherShardId = indexShard.shardId();
- return shardId.equals(otherShardId) && mapperService.hasMapping(PercolatorService.TYPE_NAME);
- }
-
- private int loadQueries(IndexShard shard) {
- shard.refresh("percolator_load_queries");
- // NOTE: we acquire the searcher via the engine directly here since this is executed right
- // before the shard is marked as POST_RECOVERY
- try (Engine.Searcher searcher = shard.engine().acquireSearcher("percolator_load_queries")) {
- Query query = new TermQuery(new Term(TypeFieldMapper.NAME, PercolatorService.TYPE_NAME));
- QueriesLoaderCollector queryCollector = new QueriesLoaderCollector(PercolatorQueriesRegistry.this, logger, mapperService, indexFieldDataService);
- IndexSearcher indexSearcher = new IndexSearcher(searcher.reader());
- indexSearcher.setQueryCache(null);
- indexSearcher.search(query, queryCollector);
- Map queries = queryCollector.queries();
- for (Map.Entry entry : queries.entrySet()) {
- Query previousQuery = percolateQueries.put(entry.getKey(), entry.getValue());
- shardPercolateService.addedQuery(entry.getKey(), previousQuery, entry.getValue());
- }
- return queries.size();
- } catch (Exception e) {
- throw new PercolatorException(shardId.index(), "failed to load queries from percolator index", e);
- }
- }
-
+ logger.debug("done loading [{}] percolator queries", loadedQueries);
}
private class RealTimePercolatorOperationListener extends IndexingOperationListener {
@@ -320,4 +283,35 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent imple
}
}
}
+
+ public void prePercolate() {
+ currentMetric.inc();
+ }
+
+ public void postPercolate(long tookInNanos) {
+ currentMetric.dec();
+ percolateMetric.inc(tookInNanos);
+ }
+
+ /**
+ * @return The current metrics
+ */
+ public PercolateStats stats() {
+ return new PercolateStats(percolateMetric.count(), TimeUnit.NANOSECONDS.toMillis(percolateMetric.sum()), currentMetric.count(), -1, numberOfQueries.count());
+ }
+
+ // Enable when a more efficient manner is found for estimating the size of a Lucene query.
+ /*private static long computeSizeInMemory(HashedBytesRef id, Query query) {
+ long size = (3 * RamUsageEstimator.NUM_BYTES_INT) + RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + id.bytes.bytes.length;
+ size += RamEstimator.sizeOf(query);
+ return size;
+ }
+
+ private static final class RamEstimator {
+ // we move this into it's own class to exclude it from the forbidden API checks
+ // it's fine to use here!
+ static long sizeOf(Query query) {
+ return RamUsageEstimator.sizeOf(query);
+ }
+ }*/
}
diff --git a/core/src/main/java/org/elasticsearch/index/percolator/stats/ShardPercolateService.java b/core/src/main/java/org/elasticsearch/index/percolator/stats/ShardPercolateService.java
deleted file mode 100644
index 80f6bd9be38..00000000000
--- a/core/src/main/java/org/elasticsearch/index/percolator/stats/ShardPercolateService.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.index.percolator.stats;
-
-import org.apache.lucene.search.Query;
-import org.apache.lucene.util.BytesRef;
-import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.common.metrics.CounterMetric;
-import org.elasticsearch.common.metrics.MeanMetric;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.index.settings.IndexSettings;
-import org.elasticsearch.index.shard.AbstractIndexShardComponent;
-import org.elasticsearch.index.shard.ShardId;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Shard level percolator service that maintains percolator metrics:
- *
- * - total time spent in percolate api
- *
- the current number of percolate requests
- *
- number of registered percolate queries
- *
- */
-public class ShardPercolateService extends AbstractIndexShardComponent {
-
- @Inject
- public ShardPercolateService(ShardId shardId, @IndexSettings Settings indexSettings) {
- super(shardId, indexSettings);
- }
-
- private final MeanMetric percolateMetric = new MeanMetric();
- private final CounterMetric currentMetric = new CounterMetric();
-
- private final CounterMetric numberOfQueries = new CounterMetric();
-
- public void prePercolate() {
- currentMetric.inc();
- }
-
- public void postPercolate(long tookInNanos) {
- currentMetric.dec();
- percolateMetric.inc(tookInNanos);
- }
-
- public void addedQuery(BytesRef id, Query previousQuery, Query newQuery) {
- numberOfQueries.inc();
- }
-
- public void removedQuery(BytesRef id, Query query) {
- numberOfQueries.dec();
- }
-
- /**
- * @return The current metrics
- */
- public PercolateStats stats() {
- return new PercolateStats(percolateMetric.count(), TimeUnit.NANOSECONDS.toMillis(percolateMetric.sum()), currentMetric.count(), -1, numberOfQueries.count());
- }
-
- // Enable when a more efficient manner is found for estimating the size of a Lucene query.
- /*private static long computeSizeInMemory(HashedBytesRef id, Query query) {
- long size = (3 * RamUsageEstimator.NUM_BYTES_INT) + RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + id.bytes.bytes.length;
- size += RamEstimator.sizeOf(query);
- return size;
- }
-
- private static final class RamEstimator {
- // we move this into it's own class to exclude it from the forbidden API checks
- // it's fine to use here!
- static long sizeOf(Query query) {
- return RamUsageEstimator.sizeOf(query);
- }
- }*/
-
-}
diff --git a/core/src/main/java/org/elasticsearch/index/query/IdsQueryBuilder.java b/core/src/main/java/org/elasticsearch/index/query/IdsQueryBuilder.java
index b85db4b66b1..1de8db2e801 100644
--- a/core/src/main/java/org/elasticsearch/index/query/IdsQueryBuilder.java
+++ b/core/src/main/java/org/elasticsearch/index/query/IdsQueryBuilder.java
@@ -22,7 +22,6 @@ package org.elasticsearch.index.query;
import org.apache.lucene.queries.TermsQuery;
import org.apache.lucene.search.Query;
import org.elasticsearch.cluster.metadata.MetaData;
-import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.search.Queries;
@@ -47,9 +46,19 @@ public class IdsQueryBuilder extends AbstractQueryBuilder {
static final IdsQueryBuilder PROTOTYPE = new IdsQueryBuilder();
/**
- * Creates a new IdsQueryBuilder by optionally providing the types of the documents to look for
+ * Creates a new IdsQueryBuilder without providing the types of the documents to look for
*/
- public IdsQueryBuilder(@Nullable String... types) {
+ public IdsQueryBuilder() {
+ this.types = new String[0];
+ }
+
+ /**
+ * Creates a new IdsQueryBuilder by providing the types of the documents to look for
+ */
+ public IdsQueryBuilder(String... types) {
+ if (types == null) {
+ throw new IllegalArgumentException("[ids] types cannot be null");
+ }
this.types = types;
}
@@ -64,32 +73,13 @@ public class IdsQueryBuilder extends AbstractQueryBuilder {
* Adds ids to the query.
*/
public IdsQueryBuilder addIds(String... ids) {
+ if (ids == null) {
+ throw new IllegalArgumentException("[ids] ids cannot be null");
+ }
Collections.addAll(this.ids, ids);
return this;
}
- /**
- * Adds ids to the query.
- */
- public IdsQueryBuilder addIds(Collection ids) {
- this.ids.addAll(ids);
- return this;
- }
-
- /**
- * Adds ids to the filter.
- */
- public IdsQueryBuilder ids(String... ids) {
- return addIds(ids);
- }
-
- /**
- * Adds ids to the filter.
- */
- public IdsQueryBuilder ids(Collection ids) {
- return addIds(ids);
- }
-
/**
* Returns the ids for the query.
*/
@@ -100,13 +90,7 @@ public class IdsQueryBuilder extends AbstractQueryBuilder {
@Override
protected void doXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(NAME);
- if (types != null) {
- if (types.length == 1) {
- builder.field("type", types[0]);
- } else {
- builder.array("types", types);
- }
- }
+ builder.array("types", types);
builder.startArray("values");
for (String value : ids) {
builder.value(value);
@@ -128,7 +112,7 @@ public class IdsQueryBuilder extends AbstractQueryBuilder {
query = Queries.newMatchNoDocsQuery();
} else {
Collection typesForQuery;
- if (types == null || types.length == 0) {
+ if (types.length == 0) {
typesForQuery = context.queryTypes();
} else if (types.length == 1 && MetaData.ALL.equals(types[0])) {
typesForQuery = context.mapperService().types();
diff --git a/core/src/main/java/org/elasticsearch/index/query/IndexQueryParserService.java b/core/src/main/java/org/elasticsearch/index/query/IndexQueryParserService.java
index d4f7491fb11..bbd9f84e81e 100644
--- a/core/src/main/java/org/elasticsearch/index/query/IndexQueryParserService.java
+++ b/core/src/main/java/org/elasticsearch/index/query/IndexQueryParserService.java
@@ -197,15 +197,6 @@ public class IndexQueryParserService extends AbstractIndexComponent {
}
}
- @Nullable
- public Query parseInnerQuery(QueryShardContext context) throws IOException {
- Query query = context.parseContext().parseInnerQueryBuilder().toQuery(context);
- if (query == null) {
- query = Queries.newMatchNoDocsQuery();
- }
- return query;
- }
-
public QueryShardContext getShardContext() {
return cache.get();
}
@@ -258,16 +249,41 @@ public class IndexQueryParserService extends AbstractIndexComponent {
context.reset(parser);
try {
context.parseFieldMatcher(parseFieldMatcher);
- Query query = context.parseContext().parseInnerQueryBuilder().toQuery(context);
- if (query == null) {
- query = Queries.newMatchNoDocsQuery();
- }
+ Query query = parseInnerQuery(context);
return new ParsedQuery(query, context.copyNamedQueries());
} finally {
context.reset(null);
}
}
+ public Query parseInnerQuery(QueryShardContext context) throws IOException {
+ return toQuery(context.parseContext().parseInnerQueryBuilder(), context);
+ }
+
+ public ParsedQuery toQuery(QueryBuilder> queryBuilder) {
+ QueryShardContext context = cache.get();
+ context.reset();
+ context.parseFieldMatcher(parseFieldMatcher);
+ try {
+ Query query = toQuery(queryBuilder, context);
+ return new ParsedQuery(query, context.copyNamedQueries());
+ } catch(QueryShardException | ParsingException e ) {
+ throw e;
+ } catch(Exception e) {
+ throw new QueryShardException(context, "failed to create query: {}", e, queryBuilder);
+ } finally {
+ context.reset();
+ }
+ }
+
+ private static Query toQuery(QueryBuilder> queryBuilder, QueryShardContext context) throws IOException {
+ Query query = queryBuilder.toQuery(context);
+ if (query == null) {
+ query = Queries.newMatchNoDocsQuery();
+ }
+ return query;
+ }
+
public ParseFieldMatcher parseFieldMatcher() {
return parseFieldMatcher;
}
diff --git a/core/src/main/java/org/elasticsearch/index/query/QueryBuilders.java b/core/src/main/java/org/elasticsearch/index/query/QueryBuilders.java
index df823e166f7..67e654cb0d8 100644
--- a/core/src/main/java/org/elasticsearch/index/query/QueryBuilders.java
+++ b/core/src/main/java/org/elasticsearch/index/query/QueryBuilders.java
@@ -19,7 +19,6 @@
package org.elasticsearch.index.query;
-import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.geo.ShapeRelation;
@@ -109,12 +108,19 @@ public abstract class QueryBuilders {
return new DisMaxQueryBuilder();
}
+ /**
+ * Constructs a query that will match only specific ids within all types.
+ */
+ public static IdsQueryBuilder idsQuery() {
+ return new IdsQueryBuilder();
+ }
+
/**
* Constructs a query that will match only specific ids within types.
*
* @param types The mapping/doc type
*/
- public static IdsQueryBuilder idsQuery(@Nullable String... types) {
+ public static IdsQueryBuilder idsQuery(String... types) {
return new IdsQueryBuilder(types);
}
diff --git a/core/src/main/java/org/elasticsearch/index/engine/IndexSearcherWrappingService.java b/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java
similarity index 50%
rename from core/src/main/java/org/elasticsearch/index/engine/IndexSearcherWrappingService.java
rename to core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java
index 23d05f01dc7..c75f3c7995f 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/IndexSearcherWrappingService.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java
@@ -17,59 +17,47 @@
* under the License.
*/
-package org.elasticsearch.index.engine;
+package org.elasticsearch.index.shard;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.search.IndexSearcher;
import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.index.engine.Engine.Searcher;
+import org.elasticsearch.index.engine.Engine;
+import org.elasticsearch.index.engine.EngineConfig;
+import org.elasticsearch.index.engine.EngineException;
-import java.util.Set;
+import java.io.IOException;
/**
- * Service responsible for wrapping the {@link DirectoryReader} and {@link IndexSearcher} of a {@link Searcher} via the
- * configured {@link IndexSearcherWrapper} instance. This allows custom functionally to be added the {@link Searcher}
- * before being used to do an operation (search, get, field stats etc.)
+ * Extension point to add custom functionality at request time to the {@link DirectoryReader}
+ * and {@link IndexSearcher} managed by the {@link Engine}.
*/
-// TODO: This needs extension point is a bit hacky now, because the IndexSearch from the engine can only be wrapped once,
-// if we allowed the IndexSearcher to be wrapped multiple times then a custom IndexSearcherWrapper needs have good
-// control over its location in the wrapping chain
-public final class IndexSearcherWrappingService {
+public interface IndexSearcherWrapper {
- private final IndexSearcherWrapper wrapper;
+ /**
+ * @param reader The provided directory reader to be wrapped to add custom functionality
+ * @return a new directory reader wrapping the provided directory reader or if no wrapping was performed
+ * the provided directory reader
+ */
+ DirectoryReader wrap(DirectoryReader reader) throws IOException;
- // for unit tests:
- IndexSearcherWrappingService() {
- this.wrapper = null;
- }
-
- @Inject
- // Use a Set parameter here, because constructor parameter can't be optional
- // and I prefer to keep the `wrapper` field final.
- public IndexSearcherWrappingService(Set wrappers) {
- if (wrappers.size() > 1) {
- throw new IllegalStateException("wrapping of the index searcher by more than one wrappers is forbidden, found the following wrappers [" + wrappers + "]");
- }
- if (wrappers.isEmpty()) {
- this.wrapper = null;
- } else {
- this.wrapper = wrappers.iterator().next();
- }
- }
+ /**
+ * @param engineConfig The engine config which can be used to get the query cache and query cache policy from
+ * when creating a new index searcher
+ * @param searcher The provided index searcher to be wrapped to add custom functionality
+ * @return a new index searcher wrapping the provided index searcher or if no wrapping was performed
+ * the provided index searcher
+ */
+ IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws IOException;
/**
* If there are configured {@link IndexSearcherWrapper} instances, the {@link IndexSearcher} of the provided engine searcher
- * gets wrapped and a new {@link Searcher} instances is returned, otherwise the provided {@link Searcher} is returned.
+ * gets wrapped and a new {@link Engine.Searcher} instances is returned, otherwise the provided {@link Engine.Searcher} is returned.
*
- * This is invoked each time a {@link Searcher} is requested to do an operation. (for example search)
+ * This is invoked each time a {@link Engine.Searcher} is requested to do an operation. (for example search)
*/
- public Searcher wrap(EngineConfig engineConfig, final Searcher engineSearcher) throws EngineException {
- if (wrapper == null) {
- return engineSearcher;
- }
-
- DirectoryReader reader = wrapper.wrap((DirectoryReader) engineSearcher.reader());
+ default Engine.Searcher wrap(EngineConfig engineConfig, Engine.Searcher engineSearcher) throws IOException {
+ DirectoryReader reader = wrap((DirectoryReader) engineSearcher.reader());
IndexSearcher innerIndexSearcher = new IndexSearcher(reader);
innerIndexSearcher.setQueryCache(engineConfig.getQueryCache());
innerIndexSearcher.setQueryCachingPolicy(engineConfig.getQueryCachingPolicy());
@@ -77,12 +65,11 @@ public final class IndexSearcherWrappingService {
// TODO: Right now IndexSearcher isn't wrapper friendly, when it becomes wrapper friendly we should revise this extension point
// For example if IndexSearcher#rewrite() is overwritten than also IndexSearcher#createNormalizedWeight needs to be overwritten
// This needs to be fixed before we can allow the IndexSearcher from Engine to be wrapped multiple times
- IndexSearcher indexSearcher = wrapper.wrap(engineConfig, innerIndexSearcher);
+ IndexSearcher indexSearcher = wrap(engineConfig, innerIndexSearcher);
if (reader == engineSearcher.reader() && indexSearcher == innerIndexSearcher) {
return engineSearcher;
} else {
return new Engine.Searcher(engineSearcher.source(), indexSearcher) {
-
@Override
public void close() throws ElasticsearchException {
engineSearcher.close();
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index c98a9c0f9dd..ea2d555ae0d 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -20,10 +20,7 @@
package org.elasticsearch.index.shard;
import org.apache.lucene.codecs.PostingsFormat;
-import org.apache.lucene.index.CheckIndex;
-import org.apache.lucene.index.IndexCommit;
-import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
-import org.apache.lucene.index.SnapshotDeletionPolicy;
+import org.apache.lucene.index.*;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.UsageTrackingQueryCachingPolicy;
import org.apache.lucene.store.AlreadyClosedException;
@@ -36,6 +33,7 @@ import org.elasticsearch.action.admin.indices.optimize.OptimizeRequest;
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
import org.elasticsearch.action.termvectors.TermVectorsRequest;
import org.elasticsearch.action.termvectors.TermVectorsResponse;
+import org.elasticsearch.bootstrap.Elasticsearch;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
@@ -51,11 +49,11 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.gateway.MetaDataStateFormat;
+import org.elasticsearch.index.IndexServicesProvider;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.aliases.IndexAliasesService;
import org.elasticsearch.index.cache.IndexCache;
@@ -75,8 +73,8 @@ import org.elasticsearch.index.indexing.IndexingStats;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.merge.MergeStats;
+import org.elasticsearch.index.percolator.PercolateStats;
import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
-import org.elasticsearch.index.percolator.stats.ShardPercolateService;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.refresh.RefreshStats;
@@ -99,12 +97,12 @@ import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.index.translog.TranslogWriter;
import org.elasticsearch.index.warmer.ShardIndexWarmerService;
import org.elasticsearch.index.warmer.WarmerStats;
-import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.InternalIndicesLifecycle;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryState;
+import org.elasticsearch.percolator.PercolatorService;
import org.elasticsearch.search.suggest.completion.Completion090PostingsFormat;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import org.elasticsearch.threadpool.ThreadPool;
@@ -137,7 +135,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
private final ShardRequestCache shardQueryCache;
private final ShardFieldData shardFieldData;
private final PercolatorQueriesRegistry percolatorQueriesRegistry;
- private final ShardPercolateService shardPercolateService;
private final TermVectorsService termVectorsService;
private final IndexFieldDataService indexFieldDataService;
private final ShardSuggestMetric shardSuggestMetric = new ShardSuggestMetric();
@@ -161,7 +158,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
protected volatile IndexShardState state;
protected final AtomicReference currentEngineReference = new AtomicReference<>();
protected final EngineFactory engineFactory;
- private final IndexSearcherWrappingService wrappingService;
@Nullable
private RecoveryState recoveryState;
@@ -190,42 +186,36 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
private final IndexShardOperationCounter indexShardOperationCounter;
- private EnumSet readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY);
+ private final EnumSet readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY);
+
+ private final IndexSearcherWrapper searcherWrapper;
@Inject
- public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndicesLifecycle indicesLifecycle, Store store,
- ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService,
- IndicesQueryCache indicesQueryCache, CodecService codecService,
- TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService,
- @Nullable IndicesWarmer warmer, SimilarityService similarityService, EngineFactory factory,
- ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) {
+ public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, ShardPath path, Store store, IndexServicesProvider provider) {
super(shardId, indexSettings);
- this.codecService = codecService;
- this.warmer = warmer;
+ this.codecService = provider.getCodecService();
+ this.warmer = provider.getWarmer();
this.deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
- this.similarityService = similarityService;
- this.wrappingService = wrappingService;
+ this.similarityService = provider.getSimilarityService();
Objects.requireNonNull(store, "Store must be provided to the index shard");
- this.engineFactory = factory;
- this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle;
+ this.engineFactory = provider.getFactory();
+ this.indicesLifecycle = (InternalIndicesLifecycle) provider.getIndicesLifecycle();
this.store = store;
this.mergeSchedulerConfig = new MergeSchedulerConfig(indexSettings);
- this.threadPool = threadPool;
- this.mapperService = mapperService;
- this.queryParserService = queryParserService;
- this.indexCache = indexCache;
- this.indexAliasesService = indexAliasesService;
+ this.threadPool = provider.getThreadPool();
+ this.mapperService = provider.getMapperService();
+ this.queryParserService = provider.getQueryParserService();
+ this.indexCache = provider.getIndexCache();
+ this.indexAliasesService = provider.getIndexAliasesService();
this.indexingService = new ShardIndexingService(shardId, indexSettings);
this.getService = new ShardGetService(this, mapperService);
- this.termVectorsService = termVectorsService;
+ this.termVectorsService = provider.getTermVectorsService();
this.searchService = new ShardSearchStats(indexSettings);
this.shardWarmerService = new ShardIndexWarmerService(shardId, indexSettings);
- this.indicesQueryCache = indicesQueryCache;
+ this.indicesQueryCache = provider.getIndicesQueryCache();
this.shardQueryCache = new ShardRequestCache(shardId, indexSettings);
this.shardFieldData = new ShardFieldData();
- this.shardPercolateService = new ShardPercolateService(shardId, indexSettings);
- this.percolatorQueriesRegistry = new PercolatorQueriesRegistry(shardId, indexSettings, queryParserService, indexingService, indicesLifecycle, mapperService, indexFieldDataService, shardPercolateService);
- this.indexFieldDataService = indexFieldDataService;
+ this.indexFieldDataService = provider.getIndexFieldDataService();
this.shardBitsetFilterCache = new ShardBitsetFilterCache(shardId, indexSettings);
state = IndexShardState.CREATED;
this.refreshInterval = indexSettings.getAsTime(INDEX_REFRESH_INTERVAL, EngineConfig.DEFAULT_REFRESH_INTERVAL);
@@ -238,7 +228,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
this.checkIndexOnStartup = indexSettings.get("index.shard.check_on_startup", "false");
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, getFromSettings(logger, indexSettings, Translog.Durabilty.REQUEST),
- bigArrays, threadPool);
+ provider.getBigArrays(), threadPool);
final QueryCachingPolicy cachingPolicy;
// the query cache is a node-level thing, however we want the most popular filters
// to be computed on a per-shard basis
@@ -252,6 +242,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
this.flushThresholdSize = indexSettings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(512, ByteSizeUnit.MB));
this.disableFlush = indexSettings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, false);
this.indexShardOperationCounter = new IndexShardOperationCounter(logger, shardId);
+ this.searcherWrapper = provider.getIndexSearcherWrapper();
+ this.percolatorQueriesRegistry = new PercolatorQueriesRegistry(shardId, indexSettings, queryParserService, indexingService, mapperService, indexFieldDataService);
+ if (mapperService.hasMapping(PercolatorService.TYPE_NAME)) {
+ percolatorQueriesRegistry.enableRealTimePercolator();
+ }
}
public Store store() {
@@ -344,7 +339,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
if (newRouting.state() == ShardRoutingState.STARTED || newRouting.state() == ShardRoutingState.RELOCATING) {
// we want to refresh *before* we move to internal STARTED state
try {
- engine().refresh("cluster_state_started");
+ getEngine().refresh("cluster_state_started");
} catch (Throwable t) {
logger.debug("failed to refresh due to move to cluster wide started", t);
}
@@ -453,7 +448,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
if (logger.isTraceEnabled()) {
logger.trace("index [{}][{}]{}", create.type(), create.id(), create.docs());
}
- engine().create(create);
+ getEngine().create(create);
create.endTime(System.nanoTime());
} catch (Throwable ex) {
indexingService.postCreate(create, ex);
@@ -492,7 +487,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
if (logger.isTraceEnabled()) {
logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs());
}
- created = engine().index(index);
+ created = getEngine().index(index);
index.endTime(System.nanoTime());
} catch (Throwable ex) {
indexingService.postIndex(index, ex);
@@ -515,7 +510,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
if (logger.isTraceEnabled()) {
logger.trace("delete [{}]", delete.uid().text());
}
- engine().delete(delete);
+ getEngine().delete(delete);
delete.endTime(System.nanoTime());
} catch (Throwable ex) {
indexingService.postDelete(delete, ex);
@@ -526,7 +521,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
public Engine.GetResult get(Engine.Get get) {
readAllowed();
- return engine().get(get);
+ return getEngine().get(get, this::acquireSearcher);
}
public void refresh(String source) {
@@ -535,7 +530,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
logger.trace("refresh with source: {}", source);
}
long time = System.nanoTime();
- engine().refresh(source);
+ getEngine().refresh(source);
refreshMetric.inc(System.nanoTime() - time);
}
@@ -561,7 +556,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
*/
@Nullable
public CommitStats commitStats() {
- Engine engine = engineUnsafe();
+ Engine engine = getEngineOrNull();
return engine == null ? null : engine.commitStats();
}
@@ -588,7 +583,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
}
public MergeStats mergeStats() {
- final Engine engine = engineUnsafe();
+ final Engine engine = getEngineOrNull();
if (engine == null) {
return new MergeStats();
}
@@ -596,7 +591,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
}
public SegmentsStats segmentStats() {
- SegmentsStats segmentsStats = engine().segmentsStats();
+ SegmentsStats segmentsStats = getEngine().segmentsStats();
segmentsStats.addBitsetMemoryInBytes(shardBitsetFilterCache.getMemorySizeInBytes());
return segmentsStats;
}
@@ -621,12 +616,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
return percolatorQueriesRegistry;
}
- public ShardPercolateService shardPercolateService() {
- return shardPercolateService;
- }
-
public TranslogStats translogStats() {
- return engine().getTranslog().stats();
+ return getEngine().getTranslog().stats();
}
public SuggestStats suggestStats() {
@@ -651,7 +642,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
public Engine.SyncedFlushResult syncFlush(String syncId, Engine.CommitId expectedCommitId) {
verifyStartedOrRecovering();
logger.trace("trying to sync flush. sync id [{}]. expected commit id [{}]]", syncId, expectedCommitId);
- return engine().syncFlush(syncId, expectedCommitId);
+ return getEngine().syncFlush(syncId, expectedCommitId);
}
public Engine.CommitId flush(FlushRequest request) throws ElasticsearchException {
@@ -666,7 +657,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
verifyStartedOrRecovering();
long time = System.nanoTime();
- Engine.CommitId commitId = engine().flush(force, waitIfOngoing);
+ Engine.CommitId commitId = getEngine().flush(force, waitIfOngoing);
flushMetric.inc(System.nanoTime() - time);
return commitId;
@@ -677,7 +668,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
if (logger.isTraceEnabled()) {
logger.trace("optimize with {}", optimize);
}
- engine().forceMerge(optimize.flush(), optimize.maxNumSegments(), optimize.onlyExpungeDeletes(), false, false);
+ getEngine().forceMerge(optimize.flush(), optimize.maxNumSegments(), optimize.onlyExpungeDeletes(), false, false);
}
/**
@@ -690,7 +681,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
}
org.apache.lucene.util.Version previousVersion = minimumCompatibleVersion();
// we just want to upgrade the segments, not actually optimize to a single segment
- engine().forceMerge(true, // we need to flush at the end to make sure the upgrade is durable
+ getEngine().forceMerge(true, // we need to flush at the end to make sure the upgrade is durable
Integer.MAX_VALUE, // we just want to upgrade the segments, not actually optimize to a single segment
false, true, upgrade.upgradeOnlyAncientSegments());
org.apache.lucene.util.Version version = minimumCompatibleVersion();
@@ -703,7 +694,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
public org.apache.lucene.util.Version minimumCompatibleVersion() {
org.apache.lucene.util.Version luceneVersion = null;
- for (Segment segment : engine().segments(false)) {
+ for (Segment segment : getEngine().segments(false)) {
if (luceneVersion == null || luceneVersion.onOrAfter(segment.getVersion())) {
luceneVersion = segment.getVersion();
}
@@ -721,7 +712,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
IndexShardState state = this.state; // one time volatile read
// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) {
- return engine().snapshotIndex(flushFirst);
+ return getEngine().snapshotIndex(flushFirst);
} else {
throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed");
}
@@ -742,12 +733,17 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
*/
public void failShard(String reason, @Nullable Throwable e) {
// fail the engine. This will cause this shard to also be removed from the node's index service.
- engine().failEngine(reason, e);
+ getEngine().failEngine(reason, e);
}
public Engine.Searcher acquireSearcher(String source) {
readAllowed();
- return engine().acquireSearcher(source);
+ Engine engine = getEngine();
+ try {
+ return searcherWrapper == null ? engine.acquireSearcher(source) : searcherWrapper.wrap(engineConfig, engine.acquireSearcher(source));
+ } catch (IOException ex) {
+ throw new ElasticsearchException("failed to wrap searcher", ex);
+ }
}
public void close(String reason, boolean flushEngine) throws IOException {
@@ -774,8 +770,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
}
}
+
public IndexShard postRecovery(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException {
- indicesLifecycle.beforeIndexShardPostRecovery(this);
+ if (mapperService.hasMapping(PercolatorService.TYPE_NAME)) {
+ refresh("percolator_load_queries");
+ try (Engine.Searcher searcher = getEngine().acquireSearcher("percolator_load_queries")) {
+ this.percolatorQueriesRegistry.loadQueries(searcher.reader());
+ }
+ }
synchronized (mutex) {
if (state == IndexShardState.CLOSED) {
throw new IndexShardClosedException(shardId);
@@ -789,7 +791,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
recoveryState.setStage(RecoveryState.Stage.DONE);
changeState(IndexShardState.POST_RECOVERY, reason);
}
- indicesLifecycle.afterIndexShardPostRecovery(this);
return this;
}
@@ -813,7 +814,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
- return engineConfig.getTranslogRecoveryPerformer().performBatchRecovery(engine(), operations);
+ return engineConfig.getTranslogRecoveryPerformer().performBatchRecovery(getEngine(), operations);
}
/**
@@ -852,7 +853,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
* a remote peer.
*/
public void skipTranslogRecovery() throws IOException {
- assert engineUnsafe() == null : "engine was already created";
+ assert getEngineOrNull() == null : "engine was already created";
internalPerformTranslogRecovery(true, true);
assert recoveryState.getTranslog().recoveredOperations() == 0;
}
@@ -892,7 +893,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
*/
public void finalizeRecovery() {
recoveryState().setStage(RecoveryState.Stage.FINALIZE);
- engine().refresh("recovery_finalization");
+ getEngine().refresh("recovery_finalization");
startScheduledTasksIfNeeded();
engineConfig.setEnableGcDeletes(true);
}
@@ -982,7 +983,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
config.setIndexingBufferSize(shardIndexingBufferSize);
- Engine engine = engineUnsafe();
+ Engine engine = getEngineOrNull();
if (engine == null) {
logger.debug("updateBufferSize: engine is closed; skipping");
return;
@@ -1057,7 +1058,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
*/
boolean shouldFlush() {
if (disableFlush == false) {
- Engine engine = engineUnsafe();
+ Engine engine = getEngineOrNull();
if (engine != null) {
try {
Translog translog = engine.getTranslog();
@@ -1171,15 +1172,37 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
searchService.onRefreshSettings(settings);
indexingService.onRefreshSettings(settings);
if (change) {
- engine().onSettingsChanged();
+ getEngine().onSettingsChanged();
}
}
+ public Translog.View acquireTranslogView() {
+ Engine engine = getEngine();
+ assert engine.getTranslog() != null : "translog must not be null";
+ return engine.getTranslog().newView();
+ }
+
+ public List segments(boolean verbose) {
+ return getEngine().segments(verbose);
+ }
+
+ public void flushAndCloseEngine() throws IOException {
+ getEngine().flushAndClose();
+ }
+
+ public Translog getTranslog() {
+ return getEngine().getTranslog();
+ }
+
+ public PercolateStats percolateStats() {
+ return percolatorQueriesRegistry.stats();
+ }
+
class EngineRefresher implements Runnable {
@Override
public void run() {
// we check before if a refresh is needed, if not, we reschedule, otherwise, we fork, refresh, and then reschedule
- if (!engine().refreshNeeded()) {
+ if (!getEngine().refreshNeeded()) {
reschedule();
return;
}
@@ -1187,7 +1210,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
@Override
public void run() {
try {
- if (engine().refreshNeeded()) {
+ if (getEngine().refreshNeeded()) {
refresh("schedule");
}
} catch (EngineClosedException e) {
@@ -1300,8 +1323,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
recoveryState.getVerifyIndex().checkIndexTime(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - timeNS)));
}
- public Engine engine() {
- Engine engine = engineUnsafe();
+ Engine getEngine() {
+ Engine engine = getEngineOrNull();
if (engine == null) {
throw new EngineClosedException(shardId);
}
@@ -1310,7 +1333,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
/** NOTE: returns null if engine is not yet started (e.g. recovery phase 1, copying over index files, is still running), or if engine is
* closed. */
- protected Engine engineUnsafe() {
+ protected Engine getEngineOrNull() {
return this.currentEngineReference.get();
}
@@ -1403,7 +1426,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
};
return new EngineConfig(shardId,
threadPool, indexingService, indexSettings, warmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig,
- mapperService.indexAnalyzer(), similarityService.similarity(), codecService, failedEngineListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, wrappingService, translogConfig);
+ mapperService.indexAnalyzer(), similarityService.similarity(), codecService, failedEngineListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig);
}
private static class IndexShardOperationCounter extends AbstractRefCounted {
@@ -1444,7 +1467,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
*/
public void sync(Translog.Location location) {
try {
- final Engine engine = engine();
+ final Engine engine = getEngine();
engine.getTranslog().ensureSynced(location);
} catch (EngineClosedException ex) {
// that's fine since we already synced everything on engine close - this also is conform with the methods documentation
@@ -1515,4 +1538,5 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
}
return false;
}
+
}
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java
deleted file mode 100644
index 188669f3fb2..00000000000
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.index.shard;
-
-import org.elasticsearch.cluster.metadata.IndexMetaData;
-import org.elasticsearch.common.inject.AbstractModule;
-import org.elasticsearch.common.inject.multibindings.Multibinder;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.index.engine.IndexSearcherWrapper;
-import org.elasticsearch.index.engine.IndexSearcherWrappingService;
-import org.elasticsearch.index.engine.EngineFactory;
-import org.elasticsearch.index.engine.InternalEngineFactory;
-
-/**
- * The {@code IndexShardModule} module is responsible for binding the correct
- * shard id, index shard, engine factory, and warming service for a newly
- * created shard.
- */
-public class IndexShardModule extends AbstractModule {
-
- private final ShardId shardId;
- private final Settings settings;
- private final boolean primary;
-
- // pkg private so tests can mock
- Class extends EngineFactory> engineFactoryImpl = InternalEngineFactory.class;
-
- public IndexShardModule(ShardId shardId, boolean primary, Settings settings) {
- this.settings = settings;
- this.shardId = shardId;
- this.primary = primary;
- if (settings.get("index.translog.type") != null) {
- throw new IllegalStateException("a custom translog type is no longer supported. got [" + settings.get("index.translog.type") + "]");
- }
- }
-
- /** Return true if a shadow engine should be used */
- protected boolean useShadowEngine() {
- return primary == false && IndexMetaData.isIndexUsingShadowReplicas(settings);
- }
-
- @Override
- protected void configure() {
- bind(ShardId.class).toInstance(shardId);
- if (useShadowEngine()) {
- bind(IndexShard.class).to(ShadowIndexShard.class).asEagerSingleton();
- } else {
- bind(IndexShard.class).asEagerSingleton();
- }
-
- bind(EngineFactory.class).to(engineFactoryImpl);
- bind(IndexSearcherWrappingService.class).asEagerSingleton();
- // this injects an empty set in IndexSearcherWrappingService, otherwise guice can't construct IndexSearcherWrappingService
- Multibinder multibinder
- = Multibinder.newSetBinder(binder(), IndexSearcherWrapper.class);
- }
-
-
-}
\ No newline at end of file
diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java
index 62fa928faf1..c81b9e5c541 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java
@@ -18,32 +18,14 @@
*/
package org.elasticsearch.index.shard;
-import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.routing.ShardRouting;
-import org.elasticsearch.common.Nullable;
-import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.util.BigArrays;
-import org.elasticsearch.index.aliases.IndexAliasesService;
-import org.elasticsearch.index.cache.IndexCache;
-import org.elasticsearch.index.codec.CodecService;
-import org.elasticsearch.index.engine.IndexSearcherWrappingService;
+import org.elasticsearch.index.IndexServicesProvider;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
-import org.elasticsearch.index.engine.EngineFactory;
-import org.elasticsearch.index.fielddata.IndexFieldDataService;
-import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.MergeStats;
-import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.settings.IndexSettings;
-import org.elasticsearch.index.settings.IndexSettingsService;
-import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
-import org.elasticsearch.index.termvectors.TermVectorsService;
-import org.elasticsearch.indices.IndicesLifecycle;
-import org.elasticsearch.indices.IndicesWarmer;
-import org.elasticsearch.indices.cache.query.IndicesQueryCache;
-import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
@@ -55,23 +37,8 @@ import java.io.IOException;
*/
public final class ShadowIndexShard extends IndexShard {
- @Inject
- public ShadowIndexShard(ShardId shardId, @IndexSettings Settings indexSettings,
- IndicesLifecycle indicesLifecycle, Store store,
- ThreadPool threadPool, MapperService mapperService,
- IndexQueryParserService queryParserService, IndexCache indexCache,
- IndexAliasesService indexAliasesService, IndicesQueryCache indicesQueryCache,
- CodecService codecService, TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService,
- @Nullable IndicesWarmer warmer,
- SimilarityService similarityService,
- EngineFactory factory,
- ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) throws IOException {
- super(shardId, indexSettings, indicesLifecycle, store,
- threadPool, mapperService, queryParserService, indexCache, indexAliasesService,
- indicesQueryCache, codecService,
- termVectorsService, indexFieldDataService,
- warmer, similarityService,
- factory, path, bigArrays, wrappingService);
+ public ShadowIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, ShardPath path, Store store, IndexServicesProvider provider) throws IOException {
+ super(shardId, indexSettings, path, store, provider);
}
/**
diff --git a/core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java b/core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java
index cbbfaae0fc2..d90a869f5b3 100644
--- a/core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java
+++ b/core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java
@@ -30,6 +30,7 @@ import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
+import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterService;
@@ -108,6 +109,8 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
private RateLimitingInputStream.Listener snapshotThrottleListener;
+ private RateLimitingInputStream.Listener restoreThrottleListener;
+
private boolean compress;
private final ParseFieldMatcher parseFieldMatcher;
@@ -162,6 +165,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
this.restoreRateLimiter = restoreRateLimiter;
this.rateLimiterListener = rateLimiterListener;
this.snapshotThrottleListener = nanos -> rateLimiterListener.onSnapshotPause(nanos);
+ this.restoreThrottleListener = nanos -> rateLimiterListener.onRestorePause(nanos);
this.compress = compress;
indexShardSnapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher, isCompress());
indexShardSnapshotLegacyFormat = new LegacyBlobStoreFormat<>(LEGACY_SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher);
@@ -501,7 +505,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
public SnapshotContext(SnapshotId snapshotId, ShardId shardId, IndexShardSnapshotStatus snapshotStatus) {
super(snapshotId, Version.CURRENT, shardId);
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
- store = indexService.shard(shardId.id()).store();
+ store = indexService.getShardOrNull(shardId.id()).store();
this.snapshotStatus = snapshotStatus;
}
@@ -785,7 +789,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
*/
public RestoreContext(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) {
super(snapshotId, version, shardId, snapshotShardId);
- store = indicesService.indexServiceSafe(shardId.getIndex()).shard(shardId.id()).store();
+ store = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()).store();
this.recoveryState = recoveryState;
}
@@ -906,16 +910,20 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
*/
private void restoreFile(final FileInfo fileInfo) throws IOException {
boolean success = false;
- try (InputStream stream = new PartSliceStream(blobContainer, fileInfo)) {
+
+ try (InputStream partSliceStream = new PartSliceStream(blobContainer, fileInfo)) {
+ final InputStream stream;
+ if (restoreRateLimiter == null) {
+ stream = partSliceStream;
+ } else {
+ stream = new RateLimitingInputStream(partSliceStream, restoreRateLimiter, restoreThrottleListener);
+ }
try (final IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) {
final byte[] buffer = new byte[BUFFER_SIZE];
int length;
while ((length = stream.read(buffer)) > 0) {
indexOutput.writeBytes(buffer, 0, length);
recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.name(), length);
- if (restoreRateLimiter != null) {
- rateLimiterListener.onRestorePause(restoreRateLimiter.pause(length));
- }
}
Store.verify(indexOutput);
indexOutput.close();
diff --git a/core/src/main/java/org/elasticsearch/index/store/IndexStore.java b/core/src/main/java/org/elasticsearch/index/store/IndexStore.java
index 4022dd75aa1..3a23a09a652 100644
--- a/core/src/main/java/org/elasticsearch/index/store/IndexStore.java
+++ b/core/src/main/java/org/elasticsearch/index/store/IndexStore.java
@@ -27,6 +27,7 @@ import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
+import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.indices.store.IndicesStore;
import java.io.Closeable;
@@ -112,7 +113,7 @@ public class IndexStore extends AbstractIndexComponent implements Closeable {
/**
* The shard store class that should be used for each shard.
*/
- public Class extends DirectoryService> shardDirectory() {
- return FsDirectoryService.class;
+ public DirectoryService newDirectoryService(ShardPath path) {
+ return new FsDirectoryService(indexSettings, this, path);
}
}
diff --git a/core/src/main/java/org/elasticsearch/index/store/Store.java b/core/src/main/java/org/elasticsearch/index/store/Store.java
index 39a0fc13e49..629e45574cb 100644
--- a/core/src/main/java/org/elasticsearch/index/store/Store.java
+++ b/core/src/main/java/org/elasticsearch/index/store/Store.java
@@ -1318,12 +1318,16 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
@Override
public void writeByte(byte b) throws IOException {
final long writtenBytes = this.writtenBytes++;
- if (writtenBytes == checksumPosition) {
- readAndCompareChecksum();
- } else if (writtenBytes > checksumPosition) { // we are writing parts of the checksum....
+ if (writtenBytes >= checksumPosition) { // we are writing parts of the checksum....
+ if (writtenBytes == checksumPosition) {
+ readAndCompareChecksum();
+ }
final int index = Math.toIntExact(writtenBytes - checksumPosition);
if (index < footerChecksum.length) {
footerChecksum[index] = b;
+ if (index == footerChecksum.length-1) {
+ verify(); // we have recorded the entire checksum
+ }
} else {
verify(); // fail if we write more than expected
throw new AssertionError("write past EOF expected length: " + metadata.length() + " writtenBytes: " + writtenBytes);
@@ -1344,16 +1348,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
@Override
public void writeBytes(byte[] b, int offset, int length) throws IOException {
if (writtenBytes + length > checksumPosition) {
- if (actualChecksum == null) {
- assert writtenBytes <= checksumPosition;
- final int bytesToWrite = (int) (checksumPosition - writtenBytes);
- out.writeBytes(b, offset, bytesToWrite);
- readAndCompareChecksum();
- offset += bytesToWrite;
- length -= bytesToWrite;
- writtenBytes += bytesToWrite;
- }
- for (int i = 0; i < length; i++) {
+ for (int i = 0; i < length; i++) { // don't optimze writing the last block of bytes
writeByte(b[offset+i]);
}
} else {
@@ -1361,7 +1356,6 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
writtenBytes += length;
}
}
-
}
/**
diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesLifecycle.java b/core/src/main/java/org/elasticsearch/indices/IndicesLifecycle.java
index 211b6d4869d..8c761dfe898 100644
--- a/core/src/main/java/org/elasticsearch/indices/IndicesLifecycle.java
+++ b/core/src/main/java/org/elasticsearch/indices/IndicesLifecycle.java
@@ -97,17 +97,6 @@ public interface IndicesLifecycle {
}
- /**
- * Called right after the shard is moved into POST_RECOVERY mode
- */
- public void afterIndexShardPostRecovery(IndexShard indexShard) {}
-
- /**
- * Called right before the shard is moved into POST_RECOVERY mode.
- * The shard is ready to be used but not yet marked as POST_RECOVERY.
- */
- public void beforeIndexShardPostRecovery(IndexShard indexShard) {}
-
/**
* Called after the index shard has been started.
*/
diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesService.java b/core/src/main/java/org/elasticsearch/indices/IndicesService.java
index 0ad77fbfcd5..ba160fc636c 100644
--- a/core/src/main/java/org/elasticsearch/indices/IndicesService.java
+++ b/core/src/main/java/org/elasticsearch/indices/IndicesService.java
@@ -51,18 +51,15 @@ import org.elasticsearch.index.IndexNameModule;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.LocalNodeIdModule;
-import org.elasticsearch.index.aliases.IndexAliasesServiceModule;
import org.elasticsearch.index.analysis.AnalysisModule;
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.cache.IndexCacheModule;
-import org.elasticsearch.index.fielddata.IndexFieldDataModule;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.indexing.IndexingStats;
import org.elasticsearch.index.mapper.MapperService;
-import org.elasticsearch.index.mapper.MapperServiceModule;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.recovery.RecoveryStats;
@@ -343,11 +340,7 @@ public class IndicesService extends AbstractLifecycleComponent i
modules.add(new AnalysisModule(indexSettings, indicesAnalysisService));
modules.add(new SimilarityModule(indexSettings));
modules.add(new IndexCacheModule(indexSettings));
- modules.add(new IndexFieldDataModule(indexSettings));
- modules.add(new MapperServiceModule());
- modules.add(new IndexAliasesServiceModule());
- modules.add(new IndexModule(indexSettings));
-
+ modules.add(new IndexModule());
pluginsService.processModules(modules);
Injector indexInjector;
diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesWarmer.java b/core/src/main/java/org/elasticsearch/indices/IndicesWarmer.java
index 9ee45b21def..2a82774a612 100644
--- a/core/src/main/java/org/elasticsearch/indices/IndicesWarmer.java
+++ b/core/src/main/java/org/elasticsearch/indices/IndicesWarmer.java
@@ -87,7 +87,7 @@ public final class IndicesWarmer extends AbstractComponent {
if (indexService == null) {
return;
}
- final IndexShard indexShard = indexService.shard(context.shardId().id());
+ final IndexShard indexShard = indexService.getShardOrNull(context.shardId().id());
if (indexShard == null) {
return;
}
diff --git a/core/src/main/java/org/elasticsearch/indices/InternalIndicesLifecycle.java b/core/src/main/java/org/elasticsearch/indices/InternalIndicesLifecycle.java
index 77050714db2..16c0c362c42 100644
--- a/core/src/main/java/org/elasticsearch/indices/InternalIndicesLifecycle.java
+++ b/core/src/main/java/org/elasticsearch/indices/InternalIndicesLifecycle.java
@@ -121,28 +121,6 @@ public class InternalIndicesLifecycle extends AbstractComponent implements Indic
}
}
- public void beforeIndexShardPostRecovery(IndexShard indexShard) {
- for (Listener listener : listeners) {
- try {
- listener.beforeIndexShardPostRecovery(indexShard);
- } catch (Throwable t) {
- logger.warn("{} failed to invoke before shard post recovery callback", t, indexShard.shardId());
- throw t;
- }
- }
- }
-
-
- public void afterIndexShardPostRecovery(IndexShard indexShard) {
- for (Listener listener : listeners) {
- try {
- listener.afterIndexShardPostRecovery(indexShard);
- } catch (Throwable t) {
- logger.warn("{} failed to invoke after shard post recovery callback", t, indexShard.shardId());
- throw t;
- }
- }
- }
public void afterIndexShardStarted(IndexShard indexShard) {
for (Listener listener : listeners) {
diff --git a/core/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java b/core/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java
index 747d15a01f9..c8142f3d37a 100644
--- a/core/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java
+++ b/core/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java
@@ -38,7 +38,7 @@ import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.indexing.IndexingStats;
import org.elasticsearch.index.merge.MergeStats;
-import org.elasticsearch.index.percolator.stats.PercolateStats;
+import org.elasticsearch.index.percolator.PercolateStats;
import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.stats.SearchStats;
diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
index eb2bc242a6c..6bce5bc4be1 100644
--- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
+++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
@@ -327,7 +327,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent[] asyncSendFiles(Store store, StoreFileMetaData[] files, Function outputStreamFactory) {
@@ -674,7 +672,6 @@ public class RecoverySourceHandler {
try (final OutputStream outputStream = outputStreamFactory.apply(md);
final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE)) {
Streams.copy(new InputStreamIndexInput(indexInput, md.length()), outputStream);
- Store.verify(indexInput);
}
return null;
});
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java
index a466147e71c..123480e81de 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java
@@ -52,7 +52,7 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler {
// if we relocate we need to close the engine in order to open a new
// IndexWriter on the other end of the relocation
engineClosed = true;
- shard.engine().flushAndClose();
+ shard.flushAndCloseEngine();
} catch (IOException e) {
logger.warn("close engine failed", e);
shard.failShard("failed to close engine (phase1)", e);
diff --git a/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java
index 45b19ae8fc5..b1cb507522e 100644
--- a/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java
+++ b/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java
@@ -395,7 +395,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
ShardId shardId = request.shardId;
IndexService indexService = indicesService.indexService(shardId.index().getName());
if (indexService != null && indexService.indexUUID().equals(request.indexUUID)) {
- return indexService.shard(shardId.id());
+ return indexService.getShardOrNull(shardId.id());
}
return null;
}
diff --git a/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java b/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java
index 48ef0aa168c..ec5cc181aa3 100644
--- a/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java
+++ b/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java
@@ -152,7 +152,7 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction> and pass previous,
// like the indexer does
- TokenStream tokenStream = field.tokenStream(analyzer, null);
- if (tokenStream != null) {
- memoryIndex.addField(field.name(), tokenStream, field.boost());
- }
+ try (TokenStream tokenStream = field.tokenStream(analyzer, null)) {
+ if (tokenStream != null) {
+ memoryIndex.addField(field.name(), tokenStream, field.boost());
+ }
+ }
} catch (IOException e) {
throw new ElasticsearchException("Failed to create token stream", e);
}
diff --git a/core/src/main/java/org/elasticsearch/percolator/PercolateContext.java b/core/src/main/java/org/elasticsearch/percolator/PercolateContext.java
index 190ffc99293..8cb797cdce0 100644
--- a/core/src/main/java/org/elasticsearch/percolator/PercolateContext.java
+++ b/core/src/main/java/org/elasticsearch/percolator/PercolateContext.java
@@ -50,6 +50,7 @@ import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.object.ObjectMapper;
+import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.query.ParsedQuery;
import org.elasticsearch.index.shard.IndexShard;
@@ -89,6 +90,7 @@ import java.util.concurrent.ConcurrentMap;
*/
public class PercolateContext extends SearchContext {
+ private final PercolatorQueriesRegistry percolateQueryRegistry;
public boolean limit;
private int size;
public boolean doSort;
@@ -102,7 +104,6 @@ public class PercolateContext extends SearchContext {
private final PageCacheRecycler pageCacheRecycler;
private final BigArrays bigArrays;
private final ScriptService scriptService;
- private final ConcurrentMap percolateQueries;
private final int numberOfShards;
private final Query aliasFilter;
private final long originNanoTime = System.nanoTime();
@@ -133,7 +134,7 @@ public class PercolateContext extends SearchContext {
this.indexService = indexService;
this.fieldDataService = indexService.fieldData();
this.searchShardTarget = searchShardTarget;
- this.percolateQueries = indexShard.percolateRegistry().percolateQueries();
+ this.percolateQueryRegistry = indexShard.percolateRegistry();
this.types = new String[]{request.documentType()};
this.pageCacheRecycler = pageCacheRecycler;
this.bigArrays = bigArrays.withCircuitBreaking();
@@ -179,7 +180,7 @@ public class PercolateContext extends SearchContext {
}
public ConcurrentMap percolateQueries() {
- return percolateQueries;
+ return percolateQueryRegistry.percolateQueries();
}
public Query percolateQuery() {
diff --git a/core/src/main/java/org/elasticsearch/percolator/PercolatorService.java b/core/src/main/java/org/elasticsearch/percolator/PercolatorService.java
index ba4ccaeb25e..b20a54f076d 100644
--- a/core/src/main/java/org/elasticsearch/percolator/PercolatorService.java
+++ b/core/src/main/java/org/elasticsearch/percolator/PercolatorService.java
@@ -71,7 +71,7 @@ import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
-import org.elasticsearch.index.percolator.stats.ShardPercolateService;
+import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
import org.elasticsearch.index.query.ParsedQuery;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
@@ -86,7 +86,6 @@ import org.elasticsearch.search.aggregations.AggregationPhase;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
-import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
import org.elasticsearch.search.highlight.HighlightField;
import org.elasticsearch.search.highlight.HighlightPhase;
@@ -177,11 +176,10 @@ public class PercolatorService extends AbstractComponent {
public PercolateShardResponse percolate(PercolateShardRequest request) {
IndexService percolateIndexService = indicesService.indexServiceSafe(request.shardId().getIndex());
- IndexShard indexShard = percolateIndexService.shardSafe(request.shardId().id());
+ IndexShard indexShard = percolateIndexService.getShard(request.shardId().id());
indexShard.readAllowed(); // check if we can read the shard...
-
- ShardPercolateService shardPercolateService = indexShard.shardPercolateService();
- shardPercolateService.prePercolate();
+ PercolatorQueriesRegistry percolateQueryRegistry = indexShard.percolateRegistry();
+ percolateQueryRegistry.prePercolate();
long startTime = System.nanoTime();
// TODO: The filteringAliases should be looked up at the coordinating node and serialized with all shard request,
@@ -255,7 +253,7 @@ public class PercolatorService extends AbstractComponent {
} finally {
SearchContext.removeCurrent();
context.close();
- shardPercolateService.postPercolate(System.nanoTime() - startTime);
+ percolateQueryRegistry.postPercolate(System.nanoTime() - startTime);
}
}
diff --git a/core/src/main/java/org/elasticsearch/percolator/SingleDocumentPercolatorIndex.java b/core/src/main/java/org/elasticsearch/percolator/SingleDocumentPercolatorIndex.java
index 3233cdcd756..1271872cab6 100644
--- a/core/src/main/java/org/elasticsearch/percolator/SingleDocumentPercolatorIndex.java
+++ b/core/src/main/java/org/elasticsearch/percolator/SingleDocumentPercolatorIndex.java
@@ -56,10 +56,11 @@ class SingleDocumentPercolatorIndex implements PercolatorIndex {
Analyzer analyzer = context.mapperService().documentMapper(parsedDocument.type()).mappers().indexAnalyzer();
// TODO: instead of passing null here, we can have a CTL