From 5204440471987d034ef613fda56cd3d55362ea3a Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 12 Jan 2016 21:09:26 +0100 Subject: [PATCH] add more tests and apply feedback from @mikemccand --- .../org/elasticsearch/index/IndexService.java | 54 +++++------ .../elasticsearch/index/shard/IndexShard.java | 6 +- .../index/IndexServiceTests.java | 91 ++++++++++++++++--- 3 files changed, 108 insertions(+), 43 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index 874216377c5..d3c6a5339ba 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -21,8 +21,6 @@ package org.elasticsearch.index; import java.io.Closeable; import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; import java.nio.file.Path; import java.util.HashMap; import java.util.Iterator; @@ -108,7 +106,8 @@ public final class IndexService extends AbstractIndexComponent implements IndexC private final IndexSettings indexSettings; private final IndexingSlowLog slowLog; private final IndexingOperationListener[] listeners; - private volatile RefreshTasks refreshTask; + private volatile AsyncRefreshTask refreshTask; + private final AsyncTranslogFSync fsyncTask; public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv, SimilarityService similarityService, @@ -145,7 +144,13 @@ public final class IndexService extends AbstractIndexComponent implements IndexC this.listeners = new IndexingOperationListener[1+listenersIn.length]; this.listeners[0] = slowLog; System.arraycopy(listenersIn, 0, this.listeners, 1, listenersIn.length); - this.refreshTask = new RefreshTasks(this, nodeServicesProvider.getThreadPool()); + // kick off async ops for the first shard in this index + if (this.indexSettings.getTranslogSyncInterval().millis() != 0) { + this.fsyncTask = new AsyncTranslogFSync(this); + } else { + this.fsyncTask = null; + } + this.refreshTask = new AsyncRefreshTask(this); } public int numberOfShards() { @@ -221,7 +226,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC } } } finally { - IOUtils.close(bitsetFilterCache, indexCache, mapperService, indexFieldData, analysisService); + IOUtils.close(bitsetFilterCache, indexCache, mapperService, indexFieldData, analysisService, refreshTask, fsyncTask); } } } @@ -312,17 +317,9 @@ public final class IndexService extends AbstractIndexComponent implements IndexC } else { indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, listeners); } - eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); indexShard.updateRoutingEntry(routing, true); - if (shards.isEmpty()) { - ThreadPool threadPool = nodeServicesProvider.getThreadPool(); - if (this.indexSettings.getTranslogSyncInterval().millis() != 0) { - new AsyncTranslogFSync(this, threadPool); // kick this off if we are the first shard in this service. - } - rescheduleRefreshTasks(); - } shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap(); success = true; return indexShard; @@ -590,7 +587,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC try { refreshTask.close(); } finally { - refreshTask = new RefreshTasks(this, nodeServicesProvider.getThreadPool()); + refreshTask = new AsyncRefreshTask(this); } } @@ -662,16 +659,16 @@ public final class IndexService extends AbstractIndexComponent implements IndexC private final AtomicBoolean closed = new AtomicBoolean(false); private volatile Exception lastThrownException; - BaseAsyncTask(IndexService indexService, ThreadPool threadPool, TimeValue interval) { + BaseAsyncTask(IndexService indexService, TimeValue interval) { this.indexService = indexService; - this.threadPool = threadPool; + this.threadPool = indexService.getThreadPool(); this.interval = interval; onTaskCompletion(); } boolean mustReschedule() { // don't re-schedule if its closed or if we dont' have a single shard here..., we are done - return (indexService.closed.get() || indexService.shards.isEmpty()) == false + return indexService.closed.get() == false && closed.get() == false && interval.millis() > 0; } @@ -685,13 +682,17 @@ public final class IndexService extends AbstractIndexComponent implements IndexC } } + boolean isScheduled() { + return scheduledFuture != null; + } + public final void run() { try { runInternal(); } catch (Exception ex) { if (lastThrownException == null || sameException(lastThrownException, ex) == false) { // prevent the annoying fact of logging the same stuff all the time with an interval of 1 sec will spam all your logs - indexService.logger.warn("failed to run task {} - supressing re-occuring exceptions unless the exception changes", ex, toString()); + indexService.logger.warn("failed to run task {} - suppressing re-occurring exceptions unless the exception changes", ex, toString()); lastThrownException = ex; } } finally { @@ -746,8 +747,8 @@ public final class IndexService extends AbstractIndexComponent implements IndexC */ final static class AsyncTranslogFSync extends BaseAsyncTask { - AsyncTranslogFSync(IndexService indexService, ThreadPool threadPool) { - super(indexService, threadPool, indexService.getIndexSettings().getTranslogSyncInterval()); + AsyncTranslogFSync(IndexService indexService) { + super(indexService, indexService.getIndexSettings().getTranslogSyncInterval()); } protected String getThreadPool() { @@ -764,10 +765,10 @@ public final class IndexService extends AbstractIndexComponent implements IndexC } } - final class RefreshTasks extends BaseAsyncTask { + final class AsyncRefreshTask extends BaseAsyncTask { - RefreshTasks(IndexService indexService, ThreadPool threadPool) { - super(indexService, threadPool, indexService.getIndexSettings().getRefreshInterval()); + AsyncRefreshTask(IndexService indexService) { + super(indexService, indexService.getIndexSettings().getRefreshInterval()); } @Override @@ -785,10 +786,11 @@ public final class IndexService extends AbstractIndexComponent implements IndexC } } - RefreshTasks getRefreshTask() { // for tests + AsyncRefreshTask getRefreshTask() { // for tests return refreshTask; } - - + AsyncTranslogFSync getFsyncTask() { // for tests + return fsyncTask; + } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 3cb7b083933..5098df49720 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -556,7 +556,7 @@ public class IndexShard extends AbstractIndexShardComponent { /** Writes all indexing changes to disk and opens a new searcher reflecting all changes. This can throw {@link EngineClosedException}. */ public void refresh(String source) { verifyNotClosed(); - if (getEngine().refreshNeeded()) { +// if (getEngine().refreshNeeded()) { if (canIndex()) { long bytes = getEngine().getIndexBufferRAMBytesUsed(); writingBytes.addAndGet(bytes); @@ -575,7 +575,7 @@ public class IndexShard extends AbstractIndexShardComponent { getEngine().refresh(source); refreshMetric.inc(System.nanoTime() - time); } - } +// } } /** Returns how many bytes we are currently moving from heap to disk */ @@ -1197,7 +1197,7 @@ public class IndexShard extends AbstractIndexShardComponent { } } - void handleRefreshException(Exception e) { + private void handleRefreshException(Exception e) { if (e instanceof EngineClosedException) { // ignore } else if (e instanceof RefreshFailedEngineException) { diff --git a/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java b/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java index 7a1076268b3..9c52ec9a060 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java +++ b/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java @@ -19,6 +19,8 @@ package org.elasticsearch.index; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.TopDocs; import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Nullable; @@ -28,14 +30,15 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.InvalidAliasNameException; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -160,11 +163,10 @@ public class IndexServiceTests extends ESSingleNodeTestCase { public void testBaseAsyncTask() throws InterruptedException, IOException { IndexService indexService = newIndexService(); - ThreadPool pool = indexService.getThreadPool(); AtomicReference latch = new AtomicReference<>(new CountDownLatch(1)); AtomicReference latch2 = new AtomicReference<>(new CountDownLatch(1)); final AtomicInteger count = new AtomicInteger(); - IndexService.BaseAsyncTask task = new IndexService.BaseAsyncTask(indexService, pool, TimeValue.timeValueMillis(1)) { + IndexService.BaseAsyncTask task = new IndexService.BaseAsyncTask(indexService, TimeValue.timeValueMillis(1)) { @Override protected void runInternal() { count.incrementAndGet(); @@ -202,27 +204,23 @@ public class IndexServiceTests extends ESSingleNodeTestCase { assertEquals(2, count.get()); - task = new IndexService.BaseAsyncTask(indexService, pool, TimeValue.timeValueMillis(1000000)) { + task = new IndexService.BaseAsyncTask(indexService, TimeValue.timeValueMillis(1000000)) { @Override protected void runInternal() { } }; assertTrue(task.mustReschedule()); - if (randomBoolean()) { - for (Integer id : indexService.shardIds()) { - indexService.removeShard(id, "simon says"); - } - } else { - indexService.close("simon says", false); - } - + indexService.close("simon says", false); assertFalse("no shards left", task.mustReschedule()); + assertTrue(task.isScheduled()); + task.close(); + assertFalse(task.isScheduled()); } - public void testRefreshTaskIsUpdated() { + public void testRefreshTaskIsUpdated() throws IOException { IndexService indexService = newIndexService(); - IndexService.RefreshTasks refreshTask = indexService.getRefreshTask(); + IndexService.AsyncRefreshTask refreshTask = indexService.getRefreshTask(); assertEquals(1000, refreshTask.getInterval().millis()); assertTrue(indexService.getRefreshTask().mustReschedule()); @@ -231,6 +229,7 @@ public class IndexServiceTests extends ESSingleNodeTestCase { indexService.updateMetaData(metaData); assertNotSame(refreshTask, indexService.getRefreshTask()); assertTrue(refreshTask.isClosed()); + assertFalse(refreshTask.isScheduled()); assertFalse(indexService.getRefreshTask().mustReschedule()); // set it to 100ms @@ -241,6 +240,7 @@ public class IndexServiceTests extends ESSingleNodeTestCase { refreshTask = indexService.getRefreshTask(); assertTrue(refreshTask.mustReschedule()); + assertTrue(refreshTask.isScheduled()); assertEquals(100, refreshTask.getInterval().millis()); // set it to 200ms @@ -251,6 +251,7 @@ public class IndexServiceTests extends ESSingleNodeTestCase { refreshTask = indexService.getRefreshTask(); assertTrue(refreshTask.mustReschedule()); + assertTrue(refreshTask.isScheduled()); assertEquals(200, refreshTask.getInterval().millis()); // set it to 200ms again @@ -258,7 +259,69 @@ public class IndexServiceTests extends ESSingleNodeTestCase { indexService.updateMetaData(metaData); assertSame(refreshTask, indexService.getRefreshTask()); assertTrue(indexService.getRefreshTask().mustReschedule()); + assertTrue(refreshTask.isScheduled()); assertFalse(refreshTask.isClosed()); assertEquals(200, refreshTask.getInterval().millis()); + indexService.close("simon says", false); + assertFalse(refreshTask.isScheduled()); + assertTrue(refreshTask.isClosed()); + } + + public void testFsyncTaskIsRunning() throws IOException { + IndexService indexService = newIndexService(); + IndexService.AsyncTranslogFSync fsyncTask = indexService.getFsyncTask(); + assertNotNull(fsyncTask); + assertEquals(5000, fsyncTask.getInterval().millis()); + assertTrue(fsyncTask.mustReschedule()); + assertTrue(fsyncTask.isScheduled()); + + indexService.close("simon says", false); + assertFalse(fsyncTask.isScheduled()); + assertTrue(fsyncTask.isClosed()); + } + + public void testRefreshActuallyWorks() throws Exception { + IndexService indexService = newIndexService(); + ensureGreen("test"); + IndexService.AsyncRefreshTask refreshTask = indexService.getRefreshTask(); + assertEquals(1000, refreshTask.getInterval().millis()); + assertTrue(indexService.getRefreshTask().mustReschedule()); + + // now disable + IndexMetaData metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_REFRESH_INTERVAL, -1)).build(); + indexService.updateMetaData(metaData); + client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}").get(); + IndexShard shard = indexService.getShard(0); + try (Engine.Searcher searcher = shard.acquireSearcher("test")) { + TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), 10); + assertEquals(0, search.totalHits); + } + // refresh every millisecond + metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_REFRESH_INTERVAL, "1ms")).build(); + indexService.updateMetaData(metaData); + assertBusy(() -> { + try (Engine.Searcher searcher = shard.acquireSearcher("test")) { + TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), 10); + assertEquals(1, search.totalHits); + } catch (IOException e) { + fail(e.getMessage()); + } + }); + } + + public void testAsyncFsyncActuallyWorks() throws Exception { + Settings settings = Settings.builder() + .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL, "10ms") // very often :) + .put(IndexSettings.INDEX_TRANSLOG_DURABILITY, Translog.Durability.ASYNC) + .build(); + IndexService indexService = createIndex("test", settings); + ensureGreen("test"); + assertTrue(indexService.getRefreshTask().mustReschedule()); + client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}").get(); + IndexShard shard = indexService.getShard(0); + assertBusy(() -> { + assertFalse(shard.getTranslog().syncNeeded()); + }); + } }