diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java index fea5b90b97a..b7a1f4ad6a5 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -148,7 +148,7 @@ public class ClusterModule extends AbstractModule { registerIndexDynamicSetting(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, Validator.EMPTY); registerIndexDynamicSetting(IndexMetaData.SETTING_PRIORITY, Validator.NON_NEGATIVE_INTEGER); registerIndexDynamicSetting(IndicesTTLService.INDEX_TTL_DISABLE_PURGE, Validator.EMPTY); - registerIndexDynamicSetting(IndexShard.INDEX_REFRESH_INTERVAL, Validator.TIME); + registerIndexDynamicSetting(IndexSettings.INDEX_REFRESH_INTERVAL, Validator.TIME); registerIndexDynamicSetting(PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS, Validator.EMPTY); registerIndexDynamicSetting(EngineConfig.INDEX_GC_DELETES_SETTING, Validator.TIME); registerIndexDynamicSetting(IndexShard.INDEX_FLUSH_ON_CLOSE, Validator.BOOLEAN); diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index 3c348ef8830..874216377c5 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -21,12 +21,14 @@ package org.elasticsearch.index; import java.io.Closeable; import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; import java.nio.file.Path; -import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -42,6 +44,8 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.env.NodeEnvironment; @@ -104,6 +108,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC private final IndexSettings indexSettings; private final IndexingSlowLog slowLog; private final IndexingOperationListener[] listeners; + private volatile RefreshTasks refreshTask; public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv, SimilarityService similarityService, @@ -140,6 +145,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC this.listeners = new IndexingOperationListener[1+listenersIn.length]; this.listeners[0] = slowLog; System.arraycopy(listenersIn, 0, this.listeners, 1, listenersIn.length); + this.refreshTask = new RefreshTasks(this, nodeServicesProvider.getThreadPool()); } public int numberOfShards() { @@ -310,9 +316,12 @@ public final class IndexService extends AbstractIndexComponent implements IndexC eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); indexShard.updateRoutingEntry(routing, true); - if (shards.isEmpty() && this.indexSettings.getTranslogSyncInterval().millis() != 0) { + if (shards.isEmpty()) { ThreadPool threadPool = nodeServicesProvider.getThreadPool(); - new AsyncTranslogFSync(this, threadPool).schedule(); // kick this off if we are the first shard in this service. + if (this.indexSettings.getTranslogSyncInterval().millis() != 0) { + new AsyncTranslogFSync(this, threadPool); // kick this off if we are the first shard in this service. + } + rescheduleRefreshTasks(); } shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap(); success = true; @@ -404,6 +413,10 @@ public final class IndexService extends AbstractIndexComponent implements IndexC return new QueryShardContext(indexSettings, nodeServicesProvider.getClient(), indexCache.bitsetFilterCache(), indexFieldData, mapperService(), similarityService(), nodeServicesProvider.getScriptService(), nodeServicesProvider.getIndicesQueriesRegistry()); } + ThreadPool getThreadPool() { + return nodeServicesProvider.getThreadPool(); + } + private class StoreCloseListener implements Store.OnClose { private final ShardId shardId; private final boolean ownsShard; @@ -567,9 +580,21 @@ public final class IndexService extends AbstractIndexComponent implements IndexC } catch (Exception e) { logger.warn("failed to refresh slowlog settings", e); } + if (refreshTask.getInterval().equals(indexSettings.getRefreshInterval()) == false) { + rescheduleRefreshTasks(); + } } } + private void rescheduleRefreshTasks() { + try { + refreshTask.close(); + } finally { + refreshTask = new RefreshTasks(this, nodeServicesProvider.getThreadPool()); + } + + } + public interface ShardStoreDeleter { void deleteShardStore(String reason, ShardLock lock, IndexSettings indexSettings) throws IOException; @@ -605,40 +630,165 @@ public final class IndexService extends AbstractIndexComponent implements IndexC } } - - /** - * FSyncs the translog for all shards of this index in a defined interval. - */ - final static class AsyncTranslogFSync implements Runnable { - private final IndexService indexService; - private final ThreadPool threadPool; - - AsyncTranslogFSync(IndexService indexService, ThreadPool threadPool) { - this.indexService = indexService; - this.threadPool = threadPool; - } - - boolean mustRun() { - // don't re-schedule if its closed or if we dont' have a single shard here..., we are done - return (indexService.closed.get() || indexService.shards.isEmpty()) == false; - } - - void schedule() { - threadPool.schedule(indexService.getIndexSettings().getTranslogSyncInterval(), ThreadPool.Names.SAME, AsyncTranslogFSync.this); - } - - @Override - public void run() { - if (mustRun()) { - threadPool.executor(ThreadPool.Names.FLUSH).execute(() -> { - indexService.maybeFSyncTranslogs(); - if (mustRun()) { - schedule(); - } - }); + private void maybeRefreshEngine() { + if (indexSettings.getRefreshInterval().millis() > 0) { + for (IndexShard shard : this.shards.values()) { + switch (shard.state()) { + case CREATED: + case RECOVERING: + case CLOSED: + continue; + case POST_RECOVERY: + case STARTED: + case RELOCATED: + try { + shard.refresh("schedule"); + } catch (EngineClosedException | AlreadyClosedException ex) { + // fine - continue; + } + continue; + default: + throw new IllegalStateException("unknown state: " + shard.state()); + } } } } + static abstract class BaseAsyncTask implements Runnable, Closeable { + protected final IndexService indexService; + protected final ThreadPool threadPool; + private final TimeValue interval; + private ScheduledFuture scheduledFuture; + private final AtomicBoolean closed = new AtomicBoolean(false); + private volatile Exception lastThrownException; + + BaseAsyncTask(IndexService indexService, ThreadPool threadPool, TimeValue interval) { + this.indexService = indexService; + this.threadPool = threadPool; + this.interval = interval; + onTaskCompletion(); + } + + boolean mustReschedule() { + // don't re-schedule if its closed or if we dont' have a single shard here..., we are done + return (indexService.closed.get() || indexService.shards.isEmpty()) == false + && closed.get() == false && interval.millis() > 0; + } + + private synchronized void onTaskCompletion() { + if (mustReschedule()) { + indexService.logger.debug("scheduling {} every {}", toString(), interval); + this.scheduledFuture = threadPool.schedule(interval, getThreadPool(), BaseAsyncTask.this); + } else { + indexService.logger.debug("scheduled {} disabled", toString()); + this.scheduledFuture = null; + } + } + + public final void run() { + try { + runInternal(); + } catch (Exception ex) { + if (lastThrownException == null || sameException(lastThrownException, ex) == false) { + // prevent the annoying fact of logging the same stuff all the time with an interval of 1 sec will spam all your logs + indexService.logger.warn("failed to run task {} - supressing re-occuring exceptions unless the exception changes", ex, toString()); + lastThrownException = ex; + } + } finally { + onTaskCompletion(); + } + } + + private static boolean sameException(Exception left, Exception right) { + if (left.getClass() == right.getClass()) { + if ((left.getMessage() != null && left.getMessage().equals(right.getMessage())) + || left.getMessage() == right.getMessage()) { + StackTraceElement[] stackTraceLeft = left.getStackTrace(); + StackTraceElement[] stackTraceRight = right.getStackTrace(); + if (stackTraceLeft.length == stackTraceRight.length) { + for (int i = 0; i < stackTraceLeft.length; i++) { + if (stackTraceLeft[i].equals(stackTraceRight[i]) == false) { + return false; + } + } + return true; + } + } + } + return false; + } + + protected abstract void runInternal(); + + protected String getThreadPool() { + return ThreadPool.Names.SAME; + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + FutureUtils.cancel(scheduledFuture); + scheduledFuture = null; + } + } + + TimeValue getInterval() { + return interval; + } + + boolean isClosed() { + return this.closed.get(); + } + } + + /** + * FSyncs the translog for all shards of this index in a defined interval. + */ + final static class AsyncTranslogFSync extends BaseAsyncTask { + + AsyncTranslogFSync(IndexService indexService, ThreadPool threadPool) { + super(indexService, threadPool, indexService.getIndexSettings().getTranslogSyncInterval()); + } + + protected String getThreadPool() { + return ThreadPool.Names.FLUSH; + } + @Override + protected void runInternal() { + indexService.maybeFSyncTranslogs(); + } + + @Override + public String toString() { + return "translog_sync"; + } + } + + final class RefreshTasks extends BaseAsyncTask { + + RefreshTasks(IndexService indexService, ThreadPool threadPool) { + super(indexService, threadPool, indexService.getIndexSettings().getRefreshInterval()); + } + + @Override + protected void runInternal() { + indexService.maybeRefreshEngine(); + } + + protected String getThreadPool() { + return ThreadPool.Names.REFRESH; + } + + @Override + public String toString() { + return "refresh"; + } + } + + RefreshTasks getRefreshTask() { // for tests + return refreshTask; + } + + } diff --git a/core/src/main/java/org/elasticsearch/index/IndexSettings.java b/core/src/main/java/org/elasticsearch/index/IndexSettings.java index 772fb053cda..4c347f78dca 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/core/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -26,8 +26,10 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.index.mapper.internal.AllFieldMapper; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.Arrays; @@ -35,6 +37,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Locale; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Predicate; @@ -54,6 +57,8 @@ public final class IndexSettings { public static final String ALLOW_UNMAPPED = "index.query.parse.allow_unmapped_fields"; public static final String INDEX_TRANSLOG_SYNC_INTERVAL = "index.translog.sync_interval"; public static final String INDEX_TRANSLOG_DURABILITY = "index.translog.durability"; + public static final String INDEX_REFRESH_INTERVAL = "index.refresh_interval"; + public static final TimeValue DEFAULT_REFRESH_INTERVAL = new TimeValue(1, TimeUnit.SECONDS); private final String uuid; private final List> updateListeners; @@ -76,6 +81,9 @@ public final class IndexSettings { private final Predicate indexNameMatcher; private volatile Translog.Durability durability; private final TimeValue syncInterval; + private volatile TimeValue refreshInterval; + + /** * Returns the default search field for this index. @@ -156,7 +164,7 @@ public final class IndexSettings { final String value = settings.get(INDEX_TRANSLOG_DURABILITY, Translog.Durability.REQUEST.name()); this.durability = getFromSettings(settings, Translog.Durability.REQUEST); syncInterval = settings.getAsTime(INDEX_TRANSLOG_SYNC_INTERVAL, TimeValue.timeValueSeconds(5)); - + refreshInterval = settings.getAsTime(INDEX_REFRESH_INTERVAL, DEFAULT_REFRESH_INTERVAL); assert indexNameMatcher.test(indexMetaData.getIndex()); } @@ -346,10 +354,19 @@ public final class IndexSettings { logger.info("updating durability from [{}] to [{}]", this.durability, durability); this.durability = durability; } + + TimeValue refreshInterval = settings.getAsTime(IndexSettings.INDEX_REFRESH_INTERVAL, this.refreshInterval); + if (!refreshInterval.equals(this.refreshInterval)) { + logger.info("updating refresh_interval from [{}] to [{}]", this.refreshInterval, refreshInterval); + this.refreshInterval = refreshInterval; + } } public TimeValue getTranslogSyncInterval() { return syncInterval; } + public TimeValue getRefreshInterval() { + return refreshInterval; + } } diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index b06ecc367f6..5aae2b349c8 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -84,7 +84,6 @@ public final class EngineConfig { /** if set to true the engine will start even if the translog id in the commit point can not be found */ public static final String INDEX_FORCE_NEW_TRANSLOG = "index.engine.force_new_translog"; - public static final TimeValue DEFAULT_REFRESH_INTERVAL = new TimeValue(1, TimeUnit.SECONDS); public static final TimeValue DEFAULT_GC_DELETES = TimeValue.timeValueSeconds(60); private static final String DEFAULT_CODEC_NAME = "default"; diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index de9bb7e10f1..3cb7b083933 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -44,7 +44,6 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.support.LoggerMessageFormat; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Settings; @@ -173,8 +172,6 @@ public class IndexShard extends AbstractIndexShardComponent { * being indexed/deleted. */ private final AtomicLong writingBytes = new AtomicLong(); - private TimeValue refreshInterval; - private volatile ScheduledFuture refreshScheduledFuture; protected volatile ShardRouting shardRouting; protected volatile IndexShardState state; @@ -200,7 +197,6 @@ public class IndexShard extends AbstractIndexShardComponent { */ public static final String INDEX_FLUSH_ON_CLOSE = "index.flush_on_close"; public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE = "index.translog.flush_threshold_size"; - public static final String INDEX_REFRESH_INTERVAL = "index.refresh_interval"; private final ShardPath path; @@ -249,7 +245,6 @@ public class IndexShard extends AbstractIndexShardComponent { this.indexFieldDataService = indexFieldDataService; this.shardBitsetFilterCache = new ShardBitsetFilterCache(shardId, indexSettings); state = IndexShardState.CREATED; - this.refreshInterval = settings.getAsTime(INDEX_REFRESH_INTERVAL, EngineConfig.DEFAULT_REFRESH_INTERVAL); this.flushOnClose = settings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, true); this.path = path; this.mergePolicyConfig = new MergePolicyConfig(logger, settings); @@ -561,23 +556,25 @@ public class IndexShard extends AbstractIndexShardComponent { /** Writes all indexing changes to disk and opens a new searcher reflecting all changes. This can throw {@link EngineClosedException}. */ public void refresh(String source) { verifyNotClosed(); - if (canIndex()) { - long bytes = getEngine().getIndexBufferRAMBytesUsed(); - writingBytes.addAndGet(bytes); - try { - logger.debug("refresh with source [{}] indexBufferRAMBytesUsed [{}]", source, new ByteSizeValue(bytes)); + if (getEngine().refreshNeeded()) { + if (canIndex()) { + long bytes = getEngine().getIndexBufferRAMBytesUsed(); + writingBytes.addAndGet(bytes); + try { + logger.debug("refresh with source [{}] indexBufferRAMBytesUsed [{}]", source, new ByteSizeValue(bytes)); + long time = System.nanoTime(); + getEngine().refresh(source); + refreshMetric.inc(System.nanoTime() - time); + } finally { + logger.debug("remove [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId()); + writingBytes.addAndGet(-bytes); + } + } else { + logger.debug("refresh with source [{}]", source); long time = System.nanoTime(); getEngine().refresh(source); refreshMetric.inc(System.nanoTime() - time); - } finally { - logger.debug("remove [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId()); - writingBytes.addAndGet(-bytes); } - } else { - logger.debug("refresh with source [{}]", source); - long time = System.nanoTime(); - getEngine().refresh(source); - refreshMetric.inc(System.nanoTime() - time); } } @@ -954,7 +951,6 @@ public class IndexShard extends AbstractIndexShardComponent { public void finalizeRecovery() { recoveryState().setStage(RecoveryState.Stage.FINALIZE); getEngine().refresh("recovery_finalization"); - startScheduledTasksIfNeeded(); engineConfig.setEnableGcDeletes(true); } @@ -1022,15 +1018,6 @@ public class IndexShard extends AbstractIndexShardComponent { } } - private void startScheduledTasksIfNeeded() { - if (refreshInterval.millis() > 0) { - refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, new EngineRefresher()); - logger.debug("scheduling refresher every {}", refreshInterval); - } else { - logger.debug("scheduled refresher disabled"); - } - } - /** Returns number of heap bytes used by the indexing buffer for this shard, or 0 if the shard is closed */ public long getIndexBufferRAMBytesUsed() { Engine engine = getEngineOrNull(); @@ -1133,22 +1120,6 @@ public class IndexShard extends AbstractIndexShardComponent { this.flushOnClose = flushOnClose; } - TimeValue refreshInterval = settings.getAsTime(INDEX_REFRESH_INTERVAL, this.refreshInterval); - if (!refreshInterval.equals(this.refreshInterval)) { - logger.info("updating refresh_interval from [{}] to [{}]", this.refreshInterval, refreshInterval); - if (refreshScheduledFuture != null) { - // NOTE: we pass false here so we do NOT attempt Thread.interrupt if EngineRefresher.run is currently running. This is - // very important, because doing so can cause files to suddenly be closed if they were doing IO when the interrupt - // hit. See https://issues.apache.org/jira/browse/LUCENE-2239 - FutureUtils.cancel(refreshScheduledFuture); - refreshScheduledFuture = null; - } - this.refreshInterval = refreshInterval; - if (refreshInterval.millis() > 0) { - refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, new EngineRefresher()); - } - } - long gcDeletesInMillis = settings.getAsTime(EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis())).millis(); if (gcDeletesInMillis != config.getGcDeletesInMillis()) { logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis()), TimeValue.timeValueMillis(gcDeletesInMillis)); @@ -1226,7 +1197,7 @@ public class IndexShard extends AbstractIndexShardComponent { } } - private void handleRefreshException(Exception e) { + void handleRefreshException(Exception e) { if (e instanceof EngineClosedException) { // ignore } else if (e instanceof RefreshFailedEngineException) { @@ -1284,43 +1255,6 @@ public class IndexShard extends AbstractIndexShardComponent { internalIndexingStats.noopUpdate(type); } - final class EngineRefresher implements Runnable { - @Override - public void run() { - // we check before if a refresh is needed, if not, we reschedule, otherwise, we fork, refresh, and then reschedule - if (!getEngine().refreshNeeded()) { - reschedule(); - return; - } - threadPool.executor(ThreadPool.Names.REFRESH).execute(new Runnable() { - @Override - public void run() { - try { - // TODO: now that we use refresh to clear the indexing buffer, we should check here if we did that "recently" and - // reschedule if so... - if (getEngine().refreshNeeded()) { - refresh("schedule"); - } - } catch (Exception e) { - handleRefreshException(e); - } - - reschedule(); - } - }); - } - - /** - * Schedules another (future) refresh, if refresh_interval is still enabled. - */ - private void reschedule() { - synchronized (mutex) { - if (state != IndexShardState.CLOSED && refreshInterval.millis() > 0) { - refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, this); - } - } - } - } private void checkIndex() throws IOException { if (store.tryIncRef()) { diff --git a/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java b/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java index 667716937da..f24c3b1e898 100644 --- a/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java +++ b/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java @@ -46,7 +46,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.MetaDataStateFormat; -import org.elasticsearch.index.engine.EngineConfig; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Segment; import org.elasticsearch.index.mapper.string.StringFieldMapperPositionIncrementGapTests; import org.elasticsearch.index.query.QueryBuilders; @@ -383,7 +383,7 @@ public class OldIndexBackwardsCompatibilityIT extends ESIntegTestCase { assertThat(source, Matchers.hasKey("foo")); assertAcked(client().admin().indices().prepareUpdateSettings(indexName).setSettings(Settings.builder() - .put("refresh_interval", EngineConfig.DEFAULT_REFRESH_INTERVAL) + .put("refresh_interval", IndexSettings.DEFAULT_REFRESH_INTERVAL) .build())); } diff --git a/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java b/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java index 3544cb1a257..7a1076268b3 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java +++ b/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; @@ -31,8 +32,13 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.InvalidAliasNameException; import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.hamcrest.Matchers.containsString; @@ -151,4 +157,108 @@ public class IndexServiceTests extends ESSingleNodeTestCase { IndexMetaData build = IndexMetaData.builder(service.getMetaData()).putAlias(AliasMetaData.builder(alias).filter(filter).build()).build(); service.updateMetaData(build); } + + public void testBaseAsyncTask() throws InterruptedException, IOException { + IndexService indexService = newIndexService(); + ThreadPool pool = indexService.getThreadPool(); + AtomicReference latch = new AtomicReference<>(new CountDownLatch(1)); + AtomicReference latch2 = new AtomicReference<>(new CountDownLatch(1)); + final AtomicInteger count = new AtomicInteger(); + IndexService.BaseAsyncTask task = new IndexService.BaseAsyncTask(indexService, pool, TimeValue.timeValueMillis(1)) { + @Override + protected void runInternal() { + count.incrementAndGet(); + assertTrue("generic threadpool is configured", Thread.currentThread().getName().contains("[generic]")); + latch.get().countDown(); + try { + latch2.get().await(); + } catch (InterruptedException e) { + fail("interrupted"); + } + if (randomBoolean()) { // task can throw exceptions!! + if (randomBoolean()) { + throw new RuntimeException("foo"); + } else { + throw new RuntimeException("bar"); + } + } + } + + @Override + protected String getThreadPool() { + return ThreadPool.Names.GENERIC; + } + }; + latch.get().await(); + latch.set(new CountDownLatch(1)); + assertEquals(1, count.get()); + latch2.get().countDown(); + latch2.set(new CountDownLatch(1)); + + latch.get().await(); + assertEquals(2, count.get()); + task.close(); + latch2.get().countDown(); + assertEquals(2, count.get()); + + + task = new IndexService.BaseAsyncTask(indexService, pool, TimeValue.timeValueMillis(1000000)) { + @Override + protected void runInternal() { + + } + }; + assertTrue(task.mustReschedule()); + if (randomBoolean()) { + for (Integer id : indexService.shardIds()) { + indexService.removeShard(id, "simon says"); + } + } else { + indexService.close("simon says", false); + } + + assertFalse("no shards left", task.mustReschedule()); + } + + public void testRefreshTaskIsUpdated() { + IndexService indexService = newIndexService(); + IndexService.RefreshTasks refreshTask = indexService.getRefreshTask(); + assertEquals(1000, refreshTask.getInterval().millis()); + assertTrue(indexService.getRefreshTask().mustReschedule()); + + // now disable + IndexMetaData metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_REFRESH_INTERVAL, -1)).build(); + indexService.updateMetaData(metaData); + assertNotSame(refreshTask, indexService.getRefreshTask()); + assertTrue(refreshTask.isClosed()); + assertFalse(indexService.getRefreshTask().mustReschedule()); + + // set it to 100ms + metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_REFRESH_INTERVAL, "100ms")).build(); + indexService.updateMetaData(metaData); + assertNotSame(refreshTask, indexService.getRefreshTask()); + assertTrue(refreshTask.isClosed()); + + refreshTask = indexService.getRefreshTask(); + assertTrue(refreshTask.mustReschedule()); + assertEquals(100, refreshTask.getInterval().millis()); + + // set it to 200ms + metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_REFRESH_INTERVAL, "200ms")).build(); + indexService.updateMetaData(metaData); + assertNotSame(refreshTask, indexService.getRefreshTask()); + assertTrue(refreshTask.isClosed()); + + refreshTask = indexService.getRefreshTask(); + assertTrue(refreshTask.mustReschedule()); + assertEquals(200, refreshTask.getInterval().millis()); + + // set it to 200ms again + metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_REFRESH_INTERVAL, "200ms")).build(); + indexService.updateMetaData(metaData); + assertSame(refreshTask, indexService.getRefreshTask()); + assertTrue(indexService.getRefreshTask().mustReschedule()); + assertFalse(refreshTask.isClosed()); + assertEquals(200, refreshTask.getInterval().millis()); + } } diff --git a/core/src/test/java/org/elasticsearch/search/child/ParentFieldLoadingIT.java b/core/src/test/java/org/elasticsearch/search/child/ParentFieldLoadingIT.java index 729b6acd8f8..3190145923e 100644 --- a/core/src/test/java/org/elasticsearch/search/child/ParentFieldLoadingIT.java +++ b/core/src/test/java/org/elasticsearch/search/child/ParentFieldLoadingIT.java @@ -27,10 +27,10 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.MergePolicyConfig; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.ESIntegTestCase; @@ -47,7 +47,7 @@ public class ParentFieldLoadingIT extends ESIntegTestCase { private final Settings indexSettings = Settings.builder() .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexShard.INDEX_REFRESH_INTERVAL, -1) + .put(IndexSettings.INDEX_REFRESH_INTERVAL, -1) // We never want merges in this test to ensure we have two segments for the last validation .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) .build(); diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index e8ff9674eed..bd94a974a0f 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -81,7 +81,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.index.query.QueryBuilders.matchQuery; -import static org.elasticsearch.index.shard.IndexShard.INDEX_REFRESH_INTERVAL; +import static org.elasticsearch.index.IndexSettings.INDEX_REFRESH_INTERVAL; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAliasesExist; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAliasesMissing;