diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index f141d9473ef..3c40b02a4b9 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -24,6 +24,7 @@ import org.apache.lucene.util.Accountable; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; @@ -382,6 +383,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone indicesLifecycle.afterIndexShardCreated(indexShard); shards = newMapBuilder(shards).put(shardId.id(), new IndexShardInjectorPair(indexShard, shardInjector)).immutableMap(); + settingsService.addListener(indexShard); success = true; return indexShard; } catch (IOException e) { @@ -433,6 +435,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone // this logic is tricky, we want to close the engine so we rollback the changes done to it // and close the shard so no operations are allowed to it if (indexShard != null) { + settingsService.removeListener(indexShard); try { final boolean flushEngine = deleted.get() == false && closed.get(); // only flush we are we closed (closed index or shutdown) and if we are not deleted indexShard.close(reason, flushEngine); @@ -453,18 +456,6 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone } } - /** - * This method gets an instance for each of the given classes passed and calls #close() on the returned instance. - * NOTE: this method swallows all exceptions thrown from the close method of the injector and logs them as debug log - */ - private void closeInjectorResource(ShardId shardId, Injector shardInjector, Class... toClose) { - for (Class closeable : toClose) { - if (closeInjectorOptionalResource(shardId, shardInjector, closeable) == false) { - logger.warn("[{}] no instance available for [{}], ignoring... ", shardId, closeable.getSimpleName()); - } - } - } - /** * Closes an optional resource. Returns true if the resource was found; * NOTE: this method swallows all exceptions thrown from the close method of the injector and logs them as debug log diff --git a/core/src/main/java/org/elasticsearch/index/settings/IndexSettingsService.java b/core/src/main/java/org/elasticsearch/index/settings/IndexSettingsService.java index 39d1437983f..d76540e1e85 100644 --- a/core/src/main/java/org/elasticsearch/index/settings/IndexSettingsService.java +++ b/core/src/main/java/org/elasticsearch/index/settings/IndexSettingsService.java @@ -73,6 +73,12 @@ public class IndexSettingsService extends AbstractIndexComponent { this.listeners.remove(listener); } + /** + * Returns true iff the given listener is already registered otherwise false + */ + public boolean isRegistered(Listener listener) { + return listeners.contains(listener); + } public interface Listener { void onRefreshSettings(Settings settings); } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index e2e3720735b..b9dd7853812 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -37,10 +37,8 @@ import org.elasticsearch.action.admin.indices.optimize.OptimizeRequest; import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest; import org.elasticsearch.action.termvectors.TermVectorsRequest; import org.elasticsearch.action.termvectors.TermVectorsResponse; -import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.RestoreSource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.Booleans; @@ -85,6 +83,7 @@ import org.elasticsearch.index.recovery.RecoveryStats; import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.search.stats.SearchStats; import org.elasticsearch.index.search.stats.ShardSearchStats; +import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.snapshots.IndexShardRepository; @@ -121,10 +120,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -public class IndexShard extends AbstractIndexShardComponent { +public class IndexShard extends AbstractIndexShardComponent implements IndexSettingsService.Listener { private final ThreadPool threadPool; - private final IndexSettingsService indexSettingsService; private final MapperService mapperService; private final IndexQueryParserService queryParserService; private final IndexCache indexCache; @@ -144,8 +142,6 @@ public class IndexShard extends AbstractIndexShardComponent { private final IndexFieldDataService indexFieldDataService; private final ShardSuggestMetric shardSuggestMetric = new ShardSuggestMetric(); private final ShardBitsetFilterCache shardBitsetFilterCache; - private final DiscoveryNode localNode; - private final Object mutex = new Object(); private final String checkIndexOnStartup; private final CodecService codecService; @@ -171,9 +167,6 @@ public class IndexShard extends AbstractIndexShardComponent { private RecoveryState recoveryState; private final RecoveryStats recoveryStats = new RecoveryStats(); - - private ApplyRefreshSettings applyRefreshSettings = new ApplyRefreshSettings(); - private final MeanMetric refreshMetric = new MeanMetric(); private final MeanMetric flushMetric = new MeanMetric(); @@ -200,13 +193,13 @@ public class IndexShard extends AbstractIndexShardComponent { private EnumSet readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY); @Inject - public IndexShard(ShardId shardId, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, + public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndicesLifecycle indicesLifecycle, Store store, ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, IndicesQueryCache indicesQueryCache, CodecService codecService, TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, @Nullable IndicesWarmer warmer, SimilarityService similarityService, EngineFactory factory, - ClusterService clusterService, ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) { - super(shardId, indexSettingsService.getSettings()); + ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) { + super(shardId, indexSettings); this.codecService = codecService; this.warmer = warmer; this.deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()); @@ -215,7 +208,6 @@ public class IndexShard extends AbstractIndexShardComponent { Objects.requireNonNull(store, "Store must be provided to the index shard"); this.engineFactory = factory; this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle; - this.indexSettingsService = indexSettingsService; this.store = store; this.mergeSchedulerConfig = new MergeSchedulerConfig(indexSettings); this.threadPool = threadPool; @@ -235,12 +227,9 @@ public class IndexShard extends AbstractIndexShardComponent { this.percolatorQueriesRegistry = new PercolatorQueriesRegistry(shardId, indexSettings, queryParserService, indexingService, indicesLifecycle, mapperService, indexFieldDataService, shardPercolateService); this.indexFieldDataService = indexFieldDataService; this.shardBitsetFilterCache = new ShardBitsetFilterCache(shardId, indexSettings); - assert clusterService.localNode() != null : "Local node is null lifecycle state is: " + clusterService.lifecycleState(); - this.localNode = clusterService.localNode(); state = IndexShardState.CREATED; this.refreshInterval = indexSettings.getAsTime(INDEX_REFRESH_INTERVAL, EngineConfig.DEFAULT_REFRESH_INTERVAL); this.flushOnClose = indexSettings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, true); - indexSettingsService.addListener(applyRefreshSettings); this.path = path; this.mergePolicyConfig = new MergePolicyConfig(logger, indexSettings); /* create engine config */ @@ -385,21 +374,9 @@ public class IndexShard extends AbstractIndexShardComponent { } /** - * Marks the shard as recovering based on a remote or local node, fails with exception is recovering is not allowed to be set. + * Marks the shard as recovering based on a recovery state, fails with exception is recovering is not allowed to be set. */ - public IndexShardState recovering(String reason, RecoveryState.Type type, DiscoveryNode sourceNode) throws IndexShardStartedException, - IndexShardRelocatedException, IndexShardRecoveringException, IndexShardClosedException { - return recovering(reason, new RecoveryState(shardId, shardRouting.primary(), type, sourceNode, localNode)); - } - - /** - * Marks the shard as recovering based on a restore, fails with exception is recovering is not allowed to be set. - */ - public IndexShardState recovering(String reason, RecoveryState.Type type, RestoreSource restoreSource) throws IndexShardStartedException { - return recovering(reason, new RecoveryState(shardId, shardRouting.primary(), type, restoreSource, localNode)); - } - - private IndexShardState recovering(String reason, RecoveryState recoveryState) throws IndexShardStartedException, + public IndexShardState recovering(String reason, RecoveryState recoveryState) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardRecoveringException, IndexShardClosedException { synchronized (mutex) { if (state == IndexShardState.CLOSED) { @@ -776,7 +753,6 @@ public class IndexShard extends AbstractIndexShardComponent { public void close(String reason, boolean flushEngine) throws IOException { synchronized (mutex) { try { - indexSettingsService.removeListener(applyRefreshSettings); if (state != IndexShardState.CLOSED) { FutureUtils.cancel(refreshScheduledFuture); refreshScheduledFuture = null; @@ -1060,7 +1036,7 @@ public class IndexShard extends AbstractIndexShardComponent { return path; } - public boolean recoverFromStore(ShardRouting shard) { + public boolean recoverFromStore(ShardRouting shard, DiscoveryNode localNode) { // we are the first primary, recover from the gateway // if its post api allocation, the index should exists assert shard.primary() : "recover from store only makes sense if the shard is a primary shard"; @@ -1069,10 +1045,10 @@ public class IndexShard extends AbstractIndexShardComponent { return storeRecovery.recoverFromStore(this, shouldExist, localNode); } - public boolean restoreFromRepository(ShardRouting shard, IndexShardRepository repository) { + public boolean restoreFromRepository(ShardRouting shard, IndexShardRepository repository, DiscoveryNode locaNode) { assert shard.primary() : "recover from store only makes sense if the shard is a primary shard"; StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); - return storeRecovery.recoverFromRepository(this, repository); + return storeRecovery.recoverFromRepository(this, repository, locaNode); } /** @@ -1094,110 +1070,108 @@ public class IndexShard extends AbstractIndexShardComponent { return false; } - private class ApplyRefreshSettings implements IndexSettingsService.Listener { - @Override - public void onRefreshSettings(Settings settings) { - boolean change = false; - synchronized (mutex) { - if (state() == IndexShardState.CLOSED) { // no need to update anything if we are closed - return; - } - int flushThresholdOperations = settings.getAsInt(INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, IndexShard.this.flushThresholdOperations); - if (flushThresholdOperations != IndexShard.this.flushThresholdOperations) { - logger.info("updating flush_threshold_ops from [{}] to [{}]", IndexShard.this.flushThresholdOperations, flushThresholdOperations); - IndexShard.this.flushThresholdOperations = flushThresholdOperations; - } - ByteSizeValue flushThresholdSize = settings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, IndexShard.this.flushThresholdSize); - if (!flushThresholdSize.equals(IndexShard.this.flushThresholdSize)) { - logger.info("updating flush_threshold_size from [{}] to [{}]", IndexShard.this.flushThresholdSize, flushThresholdSize); - IndexShard.this.flushThresholdSize = flushThresholdSize; - } - boolean disableFlush = settings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, IndexShard.this.disableFlush); - if (disableFlush != IndexShard.this.disableFlush) { - logger.info("updating disable_flush from [{}] to [{}]", IndexShard.this.disableFlush, disableFlush); - IndexShard.this.disableFlush = disableFlush; - } + @Override + public void onRefreshSettings(Settings settings) { + boolean change = false; + synchronized (mutex) { + if (state() == IndexShardState.CLOSED) { // no need to update anything if we are closed + return; + } + int flushThresholdOperations = settings.getAsInt(INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, this.flushThresholdOperations); + if (flushThresholdOperations != this.flushThresholdOperations) { + logger.info("updating flush_threshold_ops from [{}] to [{}]", this.flushThresholdOperations, flushThresholdOperations); + this.flushThresholdOperations = flushThresholdOperations; + } + ByteSizeValue flushThresholdSize = settings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, this.flushThresholdSize); + if (!flushThresholdSize.equals(this.flushThresholdSize)) { + logger.info("updating flush_threshold_size from [{}] to [{}]", this.flushThresholdSize, flushThresholdSize); + this.flushThresholdSize = flushThresholdSize; + } + boolean disableFlush = settings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, this.disableFlush); + if (disableFlush != this.disableFlush) { + logger.info("updating disable_flush from [{}] to [{}]", this.disableFlush, disableFlush); + this.disableFlush = disableFlush; + } - final EngineConfig config = engineConfig; - final boolean flushOnClose = settings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, IndexShard.this.flushOnClose); - if (flushOnClose != IndexShard.this.flushOnClose) { - logger.info("updating {} from [{}] to [{}]", INDEX_FLUSH_ON_CLOSE, IndexShard.this.flushOnClose, flushOnClose); - IndexShard.this.flushOnClose = flushOnClose; - } + final EngineConfig config = engineConfig; + final boolean flushOnClose = settings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, this.flushOnClose); + if (flushOnClose != this.flushOnClose) { + logger.info("updating {} from [{}] to [{}]", INDEX_FLUSH_ON_CLOSE, this.flushOnClose, flushOnClose); + this.flushOnClose = flushOnClose; + } - TranslogWriter.Type type = TranslogWriter.Type.fromString(settings.get(TranslogConfig.INDEX_TRANSLOG_FS_TYPE, translogConfig.getType().name())); - if (type != translogConfig.getType()) { - logger.info("updating type from [{}] to [{}]", translogConfig.getType(), type); - translogConfig.setType(type); - } + TranslogWriter.Type type = TranslogWriter.Type.fromString(settings.get(TranslogConfig.INDEX_TRANSLOG_FS_TYPE, translogConfig.getType().name())); + if (type != translogConfig.getType()) { + logger.info("updating type from [{}] to [{}]", translogConfig.getType(), type); + translogConfig.setType(type); + } - final Translog.Durabilty durabilty = getFromSettings(logger, settings, translogConfig.getDurabilty()); - if (durabilty != translogConfig.getDurabilty()) { - logger.info("updating durability from [{}] to [{}]", translogConfig.getDurabilty(), durabilty); - translogConfig.setDurabilty(durabilty); - } + final Translog.Durabilty durabilty = getFromSettings(logger, settings, translogConfig.getDurabilty()); + if (durabilty != translogConfig.getDurabilty()) { + logger.info("updating durability from [{}] to [{}]", translogConfig.getDurabilty(), durabilty); + translogConfig.setDurabilty(durabilty); + } - TimeValue refreshInterval = settings.getAsTime(INDEX_REFRESH_INTERVAL, IndexShard.this.refreshInterval); - if (!refreshInterval.equals(IndexShard.this.refreshInterval)) { - logger.info("updating refresh_interval from [{}] to [{}]", IndexShard.this.refreshInterval, refreshInterval); - if (refreshScheduledFuture != null) { - // NOTE: we pass false here so we do NOT attempt Thread.interrupt if EngineRefresher.run is currently running. This is - // very important, because doing so can cause files to suddenly be closed if they were doing IO when the interrupt - // hit. See https://issues.apache.org/jira/browse/LUCENE-2239 - FutureUtils.cancel(refreshScheduledFuture); - refreshScheduledFuture = null; - } - IndexShard.this.refreshInterval = refreshInterval; - if (refreshInterval.millis() > 0) { - refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, new EngineRefresher()); - } + TimeValue refreshInterval = settings.getAsTime(INDEX_REFRESH_INTERVAL, this.refreshInterval); + if (!refreshInterval.equals(this.refreshInterval)) { + logger.info("updating refresh_interval from [{}] to [{}]", this.refreshInterval, refreshInterval); + if (refreshScheduledFuture != null) { + // NOTE: we pass false here so we do NOT attempt Thread.interrupt if EngineRefresher.run is currently running. This is + // very important, because doing so can cause files to suddenly be closed if they were doing IO when the interrupt + // hit. See https://issues.apache.org/jira/browse/LUCENE-2239 + FutureUtils.cancel(refreshScheduledFuture); + refreshScheduledFuture = null; } - - long gcDeletesInMillis = settings.getAsTime(EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis())).millis(); - if (gcDeletesInMillis != config.getGcDeletesInMillis()) { - logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis()), TimeValue.timeValueMillis(gcDeletesInMillis)); - config.setGcDeletesInMillis(gcDeletesInMillis); - change = true; - } - - final boolean compoundOnFlush = settings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, config.isCompoundOnFlush()); - if (compoundOnFlush != config.isCompoundOnFlush()) { - logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_COMPOUND_ON_FLUSH, config.isCompoundOnFlush(), compoundOnFlush); - config.setCompoundOnFlush(compoundOnFlush); - change = true; - } - final String versionMapSize = settings.get(EngineConfig.INDEX_VERSION_MAP_SIZE, config.getVersionMapSizeSetting()); - if (config.getVersionMapSizeSetting().equals(versionMapSize) == false) { - config.setVersionMapSizeSetting(versionMapSize); - } - - final int maxThreadCount = settings.getAsInt(MergeSchedulerConfig.MAX_THREAD_COUNT, mergeSchedulerConfig.getMaxThreadCount()); - if (maxThreadCount != mergeSchedulerConfig.getMaxThreadCount()) { - logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.MAX_THREAD_COUNT, mergeSchedulerConfig.getMaxMergeCount(), maxThreadCount); - mergeSchedulerConfig.setMaxThreadCount(maxThreadCount); - change = true; - } - - final int maxMergeCount = settings.getAsInt(MergeSchedulerConfig.MAX_MERGE_COUNT, mergeSchedulerConfig.getMaxMergeCount()); - if (maxMergeCount != mergeSchedulerConfig.getMaxMergeCount()) { - logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.MAX_MERGE_COUNT, mergeSchedulerConfig.getMaxMergeCount(), maxMergeCount); - mergeSchedulerConfig.setMaxMergeCount(maxMergeCount); - change = true; - } - - final boolean autoThrottle = settings.getAsBoolean(MergeSchedulerConfig.AUTO_THROTTLE, mergeSchedulerConfig.isAutoThrottle()); - if (autoThrottle != mergeSchedulerConfig.isAutoThrottle()) { - logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.AUTO_THROTTLE, mergeSchedulerConfig.isAutoThrottle(), autoThrottle); - mergeSchedulerConfig.setAutoThrottle(autoThrottle); - change = true; + this.refreshInterval = refreshInterval; + if (refreshInterval.millis() > 0) { + refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, new EngineRefresher()); } } - mergePolicyConfig.onRefreshSettings(settings); - searchService.onRefreshSettings(settings); - indexingService.onRefreshSettings(settings); - if (change) { - engine().onSettingsChanged(); + + long gcDeletesInMillis = settings.getAsTime(EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis())).millis(); + if (gcDeletesInMillis != config.getGcDeletesInMillis()) { + logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis()), TimeValue.timeValueMillis(gcDeletesInMillis)); + config.setGcDeletesInMillis(gcDeletesInMillis); + change = true; } + + final boolean compoundOnFlush = settings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, config.isCompoundOnFlush()); + if (compoundOnFlush != config.isCompoundOnFlush()) { + logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_COMPOUND_ON_FLUSH, config.isCompoundOnFlush(), compoundOnFlush); + config.setCompoundOnFlush(compoundOnFlush); + change = true; + } + final String versionMapSize = settings.get(EngineConfig.INDEX_VERSION_MAP_SIZE, config.getVersionMapSizeSetting()); + if (config.getVersionMapSizeSetting().equals(versionMapSize) == false) { + config.setVersionMapSizeSetting(versionMapSize); + } + + final int maxThreadCount = settings.getAsInt(MergeSchedulerConfig.MAX_THREAD_COUNT, mergeSchedulerConfig.getMaxThreadCount()); + if (maxThreadCount != mergeSchedulerConfig.getMaxThreadCount()) { + logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.MAX_THREAD_COUNT, mergeSchedulerConfig.getMaxMergeCount(), maxThreadCount); + mergeSchedulerConfig.setMaxThreadCount(maxThreadCount); + change = true; + } + + final int maxMergeCount = settings.getAsInt(MergeSchedulerConfig.MAX_MERGE_COUNT, mergeSchedulerConfig.getMaxMergeCount()); + if (maxMergeCount != mergeSchedulerConfig.getMaxMergeCount()) { + logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.MAX_MERGE_COUNT, mergeSchedulerConfig.getMaxMergeCount(), maxMergeCount); + mergeSchedulerConfig.setMaxMergeCount(maxMergeCount); + change = true; + } + + final boolean autoThrottle = settings.getAsBoolean(MergeSchedulerConfig.AUTO_THROTTLE, mergeSchedulerConfig.isAutoThrottle()); + if (autoThrottle != mergeSchedulerConfig.isAutoThrottle()) { + logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.AUTO_THROTTLE, mergeSchedulerConfig.isAutoThrottle(), autoThrottle); + mergeSchedulerConfig.setAutoThrottle(autoThrottle); + change = true; + } + } + mergePolicyConfig.onRefreshSettings(settings); + searchService.onRefreshSettings(settings); + indexingService.onRefreshSettings(settings); + if (change) { + engine().onSettingsChanged(); } } @@ -1428,7 +1402,7 @@ public class IndexShard extends AbstractIndexShardComponent { } }; return new EngineConfig(shardId, - threadPool, indexingService, indexSettingsService.indexSettings(), warmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig, + threadPool, indexingService, indexSettings, warmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig, mapperService.indexAnalyzer(), similarityService.similarity(), codecService, failedEngineListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, wrappingService, translogConfig); } diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java index d323f1d2f59..62fa928faf1 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java @@ -22,6 +22,7 @@ import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.index.aliases.IndexAliasesService; import org.elasticsearch.index.cache.IndexCache; @@ -34,6 +35,7 @@ import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.query.IndexQueryParserService; +import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.Store; @@ -54,7 +56,7 @@ import java.io.IOException; public final class ShadowIndexShard extends IndexShard { @Inject - public ShadowIndexShard(ShardId shardId, IndexSettingsService indexSettingsService, + public ShadowIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndicesLifecycle indicesLifecycle, Store store, ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, @@ -62,14 +64,14 @@ public final class ShadowIndexShard extends IndexShard { CodecService codecService, TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, @Nullable IndicesWarmer warmer, SimilarityService similarityService, - EngineFactory factory, ClusterService clusterService, + EngineFactory factory, ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) throws IOException { - super(shardId, indexSettingsService, indicesLifecycle, store, + super(shardId, indexSettings, indicesLifecycle, store, threadPool, mapperService, queryParserService, indexCache, indexAliasesService, indicesQueryCache, codecService, termVectorsService, indexFieldDataService, warmer, similarityService, - factory, clusterService, path, bigArrays, wrappingService); + factory, path, bigArrays, wrappingService); } /** diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 50a087898cc..9059c162680 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -26,6 +26,7 @@ import org.apache.lucene.store.Directory; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RestoreSource; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.unit.ByteSizeValue; @@ -72,7 +73,8 @@ final class StoreRecovery { throw new IllegalStateException("can't recover - restore source is not null"); } try { - indexShard.recovering("from store", RecoveryState.Type.STORE, localNode); + final RecoveryState recoveryState = new RecoveryState(indexShard.shardId(), indexShard.routingEntry().primary(), RecoveryState.Type.STORE, localNode, localNode); + indexShard.recovering("from store", recoveryState); } catch (IllegalIndexShardStateException e) { // that's fine, since we might be called concurrently, just ignore this, we are already recovering return false; @@ -93,19 +95,21 @@ final class StoreRecovery { * @return true if the the shard has been recovered successfully, false if the recovery * has been ignored due to a concurrent modification of if the clusters state has changed due to async updates. */ - boolean recoverFromRepository(final IndexShard indexShard, IndexShardRepository repository) { + boolean recoverFromRepository(final IndexShard indexShard, IndexShardRepository repository, DiscoveryNode localNode) { if (canRecover(indexShard)) { - if (indexShard.routingEntry().restoreSource() == null) { + final ShardRouting shardRouting = indexShard.routingEntry(); + if (shardRouting.restoreSource() == null) { throw new IllegalStateException("can't restore - restore source is null"); } try { - indexShard.recovering("from snapshot", RecoveryState.Type.SNAPSHOT, indexShard.routingEntry().restoreSource()); + final RecoveryState recoveryState = new RecoveryState(shardId, shardRouting.primary(), RecoveryState.Type.SNAPSHOT, shardRouting.restoreSource(), localNode); + indexShard.recovering("from snapshot", recoveryState); } catch (IllegalIndexShardStateException e) { // that's fine, since we might be called concurrently, just ignore this, we are already recovering return false; } return executeRecovery(indexShard, () -> { - logger.debug("restoring from {} ...", indexShard.routingEntry().restoreSource()); + logger.debug("restoring from {} ...", shardRouting.restoreSource()); restore(indexShard, repository); }); } diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 4f59428eede..8331a3d1a27 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -677,14 +677,15 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent * Individual shards are getting restored as part of normal recovery process in - * {@link IndexShard#restoreFromRepository(ShardRouting, IndexShardRepository)} + * {@link IndexShard#restoreFromRepository(ShardRouting, IndexShardRepository, DiscoveryNode)} )} * method, which detects that shard should be restored from snapshot rather than recovered from gateway by looking * at the {@link org.elasticsearch.cluster.routing.ShardRouting#restoreSource()} property. *

diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 08c2f8773fa..25a8bf2b40e 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -40,6 +40,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.InternalClusterInfoService; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.SnapshotId; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.*; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -48,6 +49,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -787,7 +789,8 @@ public class IndexShardTests extends ESSingleNodeTestCase { ShardRoutingHelper.reinit(routing); IndexShard newShard = test.createShard(0, routing); newShard.updateRoutingEntry(routing, false); - assertTrue(newShard.recoverFromStore(routing)); + DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT); + assertTrue(newShard.recoverFromStore(routing, localNode)); routing = new ShardRouting(routing); ShardRoutingHelper.moveToStarted(routing); newShard.updateRoutingEntry(routing, true); @@ -799,6 +802,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { createIndex("test"); ensureGreen(); IndicesService indicesService = getInstanceFromNode(IndicesService.class); + DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT); IndexService test = indicesService.indexService("test"); final IndexShard shard = test.shard(0); @@ -817,7 +821,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { IndexShard newShard = test.createShard(0, routing); newShard.updateRoutingEntry(routing, false); try { - newShard.recoverFromStore(routing); + newShard.recoverFromStore(routing, localNode); fail("index not there!"); } catch (IndexShardRecoveryException ex) { assertTrue(ex.getMessage().contains("failed to fetch index version after copying it over")); @@ -826,11 +830,11 @@ public class IndexShardTests extends ESSingleNodeTestCase { ShardRoutingHelper.moveToUnassigned(routing, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "because I say so")); ShardRoutingHelper.initialize(routing, origRouting.currentNodeId()); - assertFalse("it's already recovering", newShard.recoverFromStore(routing)); + assertFalse("it's already recovering", newShard.recoverFromStore(routing, localNode)); test.removeShard(0, "I broken it"); newShard = test.createShard(0, routing); newShard.updateRoutingEntry(routing, false); - assertTrue("recover even if there is nothing to recover", newShard.recoverFromStore(routing)); + assertTrue("recover even if there is nothing to recover", newShard.recoverFromStore(routing, localNode)); routing = new ShardRouting(routing); ShardRoutingHelper.moveToStarted(routing); @@ -865,6 +869,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { Store targetStore = test_target_shard.store(); test_target_shard.updateRoutingEntry(routing, false); + DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT); assertTrue(test_target_shard.restoreFromRepository(routing, new IndexShardRepository() { @Override public void snapshot(SnapshotId snapshotId, ShardId shardId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) { @@ -893,7 +898,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { @Override public void verify(String verificationToken) { } - })); + }, localNode)); routing = new ShardRouting(routing); ShardRoutingHelper.moveToStarted(routing); @@ -902,4 +907,15 @@ public class IndexShardTests extends ESSingleNodeTestCase { assertSearchHits(client().prepareSearch("test_target").get(), "0"); } + public void testListenersAreRemoved() { + createIndex("test"); + ensureGreen(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService indexService = indicesService.indexService("test"); + IndexShard shard = indexService.shard(0); + IndexSettingsService settingsService = indexService.settingsService(); + assertTrue(settingsService.isRegistered(shard)); + indexService.removeShard(0, "simon says so"); + assertFalse(settingsService.isRegistered(shard)); + } }