diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index d9e3ff74529..352fa4dda09 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1639,9 +1639,7 @@ public class InternalEngine extends Engine { @Override public void writeIndexingBuffer() throws EngineException { - // we obtain a read lock here, since we don't want a flush to happen while we are writing - // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it) - refresh("write indexing buffer", SearcherScope.INTERNAL, true); + refresh("write indexing buffer", SearcherScope.INTERNAL, false); } @Override diff --git a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java new file mode 100644 index 00000000000..6eb39b31fee --- /dev/null +++ b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java @@ -0,0 +1,101 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.indices; + +import org.apache.logging.log4j.LogManager; +import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.codec.CodecService; +import org.elasticsearch.index.engine.EngineConfig; +import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.refresh.RefreshStats; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.plugins.EnginePlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESSingleNodeTestCase; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Optional; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.hamcrest.Matchers.greaterThan; + +public class IndexingMemoryControllerIT extends ESSingleNodeTestCase { + + @Override + protected Settings nodeSettings() { + return Settings.builder().put(super.nodeSettings()) + // small indexing buffer so that we can trigger refresh after buffering 100 deletes + .put("indices.memory.index_buffer_size", "1kb").build(); + } + + @Override + protected Collection> getPlugins() { + final List> plugins = new ArrayList<>(super.getPlugins()); + plugins.add(TestEnginePlugin.class); + return plugins; + } + + public static class TestEnginePlugin extends Plugin implements EnginePlugin { + + EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) { + // We need to set a larger buffer for the IndexWriter; otherwise, it will flush before the IndexingMemoryController. + Settings settings = Settings.builder().put(config.getIndexSettings().getSettings()) + .put("indices.memory.index_buffer_size", "10mb").build(); + IndexSettings indexSettings = new IndexSettings(config.getIndexSettings().getIndexMetaData(), settings); + return new EngineConfig(config.getShardId(), config.getAllocationId(), config.getThreadPool(), + indexSettings, config.getWarmer(), config.getStore(), config.getMergePolicy(), config.getAnalyzer(), + config.getSimilarity(), new CodecService(null, LogManager.getLogger(IndexingMemoryControllerIT.class)), + config.getEventListener(), config.getQueryCache(), + config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), + config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), + config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), + config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier()); + } + + @Override + public Optional getEngineFactory(IndexSettings indexSettings) { + return Optional.of(config -> new InternalEngine(engineConfigWithLargerIndexingMemory(config))); + } + } + + // #10312 + public void testDeletesAloneCanTriggerRefresh() throws Exception { + IndexService indexService = createIndex("index", Settings.builder().put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0).put("index.refresh_interval", -1).build()); + IndexShard shard = indexService.getShard(0); + for (int i = 0; i < 100; i++) { + client().prepareIndex("index", "_doc").setId(Integer.toString(i)).setSource("field", "value").get(); + } + // Force merge so we know all merges are done before we start deleting: + ForceMergeResponse r = client().admin().indices().prepareForceMerge().setMaxNumSegments(1).execute().actionGet(); + assertNoFailures(r); + final RefreshStats refreshStats = shard.refreshStats(); + for (int i = 0; i < 100; i++) { + client().prepareDelete("index", "_doc", Integer.toString(i)).get(); + } + // need to assert busily as IndexingMemoryController refreshes in background + assertBusy(() -> assertThat(shard.refreshStats().getTotal(), greaterThan(refreshStats.getTotal() + 1))); + } +} diff --git a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java index 8fa58bd05e5..1431937245e 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java @@ -18,26 +18,24 @@ */ package org.elasticsearch.indices; -import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.search.ReferenceManager; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; -import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.codec.CodecService; +import org.elasticsearch.index.engine.EngineConfig; +import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.IndexShardIT; import org.elasticsearch.index.shard.IndexShardTestCase; -import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.recovery.RecoveryState; -import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.threadpool.Scheduler.Cancellable; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPoolStats; import java.io.IOException; import java.util.ArrayList; @@ -47,15 +45,16 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; -public class IndexingMemoryControllerTests extends ESSingleNodeTestCase { +public class IndexingMemoryControllerTests extends IndexShardTestCase { static class MockController extends IndexingMemoryController { @@ -168,19 +167,16 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase { } } - public void testShardAdditionAndRemoval() { - createIndex("test", Settings.builder().put("index.number_of_shards", 3).put("index.number_of_replicas", 0).build()); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService test = indicesService.indexService(resolveIndex("test")); + public void testShardAdditionAndRemoval() throws IOException { MockController controller = new MockController(Settings.builder() .put("indices.memory.index_buffer_size", "4mb").build()); - IndexShard shard0 = test.getShard(0); + IndexShard shard0 = newStartedShard(); controller.simulateIndexing(shard0); controller.assertBuffer(shard0, 1); // add another shard - IndexShard shard1 = test.getShard(1); + IndexShard shard1 = newStartedShard(); controller.simulateIndexing(shard1); controller.assertBuffer(shard0, 1); controller.assertBuffer(shard1, 1); @@ -195,24 +191,21 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase { controller.forceCheck(); // add a new one - IndexShard shard2 = test.getShard(2); + IndexShard shard2 = newStartedShard(); controller.simulateIndexing(shard2); controller.assertBuffer(shard2, 1); + closeShards(shard0, shard1, shard2); } - public void testActiveInactive() { - - createIndex("test", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService test = indicesService.indexService(resolveIndex("test")); + public void testActiveInactive() throws IOException { MockController controller = new MockController(Settings.builder() .put("indices.memory.index_buffer_size", "5mb") .build()); - IndexShard shard0 = test.getShard(0); + IndexShard shard0 = newStartedShard(); controller.simulateIndexing(shard0); - IndexShard shard1 = test.getShard(1); + IndexShard shard1 = newStartedShard(); controller.simulateIndexing(shard1); controller.assertBuffer(shard0, 1); @@ -237,6 +230,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase { controller.simulateIndexing(shard1); // shard1 crossed 5 mb and is now cleared: controller.assertBuffer(shard1, 0); + closeShards(shard0, shard1); } public void testMinBufferSizes() { @@ -288,14 +282,11 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase { } public void testThrottling() throws Exception { - createIndex("test", Settings.builder().put("index.number_of_shards", 3).put("index.number_of_replicas", 0).build()); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService test = indicesService.indexService(resolveIndex("test")); MockController controller = new MockController(Settings.builder() .put("indices.memory.index_buffer_size", "4mb").build()); - IndexShard shard0 = test.getShard(0); - IndexShard shard1 = test.getShard(1); + IndexShard shard0 = newStartedShard(); + IndexShard shard1 = newStartedShard(); controller.simulateIndexing(shard0); controller.simulateIndexing(shard0); controller.simulateIndexing(shard0); @@ -346,95 +337,21 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase { controller.forceCheck(); controller.assertNotThrottled(shard0); controller.assertNotThrottled(shard1); - } - - // #10312 - public void testDeletesAloneCanTriggerRefresh() throws Exception { - createIndex("index", - Settings.builder().put("index.number_of_shards", 1) - .put("index.number_of_replicas", 0) - .put("index.refresh_interval", -1) - .build()); - ensureGreen(); - - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService indexService = indicesService.indexService(resolveIndex("index")); - IndexShard shard = indexService.getShardOrNull(0); - assertNotNull(shard); - - for (int i = 0; i < 100; i++) { - String id = Integer.toString(i); - client().prepareIndex("index", "type", id).setSource("field", "value").get(); - } - - // Force merge so we know all merges are done before we start deleting: - ForceMergeResponse r = client().admin().indices().prepareForceMerge().setMaxNumSegments(1).execute().actionGet(); - assertNoFailures(r); - - // Make a shell of an IMC to check up on indexing buffer usage: - Settings settings = Settings.builder().put("indices.memory.index_buffer_size", "1kb").build(); - - // TODO: would be cleaner if I could pass this 1kb setting to the single node this test created.... - IndexingMemoryController imc = new IndexingMemoryController(settings, null, null) { - @Override - protected List availableShards() { - return Collections.singletonList(shard); - } - - @Override - protected long getIndexBufferRAMBytesUsed(IndexShard shard) { - return shard.getIndexBufferRAMBytesUsed(); - } - - @Override - protected void writeIndexingBufferAsync(IndexShard shard) { - // just do it sync'd for this test - shard.writeIndexingBuffer(); - } - - @Override - protected Cancellable scheduleTask(ThreadPool threadPool) { - return null; - } - }; - - for (int i = 0; i < 100; i++) { - String id = Integer.toString(i); - client().prepareDelete("index", "type", id).get(); - } - - final long indexingBufferBytes1 = shard.getIndexBufferRAMBytesUsed(); - - imc.forceCheck(); - - // We must assertBusy because the writeIndexingBufferAsync is done in background (REFRESH) thread pool: - assertBusy(() -> { - try (Engine.Searcher s2 = shard.acquireSearcher("index")) { - // 100 buffered deletes will easily exceed our 1 KB indexing buffer so it should trigger a write: - final long indexingBufferBytes2 = shard.getIndexBufferRAMBytesUsed(); - assertTrue(indexingBufferBytes2 < indexingBufferBytes1); - } - }); + closeShards(shard0, shard1); } public void testTranslogRecoveryWorksWithIMC() throws IOException { - createIndex("test"); - ensureGreen(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService indexService = indicesService.indexService(resolveIndex("test")); - IndexShard shard = indexService.getShardOrNull(0); + IndexShard shard = newStartedShard(true); for (int i = 0; i < 100; i++) { - client().prepareIndex("test", "test", Integer.toString(i)).setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get(); + indexDoc(shard, "_doc", Integer.toString(i), "{\"foo\" : \"bar\"}", XContentType.JSON, null); } - - CheckedFunction wrapper = directoryReader -> directoryReader; shard.close("simon says", false); AtomicReference shardRef = new AtomicReference<>(); Settings settings = Settings.builder().put("indices.memory.index_buffer_size", "50kb").build(); - Iterable iterable = () -> (shardRef.get() == null) ? Collections.emptyList().iterator() + Iterable iterable = () -> (shardRef.get() == null) ? Collections.emptyIterator() : Collections.singleton(shardRef.get()).iterator(); AtomicInteger flushes = new AtomicInteger(); - IndexingMemoryController imc = new IndexingMemoryController(settings, client().threadPool(), iterable) { + IndexingMemoryController imc = new IndexingMemoryController(settings, threadPool, iterable) { @Override protected void writeIndexingBufferAsync(IndexShard shard) { assertEquals(shard, shardRef.get()); @@ -442,21 +359,94 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase { shard.writeIndexingBuffer(); } }; - final IndexShard newShard = IndexShardIT.newIndexShard(indexService, shard, wrapper, new NoneCircuitBreakerService(), imc); - shardRef.set(newShard); - try { - assertEquals(0, imc.availableShards().size()); - ShardRouting routing = newShard.routingEntry(); - DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); - newShard.markAsRecovering("store", new RecoveryState(routing, localNode, null)); + shard = reinitShard(shard, imc); + shardRef.set(shard); + assertEquals(0, imc.availableShards().size()); + DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + shard.markAsRecovering("store", new RecoveryState(shard.routingEntry(), localNode, null)); - assertEquals(1, imc.availableShards().size()); - assertTrue(IndexShardTestCase.recoverFromStore(newShard)); - assertTrue("we should have flushed in IMC at least once but did: " + flushes.get(), flushes.get() >= 1); - IndexShardTestCase.updateRoutingEntry(newShard, routing.moveToStarted()); - } finally { - newShard.close("simon says", false); - } + assertEquals(1, imc.availableShards().size()); + assertTrue(recoverFromStore(shard)); + assertThat("we should have flushed in IMC at least once", flushes.get(), greaterThanOrEqualTo(1)); + closeShards(shard); } + EngineConfig configWithRefreshListener(EngineConfig config, ReferenceManager.RefreshListener listener) { + final List internalRefreshListener = new ArrayList<>(config.getInternalRefreshListener());; + internalRefreshListener.add(listener); + return new EngineConfig(config.getShardId(), config.getAllocationId(), config.getThreadPool(), + config.getIndexSettings(), config.getWarmer(), config.getStore(), config.getMergePolicy(), config.getAnalyzer(), + config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), + config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), + config.getExternalRefreshListener(), internalRefreshListener, config.getIndexSort(), + config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), + config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier()); + } + + ThreadPoolStats.Stats getRefreshThreadPoolStats() { + final ThreadPoolStats stats = threadPool.stats(); + for (ThreadPoolStats.Stats s : stats) { + if (s.getName().equals(ThreadPool.Names.REFRESH)) { + return s; + } + } + throw new AssertionError("refresh thread pool stats not found [" + stats + "]"); + } + + public void testSkipRefreshIfShardIsRefreshingAlready() throws Exception { + SetOnce refreshLatch = new SetOnce<>(); + ReferenceManager.RefreshListener refreshListener = new ReferenceManager.RefreshListener() { + @Override + public void beforeRefresh() { + if (refreshLatch.get() != null) { + try { + refreshLatch.get().await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } + } + + @Override + public void afterRefresh(boolean didRefresh) { + + } + }; + IndexShard shard = newStartedShard(randomBoolean(), Settings.EMPTY, + config -> new InternalEngine(configWithRefreshListener(config, refreshListener))); + refreshLatch.set(new CountDownLatch(1)); // block refresh + final RefreshStats refreshStats = shard.refreshStats(); + final IndexingMemoryController controller = new IndexingMemoryController( + Settings.builder().put("indices.memory.interval", "200h") // disable it + .put("indices.memory.index_buffer_size", "1024b").build(), + threadPool, + Collections.singleton(shard)) { + @Override + protected long getIndexBufferRAMBytesUsed(IndexShard shard) { + return randomLongBetween(1025, 10 * 1024 * 1024); + } + + @Override + protected long getShardWritingBytes(IndexShard shard) { + return 0L; + } + }; + int iterations = randomIntBetween(10, 100); + for (int i = 0; i < iterations; i++) { + controller.forceCheck(); + } + assertBusy(() -> { + ThreadPoolStats.Stats stats = getRefreshThreadPoolStats(); + assertThat(stats.getQueue(), equalTo(0)); + assertThat(stats.getActive(), equalTo(1)); + }); + refreshLatch.get().countDown(); // allow refresh + assertBusy(() -> { + ThreadPoolStats.Stats stats = getRefreshThreadPoolStats(); + assertThat(stats.getQueue(), equalTo(0)); + assertThat(stats.getActive(), equalTo(0)); + }); + assertThat(shard.refreshStats().getTotal(), equalTo(refreshStats.getTotal() + 1)); + closeShards(shard); + } }