From 6fff824402256b31c48c3be3869219ff1cd29c0a Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 29 Sep 2015 17:42:55 +0200 Subject: [PATCH] Remove ClusterSerivce and IndexSettingsService dependency from IndexShard We have two unneded heavy dependencies on IndexShard that are unneeded and only cause trouble if you try to mock index shard. This commit removes IndexSettingsService as well as ClusterSerivce from IndexShard to simplify future mocking and construction. --- .../org/elasticsearch/index/IndexService.java | 15 +- .../index/settings/IndexSettingsService.java | 6 + .../elasticsearch/index/shard/IndexShard.java | 236 ++++++++---------- .../index/shard/ShadowIndexShard.java | 10 +- .../index/shard/StoreRecovery.java | 14 +- .../cluster/IndicesClusterStateService.java | 5 +- .../indices/recovery/RecoveryTarget.java | 3 +- .../snapshots/RestoreService.java | 3 +- .../index/shard/IndexShardTests.java | 26 +- 9 files changed, 157 insertions(+), 161 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index f141d9473ef..3c40b02a4b9 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -24,6 +24,7 @@ import org.apache.lucene.util.Accountable; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; @@ -382,6 +383,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone indicesLifecycle.afterIndexShardCreated(indexShard); shards = newMapBuilder(shards).put(shardId.id(), new IndexShardInjectorPair(indexShard, shardInjector)).immutableMap(); + settingsService.addListener(indexShard); success = true; return indexShard; } catch (IOException e) { @@ -433,6 +435,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone // this logic is tricky, we want to close the engine so we rollback the changes done to it // and close the shard so no operations are allowed to it if (indexShard != null) { + settingsService.removeListener(indexShard); try { final boolean flushEngine = deleted.get() == false && closed.get(); // only flush we are we closed (closed index or shutdown) and if we are not deleted indexShard.close(reason, flushEngine); @@ -453,18 +456,6 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone } } - /** - * This method gets an instance for each of the given classes passed and calls #close() on the returned instance. - * NOTE: this method swallows all exceptions thrown from the close method of the injector and logs them as debug log - */ - private void closeInjectorResource(ShardId shardId, Injector shardInjector, Class... toClose) { - for (Class closeable : toClose) { - if (closeInjectorOptionalResource(shardId, shardInjector, closeable) == false) { - logger.warn("[{}] no instance available for [{}], ignoring... ", shardId, closeable.getSimpleName()); - } - } - } - /** * Closes an optional resource. Returns true if the resource was found; * NOTE: this method swallows all exceptions thrown from the close method of the injector and logs them as debug log diff --git a/core/src/main/java/org/elasticsearch/index/settings/IndexSettingsService.java b/core/src/main/java/org/elasticsearch/index/settings/IndexSettingsService.java index 39d1437983f..d76540e1e85 100644 --- a/core/src/main/java/org/elasticsearch/index/settings/IndexSettingsService.java +++ b/core/src/main/java/org/elasticsearch/index/settings/IndexSettingsService.java @@ -73,6 +73,12 @@ public class IndexSettingsService extends AbstractIndexComponent { this.listeners.remove(listener); } + /** + * Returns true iff the given listener is already registered otherwise false + */ + public boolean isRegistered(Listener listener) { + return listeners.contains(listener); + } public interface Listener { void onRefreshSettings(Settings settings); } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index e2e3720735b..b9dd7853812 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -37,10 +37,8 @@ import org.elasticsearch.action.admin.indices.optimize.OptimizeRequest; import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest; import org.elasticsearch.action.termvectors.TermVectorsRequest; import org.elasticsearch.action.termvectors.TermVectorsResponse; -import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.RestoreSource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.Booleans; @@ -85,6 +83,7 @@ import org.elasticsearch.index.recovery.RecoveryStats; import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.search.stats.SearchStats; import org.elasticsearch.index.search.stats.ShardSearchStats; +import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.snapshots.IndexShardRepository; @@ -121,10 +120,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -public class IndexShard extends AbstractIndexShardComponent { +public class IndexShard extends AbstractIndexShardComponent implements IndexSettingsService.Listener { private final ThreadPool threadPool; - private final IndexSettingsService indexSettingsService; private final MapperService mapperService; private final IndexQueryParserService queryParserService; private final IndexCache indexCache; @@ -144,8 +142,6 @@ public class IndexShard extends AbstractIndexShardComponent { private final IndexFieldDataService indexFieldDataService; private final ShardSuggestMetric shardSuggestMetric = new ShardSuggestMetric(); private final ShardBitsetFilterCache shardBitsetFilterCache; - private final DiscoveryNode localNode; - private final Object mutex = new Object(); private final String checkIndexOnStartup; private final CodecService codecService; @@ -171,9 +167,6 @@ public class IndexShard extends AbstractIndexShardComponent { private RecoveryState recoveryState; private final RecoveryStats recoveryStats = new RecoveryStats(); - - private ApplyRefreshSettings applyRefreshSettings = new ApplyRefreshSettings(); - private final MeanMetric refreshMetric = new MeanMetric(); private final MeanMetric flushMetric = new MeanMetric(); @@ -200,13 +193,13 @@ public class IndexShard extends AbstractIndexShardComponent { private EnumSet readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY); @Inject - public IndexShard(ShardId shardId, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, + public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndicesLifecycle indicesLifecycle, Store store, ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, IndicesQueryCache indicesQueryCache, CodecService codecService, TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, @Nullable IndicesWarmer warmer, SimilarityService similarityService, EngineFactory factory, - ClusterService clusterService, ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) { - super(shardId, indexSettingsService.getSettings()); + ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) { + super(shardId, indexSettings); this.codecService = codecService; this.warmer = warmer; this.deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()); @@ -215,7 +208,6 @@ public class IndexShard extends AbstractIndexShardComponent { Objects.requireNonNull(store, "Store must be provided to the index shard"); this.engineFactory = factory; this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle; - this.indexSettingsService = indexSettingsService; this.store = store; this.mergeSchedulerConfig = new MergeSchedulerConfig(indexSettings); this.threadPool = threadPool; @@ -235,12 +227,9 @@ public class IndexShard extends AbstractIndexShardComponent { this.percolatorQueriesRegistry = new PercolatorQueriesRegistry(shardId, indexSettings, queryParserService, indexingService, indicesLifecycle, mapperService, indexFieldDataService, shardPercolateService); this.indexFieldDataService = indexFieldDataService; this.shardBitsetFilterCache = new ShardBitsetFilterCache(shardId, indexSettings); - assert clusterService.localNode() != null : "Local node is null lifecycle state is: " + clusterService.lifecycleState(); - this.localNode = clusterService.localNode(); state = IndexShardState.CREATED; this.refreshInterval = indexSettings.getAsTime(INDEX_REFRESH_INTERVAL, EngineConfig.DEFAULT_REFRESH_INTERVAL); this.flushOnClose = indexSettings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, true); - indexSettingsService.addListener(applyRefreshSettings); this.path = path; this.mergePolicyConfig = new MergePolicyConfig(logger, indexSettings); /* create engine config */ @@ -385,21 +374,9 @@ public class IndexShard extends AbstractIndexShardComponent { } /** - * Marks the shard as recovering based on a remote or local node, fails with exception is recovering is not allowed to be set. + * Marks the shard as recovering based on a recovery state, fails with exception is recovering is not allowed to be set. */ - public IndexShardState recovering(String reason, RecoveryState.Type type, DiscoveryNode sourceNode) throws IndexShardStartedException, - IndexShardRelocatedException, IndexShardRecoveringException, IndexShardClosedException { - return recovering(reason, new RecoveryState(shardId, shardRouting.primary(), type, sourceNode, localNode)); - } - - /** - * Marks the shard as recovering based on a restore, fails with exception is recovering is not allowed to be set. - */ - public IndexShardState recovering(String reason, RecoveryState.Type type, RestoreSource restoreSource) throws IndexShardStartedException { - return recovering(reason, new RecoveryState(shardId, shardRouting.primary(), type, restoreSource, localNode)); - } - - private IndexShardState recovering(String reason, RecoveryState recoveryState) throws IndexShardStartedException, + public IndexShardState recovering(String reason, RecoveryState recoveryState) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardRecoveringException, IndexShardClosedException { synchronized (mutex) { if (state == IndexShardState.CLOSED) { @@ -776,7 +753,6 @@ public class IndexShard extends AbstractIndexShardComponent { public void close(String reason, boolean flushEngine) throws IOException { synchronized (mutex) { try { - indexSettingsService.removeListener(applyRefreshSettings); if (state != IndexShardState.CLOSED) { FutureUtils.cancel(refreshScheduledFuture); refreshScheduledFuture = null; @@ -1060,7 +1036,7 @@ public class IndexShard extends AbstractIndexShardComponent { return path; } - public boolean recoverFromStore(ShardRouting shard) { + public boolean recoverFromStore(ShardRouting shard, DiscoveryNode localNode) { // we are the first primary, recover from the gateway // if its post api allocation, the index should exists assert shard.primary() : "recover from store only makes sense if the shard is a primary shard"; @@ -1069,10 +1045,10 @@ public class IndexShard extends AbstractIndexShardComponent { return storeRecovery.recoverFromStore(this, shouldExist, localNode); } - public boolean restoreFromRepository(ShardRouting shard, IndexShardRepository repository) { + public boolean restoreFromRepository(ShardRouting shard, IndexShardRepository repository, DiscoveryNode locaNode) { assert shard.primary() : "recover from store only makes sense if the shard is a primary shard"; StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); - return storeRecovery.recoverFromRepository(this, repository); + return storeRecovery.recoverFromRepository(this, repository, locaNode); } /** @@ -1094,110 +1070,108 @@ public class IndexShard extends AbstractIndexShardComponent { return false; } - private class ApplyRefreshSettings implements IndexSettingsService.Listener { - @Override - public void onRefreshSettings(Settings settings) { - boolean change = false; - synchronized (mutex) { - if (state() == IndexShardState.CLOSED) { // no need to update anything if we are closed - return; - } - int flushThresholdOperations = settings.getAsInt(INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, IndexShard.this.flushThresholdOperations); - if (flushThresholdOperations != IndexShard.this.flushThresholdOperations) { - logger.info("updating flush_threshold_ops from [{}] to [{}]", IndexShard.this.flushThresholdOperations, flushThresholdOperations); - IndexShard.this.flushThresholdOperations = flushThresholdOperations; - } - ByteSizeValue flushThresholdSize = settings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, IndexShard.this.flushThresholdSize); - if (!flushThresholdSize.equals(IndexShard.this.flushThresholdSize)) { - logger.info("updating flush_threshold_size from [{}] to [{}]", IndexShard.this.flushThresholdSize, flushThresholdSize); - IndexShard.this.flushThresholdSize = flushThresholdSize; - } - boolean disableFlush = settings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, IndexShard.this.disableFlush); - if (disableFlush != IndexShard.this.disableFlush) { - logger.info("updating disable_flush from [{}] to [{}]", IndexShard.this.disableFlush, disableFlush); - IndexShard.this.disableFlush = disableFlush; - } + @Override + public void onRefreshSettings(Settings settings) { + boolean change = false; + synchronized (mutex) { + if (state() == IndexShardState.CLOSED) { // no need to update anything if we are closed + return; + } + int flushThresholdOperations = settings.getAsInt(INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, this.flushThresholdOperations); + if (flushThresholdOperations != this.flushThresholdOperations) { + logger.info("updating flush_threshold_ops from [{}] to [{}]", this.flushThresholdOperations, flushThresholdOperations); + this.flushThresholdOperations = flushThresholdOperations; + } + ByteSizeValue flushThresholdSize = settings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, this.flushThresholdSize); + if (!flushThresholdSize.equals(this.flushThresholdSize)) { + logger.info("updating flush_threshold_size from [{}] to [{}]", this.flushThresholdSize, flushThresholdSize); + this.flushThresholdSize = flushThresholdSize; + } + boolean disableFlush = settings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, this.disableFlush); + if (disableFlush != this.disableFlush) { + logger.info("updating disable_flush from [{}] to [{}]", this.disableFlush, disableFlush); + this.disableFlush = disableFlush; + } - final EngineConfig config = engineConfig; - final boolean flushOnClose = settings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, IndexShard.this.flushOnClose); - if (flushOnClose != IndexShard.this.flushOnClose) { - logger.info("updating {} from [{}] to [{}]", INDEX_FLUSH_ON_CLOSE, IndexShard.this.flushOnClose, flushOnClose); - IndexShard.this.flushOnClose = flushOnClose; - } + final EngineConfig config = engineConfig; + final boolean flushOnClose = settings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, this.flushOnClose); + if (flushOnClose != this.flushOnClose) { + logger.info("updating {} from [{}] to [{}]", INDEX_FLUSH_ON_CLOSE, this.flushOnClose, flushOnClose); + this.flushOnClose = flushOnClose; + } - TranslogWriter.Type type = TranslogWriter.Type.fromString(settings.get(TranslogConfig.INDEX_TRANSLOG_FS_TYPE, translogConfig.getType().name())); - if (type != translogConfig.getType()) { - logger.info("updating type from [{}] to [{}]", translogConfig.getType(), type); - translogConfig.setType(type); - } + TranslogWriter.Type type = TranslogWriter.Type.fromString(settings.get(TranslogConfig.INDEX_TRANSLOG_FS_TYPE, translogConfig.getType().name())); + if (type != translogConfig.getType()) { + logger.info("updating type from [{}] to [{}]", translogConfig.getType(), type); + translogConfig.setType(type); + } - final Translog.Durabilty durabilty = getFromSettings(logger, settings, translogConfig.getDurabilty()); - if (durabilty != translogConfig.getDurabilty()) { - logger.info("updating durability from [{}] to [{}]", translogConfig.getDurabilty(), durabilty); - translogConfig.setDurabilty(durabilty); - } + final Translog.Durabilty durabilty = getFromSettings(logger, settings, translogConfig.getDurabilty()); + if (durabilty != translogConfig.getDurabilty()) { + logger.info("updating durability from [{}] to [{}]", translogConfig.getDurabilty(), durabilty); + translogConfig.setDurabilty(durabilty); + } - TimeValue refreshInterval = settings.getAsTime(INDEX_REFRESH_INTERVAL, IndexShard.this.refreshInterval); - if (!refreshInterval.equals(IndexShard.this.refreshInterval)) { - logger.info("updating refresh_interval from [{}] to [{}]", IndexShard.this.refreshInterval, refreshInterval); - if (refreshScheduledFuture != null) { - // NOTE: we pass false here so we do NOT attempt Thread.interrupt if EngineRefresher.run is currently running. This is - // very important, because doing so can cause files to suddenly be closed if they were doing IO when the interrupt - // hit. See https://issues.apache.org/jira/browse/LUCENE-2239 - FutureUtils.cancel(refreshScheduledFuture); - refreshScheduledFuture = null; - } - IndexShard.this.refreshInterval = refreshInterval; - if (refreshInterval.millis() > 0) { - refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, new EngineRefresher()); - } + TimeValue refreshInterval = settings.getAsTime(INDEX_REFRESH_INTERVAL, this.refreshInterval); + if (!refreshInterval.equals(this.refreshInterval)) { + logger.info("updating refresh_interval from [{}] to [{}]", this.refreshInterval, refreshInterval); + if (refreshScheduledFuture != null) { + // NOTE: we pass false here so we do NOT attempt Thread.interrupt if EngineRefresher.run is currently running. This is + // very important, because doing so can cause files to suddenly be closed if they were doing IO when the interrupt + // hit. See https://issues.apache.org/jira/browse/LUCENE-2239 + FutureUtils.cancel(refreshScheduledFuture); + refreshScheduledFuture = null; } - - long gcDeletesInMillis = settings.getAsTime(EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis())).millis(); - if (gcDeletesInMillis != config.getGcDeletesInMillis()) { - logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis()), TimeValue.timeValueMillis(gcDeletesInMillis)); - config.setGcDeletesInMillis(gcDeletesInMillis); - change = true; - } - - final boolean compoundOnFlush = settings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, config.isCompoundOnFlush()); - if (compoundOnFlush != config.isCompoundOnFlush()) { - logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_COMPOUND_ON_FLUSH, config.isCompoundOnFlush(), compoundOnFlush); - config.setCompoundOnFlush(compoundOnFlush); - change = true; - } - final String versionMapSize = settings.get(EngineConfig.INDEX_VERSION_MAP_SIZE, config.getVersionMapSizeSetting()); - if (config.getVersionMapSizeSetting().equals(versionMapSize) == false) { - config.setVersionMapSizeSetting(versionMapSize); - } - - final int maxThreadCount = settings.getAsInt(MergeSchedulerConfig.MAX_THREAD_COUNT, mergeSchedulerConfig.getMaxThreadCount()); - if (maxThreadCount != mergeSchedulerConfig.getMaxThreadCount()) { - logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.MAX_THREAD_COUNT, mergeSchedulerConfig.getMaxMergeCount(), maxThreadCount); - mergeSchedulerConfig.setMaxThreadCount(maxThreadCount); - change = true; - } - - final int maxMergeCount = settings.getAsInt(MergeSchedulerConfig.MAX_MERGE_COUNT, mergeSchedulerConfig.getMaxMergeCount()); - if (maxMergeCount != mergeSchedulerConfig.getMaxMergeCount()) { - logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.MAX_MERGE_COUNT, mergeSchedulerConfig.getMaxMergeCount(), maxMergeCount); - mergeSchedulerConfig.setMaxMergeCount(maxMergeCount); - change = true; - } - - final boolean autoThrottle = settings.getAsBoolean(MergeSchedulerConfig.AUTO_THROTTLE, mergeSchedulerConfig.isAutoThrottle()); - if (autoThrottle != mergeSchedulerConfig.isAutoThrottle()) { - logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.AUTO_THROTTLE, mergeSchedulerConfig.isAutoThrottle(), autoThrottle); - mergeSchedulerConfig.setAutoThrottle(autoThrottle); - change = true; + this.refreshInterval = refreshInterval; + if (refreshInterval.millis() > 0) { + refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, new EngineRefresher()); } } - mergePolicyConfig.onRefreshSettings(settings); - searchService.onRefreshSettings(settings); - indexingService.onRefreshSettings(settings); - if (change) { - engine().onSettingsChanged(); + + long gcDeletesInMillis = settings.getAsTime(EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis())).millis(); + if (gcDeletesInMillis != config.getGcDeletesInMillis()) { + logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis()), TimeValue.timeValueMillis(gcDeletesInMillis)); + config.setGcDeletesInMillis(gcDeletesInMillis); + change = true; } + + final boolean compoundOnFlush = settings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, config.isCompoundOnFlush()); + if (compoundOnFlush != config.isCompoundOnFlush()) { + logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_COMPOUND_ON_FLUSH, config.isCompoundOnFlush(), compoundOnFlush); + config.setCompoundOnFlush(compoundOnFlush); + change = true; + } + final String versionMapSize = settings.get(EngineConfig.INDEX_VERSION_MAP_SIZE, config.getVersionMapSizeSetting()); + if (config.getVersionMapSizeSetting().equals(versionMapSize) == false) { + config.setVersionMapSizeSetting(versionMapSize); + } + + final int maxThreadCount = settings.getAsInt(MergeSchedulerConfig.MAX_THREAD_COUNT, mergeSchedulerConfig.getMaxThreadCount()); + if (maxThreadCount != mergeSchedulerConfig.getMaxThreadCount()) { + logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.MAX_THREAD_COUNT, mergeSchedulerConfig.getMaxMergeCount(), maxThreadCount); + mergeSchedulerConfig.setMaxThreadCount(maxThreadCount); + change = true; + } + + final int maxMergeCount = settings.getAsInt(MergeSchedulerConfig.MAX_MERGE_COUNT, mergeSchedulerConfig.getMaxMergeCount()); + if (maxMergeCount != mergeSchedulerConfig.getMaxMergeCount()) { + logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.MAX_MERGE_COUNT, mergeSchedulerConfig.getMaxMergeCount(), maxMergeCount); + mergeSchedulerConfig.setMaxMergeCount(maxMergeCount); + change = true; + } + + final boolean autoThrottle = settings.getAsBoolean(MergeSchedulerConfig.AUTO_THROTTLE, mergeSchedulerConfig.isAutoThrottle()); + if (autoThrottle != mergeSchedulerConfig.isAutoThrottle()) { + logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.AUTO_THROTTLE, mergeSchedulerConfig.isAutoThrottle(), autoThrottle); + mergeSchedulerConfig.setAutoThrottle(autoThrottle); + change = true; + } + } + mergePolicyConfig.onRefreshSettings(settings); + searchService.onRefreshSettings(settings); + indexingService.onRefreshSettings(settings); + if (change) { + engine().onSettingsChanged(); } } @@ -1428,7 +1402,7 @@ public class IndexShard extends AbstractIndexShardComponent { } }; return new EngineConfig(shardId, - threadPool, indexingService, indexSettingsService.indexSettings(), warmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig, + threadPool, indexingService, indexSettings, warmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig, mapperService.indexAnalyzer(), similarityService.similarity(), codecService, failedEngineListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, wrappingService, translogConfig); } diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java index d323f1d2f59..62fa928faf1 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java @@ -22,6 +22,7 @@ import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.index.aliases.IndexAliasesService; import org.elasticsearch.index.cache.IndexCache; @@ -34,6 +35,7 @@ import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.query.IndexQueryParserService; +import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.Store; @@ -54,7 +56,7 @@ import java.io.IOException; public final class ShadowIndexShard extends IndexShard { @Inject - public ShadowIndexShard(ShardId shardId, IndexSettingsService indexSettingsService, + public ShadowIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndicesLifecycle indicesLifecycle, Store store, ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, @@ -62,14 +64,14 @@ public final class ShadowIndexShard extends IndexShard { CodecService codecService, TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, @Nullable IndicesWarmer warmer, SimilarityService similarityService, - EngineFactory factory, ClusterService clusterService, + EngineFactory factory, ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) throws IOException { - super(shardId, indexSettingsService, indicesLifecycle, store, + super(shardId, indexSettings, indicesLifecycle, store, threadPool, mapperService, queryParserService, indexCache, indexAliasesService, indicesQueryCache, codecService, termVectorsService, indexFieldDataService, warmer, similarityService, - factory, clusterService, path, bigArrays, wrappingService); + factory, path, bigArrays, wrappingService); } /** diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 50a087898cc..9059c162680 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -26,6 +26,7 @@ import org.apache.lucene.store.Directory; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RestoreSource; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.unit.ByteSizeValue; @@ -72,7 +73,8 @@ final class StoreRecovery { throw new IllegalStateException("can't recover - restore source is not null"); } try { - indexShard.recovering("from store", RecoveryState.Type.STORE, localNode); + final RecoveryState recoveryState = new RecoveryState(indexShard.shardId(), indexShard.routingEntry().primary(), RecoveryState.Type.STORE, localNode, localNode); + indexShard.recovering("from store", recoveryState); } catch (IllegalIndexShardStateException e) { // that's fine, since we might be called concurrently, just ignore this, we are already recovering return false; @@ -93,19 +95,21 @@ final class StoreRecovery { * @return true if the the shard has been recovered successfully, false if the recovery * has been ignored due to a concurrent modification of if the clusters state has changed due to async updates. */ - boolean recoverFromRepository(final IndexShard indexShard, IndexShardRepository repository) { + boolean recoverFromRepository(final IndexShard indexShard, IndexShardRepository repository, DiscoveryNode localNode) { if (canRecover(indexShard)) { - if (indexShard.routingEntry().restoreSource() == null) { + final ShardRouting shardRouting = indexShard.routingEntry(); + if (shardRouting.restoreSource() == null) { throw new IllegalStateException("can't restore - restore source is null"); } try { - indexShard.recovering("from snapshot", RecoveryState.Type.SNAPSHOT, indexShard.routingEntry().restoreSource()); + final RecoveryState recoveryState = new RecoveryState(shardId, shardRouting.primary(), RecoveryState.Type.SNAPSHOT, shardRouting.restoreSource(), localNode); + indexShard.recovering("from snapshot", recoveryState); } catch (IllegalIndexShardStateException e) { // that's fine, since we might be called concurrently, just ignore this, we are already recovering return false; } return executeRecovery(indexShard, () -> { - logger.debug("restoring from {} ...", indexShard.routingEntry().restoreSource()); + logger.debug("restoring from {} ...", shardRouting.restoreSource()); restore(indexShard, repository); }); } diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 4f59428eede..8331a3d1a27 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -677,14 +677,15 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent * Individual shards are getting restored as part of normal recovery process in - * {@link IndexShard#restoreFromRepository(ShardRouting, IndexShardRepository)} + * {@link IndexShard#restoreFromRepository(ShardRouting, IndexShardRepository, DiscoveryNode)} )} * method, which detects that shard should be restored from snapshot rather than recovered from gateway by looking * at the {@link org.elasticsearch.cluster.routing.ShardRouting#restoreSource()} property. *

diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 08c2f8773fa..25a8bf2b40e 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -40,6 +40,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.InternalClusterInfoService; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.SnapshotId; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.*; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -48,6 +49,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -787,7 +789,8 @@ public class IndexShardTests extends ESSingleNodeTestCase { ShardRoutingHelper.reinit(routing); IndexShard newShard = test.createShard(0, routing); newShard.updateRoutingEntry(routing, false); - assertTrue(newShard.recoverFromStore(routing)); + DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT); + assertTrue(newShard.recoverFromStore(routing, localNode)); routing = new ShardRouting(routing); ShardRoutingHelper.moveToStarted(routing); newShard.updateRoutingEntry(routing, true); @@ -799,6 +802,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { createIndex("test"); ensureGreen(); IndicesService indicesService = getInstanceFromNode(IndicesService.class); + DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT); IndexService test = indicesService.indexService("test"); final IndexShard shard = test.shard(0); @@ -817,7 +821,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { IndexShard newShard = test.createShard(0, routing); newShard.updateRoutingEntry(routing, false); try { - newShard.recoverFromStore(routing); + newShard.recoverFromStore(routing, localNode); fail("index not there!"); } catch (IndexShardRecoveryException ex) { assertTrue(ex.getMessage().contains("failed to fetch index version after copying it over")); @@ -826,11 +830,11 @@ public class IndexShardTests extends ESSingleNodeTestCase { ShardRoutingHelper.moveToUnassigned(routing, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "because I say so")); ShardRoutingHelper.initialize(routing, origRouting.currentNodeId()); - assertFalse("it's already recovering", newShard.recoverFromStore(routing)); + assertFalse("it's already recovering", newShard.recoverFromStore(routing, localNode)); test.removeShard(0, "I broken it"); newShard = test.createShard(0, routing); newShard.updateRoutingEntry(routing, false); - assertTrue("recover even if there is nothing to recover", newShard.recoverFromStore(routing)); + assertTrue("recover even if there is nothing to recover", newShard.recoverFromStore(routing, localNode)); routing = new ShardRouting(routing); ShardRoutingHelper.moveToStarted(routing); @@ -865,6 +869,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { Store targetStore = test_target_shard.store(); test_target_shard.updateRoutingEntry(routing, false); + DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT); assertTrue(test_target_shard.restoreFromRepository(routing, new IndexShardRepository() { @Override public void snapshot(SnapshotId snapshotId, ShardId shardId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) { @@ -893,7 +898,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { @Override public void verify(String verificationToken) { } - })); + }, localNode)); routing = new ShardRouting(routing); ShardRoutingHelper.moveToStarted(routing); @@ -902,4 +907,15 @@ public class IndexShardTests extends ESSingleNodeTestCase { assertSearchHits(client().prepareSearch("test_target").get(), "0"); } + public void testListenersAreRemoved() { + createIndex("test"); + ensureGreen(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService indexService = indicesService.indexService("test"); + IndexShard shard = indexService.shard(0); + IndexSettingsService settingsService = indexService.settingsService(); + assertTrue(settingsService.isRegistered(shard)); + indexService.removeShard(0, "simon says so"); + assertFalse(settingsService.isRegistered(shard)); + } }