From 59211927b6d9a41f1c89fa8f27aed1270da7c17d Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 12 Jan 2016 17:40:54 +0100 Subject: [PATCH] Move RefreshTask into IndexService and use since task per index `refresh_interval` is a per index setting but we interpret and maintain it per shard. This change moves the refresh task outside of IndexShard to the IndexService where it logically belongs and reuses scheduling infrastructure used for translog fsync (async commit). This change will use the same task for all shards of an index while previously we used on thread/task per shard to refresh. This will also prevent too many concurrent refreshes if there are many indices and shards allocated on a single node. --- .../elasticsearch/cluster/ClusterModule.java | 2 +- .../org/elasticsearch/index/IndexService.java | 218 +++++++++++++++--- .../elasticsearch/index/IndexSettings.java | 19 +- .../index/engine/EngineConfig.java | 1 - .../elasticsearch/index/shard/IndexShard.java | 98 ++------ .../OldIndexBackwardsCompatibilityIT.java | 4 +- .../index/IndexServiceTests.java | 110 +++++++++ .../search/child/ParentFieldLoadingIT.java | 4 +- .../SharedClusterSnapshotRestoreIT.java | 2 +- 9 files changed, 334 insertions(+), 124 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java index fea5b90b97a..b7a1f4ad6a5 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -148,7 +148,7 @@ public class ClusterModule extends AbstractModule { registerIndexDynamicSetting(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, Validator.EMPTY); registerIndexDynamicSetting(IndexMetaData.SETTING_PRIORITY, Validator.NON_NEGATIVE_INTEGER); registerIndexDynamicSetting(IndicesTTLService.INDEX_TTL_DISABLE_PURGE, Validator.EMPTY); - registerIndexDynamicSetting(IndexShard.INDEX_REFRESH_INTERVAL, Validator.TIME); + registerIndexDynamicSetting(IndexSettings.INDEX_REFRESH_INTERVAL, Validator.TIME); registerIndexDynamicSetting(PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS, Validator.EMPTY); registerIndexDynamicSetting(EngineConfig.INDEX_GC_DELETES_SETTING, Validator.TIME); registerIndexDynamicSetting(IndexShard.INDEX_FLUSH_ON_CLOSE, Validator.BOOLEAN); diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index 3c348ef8830..874216377c5 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -21,12 +21,14 @@ package org.elasticsearch.index; import java.io.Closeable; import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; import java.nio.file.Path; -import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -42,6 +44,8 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.env.NodeEnvironment; @@ -104,6 +108,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC private final IndexSettings indexSettings; private final IndexingSlowLog slowLog; private final IndexingOperationListener[] listeners; + private volatile RefreshTasks refreshTask; public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv, SimilarityService similarityService, @@ -140,6 +145,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC this.listeners = new IndexingOperationListener[1+listenersIn.length]; this.listeners[0] = slowLog; System.arraycopy(listenersIn, 0, this.listeners, 1, listenersIn.length); + this.refreshTask = new RefreshTasks(this, nodeServicesProvider.getThreadPool()); } public int numberOfShards() { @@ -310,9 +316,12 @@ public final class IndexService extends AbstractIndexComponent implements IndexC eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); indexShard.updateRoutingEntry(routing, true); - if (shards.isEmpty() && this.indexSettings.getTranslogSyncInterval().millis() != 0) { + if (shards.isEmpty()) { ThreadPool threadPool = nodeServicesProvider.getThreadPool(); - new AsyncTranslogFSync(this, threadPool).schedule(); // kick this off if we are the first shard in this service. + if (this.indexSettings.getTranslogSyncInterval().millis() != 0) { + new AsyncTranslogFSync(this, threadPool); // kick this off if we are the first shard in this service. + } + rescheduleRefreshTasks(); } shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap(); success = true; @@ -404,6 +413,10 @@ public final class IndexService extends AbstractIndexComponent implements IndexC return new QueryShardContext(indexSettings, nodeServicesProvider.getClient(), indexCache.bitsetFilterCache(), indexFieldData, mapperService(), similarityService(), nodeServicesProvider.getScriptService(), nodeServicesProvider.getIndicesQueriesRegistry()); } + ThreadPool getThreadPool() { + return nodeServicesProvider.getThreadPool(); + } + private class StoreCloseListener implements Store.OnClose { private final ShardId shardId; private final boolean ownsShard; @@ -567,9 +580,21 @@ public final class IndexService extends AbstractIndexComponent implements IndexC } catch (Exception e) { logger.warn("failed to refresh slowlog settings", e); } + if (refreshTask.getInterval().equals(indexSettings.getRefreshInterval()) == false) { + rescheduleRefreshTasks(); + } } } + private void rescheduleRefreshTasks() { + try { + refreshTask.close(); + } finally { + refreshTask = new RefreshTasks(this, nodeServicesProvider.getThreadPool()); + } + + } + public interface ShardStoreDeleter { void deleteShardStore(String reason, ShardLock lock, IndexSettings indexSettings) throws IOException; @@ -605,40 +630,165 @@ public final class IndexService extends AbstractIndexComponent implements IndexC } } - - /** - * FSyncs the translog for all shards of this index in a defined interval. - */ - final static class AsyncTranslogFSync implements Runnable { - private final IndexService indexService; - private final ThreadPool threadPool; - - AsyncTranslogFSync(IndexService indexService, ThreadPool threadPool) { - this.indexService = indexService; - this.threadPool = threadPool; - } - - boolean mustRun() { - // don't re-schedule if its closed or if we dont' have a single shard here..., we are done - return (indexService.closed.get() || indexService.shards.isEmpty()) == false; - } - - void schedule() { - threadPool.schedule(indexService.getIndexSettings().getTranslogSyncInterval(), ThreadPool.Names.SAME, AsyncTranslogFSync.this); - } - - @Override - public void run() { - if (mustRun()) { - threadPool.executor(ThreadPool.Names.FLUSH).execute(() -> { - indexService.maybeFSyncTranslogs(); - if (mustRun()) { - schedule(); - } - }); + private void maybeRefreshEngine() { + if (indexSettings.getRefreshInterval().millis() > 0) { + for (IndexShard shard : this.shards.values()) { + switch (shard.state()) { + case CREATED: + case RECOVERING: + case CLOSED: + continue; + case POST_RECOVERY: + case STARTED: + case RELOCATED: + try { + shard.refresh("schedule"); + } catch (EngineClosedException | AlreadyClosedException ex) { + // fine - continue; + } + continue; + default: + throw new IllegalStateException("unknown state: " + shard.state()); + } } } } + static abstract class BaseAsyncTask implements Runnable, Closeable { + protected final IndexService indexService; + protected final ThreadPool threadPool; + private final TimeValue interval; + private ScheduledFuture scheduledFuture; + private final AtomicBoolean closed = new AtomicBoolean(false); + private volatile Exception lastThrownException; + + BaseAsyncTask(IndexService indexService, ThreadPool threadPool, TimeValue interval) { + this.indexService = indexService; + this.threadPool = threadPool; + this.interval = interval; + onTaskCompletion(); + } + + boolean mustReschedule() { + // don't re-schedule if its closed or if we dont' have a single shard here..., we are done + return (indexService.closed.get() || indexService.shards.isEmpty()) == false + && closed.get() == false && interval.millis() > 0; + } + + private synchronized void onTaskCompletion() { + if (mustReschedule()) { + indexService.logger.debug("scheduling {} every {}", toString(), interval); + this.scheduledFuture = threadPool.schedule(interval, getThreadPool(), BaseAsyncTask.this); + } else { + indexService.logger.debug("scheduled {} disabled", toString()); + this.scheduledFuture = null; + } + } + + public final void run() { + try { + runInternal(); + } catch (Exception ex) { + if (lastThrownException == null || sameException(lastThrownException, ex) == false) { + // prevent the annoying fact of logging the same stuff all the time with an interval of 1 sec will spam all your logs + indexService.logger.warn("failed to run task {} - supressing re-occuring exceptions unless the exception changes", ex, toString()); + lastThrownException = ex; + } + } finally { + onTaskCompletion(); + } + } + + private static boolean sameException(Exception left, Exception right) { + if (left.getClass() == right.getClass()) { + if ((left.getMessage() != null && left.getMessage().equals(right.getMessage())) + || left.getMessage() == right.getMessage()) { + StackTraceElement[] stackTraceLeft = left.getStackTrace(); + StackTraceElement[] stackTraceRight = right.getStackTrace(); + if (stackTraceLeft.length == stackTraceRight.length) { + for (int i = 0; i < stackTraceLeft.length; i++) { + if (stackTraceLeft[i].equals(stackTraceRight[i]) == false) { + return false; + } + } + return true; + } + } + } + return false; + } + + protected abstract void runInternal(); + + protected String getThreadPool() { + return ThreadPool.Names.SAME; + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + FutureUtils.cancel(scheduledFuture); + scheduledFuture = null; + } + } + + TimeValue getInterval() { + return interval; + } + + boolean isClosed() { + return this.closed.get(); + } + } + + /** + * FSyncs the translog for all shards of this index in a defined interval. + */ + final static class AsyncTranslogFSync extends BaseAsyncTask { + + AsyncTranslogFSync(IndexService indexService, ThreadPool threadPool) { + super(indexService, threadPool, indexService.getIndexSettings().getTranslogSyncInterval()); + } + + protected String getThreadPool() { + return ThreadPool.Names.FLUSH; + } + @Override + protected void runInternal() { + indexService.maybeFSyncTranslogs(); + } + + @Override + public String toString() { + return "translog_sync"; + } + } + + final class RefreshTasks extends BaseAsyncTask { + + RefreshTasks(IndexService indexService, ThreadPool threadPool) { + super(indexService, threadPool, indexService.getIndexSettings().getRefreshInterval()); + } + + @Override + protected void runInternal() { + indexService.maybeRefreshEngine(); + } + + protected String getThreadPool() { + return ThreadPool.Names.REFRESH; + } + + @Override + public String toString() { + return "refresh"; + } + } + + RefreshTasks getRefreshTask() { // for tests + return refreshTask; + } + + } diff --git a/core/src/main/java/org/elasticsearch/index/IndexSettings.java b/core/src/main/java/org/elasticsearch/index/IndexSettings.java index 772fb053cda..4c347f78dca 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/core/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -26,8 +26,10 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.index.mapper.internal.AllFieldMapper; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.Arrays; @@ -35,6 +37,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Locale; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Predicate; @@ -54,6 +57,8 @@ public final class IndexSettings { public static final String ALLOW_UNMAPPED = "index.query.parse.allow_unmapped_fields"; public static final String INDEX_TRANSLOG_SYNC_INTERVAL = "index.translog.sync_interval"; public static final String INDEX_TRANSLOG_DURABILITY = "index.translog.durability"; + public static final String INDEX_REFRESH_INTERVAL = "index.refresh_interval"; + public static final TimeValue DEFAULT_REFRESH_INTERVAL = new TimeValue(1, TimeUnit.SECONDS); private final String uuid; private final List> updateListeners; @@ -76,6 +81,9 @@ public final class IndexSettings { private final Predicate indexNameMatcher; private volatile Translog.Durability durability; private final TimeValue syncInterval; + private volatile TimeValue refreshInterval; + + /** * Returns the default search field for this index. @@ -156,7 +164,7 @@ public final class IndexSettings { final String value = settings.get(INDEX_TRANSLOG_DURABILITY, Translog.Durability.REQUEST.name()); this.durability = getFromSettings(settings, Translog.Durability.REQUEST); syncInterval = settings.getAsTime(INDEX_TRANSLOG_SYNC_INTERVAL, TimeValue.timeValueSeconds(5)); - + refreshInterval = settings.getAsTime(INDEX_REFRESH_INTERVAL, DEFAULT_REFRESH_INTERVAL); assert indexNameMatcher.test(indexMetaData.getIndex()); } @@ -346,10 +354,19 @@ public final class IndexSettings { logger.info("updating durability from [{}] to [{}]", this.durability, durability); this.durability = durability; } + + TimeValue refreshInterval = settings.getAsTime(IndexSettings.INDEX_REFRESH_INTERVAL, this.refreshInterval); + if (!refreshInterval.equals(this.refreshInterval)) { + logger.info("updating refresh_interval from [{}] to [{}]", this.refreshInterval, refreshInterval); + this.refreshInterval = refreshInterval; + } } public TimeValue getTranslogSyncInterval() { return syncInterval; } + public TimeValue getRefreshInterval() { + return refreshInterval; + } } diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index b06ecc367f6..5aae2b349c8 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -84,7 +84,6 @@ public final class EngineConfig { /** if set to true the engine will start even if the translog id in the commit point can not be found */ public static final String INDEX_FORCE_NEW_TRANSLOG = "index.engine.force_new_translog"; - public static final TimeValue DEFAULT_REFRESH_INTERVAL = new TimeValue(1, TimeUnit.SECONDS); public static final TimeValue DEFAULT_GC_DELETES = TimeValue.timeValueSeconds(60); private static final String DEFAULT_CODEC_NAME = "default"; diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index de9bb7e10f1..3cb7b083933 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -44,7 +44,6 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.support.LoggerMessageFormat; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Settings; @@ -173,8 +172,6 @@ public class IndexShard extends AbstractIndexShardComponent { * being indexed/deleted. */ private final AtomicLong writingBytes = new AtomicLong(); - private TimeValue refreshInterval; - private volatile ScheduledFuture refreshScheduledFuture; protected volatile ShardRouting shardRouting; protected volatile IndexShardState state; @@ -200,7 +197,6 @@ public class IndexShard extends AbstractIndexShardComponent { */ public static final String INDEX_FLUSH_ON_CLOSE = "index.flush_on_close"; public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE = "index.translog.flush_threshold_size"; - public static final String INDEX_REFRESH_INTERVAL = "index.refresh_interval"; private final ShardPath path; @@ -249,7 +245,6 @@ public class IndexShard extends AbstractIndexShardComponent { this.indexFieldDataService = indexFieldDataService; this.shardBitsetFilterCache = new ShardBitsetFilterCache(shardId, indexSettings); state = IndexShardState.CREATED; - this.refreshInterval = settings.getAsTime(INDEX_REFRESH_INTERVAL, EngineConfig.DEFAULT_REFRESH_INTERVAL); this.flushOnClose = settings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, true); this.path = path; this.mergePolicyConfig = new MergePolicyConfig(logger, settings); @@ -561,23 +556,25 @@ public class IndexShard extends AbstractIndexShardComponent { /** Writes all indexing changes to disk and opens a new searcher reflecting all changes. This can throw {@link EngineClosedException}. */ public void refresh(String source) { verifyNotClosed(); - if (canIndex()) { - long bytes = getEngine().getIndexBufferRAMBytesUsed(); - writingBytes.addAndGet(bytes); - try { - logger.debug("refresh with source [{}] indexBufferRAMBytesUsed [{}]", source, new ByteSizeValue(bytes)); + if (getEngine().refreshNeeded()) { + if (canIndex()) { + long bytes = getEngine().getIndexBufferRAMBytesUsed(); + writingBytes.addAndGet(bytes); + try { + logger.debug("refresh with source [{}] indexBufferRAMBytesUsed [{}]", source, new ByteSizeValue(bytes)); + long time = System.nanoTime(); + getEngine().refresh(source); + refreshMetric.inc(System.nanoTime() - time); + } finally { + logger.debug("remove [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId()); + writingBytes.addAndGet(-bytes); + } + } else { + logger.debug("refresh with source [{}]", source); long time = System.nanoTime(); getEngine().refresh(source); refreshMetric.inc(System.nanoTime() - time); - } finally { - logger.debug("remove [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId()); - writingBytes.addAndGet(-bytes); } - } else { - logger.debug("refresh with source [{}]", source); - long time = System.nanoTime(); - getEngine().refresh(source); - refreshMetric.inc(System.nanoTime() - time); } } @@ -954,7 +951,6 @@ public class IndexShard extends AbstractIndexShardComponent { public void finalizeRecovery() { recoveryState().setStage(RecoveryState.Stage.FINALIZE); getEngine().refresh("recovery_finalization"); - startScheduledTasksIfNeeded(); engineConfig.setEnableGcDeletes(true); } @@ -1022,15 +1018,6 @@ public class IndexShard extends AbstractIndexShardComponent { } } - private void startScheduledTasksIfNeeded() { - if (refreshInterval.millis() > 0) { - refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, new EngineRefresher()); - logger.debug("scheduling refresher every {}", refreshInterval); - } else { - logger.debug("scheduled refresher disabled"); - } - } - /** Returns number of heap bytes used by the indexing buffer for this shard, or 0 if the shard is closed */ public long getIndexBufferRAMBytesUsed() { Engine engine = getEngineOrNull(); @@ -1133,22 +1120,6 @@ public class IndexShard extends AbstractIndexShardComponent { this.flushOnClose = flushOnClose; } - TimeValue refreshInterval = settings.getAsTime(INDEX_REFRESH_INTERVAL, this.refreshInterval); - if (!refreshInterval.equals(this.refreshInterval)) { - logger.info("updating refresh_interval from [{}] to [{}]", this.refreshInterval, refreshInterval); - if (refreshScheduledFuture != null) { - // NOTE: we pass false here so we do NOT attempt Thread.interrupt if EngineRefresher.run is currently running. This is - // very important, because doing so can cause files to suddenly be closed if they were doing IO when the interrupt - // hit. See https://issues.apache.org/jira/browse/LUCENE-2239 - FutureUtils.cancel(refreshScheduledFuture); - refreshScheduledFuture = null; - } - this.refreshInterval = refreshInterval; - if (refreshInterval.millis() > 0) { - refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, new EngineRefresher()); - } - } - long gcDeletesInMillis = settings.getAsTime(EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis())).millis(); if (gcDeletesInMillis != config.getGcDeletesInMillis()) { logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis()), TimeValue.timeValueMillis(gcDeletesInMillis)); @@ -1226,7 +1197,7 @@ public class IndexShard extends AbstractIndexShardComponent { } } - private void handleRefreshException(Exception e) { + void handleRefreshException(Exception e) { if (e instanceof EngineClosedException) { // ignore } else if (e instanceof RefreshFailedEngineException) { @@ -1284,43 +1255,6 @@ public class IndexShard extends AbstractIndexShardComponent { internalIndexingStats.noopUpdate(type); } - final class EngineRefresher implements Runnable { - @Override - public void run() { - // we check before if a refresh is needed, if not, we reschedule, otherwise, we fork, refresh, and then reschedule - if (!getEngine().refreshNeeded()) { - reschedule(); - return; - } - threadPool.executor(ThreadPool.Names.REFRESH).execute(new Runnable() { - @Override - public void run() { - try { - // TODO: now that we use refresh to clear the indexing buffer, we should check here if we did that "recently" and - // reschedule if so... - if (getEngine().refreshNeeded()) { - refresh("schedule"); - } - } catch (Exception e) { - handleRefreshException(e); - } - - reschedule(); - } - }); - } - - /** - * Schedules another (future) refresh, if refresh_interval is still enabled. - */ - private void reschedule() { - synchronized (mutex) { - if (state != IndexShardState.CLOSED && refreshInterval.millis() > 0) { - refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, this); - } - } - } - } private void checkIndex() throws IOException { if (store.tryIncRef()) { diff --git a/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java b/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java index 667716937da..f24c3b1e898 100644 --- a/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java +++ b/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java @@ -46,7 +46,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.MetaDataStateFormat; -import org.elasticsearch.index.engine.EngineConfig; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Segment; import org.elasticsearch.index.mapper.string.StringFieldMapperPositionIncrementGapTests; import org.elasticsearch.index.query.QueryBuilders; @@ -383,7 +383,7 @@ public class OldIndexBackwardsCompatibilityIT extends ESIntegTestCase { assertThat(source, Matchers.hasKey("foo")); assertAcked(client().admin().indices().prepareUpdateSettings(indexName).setSettings(Settings.builder() - .put("refresh_interval", EngineConfig.DEFAULT_REFRESH_INTERVAL) + .put("refresh_interval", IndexSettings.DEFAULT_REFRESH_INTERVAL) .build())); } diff --git a/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java b/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java index 3544cb1a257..7a1076268b3 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java +++ b/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; @@ -31,8 +32,13 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.InvalidAliasNameException; import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.hamcrest.Matchers.containsString; @@ -151,4 +157,108 @@ public class IndexServiceTests extends ESSingleNodeTestCase { IndexMetaData build = IndexMetaData.builder(service.getMetaData()).putAlias(AliasMetaData.builder(alias).filter(filter).build()).build(); service.updateMetaData(build); } + + public void testBaseAsyncTask() throws InterruptedException, IOException { + IndexService indexService = newIndexService(); + ThreadPool pool = indexService.getThreadPool(); + AtomicReference latch = new AtomicReference<>(new CountDownLatch(1)); + AtomicReference latch2 = new AtomicReference<>(new CountDownLatch(1)); + final AtomicInteger count = new AtomicInteger(); + IndexService.BaseAsyncTask task = new IndexService.BaseAsyncTask(indexService, pool, TimeValue.timeValueMillis(1)) { + @Override + protected void runInternal() { + count.incrementAndGet(); + assertTrue("generic threadpool is configured", Thread.currentThread().getName().contains("[generic]")); + latch.get().countDown(); + try { + latch2.get().await(); + } catch (InterruptedException e) { + fail("interrupted"); + } + if (randomBoolean()) { // task can throw exceptions!! + if (randomBoolean()) { + throw new RuntimeException("foo"); + } else { + throw new RuntimeException("bar"); + } + } + } + + @Override + protected String getThreadPool() { + return ThreadPool.Names.GENERIC; + } + }; + latch.get().await(); + latch.set(new CountDownLatch(1)); + assertEquals(1, count.get()); + latch2.get().countDown(); + latch2.set(new CountDownLatch(1)); + + latch.get().await(); + assertEquals(2, count.get()); + task.close(); + latch2.get().countDown(); + assertEquals(2, count.get()); + + + task = new IndexService.BaseAsyncTask(indexService, pool, TimeValue.timeValueMillis(1000000)) { + @Override + protected void runInternal() { + + } + }; + assertTrue(task.mustReschedule()); + if (randomBoolean()) { + for (Integer id : indexService.shardIds()) { + indexService.removeShard(id, "simon says"); + } + } else { + indexService.close("simon says", false); + } + + assertFalse("no shards left", task.mustReschedule()); + } + + public void testRefreshTaskIsUpdated() { + IndexService indexService = newIndexService(); + IndexService.RefreshTasks refreshTask = indexService.getRefreshTask(); + assertEquals(1000, refreshTask.getInterval().millis()); + assertTrue(indexService.getRefreshTask().mustReschedule()); + + // now disable + IndexMetaData metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_REFRESH_INTERVAL, -1)).build(); + indexService.updateMetaData(metaData); + assertNotSame(refreshTask, indexService.getRefreshTask()); + assertTrue(refreshTask.isClosed()); + assertFalse(indexService.getRefreshTask().mustReschedule()); + + // set it to 100ms + metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_REFRESH_INTERVAL, "100ms")).build(); + indexService.updateMetaData(metaData); + assertNotSame(refreshTask, indexService.getRefreshTask()); + assertTrue(refreshTask.isClosed()); + + refreshTask = indexService.getRefreshTask(); + assertTrue(refreshTask.mustReschedule()); + assertEquals(100, refreshTask.getInterval().millis()); + + // set it to 200ms + metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_REFRESH_INTERVAL, "200ms")).build(); + indexService.updateMetaData(metaData); + assertNotSame(refreshTask, indexService.getRefreshTask()); + assertTrue(refreshTask.isClosed()); + + refreshTask = indexService.getRefreshTask(); + assertTrue(refreshTask.mustReschedule()); + assertEquals(200, refreshTask.getInterval().millis()); + + // set it to 200ms again + metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_REFRESH_INTERVAL, "200ms")).build(); + indexService.updateMetaData(metaData); + assertSame(refreshTask, indexService.getRefreshTask()); + assertTrue(indexService.getRefreshTask().mustReschedule()); + assertFalse(refreshTask.isClosed()); + assertEquals(200, refreshTask.getInterval().millis()); + } } diff --git a/core/src/test/java/org/elasticsearch/search/child/ParentFieldLoadingIT.java b/core/src/test/java/org/elasticsearch/search/child/ParentFieldLoadingIT.java index 729b6acd8f8..3190145923e 100644 --- a/core/src/test/java/org/elasticsearch/search/child/ParentFieldLoadingIT.java +++ b/core/src/test/java/org/elasticsearch/search/child/ParentFieldLoadingIT.java @@ -27,10 +27,10 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.MergePolicyConfig; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.ESIntegTestCase; @@ -47,7 +47,7 @@ public class ParentFieldLoadingIT extends ESIntegTestCase { private final Settings indexSettings = Settings.builder() .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexShard.INDEX_REFRESH_INTERVAL, -1) + .put(IndexSettings.INDEX_REFRESH_INTERVAL, -1) // We never want merges in this test to ensure we have two segments for the last validation .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) .build(); diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index e8ff9674eed..bd94a974a0f 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -81,7 +81,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.index.query.QueryBuilders.matchQuery; -import static org.elasticsearch.index.shard.IndexShard.INDEX_REFRESH_INTERVAL; +import static org.elasticsearch.index.IndexSettings.INDEX_REFRESH_INTERVAL; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAliasesExist; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAliasesMissing;