From 59211927b6d9a41f1c89fa8f27aed1270da7c17d Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 12 Jan 2016 17:40:54 +0100 Subject: [PATCH 1/4] Move RefreshTask into IndexService and use since task per index `refresh_interval` is a per index setting but we interpret and maintain it per shard. This change moves the refresh task outside of IndexShard to the IndexService where it logically belongs and reuses scheduling infrastructure used for translog fsync (async commit). This change will use the same task for all shards of an index while previously we used on thread/task per shard to refresh. This will also prevent too many concurrent refreshes if there are many indices and shards allocated on a single node. --- .../elasticsearch/cluster/ClusterModule.java | 2 +- .../org/elasticsearch/index/IndexService.java | 218 +++++++++++++++--- .../elasticsearch/index/IndexSettings.java | 19 +- .../index/engine/EngineConfig.java | 1 - .../elasticsearch/index/shard/IndexShard.java | 98 ++------ .../OldIndexBackwardsCompatibilityIT.java | 4 +- .../index/IndexServiceTests.java | 110 +++++++++ .../search/child/ParentFieldLoadingIT.java | 4 +- .../SharedClusterSnapshotRestoreIT.java | 2 +- 9 files changed, 334 insertions(+), 124 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java index fea5b90b97a..b7a1f4ad6a5 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -148,7 +148,7 @@ public class ClusterModule extends AbstractModule { registerIndexDynamicSetting(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, Validator.EMPTY); registerIndexDynamicSetting(IndexMetaData.SETTING_PRIORITY, Validator.NON_NEGATIVE_INTEGER); registerIndexDynamicSetting(IndicesTTLService.INDEX_TTL_DISABLE_PURGE, Validator.EMPTY); - registerIndexDynamicSetting(IndexShard.INDEX_REFRESH_INTERVAL, Validator.TIME); + registerIndexDynamicSetting(IndexSettings.INDEX_REFRESH_INTERVAL, Validator.TIME); registerIndexDynamicSetting(PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS, Validator.EMPTY); registerIndexDynamicSetting(EngineConfig.INDEX_GC_DELETES_SETTING, Validator.TIME); registerIndexDynamicSetting(IndexShard.INDEX_FLUSH_ON_CLOSE, Validator.BOOLEAN); diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index 3c348ef8830..874216377c5 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -21,12 +21,14 @@ package org.elasticsearch.index; import java.io.Closeable; import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; import java.nio.file.Path; -import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -42,6 +44,8 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.env.NodeEnvironment; @@ -104,6 +108,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC private final IndexSettings indexSettings; private final IndexingSlowLog slowLog; private final IndexingOperationListener[] listeners; + private volatile RefreshTasks refreshTask; public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv, SimilarityService similarityService, @@ -140,6 +145,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC this.listeners = new IndexingOperationListener[1+listenersIn.length]; this.listeners[0] = slowLog; System.arraycopy(listenersIn, 0, this.listeners, 1, listenersIn.length); + this.refreshTask = new RefreshTasks(this, nodeServicesProvider.getThreadPool()); } public int numberOfShards() { @@ -310,9 +316,12 @@ public final class IndexService extends AbstractIndexComponent implements IndexC eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); indexShard.updateRoutingEntry(routing, true); - if (shards.isEmpty() && this.indexSettings.getTranslogSyncInterval().millis() != 0) { + if (shards.isEmpty()) { ThreadPool threadPool = nodeServicesProvider.getThreadPool(); - new AsyncTranslogFSync(this, threadPool).schedule(); // kick this off if we are the first shard in this service. + if (this.indexSettings.getTranslogSyncInterval().millis() != 0) { + new AsyncTranslogFSync(this, threadPool); // kick this off if we are the first shard in this service. + } + rescheduleRefreshTasks(); } shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap(); success = true; @@ -404,6 +413,10 @@ public final class IndexService extends AbstractIndexComponent implements IndexC return new QueryShardContext(indexSettings, nodeServicesProvider.getClient(), indexCache.bitsetFilterCache(), indexFieldData, mapperService(), similarityService(), nodeServicesProvider.getScriptService(), nodeServicesProvider.getIndicesQueriesRegistry()); } + ThreadPool getThreadPool() { + return nodeServicesProvider.getThreadPool(); + } + private class StoreCloseListener implements Store.OnClose { private final ShardId shardId; private final boolean ownsShard; @@ -567,9 +580,21 @@ public final class IndexService extends AbstractIndexComponent implements IndexC } catch (Exception e) { logger.warn("failed to refresh slowlog settings", e); } + if (refreshTask.getInterval().equals(indexSettings.getRefreshInterval()) == false) { + rescheduleRefreshTasks(); + } } } + private void rescheduleRefreshTasks() { + try { + refreshTask.close(); + } finally { + refreshTask = new RefreshTasks(this, nodeServicesProvider.getThreadPool()); + } + + } + public interface ShardStoreDeleter { void deleteShardStore(String reason, ShardLock lock, IndexSettings indexSettings) throws IOException; @@ -605,40 +630,165 @@ public final class IndexService extends AbstractIndexComponent implements IndexC } } - - /** - * FSyncs the translog for all shards of this index in a defined interval. - */ - final static class AsyncTranslogFSync implements Runnable { - private final IndexService indexService; - private final ThreadPool threadPool; - - AsyncTranslogFSync(IndexService indexService, ThreadPool threadPool) { - this.indexService = indexService; - this.threadPool = threadPool; - } - - boolean mustRun() { - // don't re-schedule if its closed or if we dont' have a single shard here..., we are done - return (indexService.closed.get() || indexService.shards.isEmpty()) == false; - } - - void schedule() { - threadPool.schedule(indexService.getIndexSettings().getTranslogSyncInterval(), ThreadPool.Names.SAME, AsyncTranslogFSync.this); - } - - @Override - public void run() { - if (mustRun()) { - threadPool.executor(ThreadPool.Names.FLUSH).execute(() -> { - indexService.maybeFSyncTranslogs(); - if (mustRun()) { - schedule(); - } - }); + private void maybeRefreshEngine() { + if (indexSettings.getRefreshInterval().millis() > 0) { + for (IndexShard shard : this.shards.values()) { + switch (shard.state()) { + case CREATED: + case RECOVERING: + case CLOSED: + continue; + case POST_RECOVERY: + case STARTED: + case RELOCATED: + try { + shard.refresh("schedule"); + } catch (EngineClosedException | AlreadyClosedException ex) { + // fine - continue; + } + continue; + default: + throw new IllegalStateException("unknown state: " + shard.state()); + } } } } + static abstract class BaseAsyncTask implements Runnable, Closeable { + protected final IndexService indexService; + protected final ThreadPool threadPool; + private final TimeValue interval; + private ScheduledFuture scheduledFuture; + private final AtomicBoolean closed = new AtomicBoolean(false); + private volatile Exception lastThrownException; + + BaseAsyncTask(IndexService indexService, ThreadPool threadPool, TimeValue interval) { + this.indexService = indexService; + this.threadPool = threadPool; + this.interval = interval; + onTaskCompletion(); + } + + boolean mustReschedule() { + // don't re-schedule if its closed or if we dont' have a single shard here..., we are done + return (indexService.closed.get() || indexService.shards.isEmpty()) == false + && closed.get() == false && interval.millis() > 0; + } + + private synchronized void onTaskCompletion() { + if (mustReschedule()) { + indexService.logger.debug("scheduling {} every {}", toString(), interval); + this.scheduledFuture = threadPool.schedule(interval, getThreadPool(), BaseAsyncTask.this); + } else { + indexService.logger.debug("scheduled {} disabled", toString()); + this.scheduledFuture = null; + } + } + + public final void run() { + try { + runInternal(); + } catch (Exception ex) { + if (lastThrownException == null || sameException(lastThrownException, ex) == false) { + // prevent the annoying fact of logging the same stuff all the time with an interval of 1 sec will spam all your logs + indexService.logger.warn("failed to run task {} - supressing re-occuring exceptions unless the exception changes", ex, toString()); + lastThrownException = ex; + } + } finally { + onTaskCompletion(); + } + } + + private static boolean sameException(Exception left, Exception right) { + if (left.getClass() == right.getClass()) { + if ((left.getMessage() != null && left.getMessage().equals(right.getMessage())) + || left.getMessage() == right.getMessage()) { + StackTraceElement[] stackTraceLeft = left.getStackTrace(); + StackTraceElement[] stackTraceRight = right.getStackTrace(); + if (stackTraceLeft.length == stackTraceRight.length) { + for (int i = 0; i < stackTraceLeft.length; i++) { + if (stackTraceLeft[i].equals(stackTraceRight[i]) == false) { + return false; + } + } + return true; + } + } + } + return false; + } + + protected abstract void runInternal(); + + protected String getThreadPool() { + return ThreadPool.Names.SAME; + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + FutureUtils.cancel(scheduledFuture); + scheduledFuture = null; + } + } + + TimeValue getInterval() { + return interval; + } + + boolean isClosed() { + return this.closed.get(); + } + } + + /** + * FSyncs the translog for all shards of this index in a defined interval. + */ + final static class AsyncTranslogFSync extends BaseAsyncTask { + + AsyncTranslogFSync(IndexService indexService, ThreadPool threadPool) { + super(indexService, threadPool, indexService.getIndexSettings().getTranslogSyncInterval()); + } + + protected String getThreadPool() { + return ThreadPool.Names.FLUSH; + } + @Override + protected void runInternal() { + indexService.maybeFSyncTranslogs(); + } + + @Override + public String toString() { + return "translog_sync"; + } + } + + final class RefreshTasks extends BaseAsyncTask { + + RefreshTasks(IndexService indexService, ThreadPool threadPool) { + super(indexService, threadPool, indexService.getIndexSettings().getRefreshInterval()); + } + + @Override + protected void runInternal() { + indexService.maybeRefreshEngine(); + } + + protected String getThreadPool() { + return ThreadPool.Names.REFRESH; + } + + @Override + public String toString() { + return "refresh"; + } + } + + RefreshTasks getRefreshTask() { // for tests + return refreshTask; + } + + } diff --git a/core/src/main/java/org/elasticsearch/index/IndexSettings.java b/core/src/main/java/org/elasticsearch/index/IndexSettings.java index 772fb053cda..4c347f78dca 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/core/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -26,8 +26,10 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.index.mapper.internal.AllFieldMapper; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.Arrays; @@ -35,6 +37,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Locale; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Predicate; @@ -54,6 +57,8 @@ public final class IndexSettings { public static final String ALLOW_UNMAPPED = "index.query.parse.allow_unmapped_fields"; public static final String INDEX_TRANSLOG_SYNC_INTERVAL = "index.translog.sync_interval"; public static final String INDEX_TRANSLOG_DURABILITY = "index.translog.durability"; + public static final String INDEX_REFRESH_INTERVAL = "index.refresh_interval"; + public static final TimeValue DEFAULT_REFRESH_INTERVAL = new TimeValue(1, TimeUnit.SECONDS); private final String uuid; private final List> updateListeners; @@ -76,6 +81,9 @@ public final class IndexSettings { private final Predicate indexNameMatcher; private volatile Translog.Durability durability; private final TimeValue syncInterval; + private volatile TimeValue refreshInterval; + + /** * Returns the default search field for this index. @@ -156,7 +164,7 @@ public final class IndexSettings { final String value = settings.get(INDEX_TRANSLOG_DURABILITY, Translog.Durability.REQUEST.name()); this.durability = getFromSettings(settings, Translog.Durability.REQUEST); syncInterval = settings.getAsTime(INDEX_TRANSLOG_SYNC_INTERVAL, TimeValue.timeValueSeconds(5)); - + refreshInterval = settings.getAsTime(INDEX_REFRESH_INTERVAL, DEFAULT_REFRESH_INTERVAL); assert indexNameMatcher.test(indexMetaData.getIndex()); } @@ -346,10 +354,19 @@ public final class IndexSettings { logger.info("updating durability from [{}] to [{}]", this.durability, durability); this.durability = durability; } + + TimeValue refreshInterval = settings.getAsTime(IndexSettings.INDEX_REFRESH_INTERVAL, this.refreshInterval); + if (!refreshInterval.equals(this.refreshInterval)) { + logger.info("updating refresh_interval from [{}] to [{}]", this.refreshInterval, refreshInterval); + this.refreshInterval = refreshInterval; + } } public TimeValue getTranslogSyncInterval() { return syncInterval; } + public TimeValue getRefreshInterval() { + return refreshInterval; + } } diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index b06ecc367f6..5aae2b349c8 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -84,7 +84,6 @@ public final class EngineConfig { /** if set to true the engine will start even if the translog id in the commit point can not be found */ public static final String INDEX_FORCE_NEW_TRANSLOG = "index.engine.force_new_translog"; - public static final TimeValue DEFAULT_REFRESH_INTERVAL = new TimeValue(1, TimeUnit.SECONDS); public static final TimeValue DEFAULT_GC_DELETES = TimeValue.timeValueSeconds(60); private static final String DEFAULT_CODEC_NAME = "default"; diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index de9bb7e10f1..3cb7b083933 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -44,7 +44,6 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.support.LoggerMessageFormat; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Settings; @@ -173,8 +172,6 @@ public class IndexShard extends AbstractIndexShardComponent { * being indexed/deleted. */ private final AtomicLong writingBytes = new AtomicLong(); - private TimeValue refreshInterval; - private volatile ScheduledFuture refreshScheduledFuture; protected volatile ShardRouting shardRouting; protected volatile IndexShardState state; @@ -200,7 +197,6 @@ public class IndexShard extends AbstractIndexShardComponent { */ public static final String INDEX_FLUSH_ON_CLOSE = "index.flush_on_close"; public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE = "index.translog.flush_threshold_size"; - public static final String INDEX_REFRESH_INTERVAL = "index.refresh_interval"; private final ShardPath path; @@ -249,7 +245,6 @@ public class IndexShard extends AbstractIndexShardComponent { this.indexFieldDataService = indexFieldDataService; this.shardBitsetFilterCache = new ShardBitsetFilterCache(shardId, indexSettings); state = IndexShardState.CREATED; - this.refreshInterval = settings.getAsTime(INDEX_REFRESH_INTERVAL, EngineConfig.DEFAULT_REFRESH_INTERVAL); this.flushOnClose = settings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, true); this.path = path; this.mergePolicyConfig = new MergePolicyConfig(logger, settings); @@ -561,23 +556,25 @@ public class IndexShard extends AbstractIndexShardComponent { /** Writes all indexing changes to disk and opens a new searcher reflecting all changes. This can throw {@link EngineClosedException}. */ public void refresh(String source) { verifyNotClosed(); - if (canIndex()) { - long bytes = getEngine().getIndexBufferRAMBytesUsed(); - writingBytes.addAndGet(bytes); - try { - logger.debug("refresh with source [{}] indexBufferRAMBytesUsed [{}]", source, new ByteSizeValue(bytes)); + if (getEngine().refreshNeeded()) { + if (canIndex()) { + long bytes = getEngine().getIndexBufferRAMBytesUsed(); + writingBytes.addAndGet(bytes); + try { + logger.debug("refresh with source [{}] indexBufferRAMBytesUsed [{}]", source, new ByteSizeValue(bytes)); + long time = System.nanoTime(); + getEngine().refresh(source); + refreshMetric.inc(System.nanoTime() - time); + } finally { + logger.debug("remove [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId()); + writingBytes.addAndGet(-bytes); + } + } else { + logger.debug("refresh with source [{}]", source); long time = System.nanoTime(); getEngine().refresh(source); refreshMetric.inc(System.nanoTime() - time); - } finally { - logger.debug("remove [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId()); - writingBytes.addAndGet(-bytes); } - } else { - logger.debug("refresh with source [{}]", source); - long time = System.nanoTime(); - getEngine().refresh(source); - refreshMetric.inc(System.nanoTime() - time); } } @@ -954,7 +951,6 @@ public class IndexShard extends AbstractIndexShardComponent { public void finalizeRecovery() { recoveryState().setStage(RecoveryState.Stage.FINALIZE); getEngine().refresh("recovery_finalization"); - startScheduledTasksIfNeeded(); engineConfig.setEnableGcDeletes(true); } @@ -1022,15 +1018,6 @@ public class IndexShard extends AbstractIndexShardComponent { } } - private void startScheduledTasksIfNeeded() { - if (refreshInterval.millis() > 0) { - refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, new EngineRefresher()); - logger.debug("scheduling refresher every {}", refreshInterval); - } else { - logger.debug("scheduled refresher disabled"); - } - } - /** Returns number of heap bytes used by the indexing buffer for this shard, or 0 if the shard is closed */ public long getIndexBufferRAMBytesUsed() { Engine engine = getEngineOrNull(); @@ -1133,22 +1120,6 @@ public class IndexShard extends AbstractIndexShardComponent { this.flushOnClose = flushOnClose; } - TimeValue refreshInterval = settings.getAsTime(INDEX_REFRESH_INTERVAL, this.refreshInterval); - if (!refreshInterval.equals(this.refreshInterval)) { - logger.info("updating refresh_interval from [{}] to [{}]", this.refreshInterval, refreshInterval); - if (refreshScheduledFuture != null) { - // NOTE: we pass false here so we do NOT attempt Thread.interrupt if EngineRefresher.run is currently running. This is - // very important, because doing so can cause files to suddenly be closed if they were doing IO when the interrupt - // hit. See https://issues.apache.org/jira/browse/LUCENE-2239 - FutureUtils.cancel(refreshScheduledFuture); - refreshScheduledFuture = null; - } - this.refreshInterval = refreshInterval; - if (refreshInterval.millis() > 0) { - refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, new EngineRefresher()); - } - } - long gcDeletesInMillis = settings.getAsTime(EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis())).millis(); if (gcDeletesInMillis != config.getGcDeletesInMillis()) { logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis()), TimeValue.timeValueMillis(gcDeletesInMillis)); @@ -1226,7 +1197,7 @@ public class IndexShard extends AbstractIndexShardComponent { } } - private void handleRefreshException(Exception e) { + void handleRefreshException(Exception e) { if (e instanceof EngineClosedException) { // ignore } else if (e instanceof RefreshFailedEngineException) { @@ -1284,43 +1255,6 @@ public class IndexShard extends AbstractIndexShardComponent { internalIndexingStats.noopUpdate(type); } - final class EngineRefresher implements Runnable { - @Override - public void run() { - // we check before if a refresh is needed, if not, we reschedule, otherwise, we fork, refresh, and then reschedule - if (!getEngine().refreshNeeded()) { - reschedule(); - return; - } - threadPool.executor(ThreadPool.Names.REFRESH).execute(new Runnable() { - @Override - public void run() { - try { - // TODO: now that we use refresh to clear the indexing buffer, we should check here if we did that "recently" and - // reschedule if so... - if (getEngine().refreshNeeded()) { - refresh("schedule"); - } - } catch (Exception e) { - handleRefreshException(e); - } - - reschedule(); - } - }); - } - - /** - * Schedules another (future) refresh, if refresh_interval is still enabled. - */ - private void reschedule() { - synchronized (mutex) { - if (state != IndexShardState.CLOSED && refreshInterval.millis() > 0) { - refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, this); - } - } - } - } private void checkIndex() throws IOException { if (store.tryIncRef()) { diff --git a/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java b/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java index 667716937da..f24c3b1e898 100644 --- a/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java +++ b/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java @@ -46,7 +46,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.MetaDataStateFormat; -import org.elasticsearch.index.engine.EngineConfig; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Segment; import org.elasticsearch.index.mapper.string.StringFieldMapperPositionIncrementGapTests; import org.elasticsearch.index.query.QueryBuilders; @@ -383,7 +383,7 @@ public class OldIndexBackwardsCompatibilityIT extends ESIntegTestCase { assertThat(source, Matchers.hasKey("foo")); assertAcked(client().admin().indices().prepareUpdateSettings(indexName).setSettings(Settings.builder() - .put("refresh_interval", EngineConfig.DEFAULT_REFRESH_INTERVAL) + .put("refresh_interval", IndexSettings.DEFAULT_REFRESH_INTERVAL) .build())); } diff --git a/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java b/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java index 3544cb1a257..7a1076268b3 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java +++ b/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; @@ -31,8 +32,13 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.InvalidAliasNameException; import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.hamcrest.Matchers.containsString; @@ -151,4 +157,108 @@ public class IndexServiceTests extends ESSingleNodeTestCase { IndexMetaData build = IndexMetaData.builder(service.getMetaData()).putAlias(AliasMetaData.builder(alias).filter(filter).build()).build(); service.updateMetaData(build); } + + public void testBaseAsyncTask() throws InterruptedException, IOException { + IndexService indexService = newIndexService(); + ThreadPool pool = indexService.getThreadPool(); + AtomicReference latch = new AtomicReference<>(new CountDownLatch(1)); + AtomicReference latch2 = new AtomicReference<>(new CountDownLatch(1)); + final AtomicInteger count = new AtomicInteger(); + IndexService.BaseAsyncTask task = new IndexService.BaseAsyncTask(indexService, pool, TimeValue.timeValueMillis(1)) { + @Override + protected void runInternal() { + count.incrementAndGet(); + assertTrue("generic threadpool is configured", Thread.currentThread().getName().contains("[generic]")); + latch.get().countDown(); + try { + latch2.get().await(); + } catch (InterruptedException e) { + fail("interrupted"); + } + if (randomBoolean()) { // task can throw exceptions!! + if (randomBoolean()) { + throw new RuntimeException("foo"); + } else { + throw new RuntimeException("bar"); + } + } + } + + @Override + protected String getThreadPool() { + return ThreadPool.Names.GENERIC; + } + }; + latch.get().await(); + latch.set(new CountDownLatch(1)); + assertEquals(1, count.get()); + latch2.get().countDown(); + latch2.set(new CountDownLatch(1)); + + latch.get().await(); + assertEquals(2, count.get()); + task.close(); + latch2.get().countDown(); + assertEquals(2, count.get()); + + + task = new IndexService.BaseAsyncTask(indexService, pool, TimeValue.timeValueMillis(1000000)) { + @Override + protected void runInternal() { + + } + }; + assertTrue(task.mustReschedule()); + if (randomBoolean()) { + for (Integer id : indexService.shardIds()) { + indexService.removeShard(id, "simon says"); + } + } else { + indexService.close("simon says", false); + } + + assertFalse("no shards left", task.mustReschedule()); + } + + public void testRefreshTaskIsUpdated() { + IndexService indexService = newIndexService(); + IndexService.RefreshTasks refreshTask = indexService.getRefreshTask(); + assertEquals(1000, refreshTask.getInterval().millis()); + assertTrue(indexService.getRefreshTask().mustReschedule()); + + // now disable + IndexMetaData metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_REFRESH_INTERVAL, -1)).build(); + indexService.updateMetaData(metaData); + assertNotSame(refreshTask, indexService.getRefreshTask()); + assertTrue(refreshTask.isClosed()); + assertFalse(indexService.getRefreshTask().mustReschedule()); + + // set it to 100ms + metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_REFRESH_INTERVAL, "100ms")).build(); + indexService.updateMetaData(metaData); + assertNotSame(refreshTask, indexService.getRefreshTask()); + assertTrue(refreshTask.isClosed()); + + refreshTask = indexService.getRefreshTask(); + assertTrue(refreshTask.mustReschedule()); + assertEquals(100, refreshTask.getInterval().millis()); + + // set it to 200ms + metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_REFRESH_INTERVAL, "200ms")).build(); + indexService.updateMetaData(metaData); + assertNotSame(refreshTask, indexService.getRefreshTask()); + assertTrue(refreshTask.isClosed()); + + refreshTask = indexService.getRefreshTask(); + assertTrue(refreshTask.mustReschedule()); + assertEquals(200, refreshTask.getInterval().millis()); + + // set it to 200ms again + metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_REFRESH_INTERVAL, "200ms")).build(); + indexService.updateMetaData(metaData); + assertSame(refreshTask, indexService.getRefreshTask()); + assertTrue(indexService.getRefreshTask().mustReschedule()); + assertFalse(refreshTask.isClosed()); + assertEquals(200, refreshTask.getInterval().millis()); + } } diff --git a/core/src/test/java/org/elasticsearch/search/child/ParentFieldLoadingIT.java b/core/src/test/java/org/elasticsearch/search/child/ParentFieldLoadingIT.java index 729b6acd8f8..3190145923e 100644 --- a/core/src/test/java/org/elasticsearch/search/child/ParentFieldLoadingIT.java +++ b/core/src/test/java/org/elasticsearch/search/child/ParentFieldLoadingIT.java @@ -27,10 +27,10 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.MergePolicyConfig; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.ESIntegTestCase; @@ -47,7 +47,7 @@ public class ParentFieldLoadingIT extends ESIntegTestCase { private final Settings indexSettings = Settings.builder() .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexShard.INDEX_REFRESH_INTERVAL, -1) + .put(IndexSettings.INDEX_REFRESH_INTERVAL, -1) // We never want merges in this test to ensure we have two segments for the last validation .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) .build(); diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index e8ff9674eed..bd94a974a0f 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -81,7 +81,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.index.query.QueryBuilders.matchQuery; -import static org.elasticsearch.index.shard.IndexShard.INDEX_REFRESH_INTERVAL; +import static org.elasticsearch.index.IndexSettings.INDEX_REFRESH_INTERVAL; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAliasesExist; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAliasesMissing; From 5204440471987d034ef613fda56cd3d55362ea3a Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 12 Jan 2016 21:09:26 +0100 Subject: [PATCH 2/4] add more tests and apply feedback from @mikemccand --- .../org/elasticsearch/index/IndexService.java | 54 +++++------ .../elasticsearch/index/shard/IndexShard.java | 6 +- .../index/IndexServiceTests.java | 91 ++++++++++++++++--- 3 files changed, 108 insertions(+), 43 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index 874216377c5..d3c6a5339ba 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -21,8 +21,6 @@ package org.elasticsearch.index; import java.io.Closeable; import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; import java.nio.file.Path; import java.util.HashMap; import java.util.Iterator; @@ -108,7 +106,8 @@ public final class IndexService extends AbstractIndexComponent implements IndexC private final IndexSettings indexSettings; private final IndexingSlowLog slowLog; private final IndexingOperationListener[] listeners; - private volatile RefreshTasks refreshTask; + private volatile AsyncRefreshTask refreshTask; + private final AsyncTranslogFSync fsyncTask; public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv, SimilarityService similarityService, @@ -145,7 +144,13 @@ public final class IndexService extends AbstractIndexComponent implements IndexC this.listeners = new IndexingOperationListener[1+listenersIn.length]; this.listeners[0] = slowLog; System.arraycopy(listenersIn, 0, this.listeners, 1, listenersIn.length); - this.refreshTask = new RefreshTasks(this, nodeServicesProvider.getThreadPool()); + // kick off async ops for the first shard in this index + if (this.indexSettings.getTranslogSyncInterval().millis() != 0) { + this.fsyncTask = new AsyncTranslogFSync(this); + } else { + this.fsyncTask = null; + } + this.refreshTask = new AsyncRefreshTask(this); } public int numberOfShards() { @@ -221,7 +226,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC } } } finally { - IOUtils.close(bitsetFilterCache, indexCache, mapperService, indexFieldData, analysisService); + IOUtils.close(bitsetFilterCache, indexCache, mapperService, indexFieldData, analysisService, refreshTask, fsyncTask); } } } @@ -312,17 +317,9 @@ public final class IndexService extends AbstractIndexComponent implements IndexC } else { indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, listeners); } - eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); indexShard.updateRoutingEntry(routing, true); - if (shards.isEmpty()) { - ThreadPool threadPool = nodeServicesProvider.getThreadPool(); - if (this.indexSettings.getTranslogSyncInterval().millis() != 0) { - new AsyncTranslogFSync(this, threadPool); // kick this off if we are the first shard in this service. - } - rescheduleRefreshTasks(); - } shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap(); success = true; return indexShard; @@ -590,7 +587,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC try { refreshTask.close(); } finally { - refreshTask = new RefreshTasks(this, nodeServicesProvider.getThreadPool()); + refreshTask = new AsyncRefreshTask(this); } } @@ -662,16 +659,16 @@ public final class IndexService extends AbstractIndexComponent implements IndexC private final AtomicBoolean closed = new AtomicBoolean(false); private volatile Exception lastThrownException; - BaseAsyncTask(IndexService indexService, ThreadPool threadPool, TimeValue interval) { + BaseAsyncTask(IndexService indexService, TimeValue interval) { this.indexService = indexService; - this.threadPool = threadPool; + this.threadPool = indexService.getThreadPool(); this.interval = interval; onTaskCompletion(); } boolean mustReschedule() { // don't re-schedule if its closed or if we dont' have a single shard here..., we are done - return (indexService.closed.get() || indexService.shards.isEmpty()) == false + return indexService.closed.get() == false && closed.get() == false && interval.millis() > 0; } @@ -685,13 +682,17 @@ public final class IndexService extends AbstractIndexComponent implements IndexC } } + boolean isScheduled() { + return scheduledFuture != null; + } + public final void run() { try { runInternal(); } catch (Exception ex) { if (lastThrownException == null || sameException(lastThrownException, ex) == false) { // prevent the annoying fact of logging the same stuff all the time with an interval of 1 sec will spam all your logs - indexService.logger.warn("failed to run task {} - supressing re-occuring exceptions unless the exception changes", ex, toString()); + indexService.logger.warn("failed to run task {} - suppressing re-occurring exceptions unless the exception changes", ex, toString()); lastThrownException = ex; } } finally { @@ -746,8 +747,8 @@ public final class IndexService extends AbstractIndexComponent implements IndexC */ final static class AsyncTranslogFSync extends BaseAsyncTask { - AsyncTranslogFSync(IndexService indexService, ThreadPool threadPool) { - super(indexService, threadPool, indexService.getIndexSettings().getTranslogSyncInterval()); + AsyncTranslogFSync(IndexService indexService) { + super(indexService, indexService.getIndexSettings().getTranslogSyncInterval()); } protected String getThreadPool() { @@ -764,10 +765,10 @@ public final class IndexService extends AbstractIndexComponent implements IndexC } } - final class RefreshTasks extends BaseAsyncTask { + final class AsyncRefreshTask extends BaseAsyncTask { - RefreshTasks(IndexService indexService, ThreadPool threadPool) { - super(indexService, threadPool, indexService.getIndexSettings().getRefreshInterval()); + AsyncRefreshTask(IndexService indexService) { + super(indexService, indexService.getIndexSettings().getRefreshInterval()); } @Override @@ -785,10 +786,11 @@ public final class IndexService extends AbstractIndexComponent implements IndexC } } - RefreshTasks getRefreshTask() { // for tests + AsyncRefreshTask getRefreshTask() { // for tests return refreshTask; } - - + AsyncTranslogFSync getFsyncTask() { // for tests + return fsyncTask; + } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 3cb7b083933..5098df49720 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -556,7 +556,7 @@ public class IndexShard extends AbstractIndexShardComponent { /** Writes all indexing changes to disk and opens a new searcher reflecting all changes. This can throw {@link EngineClosedException}. */ public void refresh(String source) { verifyNotClosed(); - if (getEngine().refreshNeeded()) { +// if (getEngine().refreshNeeded()) { if (canIndex()) { long bytes = getEngine().getIndexBufferRAMBytesUsed(); writingBytes.addAndGet(bytes); @@ -575,7 +575,7 @@ public class IndexShard extends AbstractIndexShardComponent { getEngine().refresh(source); refreshMetric.inc(System.nanoTime() - time); } - } +// } } /** Returns how many bytes we are currently moving from heap to disk */ @@ -1197,7 +1197,7 @@ public class IndexShard extends AbstractIndexShardComponent { } } - void handleRefreshException(Exception e) { + private void handleRefreshException(Exception e) { if (e instanceof EngineClosedException) { // ignore } else if (e instanceof RefreshFailedEngineException) { diff --git a/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java b/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java index 7a1076268b3..9c52ec9a060 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java +++ b/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java @@ -19,6 +19,8 @@ package org.elasticsearch.index; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.TopDocs; import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Nullable; @@ -28,14 +30,15 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.InvalidAliasNameException; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -160,11 +163,10 @@ public class IndexServiceTests extends ESSingleNodeTestCase { public void testBaseAsyncTask() throws InterruptedException, IOException { IndexService indexService = newIndexService(); - ThreadPool pool = indexService.getThreadPool(); AtomicReference latch = new AtomicReference<>(new CountDownLatch(1)); AtomicReference latch2 = new AtomicReference<>(new CountDownLatch(1)); final AtomicInteger count = new AtomicInteger(); - IndexService.BaseAsyncTask task = new IndexService.BaseAsyncTask(indexService, pool, TimeValue.timeValueMillis(1)) { + IndexService.BaseAsyncTask task = new IndexService.BaseAsyncTask(indexService, TimeValue.timeValueMillis(1)) { @Override protected void runInternal() { count.incrementAndGet(); @@ -202,27 +204,23 @@ public class IndexServiceTests extends ESSingleNodeTestCase { assertEquals(2, count.get()); - task = new IndexService.BaseAsyncTask(indexService, pool, TimeValue.timeValueMillis(1000000)) { + task = new IndexService.BaseAsyncTask(indexService, TimeValue.timeValueMillis(1000000)) { @Override protected void runInternal() { } }; assertTrue(task.mustReschedule()); - if (randomBoolean()) { - for (Integer id : indexService.shardIds()) { - indexService.removeShard(id, "simon says"); - } - } else { - indexService.close("simon says", false); - } - + indexService.close("simon says", false); assertFalse("no shards left", task.mustReschedule()); + assertTrue(task.isScheduled()); + task.close(); + assertFalse(task.isScheduled()); } - public void testRefreshTaskIsUpdated() { + public void testRefreshTaskIsUpdated() throws IOException { IndexService indexService = newIndexService(); - IndexService.RefreshTasks refreshTask = indexService.getRefreshTask(); + IndexService.AsyncRefreshTask refreshTask = indexService.getRefreshTask(); assertEquals(1000, refreshTask.getInterval().millis()); assertTrue(indexService.getRefreshTask().mustReschedule()); @@ -231,6 +229,7 @@ public class IndexServiceTests extends ESSingleNodeTestCase { indexService.updateMetaData(metaData); assertNotSame(refreshTask, indexService.getRefreshTask()); assertTrue(refreshTask.isClosed()); + assertFalse(refreshTask.isScheduled()); assertFalse(indexService.getRefreshTask().mustReschedule()); // set it to 100ms @@ -241,6 +240,7 @@ public class IndexServiceTests extends ESSingleNodeTestCase { refreshTask = indexService.getRefreshTask(); assertTrue(refreshTask.mustReschedule()); + assertTrue(refreshTask.isScheduled()); assertEquals(100, refreshTask.getInterval().millis()); // set it to 200ms @@ -251,6 +251,7 @@ public class IndexServiceTests extends ESSingleNodeTestCase { refreshTask = indexService.getRefreshTask(); assertTrue(refreshTask.mustReschedule()); + assertTrue(refreshTask.isScheduled()); assertEquals(200, refreshTask.getInterval().millis()); // set it to 200ms again @@ -258,7 +259,69 @@ public class IndexServiceTests extends ESSingleNodeTestCase { indexService.updateMetaData(metaData); assertSame(refreshTask, indexService.getRefreshTask()); assertTrue(indexService.getRefreshTask().mustReschedule()); + assertTrue(refreshTask.isScheduled()); assertFalse(refreshTask.isClosed()); assertEquals(200, refreshTask.getInterval().millis()); + indexService.close("simon says", false); + assertFalse(refreshTask.isScheduled()); + assertTrue(refreshTask.isClosed()); + } + + public void testFsyncTaskIsRunning() throws IOException { + IndexService indexService = newIndexService(); + IndexService.AsyncTranslogFSync fsyncTask = indexService.getFsyncTask(); + assertNotNull(fsyncTask); + assertEquals(5000, fsyncTask.getInterval().millis()); + assertTrue(fsyncTask.mustReschedule()); + assertTrue(fsyncTask.isScheduled()); + + indexService.close("simon says", false); + assertFalse(fsyncTask.isScheduled()); + assertTrue(fsyncTask.isClosed()); + } + + public void testRefreshActuallyWorks() throws Exception { + IndexService indexService = newIndexService(); + ensureGreen("test"); + IndexService.AsyncRefreshTask refreshTask = indexService.getRefreshTask(); + assertEquals(1000, refreshTask.getInterval().millis()); + assertTrue(indexService.getRefreshTask().mustReschedule()); + + // now disable + IndexMetaData metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_REFRESH_INTERVAL, -1)).build(); + indexService.updateMetaData(metaData); + client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}").get(); + IndexShard shard = indexService.getShard(0); + try (Engine.Searcher searcher = shard.acquireSearcher("test")) { + TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), 10); + assertEquals(0, search.totalHits); + } + // refresh every millisecond + metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_REFRESH_INTERVAL, "1ms")).build(); + indexService.updateMetaData(metaData); + assertBusy(() -> { + try (Engine.Searcher searcher = shard.acquireSearcher("test")) { + TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), 10); + assertEquals(1, search.totalHits); + } catch (IOException e) { + fail(e.getMessage()); + } + }); + } + + public void testAsyncFsyncActuallyWorks() throws Exception { + Settings settings = Settings.builder() + .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL, "10ms") // very often :) + .put(IndexSettings.INDEX_TRANSLOG_DURABILITY, Translog.Durability.ASYNC) + .build(); + IndexService indexService = createIndex("test", settings); + ensureGreen("test"); + assertTrue(indexService.getRefreshTask().mustReschedule()); + client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}").get(); + IndexShard shard = indexService.getShard(0); + assertBusy(() -> { + assertFalse(shard.getTranslog().syncNeeded()); + }); + } } From 050afe91713aa62ece388f2ed41042f1336db5ad Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 12 Jan 2016 21:17:14 +0100 Subject: [PATCH 3/4] add yet another test --- .../java/org/elasticsearch/index/shard/IndexShard.java | 4 ++-- .../java/org/elasticsearch/index/IndexServiceTests.java | 7 +++++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 5098df49720..87609c2fae0 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -556,7 +556,7 @@ public class IndexShard extends AbstractIndexShardComponent { /** Writes all indexing changes to disk and opens a new searcher reflecting all changes. This can throw {@link EngineClosedException}. */ public void refresh(String source) { verifyNotClosed(); -// if (getEngine().refreshNeeded()) { + if (getEngine().refreshNeeded()) { if (canIndex()) { long bytes = getEngine().getIndexBufferRAMBytesUsed(); writingBytes.addAndGet(bytes); @@ -575,7 +575,7 @@ public class IndexShard extends AbstractIndexShardComponent { getEngine().refresh(source); refreshMetric.inc(System.nanoTime() - time); } -// } + } } /** Returns how many bytes we are currently moving from heap to disk */ diff --git a/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java b/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java index 9c52ec9a060..3e118f4fd4a 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java +++ b/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java @@ -322,6 +322,13 @@ public class IndexServiceTests extends ESSingleNodeTestCase { assertBusy(() -> { assertFalse(shard.getTranslog().syncNeeded()); }); + } + public void testNoFsyncTaskIfDisabled() { + Settings settings = Settings.builder() + .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL, "0ms") // disable + .build(); + IndexService indexService = createIndex("test", settings); + assertNull(indexService.getFsyncTask()); } } From 2c978941f56476d0cbe925c116e3cd0840a11497 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 13 Jan 2016 10:03:20 +0100 Subject: [PATCH 4/4] fix test to wipe lenient index first --- .../org/elasticsearch/cluster/settings/ClusterSettingsIT.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/java/org/elasticsearch/cluster/settings/ClusterSettingsIT.java b/core/src/test/java/org/elasticsearch/cluster/settings/ClusterSettingsIT.java index fba6d8127b8..6cc8bf8bc67 100644 --- a/core/src/test/java/org/elasticsearch/cluster/settings/ClusterSettingsIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/settings/ClusterSettingsIT.java @@ -335,6 +335,7 @@ public class ClusterSettingsIT extends ESIntegTestCase { assertAcked(prepareCreate("test")); ensureGreen(); client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put("index.refresh_interval", "10")).execute().actionGet(); + client().admin().indices().prepareDelete("test").get(); } finally { // Restore the default so subsequent tests require units: assertFalse(Settings.getSettingsRequireUnits());