diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index c949a293010..d3e0cdade27 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -80,13 +80,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -313,7 +307,7 @@ public class MetaDataCreateIndexService extends AbstractComponent { // Set up everything, now locally create the index to see that things are ok, and apply final IndexMetaData tmpImd = IndexMetaData.builder(request.index()).settings(actualIndexSettings).build(); // create the index here (on the master) to validate it can be created, as well as adding the mapping - indicesService.createIndex(tmpImd); + indicesService.createIndex(tmpImd, Collections.EMPTY_LIST); indexCreated = true; // now add the mappings IndexService indexService = indicesService.indexServiceSafe(request.index()); @@ -387,7 +381,7 @@ public class MetaDataCreateIndexService extends AbstractComponent { throw e; } - indexService.indicesLifecycle().beforeIndexAddedToCluster(new Index(request.index()), + indexService.getIndexEventListener().beforeIndexAddedToCluster(new Index(request.index()), indexMetaData.getSettings()); MetaData newMetaData = MetaData.builder(currentState.metaData()) @@ -433,29 +427,6 @@ public class MetaDataCreateIndexService extends AbstractComponent { } } - private void addMappings(Map> mappings, Path mappingsDir) throws IOException { - try (DirectoryStream stream = Files.newDirectoryStream(mappingsDir)) { - for (Path mappingFile : stream) { - final String fileName = mappingFile.getFileName().toString(); - if (FileSystemUtils.isHidden(mappingFile)) { - continue; - } - int lastDotIndex = fileName.lastIndexOf('.'); - String mappingType = lastDotIndex != -1 ? mappingFile.getFileName().toString().substring(0, lastDotIndex) : mappingFile.getFileName().toString(); - try (BufferedReader reader = Files.newBufferedReader(mappingFile, StandardCharsets.UTF_8)) { - String mappingSource = Streams.copyToString(reader); - if (mappings.containsKey(mappingType)) { - XContentHelper.mergeDefaults(mappings.get(mappingType), parseMapping(mappingSource)); - } else { - mappings.put(mappingType, parseMapping(mappingSource)); - } - } catch (Exception e) { - logger.warn("failed to read / parse mapping [" + mappingType + "] from location [" + mappingFile + "], ignoring...", e); - } - } - } - } - private List findTemplates(CreateIndexClusterStateUpdateRequest request, ClusterState state, IndexTemplateFilter indexTemplateFilter) throws IOException { List templates = new ArrayList<>(); for (ObjectCursor cursor : state.metaData().templates().values()) { diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesService.java index c051d9b60db..2dffcb9c6fe 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesService.java @@ -36,10 +36,7 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.indices.IndicesService; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; /** * Service responsible for submitting add and remove aliases requests @@ -98,7 +95,7 @@ public class MetaDataIndexAliasesService extends AbstractComponent { if (indexService == null) { // temporarily create the index and add mappings so we can parse the filter try { - indexService = indicesService.createIndex(indexMetaData); + indexService = indicesService.createIndex(indexMetaData, Collections.EMPTY_LIST); if (indexMetaData.getMappings().containsKey(MapperService.DEFAULT_MAPPING)) { indexService.mapperService().merge(MapperService.DEFAULT_MAPPING, indexMetaData.getMappings().get(MapperService.DEFAULT_MAPPING).source(), false, false); } diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java index 4fc19ecb6f2..e9c9f5509d1 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java @@ -172,7 +172,7 @@ public class MetaDataMappingService extends AbstractComponent { IndexService indexService = indicesService.indexService(index); if (indexService == null) { // we need to create the index here, and add the current mapping to it, so we can merge - indexService = indicesService.createIndex(indexMetaData); + indexService = indicesService.createIndex(indexMetaData, Collections.EMPTY_LIST); removeIndex = true; Set typesToIntroduce = new HashSet<>(); for (MappingTask task : tasks) { @@ -350,7 +350,7 @@ public class MetaDataMappingService extends AbstractComponent { continue; } final IndexMetaData indexMetaData = currentState.metaData().index(index); - IndexService indexService = indicesService.createIndex(indexMetaData); + IndexService indexService = indicesService.createIndex(indexMetaData, Collections.EMPTY_LIST); indicesToClose.add(indexMetaData.getIndex()); // make sure to add custom default mapping if exists if (indexMetaData.getMappings().containsKey(MapperService.DEFAULT_MAPPING)) { diff --git a/core/src/main/java/org/elasticsearch/indices/InternalIndicesLifecycle.java b/core/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java similarity index 56% rename from core/src/main/java/org/elasticsearch/indices/InternalIndicesLifecycle.java rename to core/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java index 16c0c362c42..8165f5a365f 100644 --- a/core/src/main/java/org/elasticsearch/indices/InternalIndicesLifecycle.java +++ b/core/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java @@ -17,231 +17,244 @@ * under the License. */ -package org.elasticsearch.indices; +package org.elasticsearch.index; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.Index; -import org.elasticsearch.index.IndexService; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; +import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.ShardId; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; /** - * InternalIndicesLifecycle handles invoking each listener for the Index. All - * exceptions thrown by listeners are logged and then re-thrown to stop further - * index action. + * A composite {@link IndexEventListener} that forwards all callbacks to an immutable list of IndexEventListener */ -public class InternalIndicesLifecycle extends AbstractComponent implements IndicesLifecycle { +final class CompositeIndexEventListener implements IndexEventListener { - private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>(); + private final List listeners; + private final ESLogger logger; - @Inject - public InternalIndicesLifecycle(Settings settings) { - super(settings); - } - @Override - public void addListener(Listener listener) { - listeners.add(listener); + CompositeIndexEventListener(String index, Settings indexSettings, Collection listeners) { + for (IndexEventListener listener : listeners) { + if (listener == null) { + throw new IllegalArgumentException("listeners must be non-null"); + } + } + this.listeners = Collections.unmodifiableList(new ArrayList<>(listeners)); + this.logger = Loggers.getLogger(getClass(), indexSettings, index); } @Override - public void removeListener(Listener listener) { - listeners.remove(listener); - } - public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting oldRouting, ShardRouting newRouting) { - for (Listener listener : listeners) { + for (IndexEventListener listener : listeners) { try { listener.shardRoutingChanged(indexShard, oldRouting, newRouting); } catch (Throwable t) { - logger.warn("{} failed to invoke shard touring changed callback", t, indexShard.shardId()); - } - } - } - - public void beforeIndexAddedToCluster(Index index, @IndexSettings Settings indexSettings) { - for (Listener listener : listeners) { - try { - listener.beforeIndexAddedToCluster(index, indexSettings); - } catch (Throwable t) { - logger.warn("[{}] failed to invoke before index added to cluster callback", t, index.name()); - throw t; - } - } - } - - public void beforeIndexCreated(Index index, @IndexSettings Settings indexSettings) { - for (Listener listener : listeners) { - try { - listener.beforeIndexCreated(index, indexSettings); - } catch (Throwable t) { - logger.warn("[{}] failed to invoke before index created callback", t, index.name()); - throw t; - } - } - } - - public void afterIndexCreated(IndexService indexService) { - for (Listener listener : listeners) { - try { - listener.afterIndexCreated(indexService); - } catch (Throwable t) { - logger.warn("[{}] failed to invoke after index created callback", t, indexService.index().name()); - throw t; - } - } - } - - public void beforeIndexShardCreated(ShardId shardId, @IndexSettings Settings indexSettings) { - for (Listener listener : listeners) { - try { - listener.beforeIndexShardCreated(shardId, indexSettings); - } catch (Throwable t) { - logger.warn("{} failed to invoke before shard created callback", t, shardId); - throw t; + logger.warn("[{}] failed to invoke shard touring changed callback", t, indexShard.shardId().getId()); } } } + @Override public void afterIndexShardCreated(IndexShard indexShard) { - for (Listener listener : listeners) { + for (IndexEventListener listener : listeners) { try { listener.afterIndexShardCreated(indexShard); } catch (Throwable t) { - logger.warn("{} failed to invoke after shard created callback", t, indexShard.shardId()); + logger.warn("[{}] failed to invoke after shard created callback", t, indexShard.shardId().getId()); throw t; } } } - + @Override public void afterIndexShardStarted(IndexShard indexShard) { - for (Listener listener : listeners) { + for (IndexEventListener listener : listeners) { try { listener.afterIndexShardStarted(indexShard); } catch (Throwable t) { - logger.warn("{} failed to invoke after shard started callback", t, indexShard.shardId()); - throw t; - } - } - } - - public void beforeIndexClosed(IndexService indexService) { - for (Listener listener : listeners) { - try { - listener.beforeIndexClosed(indexService); - } catch (Throwable t) { - logger.warn("[{}] failed to invoke before index closed callback", t, indexService.index().name()); - throw t; - } - } - } - - public void beforeIndexDeleted(IndexService indexService) { - for (Listener listener : listeners) { - try { - listener.beforeIndexDeleted(indexService); - } catch (Throwable t) { - logger.warn("[{}] failed to invoke before index deleted callback", t, indexService.index().name()); - throw t; - } - } - } - - public void afterIndexDeleted(Index index, @IndexSettings Settings indexSettings) { - for (Listener listener : listeners) { - try { - listener.afterIndexDeleted(index, indexSettings); - } catch (Throwable t) { - logger.warn("[{}] failed to invoke after index deleted callback", t, index.name()); - throw t; - } - } - } - - public void afterIndexClosed(Index index, @IndexSettings Settings indexSettings) { - for (Listener listener : listeners) { - try { - listener.afterIndexClosed(index, indexSettings); - } catch (Throwable t) { - logger.warn("[{}] failed to invoke after index closed callback", t, index.name()); + logger.warn("[{}] failed to invoke after shard started callback", t, indexShard.shardId().getId()); throw t; } } } + @Override public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, - @IndexSettings Settings indexSettings) { - for (Listener listener : listeners) { + Settings indexSettings) { + for (IndexEventListener listener : listeners) { try { listener.beforeIndexShardClosed(shardId, indexShard, indexSettings); } catch (Throwable t) { - logger.warn("{} failed to invoke before shard closed callback", t, shardId); + logger.warn("[{}] failed to invoke before shard closed callback", t, shardId.getId()); throw t; } } } + @Override public void afterIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, - @IndexSettings Settings indexSettings) { - for (Listener listener : listeners) { + Settings indexSettings) { + for (IndexEventListener listener : listeners) { try { listener.afterIndexShardClosed(shardId, indexShard, indexSettings); } catch (Throwable t) { - logger.warn("{} failed to invoke after shard closed callback", t, shardId); - throw t; - } - } - } - - public void beforeIndexShardDeleted(ShardId shardId, - @IndexSettings Settings indexSettings) { - for (Listener listener : listeners) { - try { - listener.beforeIndexShardDeleted(shardId, indexSettings); - } catch (Throwable t) { - logger.warn("{} failed to invoke before shard deleted callback", t, shardId); - throw t; - } - } - } - - public void afterIndexShardDeleted(ShardId shardId, - @IndexSettings Settings indexSettings) { - for (Listener listener : listeners) { - try { - listener.afterIndexShardDeleted(shardId, indexSettings); - } catch (Throwable t) { - logger.warn("{} failed to invoke after shard deleted callback", t, shardId); - throw t; - } - } - } - - public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, @Nullable String reason) { - for (Listener listener : listeners) { - try { - listener.indexShardStateChanged(indexShard, previousState, indexShard.state(), reason); - } catch (Throwable t) { - logger.warn("{} failed to invoke index shard state changed callback", t, indexShard.shardId()); + logger.warn("[{}] failed to invoke after shard closed callback", t, shardId.getId()); throw t; } } } + @Override public void onShardInactive(IndexShard indexShard) { - for (Listener listener : listeners) { + for (IndexEventListener listener : listeners) { try { listener.onShardInactive(indexShard); } catch (Throwable t) { - logger.warn("{} failed to invoke on shard inactive callback", t, indexShard.shardId()); + logger.warn("[{}] failed to invoke on shard inactive callback", t, indexShard.shardId().getId()); + throw t; + } + } + } + + @Override + public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, @Nullable String reason) { + for (IndexEventListener listener : listeners) { + try { + listener.indexShardStateChanged(indexShard, previousState, indexShard.state(), reason); + } catch (Throwable t) { + logger.warn("[{}] failed to invoke index shard state changed callback", t, indexShard.shardId().getId()); + throw t; + } + } + } + + @Override + public void beforeIndexCreated(Index index, @IndexSettings Settings indexSettings) { + for (IndexEventListener listener : listeners) { + try { + listener.beforeIndexCreated(index, indexSettings); + } catch (Throwable t) { + logger.warn("failed to invoke before index created callback", t); + throw t; + } + } + } + + @Override + public void afterIndexCreated(IndexService indexService) { + for (IndexEventListener listener : listeners) { + try { + listener.afterIndexCreated(indexService); + } catch (Throwable t) { + logger.warn("failed to invoke after index created callback", t); + throw t; + } + } + } + + @Override + public void beforeIndexShardCreated(ShardId shardId, Settings indexSettings) { + for (IndexEventListener listener : listeners) { + try { + listener.beforeIndexShardCreated(shardId, indexSettings); + } catch (Throwable t) { + logger.warn("[{}] failed to invoke before shard created callback", t, shardId); + throw t; + } + } + } + + @Override + public void beforeIndexClosed(IndexService indexService) { + for (IndexEventListener listener : listeners) { + try { + listener.beforeIndexClosed(indexService); + } catch (Throwable t) { + logger.warn("failed to invoke before index closed callback", t); + throw t; + } + } + } + + @Override + public void beforeIndexDeleted(IndexService indexService) { + for (IndexEventListener listener : listeners) { + try { + listener.beforeIndexDeleted(indexService); + } catch (Throwable t) { + logger.warn("failed to invoke before index deleted callback", t); + throw t; + } + } + } + + @Override + public void afterIndexDeleted(Index index, @IndexSettings Settings indexSettings) { + for (IndexEventListener listener : listeners) { + try { + listener.afterIndexDeleted(index, indexSettings); + } catch (Throwable t) { + logger.warn("failed to invoke after index deleted callback", t); + throw t; + } + } + } + + @Override + public void afterIndexClosed(Index index, @IndexSettings Settings indexSettings) { + for (IndexEventListener listener : listeners) { + try { + listener.afterIndexClosed(index, indexSettings); + } catch (Throwable t) { + logger.warn("failed to invoke after index closed callback", t); + throw t; + } + } + } + + @Override + public void beforeIndexShardDeleted(ShardId shardId, + @IndexSettings Settings indexSettings) { + for (IndexEventListener listener : listeners) { + try { + listener.beforeIndexShardDeleted(shardId, indexSettings); + } catch (Throwable t) { + logger.warn("[{}] failed to invoke before shard deleted callback", t, shardId.getId()); + throw t; + } + } + } + + @Override + public void afterIndexShardDeleted(ShardId shardId, + @IndexSettings Settings indexSettings) { + for (IndexEventListener listener : listeners) { + try { + listener.afterIndexShardDeleted(shardId, indexSettings); + } catch (Throwable t) { + logger.warn("[{}] failed to invoke after shard deleted callback", t, shardId.getId()); + throw t; + } + } + } + + @Override + public void beforeIndexAddedToCluster(Index index, @IndexSettings Settings indexSettings) { + for (IndexEventListener listener : listeners) { + try { + listener.beforeIndexAddedToCluster(index, indexSettings); + } catch (Throwable t) { + logger.warn("failed to invoke before index added to cluster callback", t); throw t; } } diff --git a/core/src/main/java/org/elasticsearch/index/IndexModule.java b/core/src/main/java/org/elasticsearch/index/IndexModule.java index dc637cfd5e9..a6c6e93fb49 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexModule.java +++ b/core/src/main/java/org/elasticsearch/index/IndexModule.java @@ -22,24 +22,61 @@ package org.elasticsearch.index; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.util.Providers; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexSearcherWrapper; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + /** * */ public class IndexModule extends AbstractModule { private final IndexMetaData indexMetaData; + private final Settings settings; // pkg private so tests can mock Class engineFactoryImpl = InternalEngineFactory.class; Class indexSearcherWrapper = null; + private final Set indexEventListeners = new HashSet<>(); + private IndexEventListener listener; - public IndexModule(IndexMetaData indexMetaData) { + + public IndexModule(Settings settings, IndexMetaData indexMetaData) { this.indexMetaData = indexMetaData; + this.settings = settings; + } + + public Settings getIndexSettings() { + return settings; + } + + public void addIndexEventListener(IndexEventListener listener) { + if (this.listener != null) { + throw new IllegalStateException("can't add listener after listeners are frozen"); + } + if (listener == null) { + throw new IllegalArgumentException("listener must not be null"); + } + if (indexEventListeners.contains(listener)) { + throw new IllegalArgumentException("listener already added"); + } + + this.indexEventListeners.add(listener); + } + + public IndexEventListener freeze() { + // TODO somehow we need to make this pkg private... + if (listener == null) { + listener = new CompositeIndexEventListener(indexMetaData.getIndex(), settings, indexEventListeners); + } + return listener; } @Override @@ -50,12 +87,11 @@ public class IndexModule extends AbstractModule { } else { bind(IndexSearcherWrapper.class).to(indexSearcherWrapper).asEagerSingleton(); } + bind(IndexEventListener.class).toInstance(freeze()); bind(IndexMetaData.class).toInstance(indexMetaData); bind(IndexService.class).asEagerSingleton(); bind(IndexServicesProvider.class).asEagerSingleton(); bind(MapperService.class).asEagerSingleton(); bind(IndexFieldDataService.class).asEagerSingleton(); } - - } diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index 9c932e2468a..f07a37cd220 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -46,28 +46,17 @@ import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.query.ParsedQuery; -import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettingsService; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.ShadowIndexShard; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.shard.ShardNotFoundException; -import org.elasticsearch.index.shard.ShardPath; +import org.elasticsearch.index.shard.*; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.Store; -import org.elasticsearch.indices.AliasFilterParsingException; -import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.indices.InternalIndicesLifecycle; -import org.elasticsearch.indices.InvalidAliasNameException; +import org.elasticsearch.indices.*; import java.io.Closeable; import java.io.IOException; import java.nio.file.Path; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -80,7 +69,7 @@ import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder; */ public class IndexService extends AbstractIndexComponent implements IndexComponent, Iterable { - private final InternalIndicesLifecycle indicesLifecycle; + private final IndexEventListener eventListener; private final AnalysisService analysisService; private final IndexFieldDataService indexFieldData; private final BitsetFilterCache bitsetFilterCache; @@ -102,7 +91,8 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone BitsetFilterCache bitSetFilterCache, IndicesService indicesServices, IndexServicesProvider indexServicesProvider, - IndexStore indexStore) { + IndexStore indexStore, + IndexEventListener eventListener) { super(index, settingsService.indexSettings()); assert indexMetaData != null; this.analysisService = analysisService; @@ -110,7 +100,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone this.settingsService = settingsService; this.bitsetFilterCache = bitSetFilterCache; this.indicesServices = indicesServices; - this.indicesLifecycle = (InternalIndicesLifecycle) indexServicesProvider.getIndicesLifecycle(); + this.eventListener = eventListener; this.nodeEnv = nodeEnv; this.indexServicesProvider = indexServicesProvider; this.indexStore = indexStore; @@ -123,8 +113,8 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone return shards.size(); } - public InternalIndicesLifecycle indicesLifecycle() { - return this.indicesLifecycle; + public IndexEventListener getIndexEventListener() { + return this.eventListener; } @Override @@ -225,7 +215,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone } } - public synchronized IndexShard createShard(int sShardId, ShardRouting routing) { + public synchronized IndexShard createShard(int sShardId, ShardRouting routing) throws IOException { final boolean primary = routing.primary(); /* * TODO: we execute this in parallel but it's a synced method. Yet, we might @@ -237,13 +227,12 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone } final Settings indexSettings = settingsService.getSettings(); final ShardId shardId = new ShardId(index, sShardId); - ShardLock lock = null; boolean success = false; Store store = null; IndexShard indexShard = null; + final ShardLock lock = nodeEnv.shardLock(shardId, TimeUnit.SECONDS.toMillis(5)); try { - lock = nodeEnv.shardLock(shardId, TimeUnit.SECONDS.toMillis(5)); - indicesLifecycle.beforeIndexShardCreated(shardId, indexSettings); + eventListener.beforeIndexShardCreated(shardId, indexSettings); ShardPath path; try { path = ShardPath.loadShardPath(logger, nodeEnv, shardId, indexSettings); @@ -293,20 +282,16 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone indexShard = new IndexShard(shardId, indexSettings, path, store, indexServicesProvider); } - indicesLifecycle.indexShardStateChanged(indexShard, null, "shard created"); - indicesLifecycle.afterIndexShardCreated(indexShard); + eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); + eventListener.afterIndexShardCreated(indexShard); settingsService.addListener(indexShard); shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap(); success = true; return indexShard; - } catch (IOException e) { - ElasticsearchException ex = new ElasticsearchException("failed to create shard", e); - ex.setShard(shardId); - throw ex; } finally { if (success == false) { IOUtils.closeWhileHandlingException(lock); - closeShard("initialization failed", shardId, indexShard, store); + closeShard("initialization failed", shardId, indexShard, store, eventListener); } } } @@ -325,16 +310,16 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone HashMap newShards = new HashMap<>(shards); indexShard = newShards.remove(shardId); shards = unmodifiableMap(newShards); - closeShard(reason, sId, indexShard, indexShard.store()); + closeShard(reason, sId, indexShard, indexShard.store(), indexShard.getIndexEventListener()); logger.debug("[{}] closed (reason: [{}])", shardId, reason); } - private void closeShard(String reason, ShardId sId, IndexShard indexShard, Store store) { + private void closeShard(String reason, ShardId sId, IndexShard indexShard, Store store, IndexEventListener listener) { final int shardId = sId.id(); final Settings indexSettings = settingsService.getSettings(); try { try { - indicesLifecycle.beforeIndexShardClosed(sId, indexShard, indexSettings); + listener.beforeIndexShardClosed(sId, indexShard, indexSettings); } finally { // this logic is tricky, we want to close the engine so we rollback the changes done to it // and close the shard so no operations are allowed to it @@ -349,7 +334,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone } } // call this before we close the store, so we can release resources for it - indicesLifecycle.afterIndexShardClosed(sId, indexShard, indexSettings); + listener.afterIndexShardClosed(sId, indexShard, indexSettings); } } finally { try { @@ -367,10 +352,10 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone try { if (ownsShard) { try { - indicesLifecycle.beforeIndexShardDeleted(lock.getShardId(), indexSettings); + eventListener.beforeIndexShardDeleted(lock.getShardId(), indexSettings); } finally { indicesServices.deleteShardStore("delete index", lock, indexSettings); - indicesLifecycle.afterIndexShardDeleted(lock.getShardId(), indexSettings); + eventListener.afterIndexShardDeleted(lock.getShardId(), indexSettings); } } } catch (IOException e) { diff --git a/core/src/main/java/org/elasticsearch/index/IndexServicesProvider.java b/core/src/main/java/org/elasticsearch/index/IndexServicesProvider.java index 53b9f061abf..d61c911ab7d 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexServicesProvider.java +++ b/core/src/main/java/org/elasticsearch/index/IndexServicesProvider.java @@ -28,10 +28,10 @@ import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.query.IndexQueryParserService; +import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexSearcherWrapper; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.termvectors.TermVectorsService; -import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.IndicesWarmer; import org.elasticsearch.indices.cache.query.IndicesQueryCache; import org.elasticsearch.indices.memory.IndexingMemoryController; @@ -44,7 +44,6 @@ import org.elasticsearch.threadpool.ThreadPool; */ public final class IndexServicesProvider { - private final IndicesLifecycle indicesLifecycle; private final ThreadPool threadPool; private final MapperService mapperService; private final IndexQueryParserService queryParserService; @@ -59,10 +58,11 @@ public final class IndexServicesProvider { private final BigArrays bigArrays; private final IndexSearcherWrapper indexSearcherWrapper; private final IndexingMemoryController indexingMemoryController; + private final IndexEventListener listener; @Inject - public IndexServicesProvider(IndicesLifecycle indicesLifecycle, ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndicesQueryCache indicesQueryCache, CodecService codecService, TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, @Nullable IndicesWarmer warmer, SimilarityService similarityService, EngineFactory factory, BigArrays bigArrays, @Nullable IndexSearcherWrapper indexSearcherWrapper, IndexingMemoryController indexingMemoryController) { - this.indicesLifecycle = indicesLifecycle; + public IndexServicesProvider(IndexEventListener listener, ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndicesQueryCache indicesQueryCache, CodecService codecService, TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, @Nullable IndicesWarmer warmer, SimilarityService similarityService, EngineFactory factory, BigArrays bigArrays, @Nullable IndexSearcherWrapper indexSearcherWrapper, IndexingMemoryController indexingMemoryController) { + this.listener = listener; this.threadPool = threadPool; this.mapperService = mapperService; this.queryParserService = queryParserService; @@ -79,10 +79,9 @@ public final class IndexServicesProvider { this.indexingMemoryController = indexingMemoryController; } - public IndicesLifecycle getIndicesLifecycle() { - return indicesLifecycle; + public IndexEventListener getIndexEventListener() { + return listener; } - public ThreadPool getThreadPool() { return threadPool; } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java b/core/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java new file mode 100644 index 00000000000..f361e2d6175 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java @@ -0,0 +1,181 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.shard; + +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexService; + +/** + * An index event listener is the primary extension point for plugins and build-in services + * to react / listen to per-index and per-shard events. These listeners are registered per-index + * via {@link org.elasticsearch.index.IndexModule#addIndexEventListener(IndexEventListener)}. All listeners have the same + * lifecycle as the {@link IndexService} they are created for. + *

+ * An IndexEventListener can be used across multiple indices and shards since all callback methods receive sufficient + * local state via their arguments. Yet, if an instance is shared across indices they might be called concurrently and should not + * modify local state without sufficient synchronization. + *

+ */ +public interface IndexEventListener { + + /** + * Called when the shard routing has changed state. + * + * @param indexShard The index shard + * @param oldRouting The old routing state (can be null) + * @param newRouting The new routing state + */ + default void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting oldRouting, ShardRouting newRouting) {} + + /** + * Called after the index shard has been created. + */ + default void afterIndexShardCreated(IndexShard indexShard) {} + + /** + * Called after the index shard has been started. + */ + default void afterIndexShardStarted(IndexShard indexShard) {} + + /** + * Called before the index shard gets closed. + * + * @param indexShard The index shard + */ + default void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {} + + /** + * Called after the index shard has been closed. + * + * @param shardId The shard id + */ + default void afterIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {} + + + /** + * Called after a shard's {@link org.elasticsearch.index.shard.IndexShardState} changes. + * The order of concurrent events is preserved. The execution must be lightweight. + * + * @param indexShard the shard the new state was applied to + * @param previousState the previous index shard state if there was one, null otherwise + * @param currentState the new shard state + * @param reason the reason for the state change if there is one, null otherwise + */ + default void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, @Nullable String reason) {} + + /** + * Called when a shard is marked as inactive + * + * @param indexShard The shard that was marked inactive + */ + default void onShardInactive(IndexShard indexShard) {} + + /** + * Called before the index gets created. Note that this is also called + * when the index is created on data nodes + */ + default void beforeIndexCreated(Index index, Settings indexSettings) { + + } + + /** + * Called after the index has been created. + */ + default void afterIndexCreated(IndexService indexService) { + + } + + /** + * Called before the index shard gets created. + */ + default void beforeIndexShardCreated(ShardId shardId, Settings indexSettings) { + } + + + /** + * Called before the index get closed. + * + * @param indexService The index service + */ + default void beforeIndexClosed(IndexService indexService) { + + } + + /** + * Called after the index has been closed. + * + * @param index The index + */ + default void afterIndexClosed(Index index, Settings indexSettings) { + + } + + /** + * Called before the index shard gets deleted from disk + * Note: this method is only executed on the first attempt of deleting the shard. Retries are will not invoke + * this method. + * @param shardId The shard id + * @param indexSettings the shards index settings + */ + default void beforeIndexShardDeleted(ShardId shardId, Settings indexSettings) { + } + + /** + * Called after the index shard has been deleted from disk. + * + * Note: this method is only called if the deletion of the shard did finish without an exception + * + * @param shardId The shard id + * @param indexSettings the shards index settings + */ + default void afterIndexShardDeleted(ShardId shardId, Settings indexSettings) { + } + + /** + * Called after the index has been deleted. + * This listener method is invoked after {@link #afterIndexClosed(org.elasticsearch.index.Index, org.elasticsearch.common.settings.Settings)} + * when an index is deleted + * + * @param index The index + */ + default void afterIndexDeleted(Index index, Settings indexSettings) { + + } + + /** + * Called before the index gets deleted. + * This listener method is invoked after + * {@link #beforeIndexClosed(org.elasticsearch.index.IndexService)} when an index is deleted + * + * @param indexService The index service + */ + default void beforeIndexDeleted(IndexService indexService) { + + } + + /** + * Called on the Master node only before the {@link IndexService} instances is created to simulate an index creation. + * This happens right before the index and it's metadata is registered in the cluster state + */ + default void beforeIndexAddedToCluster(Index index, Settings indexSettings) { + } +} diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 2a87e14a4c2..28e78f4e48b 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -98,7 +98,6 @@ import org.elasticsearch.index.translog.TranslogWriter; import org.elasticsearch.index.warmer.ShardIndexWarmerService; import org.elasticsearch.index.warmer.WarmerStats; import org.elasticsearch.indices.IndicesWarmer; -import org.elasticsearch.indices.InternalIndicesLifecycle; import org.elasticsearch.indices.cache.query.IndicesQueryCache; import org.elasticsearch.indices.memory.IndexingMemoryController; import org.elasticsearch.indices.recovery.RecoveryFailedException; @@ -125,7 +124,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett private final ThreadPool threadPool; private final MapperService mapperService; private final IndexCache indexCache; - private final InternalIndicesLifecycle indicesLifecycle; private final Store store; private final MergeSchedulerConfig mergeSchedulerConfig; private final ShardIndexingService indexingService; @@ -149,6 +147,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett private final TranslogConfig translogConfig; private final MergePolicyConfig mergePolicyConfig; private final IndicesQueryCache indicesQueryCache; + private final IndexEventListener indexEventListener; private TimeValue refreshInterval; @@ -206,8 +205,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett this.similarityService = provider.getSimilarityService(); Objects.requireNonNull(store, "Store must be provided to the index shard"); this.engineFactory = provider.getFactory(); - this.indicesLifecycle = (InternalIndicesLifecycle) provider.getIndicesLifecycle(); this.store = store; + this.indexEventListener = provider.getIndexEventListener(); this.mergeSchedulerConfig = new MergeSchedulerConfig(indexSettings); this.threadPool = provider.getThreadPool(); this.mapperService = provider.getMapperService(); @@ -367,12 +366,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett } } if (movedToStarted) { - indicesLifecycle.afterIndexShardStarted(this); + indexEventListener.afterIndexShardStarted(this); } } } this.shardRouting = newRouting; - indicesLifecycle.shardRoutingChanged(this, currentRouting, newRouting); + indexEventListener.shardRoutingChanged(this, currentRouting, newRouting); } finally { if (persistState) { persistMetadata(newRouting, currentRouting); @@ -431,7 +430,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett logger.debug("state: [{}]->[{}], reason [{}]", state, newState, reason); IndexShardState previousState = state; state = newState; - this.indicesLifecycle.indexShardStateChanged(this, previousState, reason); + this.indexEventListener.indexShardStateChanged(this, previousState, newState, reason); return previousState; } @@ -1036,7 +1035,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett if (wasActive) { updateBufferSize(IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER); logger.debug("shard is now inactive"); - indicesLifecycle.onShardInactive(this); + indexEventListener.onShardInactive(this); } } @@ -1230,6 +1229,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett return percolatorQueriesRegistry.stats(); } + public IndexEventListener getIndexEventListener() { + return indexEventListener; + } + class EngineRefresher implements Runnable { @Override public void run() { diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesLifecycle.java b/core/src/main/java/org/elasticsearch/indices/IndicesLifecycle.java deleted file mode 100644 index 8c761dfe898..00000000000 --- a/core/src/main/java/org/elasticsearch/indices/IndicesLifecycle.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indices; - -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.Index; -import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.shard.IndexShardState; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.settings.IndexSettings; - -/** - * A global component allowing to register for lifecycle of an index (create/closed) and - * an index shard (created/closed). - */ -public interface IndicesLifecycle { - - /** - * Add a listener. - */ - void addListener(Listener listener); - - /** - * Remove a listener. - */ - void removeListener(Listener listener); - - /** - * A listener for index and index shard lifecycle events (create/closed). - */ - public abstract static class Listener { - - /** - * Called when the shard routing has changed state. - * - * @param indexShard The index shard - * @param oldRouting The old routing state (can be null) - * @param newRouting The new routing state - */ - public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting oldRouting, ShardRouting newRouting) { - - } - - /** - * Called on the Master node only before the index is created - */ - public void beforeIndexAddedToCluster(Index index, @IndexSettings Settings indexSettings) { - - } - - /** - * Called before the index gets created. Note that this is also called - * when the index is created on data nodes - */ - public void beforeIndexCreated(Index index, @IndexSettings Settings indexSettings) { - - } - - /** - * Called after the index has been created. - */ - public void afterIndexCreated(IndexService indexService) { - - } - - /** - * Called before the index shard gets created. - */ - public void beforeIndexShardCreated(ShardId shardId, @IndexSettings Settings indexSettings) { - - } - - /** - * Called after the index shard has been created. - */ - public void afterIndexShardCreated(IndexShard indexShard) { - - } - - /** - * Called after the index shard has been started. - */ - public void afterIndexShardStarted(IndexShard indexShard) { - - } - - /** - * Called before the index get closed. - * - * @param indexService The index service - */ - public void beforeIndexClosed(IndexService indexService) { - - } - - /** - * Called after the index has been closed. - * - * @param index The index - */ - public void afterIndexClosed(Index index, @IndexSettings Settings indexSettings) { - - } - - /** - * Called before the index shard gets closed. - * - * @param indexShard The index shard - */ - public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, - @IndexSettings Settings indexSettings) { - - } - - /** - * Called after the index shard has been closed. - * - * @param shardId The shard id - */ - public void afterIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, - @IndexSettings Settings indexSettings) { - - } - - /** - * Called before the index shard gets deleted from disk - * Note: this method is only executed on the first attempt of deleting the shard. Retries are will not invoke - * this method. - * @param shardId The shard id - * @param indexSettings the shards index settings - */ - public void beforeIndexShardDeleted(ShardId shardId, @IndexSettings Settings indexSettings) { - } - - /** - * Called after the index shard has been deleted from disk. - * - * Note: this method is only called if the deletion of the shard did finish without an exception - * - * @param shardId The shard id - * @param indexSettings the shards index settings - */ - public void afterIndexShardDeleted(ShardId shardId, @IndexSettings Settings indexSettings) { - } - - /** - * Called after a shard's {@link org.elasticsearch.index.shard.IndexShardState} changes. - * The order of concurrent events is preserved. The execution must be lightweight. - * - * @param indexShard the shard the new state was applied to - * @param previousState the previous index shard state if there was one, null otherwise - * @param currentState the new shard state - * @param reason the reason for the state change if there is one, null otherwise - */ - public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, @Nullable String reason) { - - } - - /** - * Called after the index has been deleted. - * This listener method is invoked after {@link #afterIndexClosed(org.elasticsearch.index.Index, org.elasticsearch.common.settings.Settings)} - * when an index is deleted - * - * @param index The index - */ - public void afterIndexDeleted(Index index, @IndexSettings Settings indexSettings) { - - } - - /** - * Called before the index gets deleted. - * This listener method is invoked after - * {@link #beforeIndexClosed(org.elasticsearch.index.IndexService)} when an index is deleted - * - * @param indexService The index service - */ - public void beforeIndexDeleted(IndexService indexService) { - - } - - /** - * Called when a shard is marked as inactive - * - * @param indexShard The shard that was marked inactive - */ - public void onShardInactive(IndexShard indexShard) { - - } - } - -} diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesModule.java b/core/src/main/java/org/elasticsearch/indices/IndicesModule.java index 69aad95ccaf..89a3ae21fb4 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -52,15 +52,13 @@ import org.elasticsearch.indices.ttl.IndicesTTLService; */ public class IndicesModule extends AbstractModule { - private final Settings settings; private final ExtensionPoint.ClassSet queryParsers = new ExtensionPoint.ClassSet<>("query_parser", QueryParser.class); private final ExtensionPoint.InstanceMap hunspellDictionaries = new ExtensionPoint.InstanceMap<>("hunspell_dictionary", String.class, Dictionary.class); - public IndicesModule(Settings settings) { - this.settings = settings; + public IndicesModule() { registerBuiltinQueryParsers(); } @@ -130,7 +128,6 @@ public class IndicesModule extends AbstractModule { bindQueryParsersExtension(); bindHunspellExtension(); - bind(IndicesLifecycle.class).to(InternalIndicesLifecycle.class).asEagerSingleton(); bind(IndicesService.class).asEagerSingleton(); bind(RecoverySettings.class).asEagerSingleton(); bind(RecoveryTarget.class).asEagerSingleton(); diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesService.java b/core/src/main/java/org/elasticsearch/indices/IndicesService.java index ea94a968e3e..818cb69352b 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -68,6 +68,7 @@ import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.similarity.SimilarityModule; import org.elasticsearch.index.store.IndexStore; @@ -86,10 +87,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.stream.Stream; import static java.util.Collections.emptyMap; @@ -106,9 +104,6 @@ import static org.elasticsearch.common.util.CollectionUtils.arrayAsArrayList; public class IndicesService extends AbstractLifecycleComponent implements Iterable { public static final String INDICES_SHARDS_CLOSED_TIMEOUT = "indices.shards_closed_timeout"; - - private final InternalIndicesLifecycle indicesLifecycle; - private final IndicesAnalysisService indicesAnalysisService; private final Injector injector; @@ -142,13 +137,11 @@ public class IndicesService extends AbstractLifecycleComponent i private final OldShardsStats oldShardsStats = new OldShardsStats(); @Inject - public IndicesService(Settings settings, IndicesLifecycle indicesLifecycle, IndicesAnalysisService indicesAnalysisService, Injector injector, NodeEnvironment nodeEnv) { + public IndicesService(Settings settings, IndicesAnalysisService indicesAnalysisService, Injector injector, PluginsService pluginsService, NodeEnvironment nodeEnv) { super(settings); - this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle; this.indicesAnalysisService = indicesAnalysisService; this.injector = injector; - this.pluginsService = injector.getInstance(PluginsService.class); - this.indicesLifecycle.addListener(oldShardsStats); + this.pluginsService = pluginsService; this.nodeEnv = nodeEnv; this.shardsClosedTimeout = settings.getAsTime(INDICES_SHARDS_CLOSED_TIMEOUT, new TimeValue(1, TimeUnit.DAYS)); } @@ -195,10 +188,6 @@ public class IndicesService extends AbstractLifecycleComponent i indicesAnalysisService); } - public IndicesLifecycle indicesLifecycle() { - return this.indicesLifecycle; - } - /** * Returns the node stats indices stats. The includePrevious flag controls * if old shards stats will be aggregated as well (only for relevant stats, such as @@ -305,7 +294,14 @@ public class IndicesService extends AbstractLifecycleComponent i return indexService; } - public synchronized IndexService createIndex(IndexMetaData indexMetaData) { + + /** + * Creates a new {@link IndexService} for the given metadata. + * @param indexMetaData the index metadata to create the index for + * @param builtInListeners a list of built-in lifecycle {@link IndexEventListener} that should should be used along side with the per-index listeners + * @throws IndexAlreadyExistsException if the index already exists. + */ + public synchronized IndexService createIndex(IndexMetaData indexMetaData, List builtInListeners) { if (!lifecycle.started()) { throw new IllegalStateException("Can't create an index [" + indexMetaData.getIndex() + "], node is closed"); } @@ -314,9 +310,6 @@ public class IndicesService extends AbstractLifecycleComponent i if (indices.containsKey(index.name())) { throw new IndexAlreadyExistsException(index); } - - indicesLifecycle.beforeIndexCreated(index, settings); - logger.debug("creating Index [{}], shards [{}]/[{}{}]", indexMetaData.getIndex(), settings.get(SETTING_NUMBER_OF_SHARDS), @@ -335,12 +328,19 @@ public class IndicesService extends AbstractLifecycleComponent i for (Module pluginModule : pluginsService.indexModules(indexSettings)) { modules.add(pluginModule); } + final IndexModule indexModule = new IndexModule(settings, indexMetaData); + for (IndexEventListener listener : builtInListeners) { + indexModule.addIndexEventListener(listener); + } + indexModule.addIndexEventListener(oldShardsStats); modules.add(new IndexStoreModule(indexSettings)); modules.add(new AnalysisModule(indexSettings, indicesAnalysisService)); modules.add(new SimilarityModule(index, indexSettings)); modules.add(new IndexCacheModule(indexSettings)); - modules.add(new IndexModule(indexMetaData)); + modules.add(indexModule); pluginsService.processModules(modules); + final IndexEventListener listener = indexModule.freeze(); + listener.beforeIndexCreated(index, settings); Injector indexInjector; try { @@ -352,9 +352,8 @@ public class IndicesService extends AbstractLifecycleComponent i } IndexService indexService = indexInjector.getInstance(IndexService.class); - - indicesLifecycle.afterIndexCreated(indexService); - + assert indexService.getIndexEventListener() == listener; + listener.afterIndexCreated(indexService); indices = newMapBuilder(indices).put(index.name(), new IndexServiceInjectorPair(indexService, indexInjector)).immutableMap(); return indexService; } @@ -373,6 +372,7 @@ public class IndicesService extends AbstractLifecycleComponent i try { final IndexService indexService; final Injector indexInjector; + final IndexEventListener listener; synchronized (this) { if (indices.containsKey(index) == false) { return; @@ -384,11 +384,12 @@ public class IndicesService extends AbstractLifecycleComponent i indexService = remove.getIndexService(); indexInjector = remove.getInjector(); indices = unmodifiableMap(newIndices); + listener = indexService.getIndexEventListener(); } - indicesLifecycle.beforeIndexClosed(indexService); + listener.beforeIndexClosed(indexService); if (delete) { - indicesLifecycle.beforeIndexDeleted(indexService); + listener.beforeIndexDeleted(indexService); } Stream closeables = pluginsService.indexServices().stream().map(p -> indexInjector.getInstance(p)); IOUtils.close(closeables::iterator); @@ -412,10 +413,10 @@ public class IndicesService extends AbstractLifecycleComponent i indexInjector.getInstance(IndexStore.class).close(); logger.debug("[{}] closed... (reason [{}])", index, reason); - indicesLifecycle.afterIndexClosed(indexService.index(), indexService.settingsService().getSettings()); + listener.afterIndexClosed(indexService.index(), indexService.settingsService().getSettings()); if (delete) { final Settings indexSettings = indexService.getIndexSettings(); - indicesLifecycle.afterIndexDeleted(indexService.index(), indexSettings); + listener.afterIndexDeleted(indexService.index(), indexSettings); // now we are done - try to wipe data on disk if possible deleteIndexStore(reason, indexService.index(), indexSettings, false); } @@ -424,7 +425,7 @@ public class IndicesService extends AbstractLifecycleComponent i } } - static class OldShardsStats extends IndicesLifecycle.Listener { + static class OldShardsStats implements IndexEventListener { final SearchStats searchStats = new SearchStats(); final GetStats getStats = new GetStats(); diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 76c5fe26403..c0f821fbca0 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -51,17 +51,17 @@ import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.shard.*; import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.flush.SyncedFlushService; import org.elasticsearch.indices.recovery.RecoveryFailedException; +import org.elasticsearch.indices.recovery.RecoverySource; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.search.SearchService; import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.threadpool.ThreadPool; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentMap; /** @@ -101,14 +101,16 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent buildInIndexListener; @Inject public IndicesClusterStateService(Settings settings, IndicesService indicesService, ClusterService clusterService, ThreadPool threadPool, RecoveryTarget recoveryTarget, ShardStateAction shardStateAction, NodeIndexDeletedAction nodeIndexDeletedAction, - NodeMappingRefreshAction nodeMappingRefreshAction, RepositoriesService repositoriesService, RestoreService restoreService) { + NodeMappingRefreshAction nodeMappingRefreshAction, RepositoriesService repositoriesService, RestoreService restoreService, SearchService searchService, SyncedFlushService syncedFlushService, RecoverySource recoverySource) { super(settings); + this.buildInIndexListener = Arrays.asList(recoverySource, recoveryTarget, searchService, syncedFlushService); this.indicesService = indicesService; this.clusterService = clusterService; this.threadPool = threadPool; @@ -299,7 +301,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent() { - @Override - public void onResponse(ShardsSyncedFlushResult syncedFlushResult) { - logger.trace("{} sync flush on inactive shard returned successfully for sync_id: {}", syncedFlushResult.getShardId(), syncedFlushResult.syncId()); - } + } - @Override - public void onFailure(Throwable e) { - logger.debug("{} sync flush on inactive shard failed", e, indexShard.shardId()); - } - }); + @Override + public void onShardInactive(final IndexShard indexShard) { + // we only want to call sync flush once, so only trigger it when we are on a primary + if (indexShard.routingEntry().primary()) { + attemptSyncedFlush(indexShard.shardId(), new ActionListener() { + @Override + public void onResponse(ShardsSyncedFlushResult syncedFlushResult) { + logger.trace("{} sync flush on inactive shard returned successfully for sync_id: {}", syncedFlushResult.getShardId(), syncedFlushResult.syncId()); } - } - }); + + @Override + public void onFailure(Throwable e) { + logger.debug("{} sync flush on inactive shard failed", e, indexShard.shardId()); + } + }); + } } /** diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java index 6ea41896e55..58186490e9d 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java @@ -30,9 +30,9 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportChannel; @@ -50,7 +50,7 @@ import java.util.Set; * The source recovery accepts recovery requests from other peer shards and start the recovery process from this * source shard to the target shard. */ -public class RecoverySource extends AbstractComponent { +public class RecoverySource extends AbstractComponent implements IndexEventListener{ public static class Actions { public static final String START_RECOVERY = "internal:index/shard/recovery/start_recovery"; @@ -72,21 +72,18 @@ public class RecoverySource extends AbstractComponent { this.transportService = transportService; this.indicesService = indicesService; this.clusterService = clusterService; - this.indicesService.indicesLifecycle().addListener(new IndicesLifecycle.Listener() { - @Override - public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, - @IndexSettings Settings indexSettings) { - if (indexShard != null) { - ongoingRecoveries.cancel(indexShard, "shard is closed"); - } - } - }); - this.recoverySettings = recoverySettings; - transportService.registerRequestHandler(Actions.START_RECOVERY, StartRecoveryRequest::new, ThreadPool.Names.GENERIC, new StartRecoveryTransportRequestHandler()); } + @Override + public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, + @IndexSettings Settings indexSettings) { + if (indexShard != null) { + ongoingRecoveries.cancel(indexShard, "shard is closed"); + } + } + private RecoveryResponse recover(final StartRecoveryRequest request) { final IndexService indexService = indicesService.indexServiceSafe(request.shardId().index().name()); final IndexShard shard = indexService.getShard(request.shardId().id()); diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index d8880893a13..65f309598cf 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -48,7 +48,7 @@ import org.elasticsearch.index.mapper.MapperException; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.*; import org.elasticsearch.index.store.Store; -import org.elasticsearch.indices.IndicesLifecycle; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; @@ -67,7 +67,7 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; * Note, it can be safely assumed that there will only be a single recovery per shard (index+id) and * not several of them (since we don't allocate several shard replicas to the same node). */ -public class RecoveryTarget extends AbstractComponent { +public class RecoveryTarget extends AbstractComponent implements IndexEventListener { public static class Actions { public static final String FILES_INFO = "internal:index/shard/recovery/filesInfo"; @@ -88,8 +88,7 @@ public class RecoveryTarget extends AbstractComponent { private final RecoveriesCollection onGoingRecoveries; @Inject - public RecoveryTarget(Settings settings, ThreadPool threadPool, TransportService transportService, - IndicesLifecycle indicesLifecycle, RecoverySettings recoverySettings, ClusterService clusterService) { + public RecoveryTarget(Settings settings, ThreadPool threadPool, TransportService transportService, RecoverySettings recoverySettings, ClusterService clusterService) { super(settings); this.threadPool = threadPool; this.transportService = transportService; @@ -103,16 +102,14 @@ public class RecoveryTarget extends AbstractComponent { transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, RecoveryPrepareForTranslogOperationsRequest::new, ThreadPool.Names.GENERIC, new PrepareForTranslogOperationsRequestHandler()); transportService.registerRequestHandler(Actions.TRANSLOG_OPS, RecoveryTranslogOperationsRequest::new, ThreadPool.Names.GENERIC, new TranslogOperationsRequestHandler()); transportService.registerRequestHandler(Actions.FINALIZE, RecoveryFinalizeRecoveryRequest::new, ThreadPool.Names.GENERIC, new FinalizeRecoveryRequestHandler()); + } - indicesLifecycle.addListener(new IndicesLifecycle.Listener() { - @Override - public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, - @IndexSettings Settings indexSettings) { - if (indexShard != null) { - onGoingRecoveries.cancelRecoveriesForShard(shardId, "shard closed"); - } - } - }); + @Override + public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, + @IndexSettings Settings indexSettings) { + if (indexShard != null) { + onGoingRecoveries.cancelRecoveriesForShard(shardId, "shard closed"); + } } /** diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index ecd7fddd3be..345eb56a3fa 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -181,7 +181,7 @@ public class Node implements Releasable { if (settings.getAsBoolean(HTTP_ENABLED, true)) { modules.add(new HttpServerModule(settings)); } - modules.add(new IndicesModule(settings)); + modules.add(new IndicesModule()); modules.add(new SearchModule(settings)); modules.add(new ActionModule(false)); modules.add(new MonitorModule(settings)); diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index c986d51f6b7..a4844cd7ff1 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -65,8 +65,8 @@ import org.elasticsearch.index.query.QueryParseContext; import org.elasticsearch.index.search.stats.ShardSearchStats; import org.elasticsearch.index.search.stats.StatsGroupsParseElement; import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesWarmer; import org.elasticsearch.indices.IndicesWarmer.TerminationHandle; @@ -119,7 +119,7 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes; /** * */ -public class SearchService extends AbstractLifecycleComponent { +public class SearchService extends AbstractLifecycleComponent implements IndexEventListener { public static final String NORMS_LOADING_KEY = "index.norms.loading"; public static final String DEFAULT_KEEPALIVE_KEY = "search.default_keep_alive"; @@ -173,27 +173,6 @@ public class SearchService extends AbstractLifecycleComponent { this.threadPool = threadPool; this.clusterService = clusterService; this.indicesService = indicesService; - indicesService.indicesLifecycle().addListener(new IndicesLifecycle.Listener() { - @Override - public void afterIndexClosed(Index index, @IndexSettings Settings indexSettings) { - // once an index is closed we can just clean up all the pending search context information - // to release memory and let references to the filesystem go etc. - IndexMetaData idxMeta = SearchService.this.clusterService.state().metaData().index(index.getName()); - if (idxMeta != null && idxMeta.getState() == IndexMetaData.State.CLOSE) { - // we need to check if it's really closed - // since sometimes due to a relocation we already closed the shard and that causes the index to be closed - // if we then close all the contexts we can get some search failures along the way which are not expected. - // it's fine to keep the contexts open if the index is still "alive" - // unfortunately we don't have a clear way to signal today why an index is closed. - afterIndexDeleted(index, indexSettings); - } - } - - @Override - public void afterIndexDeleted(Index index, @IndexSettings Settings indexSettings) { - freeAllContextForIndex(index); - } - }); this.indicesWarmer = indicesWarmer; this.scriptService = scriptService; this.pageCacheRecycler = pageCacheRecycler; @@ -235,6 +214,26 @@ public class SearchService extends AbstractLifecycleComponent { } } + @Override + public void afterIndexClosed(Index index, @IndexSettings Settings indexSettings) { + // once an index is closed we can just clean up all the pending search context information + // to release memory and let references to the filesystem go etc. + IndexMetaData idxMeta = SearchService.this.clusterService.state().metaData().index(index.getName()); + if (idxMeta != null && idxMeta.getState() == IndexMetaData.State.CLOSE) { + // we need to check if it's really closed + // since sometimes due to a relocation we already closed the shard and that causes the index to be closed + // if we then close all the contexts we can get some search failures along the way which are not expected. + // it's fine to keep the contexts open if the index is still "alive" + // unfortunately we don't have a clear way to signal today why an index is closed. + afterIndexDeleted(index, indexSettings); + } + } + + @Override + public void afterIndexDeleted(Index index, @IndexSettings Settings indexSettings) { + freeAllContextForIndex(index); + } + protected void putContext(SearchContext context) { final SearchContext previous = activeContexts.put(context.id(), context); assert previous == null; diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoreRequestIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoreRequestIT.java index 8d5fb6567ec..ffb9e630b70 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoreRequestIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoreRequestIT.java @@ -38,7 +38,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.junit.annotations.TestLogging; -import org.elasticsearch.test.store.MockFSDirectoryService; +import org.elasticsearch.test.store.MockFSIndexStore; import java.util.HashMap; import java.util.HashSet; @@ -148,7 +148,7 @@ public class IndicesShardStoreRequestIT extends ESIntegTestCase { internalCluster().ensureAtLeastNumDataNodes(2); assertAcked(prepareCreate(index).setSettings(Settings.builder() .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "5") - .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) + .put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false) )); indexRandomData(index); ensureGreen(index); diff --git a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java index 5d1a0313066..01c76b465a9 100644 --- a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java @@ -38,6 +38,7 @@ import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.InternalTestCluster.RestartCallback; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.store.MockFSDirectoryService; +import org.elasticsearch.test.store.MockFSIndexStore; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; @@ -323,7 +324,7 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase { public void testReusePeerRecovery() throws Exception { final Settings settings = settingsBuilder() .put("action.admin.cluster.node.shutdown.delay", "10ms") - .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) + .put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false) .put("gateway.recover_after_nodes", 4) .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CONCURRENT_RECOVERIES, 4) diff --git a/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java index dd32b309a0f..2c3e2105478 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java +++ b/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java @@ -27,31 +27,46 @@ import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.InternalEngineFactory; +import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexSearcherWrapper; import org.elasticsearch.test.engine.MockEngineFactory; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + public class IndexModuleTests extends ModuleTestCase { public void testWrapperIsBound() { - IndexModule module = new IndexModule(IndexMetaData.PROTO); + IndexModule module = new IndexModule(Settings.EMPTY, IndexMetaData.PROTO); assertInstanceBinding(module, IndexSearcherWrapper.class,(x) -> x == null); module.indexSearcherWrapper = Wrapper.class; assertBinding(module, IndexSearcherWrapper.class, Wrapper.class); } public void testEngineFactoryBound() { - IndexModule module = new IndexModule(IndexMetaData.PROTO); + IndexModule module = new IndexModule(Settings.EMPTY,IndexMetaData.PROTO); assertBinding(module, EngineFactory.class, InternalEngineFactory.class); module.engineFactoryImpl = MockEngineFactory.class; assertBinding(module, EngineFactory.class, MockEngineFactory.class); } public void testOtherServiceBound() { + final AtomicBoolean atomicBoolean = new AtomicBoolean(false); + final IndexEventListener listener = new IndexEventListener() { + @Override + public void beforeIndexDeleted(IndexService indexService) { + atomicBoolean.set(true); + } + }; final IndexMetaData meta = IndexMetaData.builder(IndexMetaData.PROTO).index("foo").build(); - IndexModule module = new IndexModule(meta); + IndexModule module = new IndexModule(Settings.EMPTY,meta); + module.addIndexEventListener(listener); assertBinding(module, IndexService.class, IndexService.class); assertBinding(module, IndexServicesProvider.class, IndexServicesProvider.class); assertInstanceBinding(module, IndexMetaData.class, (x) -> x == meta); + assertInstanceBinding(module, IndexEventListener.class, (x) -> {x.beforeIndexDeleted(null); return atomicBoolean.get();}); } public static final class Wrapper extends IndexSearcherWrapper { diff --git a/core/src/test/java/org/elasticsearch/index/analysis/AnalysisTestsHelper.java b/core/src/test/java/org/elasticsearch/index/analysis/AnalysisTestsHelper.java index 72eac6860ba..74102fc014d 100644 --- a/core/src/test/java/org/elasticsearch/index/analysis/AnalysisTestsHelper.java +++ b/core/src/test/java/org/elasticsearch/index/analysis/AnalysisTestsHelper.java @@ -52,7 +52,7 @@ public class AnalysisTestsHelper { if (settings.get(IndexMetaData.SETTING_VERSION_CREATED) == null) { settings = Settings.builder().put(settings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build(); } - IndicesModule indicesModule = new IndicesModule(settings) { + IndicesModule indicesModule = new IndicesModule() { @Override public void configure() { // skip services diff --git a/core/src/test/java/org/elasticsearch/index/query/AbstractQueryTestCase.java b/core/src/test/java/org/elasticsearch/index/query/AbstractQueryTestCase.java index 281293811ff..a37adcd808c 100644 --- a/core/src/test/java/org/elasticsearch/index/query/AbstractQueryTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/query/AbstractQueryTestCase.java @@ -173,7 +173,7 @@ public abstract class AbstractQueryTestCase> new EnvironmentModule(new Environment(settings)), new SettingsModule(settings), new ThreadPoolModule(new ThreadPool(settings)), - new IndicesModule(settings) { + new IndicesModule() { @Override public void configure() { // skip services diff --git a/core/src/test/java/org/elasticsearch/index/query/TemplateQueryParserTests.java b/core/src/test/java/org/elasticsearch/index/query/TemplateQueryParserTests.java index 4e2c43a8e08..6e47f52afe4 100644 --- a/core/src/test/java/org/elasticsearch/index/query/TemplateQueryParserTests.java +++ b/core/src/test/java/org/elasticsearch/index/query/TemplateQueryParserTests.java @@ -86,7 +86,7 @@ public class TemplateQueryParserTests extends ESTestCase { new EnvironmentModule(new Environment(settings)), new SettingsModule(settings), new ThreadPoolModule(new ThreadPool(settings)), - new IndicesModule(settings) { + new IndicesModule() { @Override public void configure() { // skip services diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index c73420aaa44..b77c704cdcb 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -763,7 +763,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { assertEquals(total + 1, shard.flushStats().getTotal()); } - public void testRecoverFromStore() { + public void testRecoverFromStore() throws IOException { createIndex("test"); ensureGreen(); IndicesService indicesService = getInstanceFromNode(IndicesService.class); @@ -1039,7 +1039,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { ShardRouting routing = new ShardRouting(shard.routingEntry()); shard.close("simon says", true); IndexServicesProvider indexServices = indexService.getIndexServices(); - IndexServicesProvider newProvider = new IndexServicesProvider(indexServices.getIndicesLifecycle(), indexServices.getThreadPool(), indexServices.getMapperService(), indexServices.getQueryParserService(), indexServices.getIndexCache(), indexServices.getIndicesQueryCache(), indexServices.getCodecService(), indexServices.getTermVectorsService(), indexServices.getIndexFieldDataService(), indexServices.getWarmer(), indexServices.getSimilarityService(), indexServices.getFactory(), indexServices.getBigArrays(), wrapper, indexServices.getIndexingMemoryController()); + IndexServicesProvider newProvider = new IndexServicesProvider(indexServices.getIndexEventListener(), indexServices.getThreadPool(), indexServices.getMapperService(), indexServices.getQueryParserService(), indexServices.getIndexCache(), indexServices.getIndicesQueryCache(), indexServices.getCodecService(), indexServices.getTermVectorsService(), indexServices.getIndexFieldDataService(), indexServices.getWarmer(), indexServices.getSimilarityService(), indexServices.getFactory(), indexServices.getBigArrays(), wrapper, indexServices.getIndexingMemoryController()); IndexShard newShard = new IndexShard(shard.shardId(), shard.indexSettings, shard.shardPath(), shard.store(), newProvider); ShardRoutingHelper.reinit(routing); newShard.updateRoutingEntry(routing, false); diff --git a/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java b/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java index b9282ea2db5..5f18f9980ab 100644 --- a/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java +++ b/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java @@ -30,7 +30,6 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRes import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -52,12 +51,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.gateway.PrimaryShardAllocator; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.IndexShardState; -import org.elasticsearch.index.shard.MergePolicyConfig; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.IndicesLifecycle; -import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.index.shard.*; import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoveryTarget; @@ -67,7 +61,8 @@ import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.CorruptionUtils; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; -import org.elasticsearch.test.store.MockFSDirectoryService; +import org.elasticsearch.test.MockIndexEventListener; +import org.elasticsearch.test.store.MockFSIndexStore; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; @@ -125,7 +120,7 @@ public class CorruptedFileIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - return pluginList(MockTransportService.TestPlugin.class); + return pluginList(MockTransportService.TestPlugin.class, MockIndexEventListener.TestPlugin.class); } /** @@ -145,7 +140,7 @@ public class CorruptedFileIT extends ESIntegTestCase { .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1") .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "1") .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) - .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose + .put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose .put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files .put("indices.recovery.concurrent_streams", 10) )); @@ -194,7 +189,7 @@ public class CorruptedFileIT extends ESIntegTestCase { */ final CountDownLatch latch = new CountDownLatch(numShards * 3); // primary + 2 replicas final CopyOnWriteArrayList exception = new CopyOnWriteArrayList<>(); - final IndicesLifecycle.Listener listener = new IndicesLifecycle.Listener() { + final IndexEventListener listener = new IndexEventListener() { @Override public void afterIndexShardClosed(ShardId sid, @Nullable IndexShard indexShard, @IndexSettings Settings indexSettings) { if (indexShard != null) { @@ -225,16 +220,16 @@ public class CorruptedFileIT extends ESIntegTestCase { } }; - for (IndicesService service : internalCluster().getDataNodeInstances(IndicesService.class)) { - service.indicesLifecycle().addListener(listener); + for (MockIndexEventListener.TestEventListener eventListener : internalCluster().getDataNodeInstances(MockIndexEventListener.TestEventListener.class)) { + eventListener.setNewDelegate(listener); } try { client().admin().indices().prepareDelete("test").get(); latch.await(); assertThat(exception, empty()); } finally { - for (IndicesService service : internalCluster().getDataNodeInstances(IndicesService.class)) { - service.indicesLifecycle().removeListener(listener); + for (MockIndexEventListener.TestEventListener eventListener : internalCluster().getDataNodeInstances(MockIndexEventListener.TestEventListener.class)) { + eventListener.setNewDelegate(null); } } } @@ -250,7 +245,7 @@ public class CorruptedFileIT extends ESIntegTestCase { assertAcked(prepareCreate("test").setSettings(Settings.builder() .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0") .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) - .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose + .put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose .put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files .put("indices.recovery.concurrent_streams", 10) )); @@ -395,7 +390,7 @@ public class CorruptedFileIT extends ESIntegTestCase { .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0") .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, between(1, 4)) // don't go crazy here it must recovery fast // This does corrupt files on the replica, so we can't check: - .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) + .put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false) .put("index.routing.allocation.include._name", primariesNode.getNode().name()) .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, EnableAllocationDecider.Rebalance.NONE) )); @@ -476,7 +471,7 @@ public class CorruptedFileIT extends ESIntegTestCase { assertAcked(prepareCreate("test").setSettings(Settings.builder() .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0") // no replicas for this test .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) - .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose + .put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose .put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files .put("indices.recovery.concurrent_streams", 10) )); @@ -531,7 +526,7 @@ public class CorruptedFileIT extends ESIntegTestCase { .put(PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS, "one") .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, cluster().numDataNodes() - 1) .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) - .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose + .put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose .put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files .put("indices.recovery.concurrent_streams", 10) )); diff --git a/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerIT.java b/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerIT.java index 9ea4e38f868..39284ff2929 100644 --- a/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerIT.java +++ b/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerIT.java @@ -31,14 +31,19 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; +import org.elasticsearch.test.MockIndexEventListener; +import org.elasticsearch.test.transport.MockTransportService; import org.hamcrest.Matchers; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -63,6 +68,12 @@ import static org.hamcrest.Matchers.hasSize; @ClusterScope(scope = Scope.TEST, numDataNodes = 0) public class IndicesLifecycleListenerIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return pluginList(MockIndexEventListener.TestPlugin.class); + } + public void testBeforeIndexAddedToCluster() throws Exception { String node1 = internalCluster().startNode(); String node2 = internalCluster().startNode(); @@ -71,7 +82,7 @@ public class IndicesLifecycleListenerIT extends ESIntegTestCase { final AtomicInteger beforeAddedCount = new AtomicInteger(0); final AtomicInteger allCreatedCount = new AtomicInteger(0); - IndicesLifecycle.Listener listener = new IndicesLifecycle.Listener() { + IndexEventListener listener = new IndexEventListener() { @Override public void beforeIndexAddedToCluster(Index index, @IndexSettings Settings indexSettings) { beforeAddedCount.incrementAndGet(); @@ -86,9 +97,9 @@ public class IndicesLifecycleListenerIT extends ESIntegTestCase { } }; - internalCluster().getInstance(IndicesLifecycle.class, node1).addListener(listener); - internalCluster().getInstance(IndicesLifecycle.class, node2).addListener(listener); - internalCluster().getInstance(IndicesLifecycle.class, node3).addListener(listener); + internalCluster().getInstance(MockIndexEventListener.TestEventListener.class, node1).setNewDelegate(listener); + internalCluster().getInstance(MockIndexEventListener.TestEventListener.class, node2).setNewDelegate(listener); + internalCluster().getInstance(MockIndexEventListener.TestEventListener.class, node3).setNewDelegate(listener); client().admin().indices().prepareCreate("test") .setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 3, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1).get(); @@ -115,7 +126,7 @@ public class IndicesLifecycleListenerIT extends ESIntegTestCase { client().admin().indices().prepareCreate("index1").setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0).get(); ensureGreen("index1"); String node2 = internalCluster().startNode(); - internalCluster().getInstance(IndicesLifecycle.class, node2).addListener(new IndexShardStateChangeListener() { + internalCluster().getInstance(MockIndexEventListener.TestEventListener.class, node2).setNewDelegate(new IndexShardStateChangeListener() { @Override public void beforeIndexCreated(Index index, @IndexSettings Settings indexSettings) { throw new RuntimeException("FAIL"); @@ -134,7 +145,7 @@ public class IndicesLifecycleListenerIT extends ESIntegTestCase { String node1 = internalCluster().startNode(); IndexShardStateChangeListener stateChangeListenerNode1 = new IndexShardStateChangeListener(); //add a listener that keeps track of the shard state changes - internalCluster().getInstance(IndicesLifecycle.class, node1).addListener(stateChangeListenerNode1); + internalCluster().getInstance(MockIndexEventListener.TestEventListener.class, node1).setNewDelegate(stateChangeListenerNode1); //create an index that should fail try { @@ -165,7 +176,7 @@ public class IndicesLifecycleListenerIT extends ESIntegTestCase { String node2 = internalCluster().startNode(); IndexShardStateChangeListener stateChangeListenerNode2 = new IndexShardStateChangeListener(); //add a listener that keeps track of the shard state changes - internalCluster().getInstance(IndicesLifecycle.class, node2).addListener(stateChangeListenerNode2); + internalCluster().getInstance(MockIndexEventListener.TestEventListener.class, node2).setNewDelegate(stateChangeListenerNode2); //re-enable allocation assertAcked(client().admin().cluster().prepareUpdateSettings() .setPersistentSettings(builder().put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, "all"))); @@ -226,7 +237,7 @@ public class IndicesLifecycleListenerIT extends ESIntegTestCase { stateChangeListener.shardStates.clear(); } - private static class IndexShardStateChangeListener extends IndicesLifecycle.Listener { + private static class IndexShardStateChangeListener implements IndexEventListener { //we keep track of all the states (ordered) a shard goes through final ConcurrentMap> shardStates = new ConcurrentHashMap<>(); Settings creationSettings = Settings.EMPTY; diff --git a/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java index 037e9b974fb..6b15432d490 100644 --- a/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -18,13 +18,22 @@ */ package org.elasticsearch.indices; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingHelper; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.shard.IndexEventListener; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESSingleNodeTestCase; - +import java.util.Arrays; import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; @@ -32,17 +41,17 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; public class IndicesLifecycleListenerSingleNodeTests extends ESSingleNodeTestCase { - @Override - protected boolean resetNodeAfterTest() { - return true; - } public void testCloseDeleteCallback() throws Throwable { - final AtomicInteger counter = new AtomicInteger(1); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); assertAcked(client().admin().indices().prepareCreate("test") .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0)); ensureGreen(); - getInstanceFromNode(IndicesLifecycle.class).addListener(new IndicesLifecycle.Listener() { + IndexMetaData metaData = indicesService.indexService("test").getMetaData(); + ShardRouting shardRouting = indicesService.indexService("test").getShard(0).routingEntry(); + assertAcked(client().admin().indices().prepareDelete("test").get()); + final AtomicInteger counter = new AtomicInteger(1); + IndexEventListener countingListener = new IndexEventListener() { @Override public void afterIndexClosed(Index index, @IndexSettings Settings indexSettings) { assertEquals(counter.get(), 5); @@ -62,7 +71,7 @@ public class IndicesLifecycleListenerSingleNodeTests extends ESSingleNodeTestCas } @Override - public void beforeIndexDeleted(IndexService indexService) { + public void beforeIndexDeleted(IndexService indexService) { assertEquals(counter.get(), 2); counter.incrementAndGet(); } @@ -78,8 +87,19 @@ public class IndicesLifecycleListenerSingleNodeTests extends ESSingleNodeTestCas assertEquals(counter.get(), 4); counter.incrementAndGet(); } - }); - assertAcked(client().admin().indices().prepareDelete("test").get()); + }; + IndexService index = indicesService.createIndex(metaData, Arrays.asList(countingListener)); + ShardRouting newRouting = new ShardRouting(shardRouting); + String nodeId = newRouting.currentNodeId(); + ShardRoutingHelper.moveToUnassigned(newRouting, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "boom")); + ShardRoutingHelper.initialize(newRouting, nodeId); + IndexShard shard = index.createShard(0, newRouting); + shard.updateRoutingEntry(newRouting, true); + shard.recoverFromStore(newRouting, new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT)); + newRouting = new ShardRouting(newRouting); + ShardRoutingHelper.moveToStarted(newRouting); + shard.updateRoutingEntry(newRouting, true); + indicesService.deleteIndex("test", "simon says"); assertEquals(7, counter.get()); } diff --git a/core/src/test/java/org/elasticsearch/indices/IndicesModuleTests.java b/core/src/test/java/org/elasticsearch/indices/IndicesModuleTests.java index c7a2624d380..3070735672f 100644 --- a/core/src/test/java/org/elasticsearch/indices/IndicesModuleTests.java +++ b/core/src/test/java/org/elasticsearch/indices/IndicesModuleTests.java @@ -48,13 +48,13 @@ public class IndicesModuleTests extends ModuleTestCase { } public void testRegisterQueryParser() { - IndicesModule module = new IndicesModule(Settings.EMPTY); + IndicesModule module = new IndicesModule(); module.registerQueryParser(FakeQueryParser.class); assertSetMultiBinding(module, QueryParser.class, FakeQueryParser.class); } public void testRegisterQueryParserDuplicate() { - IndicesModule module = new IndicesModule(Settings.EMPTY); + IndicesModule module = new IndicesModule(); try { module.registerQueryParser(TermQueryParser.class); } catch (IllegalArgumentException e) { @@ -63,7 +63,7 @@ public class IndicesModuleTests extends ModuleTestCase { } public void testRegisterHunspellDictionary() throws Exception { - IndicesModule module = new IndicesModule(Settings.EMPTY); + IndicesModule module = new IndicesModule(); InputStream aff = getClass().getResourceAsStream("/indices/analyze/conf_dir/hunspell/en_US/en_US.aff"); InputStream dic = getClass().getResourceAsStream("/indices/analyze/conf_dir/hunspell/en_US/en_US.dic"); Dictionary dictionary = new Dictionary(aff, dic); @@ -72,7 +72,7 @@ public class IndicesModuleTests extends ModuleTestCase { } public void testRegisterHunspellDictionaryDuplicate() { - IndicesModule module = new IndicesModule(Settings.EMPTY); + IndicesModule module = new IndicesModule(); try { module.registerQueryParser(TermQueryParser.class); } catch (IllegalArgumentException e) { diff --git a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java index 5f99f80ce54..34abea7816e 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -40,10 +40,10 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest; import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.plugins.Plugin; @@ -53,6 +53,7 @@ import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; +import org.elasticsearch.test.MockIndexEventListener; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.Transport; @@ -89,9 +90,11 @@ import static org.hamcrest.Matchers.startsWith; public class RelocationIT extends ESIntegTestCase { private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES); + + @Override protected Collection> nodePlugins() { - return pluginList(MockTransportService.TestPlugin.class); + return pluginList(MockTransportService.TestPlugin.class, MockIndexEventListener.TestPlugin.class); } public void testSimpleRelocationNoIndexing() { @@ -282,16 +285,16 @@ public class RelocationIT extends ESIntegTestCase { } final Semaphore postRecoveryShards = new Semaphore(0); - - for (IndicesLifecycle indicesLifecycle : internalCluster().getInstances(IndicesLifecycle.class)) { - indicesLifecycle.addListener(new IndicesLifecycle.Listener() { - @Override - public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, @Nullable String reason) { - if (currentState == IndexShardState.POST_RECOVERY) { - postRecoveryShards.release(); - } + final IndexEventListener listener = new IndexEventListener() { + @Override + public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, @Nullable String reason) { + if (currentState == IndexShardState.POST_RECOVERY) { + postRecoveryShards.release(); } - }); + } + }; + for (MockIndexEventListener.TestEventListener eventListener : internalCluster().getInstances(MockIndexEventListener.TestEventListener.class)) { + eventListener.setNewDelegate(listener); } diff --git a/core/src/test/java/org/elasticsearch/search/basic/SearchWithRandomIOExceptionsIT.java b/core/src/test/java/org/elasticsearch/search/basic/SearchWithRandomIOExceptionsIT.java index 457f63d54e5..95c1a807935 100644 --- a/core/src/test/java/org/elasticsearch/search/basic/SearchWithRandomIOExceptionsIT.java +++ b/core/src/test/java/org/elasticsearch/search/basic/SearchWithRandomIOExceptionsIT.java @@ -35,6 +35,7 @@ import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.store.MockFSDirectoryService; +import org.elasticsearch.test.store.MockFSIndexStore; import java.io.IOException; import java.util.concurrent.ExecutionException; @@ -103,7 +104,7 @@ public class SearchWithRandomIOExceptionsIT extends ESIntegTestCase { } else { Settings.Builder settings = settingsBuilder() .put("index.number_of_replicas", randomIntBetween(0, 1)) - .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) + .put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false) .put(MockFSDirectoryService.RANDOM_IO_EXCEPTION_RATE, exceptionRate) .put(MockFSDirectoryService.RANDOM_IO_EXCEPTION_RATE_ON_OPEN, exceptionOnOpenRate); // we cannot expect that the index will be valid logger.info("creating index: [test] using settings: [{}]", settings.build().getAsMap()); diff --git a/core/src/test/java/org/elasticsearch/search/builder/SearchSourceBuilderTests.java b/core/src/test/java/org/elasticsearch/search/builder/SearchSourceBuilderTests.java index 18419731bc4..64825dac8bb 100644 --- a/core/src/test/java/org/elasticsearch/search/builder/SearchSourceBuilderTests.java +++ b/core/src/test/java/org/elasticsearch/search/builder/SearchSourceBuilderTests.java @@ -81,7 +81,7 @@ public class SearchSourceBuilderTests extends ESTestCase { injector = new ModulesBuilder().add( new SettingsModule(settings), new ThreadPoolModule(new ThreadPool(settings)), - new IndicesModule(settings) { + new IndicesModule() { @Override public void configure() { // skip services diff --git a/core/src/test/java/org/elasticsearch/test/MockIndexEventListener.java b/core/src/test/java/org/elasticsearch/test/MockIndexEventListener.java new file mode 100644 index 00000000000..5ffbf874808 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/test/MockIndexEventListener.java @@ -0,0 +1,163 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.test; + +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.inject.Module; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexModule; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.shard.IndexEventListener; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardState; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.plugins.Plugin; + +import java.util.Collection; +import java.util.Collections; + +/** + * This is a testing plugin that registers a generic {@link org.elasticsearch.test.MockIndexEventListener.TestEventListener} as a node level service as well as a listener + * on every index. Tests can access it like this: + *
+ *     TestEventListener listener = internalCluster().getInstance(MockIndexEventListener.TestEventListener.class, node1);
+ *     listener.setNewDelegate(new IndexEventListener() {
+ *        // do some stuff
+ *     });
+ * 
+ * This allows tests to use the listener without registering their own plugins. + */ +public final class MockIndexEventListener { + + public static class TestPlugin extends Plugin { + private final TestEventListener listener = new TestEventListener(); + @Override + public String name() { + return "mock-index-listener"; + } + @Override + public String description() { + return "a mock index listener for testing only"; + } + + public void onModule(IndexModule module) { + module.addIndexEventListener(listener); + } + + @Override + public Collection nodeModules() { + return Collections.singleton(binder -> binder.bind(TestEventListener.class).toInstance(listener)); + } + } + + public static class TestEventListener implements IndexEventListener { + private volatile IndexEventListener delegate = new IndexEventListener() {}; + + public void setNewDelegate(IndexEventListener listener) { + delegate = listener == null ? new IndexEventListener() {} : listener; + } + + @Override + public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting oldRouting, ShardRouting newRouting) { + delegate.shardRoutingChanged(indexShard, oldRouting, newRouting); + } + + @Override + public void afterIndexShardCreated(IndexShard indexShard) { + delegate.afterIndexShardCreated(indexShard); + } + + @Override + public void afterIndexShardStarted(IndexShard indexShard) { + delegate.afterIndexShardStarted(indexShard); + } + + @Override + public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { + delegate.beforeIndexShardClosed(shardId, indexShard, indexSettings); + } + + @Override + public void afterIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { + delegate.afterIndexShardClosed(shardId, indexShard, indexSettings); + } + + @Override + public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, @Nullable String reason) { + delegate.indexShardStateChanged(indexShard, previousState, currentState, reason); + } + + @Override + public void onShardInactive(IndexShard indexShard) { + delegate.onShardInactive(indexShard); + } + + @Override + public void beforeIndexCreated(Index index, Settings indexSettings) { + delegate.beforeIndexCreated(index, indexSettings); + } + + @Override + public void afterIndexCreated(IndexService indexService) { + delegate.afterIndexCreated(indexService); + } + + @Override + public void beforeIndexShardCreated(ShardId shardId, Settings indexSettings) { + delegate.beforeIndexShardCreated(shardId, indexSettings); + } + + @Override + public void beforeIndexClosed(IndexService indexService) { + delegate.beforeIndexClosed(indexService); + } + + @Override + public void afterIndexClosed(Index index, Settings indexSettings) { + delegate.afterIndexClosed(index, indexSettings); + } + + @Override + public void beforeIndexShardDeleted(ShardId shardId, Settings indexSettings) { + delegate.beforeIndexShardDeleted(shardId, indexSettings); + } + + @Override + public void afterIndexShardDeleted(ShardId shardId, Settings indexSettings) { + delegate.afterIndexShardDeleted(shardId, indexSettings); + } + + @Override + public void afterIndexDeleted(Index index, Settings indexSettings) { + delegate.afterIndexDeleted(index, indexSettings); + } + + @Override + public void beforeIndexDeleted(IndexService indexService) { + delegate.beforeIndexDeleted(indexService); + } + + @Override + public void beforeIndexAddedToCluster(Index index, Settings indexSettings) { + delegate.beforeIndexAddedToCluster(index, indexSettings); + } + } +} diff --git a/core/src/test/java/org/elasticsearch/test/store/MockFSDirectoryService.java b/core/src/test/java/org/elasticsearch/test/store/MockFSDirectoryService.java index 43049672602..c897011d0e9 100644 --- a/core/src/test/java/org/elasticsearch/test/store/MockFSDirectoryService.java +++ b/core/src/test/java/org/elasticsearch/test/store/MockFSDirectoryService.java @@ -27,11 +27,11 @@ import org.apache.lucene.index.IndexWriter; import org.apache.lucene.store.*; import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.TestRuleMarkFailure; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.settings.IndexSettings; @@ -40,7 +40,6 @@ import org.elasticsearch.index.store.FsDirectoryService; import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.IndexStoreModule; import org.elasticsearch.index.store.Store; -import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESTestCase; @@ -54,18 +53,12 @@ import java.util.*; public class MockFSDirectoryService extends FsDirectoryService { - public static final String CHECK_INDEX_ON_CLOSE = "index.store.mock.check_index_on_close"; public static final String RANDOM_IO_EXCEPTION_RATE_ON_OPEN = "index.store.mock.random.io_exception_rate_on_open"; public static final String RANDOM_PREVENT_DOUBLE_WRITE = "index.store.mock.random.prevent_double_write"; public static final String RANDOM_NO_DELETE_OPEN_FILE = "index.store.mock.random.no_delete_open_file"; public static final String CRASH_INDEX = "index.store.mock.random.crash_index"; - private static final EnumSet validCheckIndexStates = EnumSet.of( - IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY - ); - private final FsDirectoryService delegateService; - private final boolean checkIndexOnClose; private final Random random; private final double randomIOExceptionRate; private final double randomIOExceptionRateOnOpen; @@ -80,7 +73,7 @@ public class MockFSDirectoryService extends FsDirectoryService { super(indexSettings, indexStore, path); final long seed = indexSettings.getAsLong(ESIntegTestCase.SETTING_INDEX_SEED, 0l); this.random = new Random(seed); - checkIndexOnClose = indexSettings.getAsBoolean(CHECK_INDEX_ON_CLOSE, true); + randomIOExceptionRate = indexSettings.getAsDouble(RANDOM_IO_EXCEPTION_RATE, 0.0d); randomIOExceptionRateOnOpen = indexSettings.getAsDouble(RANDOM_IO_EXCEPTION_RATE_ON_OPEN, 0.0d); preventDoubleWrite = indexSettings.getAsBoolean(RANDOM_PREVENT_DOUBLE_WRITE, true); // true is default in MDW @@ -95,33 +88,6 @@ public class MockFSDirectoryService extends FsDirectoryService { } this.indexSettings = indexSettings; delegateService = randomDirectorService(indexStore, path); - if (checkIndexOnClose) { - final IndicesLifecycle.Listener listener = new IndicesLifecycle.Listener() { - - boolean canRun = false; - - @Override - public void beforeIndexShardClosed(ShardId sid, @Nullable IndexShard indexShard, - @IndexSettings Settings indexSettings) { - if (indexShard != null && shardId.equals(sid)) { - if (validCheckIndexStates.contains(indexShard.state()) && IndexMetaData.isOnSharedFilesystem(indexSettings) == false) { - canRun = true; - } - } - } - - @Override - public void afterIndexShardClosed(ShardId sid, @Nullable IndexShard indexShard, - @IndexSettings Settings indexSettings) { - if (shardId.equals(sid) && indexShard != null && canRun) { - assert indexShard.state() == IndexShardState.CLOSED : "Current state must be closed"; - checkIndex(indexShard.store(), sid); - } - service.indicesLifecycle().removeListener(this); - } - }; - service.indicesLifecycle().addListener(listener); - } } @@ -135,7 +101,7 @@ public class MockFSDirectoryService extends FsDirectoryService { throw new UnsupportedOperationException(); } - public void checkIndex(Store store, ShardId shardId) { + public static void checkIndex(ESLogger logger, Store store, ShardId shardId) { if (store.tryIncRef()) { logger.info("start check index"); try { diff --git a/core/src/test/java/org/elasticsearch/test/store/MockFSIndexStore.java b/core/src/test/java/org/elasticsearch/test/store/MockFSIndexStore.java index 11a791c04f3..cc9cb696c58 100644 --- a/core/src/test/java/org/elasticsearch/test/store/MockFSIndexStore.java +++ b/core/src/test/java/org/elasticsearch/test/store/MockFSIndexStore.java @@ -19,22 +19,29 @@ package org.elasticsearch.test.store; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettingsService; -import org.elasticsearch.index.shard.ShardPath; +import org.elasticsearch.index.shard.*; import org.elasticsearch.index.store.DirectoryService; -import org.elasticsearch.index.store.FsDirectoryService; import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.IndexStoreModule; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.plugins.Plugin; +import java.util.EnumSet; + public class MockFSIndexStore extends IndexStore { + public static final String CHECK_INDEX_ON_CLOSE = "index.store.mock.check_index_on_close"; private final IndicesService indicesService; public static class TestPlugin extends Plugin { @@ -53,6 +60,15 @@ public class MockFSIndexStore extends IndexStore { public Settings additionalSettings() { return Settings.builder().put(IndexStoreModule.STORE_TYPE, "mock").build(); } + + public void onModule(IndexModule module) { + Settings indexSettings = module.getIndexSettings(); + if ("mock".equals(indexSettings.get(IndexStoreModule.STORE_TYPE))) { + if (indexSettings.getAsBoolean(CHECK_INDEX_ON_CLOSE, true)) { + module.addIndexEventListener(new Listener()); + } + } + } } @Inject @@ -66,4 +82,18 @@ public class MockFSIndexStore extends IndexStore { return new MockFSDirectoryService(indexSettings, this, indicesService, path); } + private static final EnumSet validCheckIndexStates = EnumSet.of( + IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY + ); + private static final class Listener implements IndexEventListener { + @Override + public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, @Nullable String reason) { + if (currentState == IndexShardState.CLOSED && validCheckIndexStates.contains(previousState) && IndexMetaData.isOnSharedFilesystem(indexShard.indexSettings()) == false) { + ESLogger logger = Loggers.getLogger(getClass(), indexShard.indexSettings(), indexShard.shardId()); + MockFSDirectoryService.checkIndex(logger, indexShard.store(), indexShard.shardId()); + } + + } + } + }