diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java index 29a3dcaeb9f..cdde49170d4 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java @@ -125,8 +125,7 @@ public class MetaDataIndexUpgradeService extends AbstractComponent { "index.store.throttle.max_bytes_per_sec", "index.translog.flush_threshold_size", "index.translog.fs.buffer_size", - "index.version_map_size", - "index.buffer_size")); + "index.version_map_size")); /** All known time settings for an index. */ public static final Set INDEX_TIME_SETTINGS = unmodifiableSet(newHashSet( diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index e76aa1d38b9..abca9cc875e 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -97,7 +97,6 @@ public final class EngineConfig { * Index setting to control the index buffer size. * This setting is not realtime updateable. */ - public static final String INDEX_BUFFER_SIZE_SETTING = "index.buffer_size"; /** if set to true the engine will start even if the translog id in the commit point can not be found */ public static final String INDEX_FORCE_NEW_TRANSLOG = "index.engine.force_new_translog"; @@ -132,9 +131,9 @@ public final class EngineConfig { this.failedEngineListener = failedEngineListener; this.compoundOnFlush = indexSettings.getAsBoolean(INDEX_COMPOUND_ON_FLUSH, compoundOnFlush); codecName = indexSettings.get(INDEX_CODEC_SETTING, DEFAULT_CODEC_NAME); - // We tell IndexWriter to use large heap, but IndexingMemoryController checks periodically and refreshes the most heap-consuming - // shards when total indexing heap usage is too high: - indexingBufferSize = indexSettings.getAsBytesSize(INDEX_BUFFER_SIZE_SETTING, new ByteSizeValue(256, ByteSizeUnit.MB)); + // We give IndexWriter a huge buffer, so it won't flush on its own. Instead, IndexingMemoryController periodically checks + // and refreshes the most heap-consuming shards when total indexing heap usage is too high: + indexingBufferSize = new ByteSizeValue(256, ByteSizeUnit.MB); gcDeletesInMillis = indexSettings.getAsTime(INDEX_GC_DELETES_SETTING, DEFAULT_GC_DELETES).millis(); this.translogRecoveryPerformer = translogRecoveryPerformer; this.forceNewTranslog = indexSettings.getAsBoolean(INDEX_FORCE_NEW_TRANSLOG, false); diff --git a/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java b/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java index 821311ad636..6073aa54626 100644 --- a/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java +++ b/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java @@ -170,7 +170,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent indexingBuffer.bytes()) { // OK we are using too much; make a queue and ask largest shard(s) to refresh: logger.debug("now refreshing some shards: total indexing bytes used [{}] vs index_buffer_size [{}]", new ByteSizeValue(totalBytesUsed), indexingBuffer); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 2b929f76ad3..9715498a90c 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1565,10 +1565,9 @@ public class InternalEngineTests extends ESTestCase { public void testDeletesAloneCanTriggerRefresh() throws Exception { Settings settings = Settings.builder() .put(defaultSettings) - .put(EngineConfig.INDEX_BUFFER_SIZE_SETTING, "1kb") - .put(IndexingMemoryController.SHARD_MEMORY_INTERVAL_TIME_SETTING, "100ms").build(); + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "1kb").build(); try (Store store = createStore(); - Engine engine = new InternalEngine(config(settings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), newMergePolicy()), false)) { + Engine engine = new InternalEngine(config(settings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), newMergePolicy()), false)) { for (int i = 0; i < 100; i++) { String id = Integer.toString(i); ParsedDocument doc = testParsedDocument(id, id, "test", null, -1, -1, testDocument(), B_1, null); @@ -1578,6 +1577,30 @@ public class InternalEngineTests extends ESTestCase { // Force merge so we know all merges are done before we start deleting: engine.forceMerge(true, 1, false, false, false); + // Make a shell of an IMC to check up on indexing buffer usage: + IndexingMemoryController imc = new IndexingMemoryController(settings, threadPool, null) { + @Override + protected IndexShard getShard(ShardId shardId) { + return null; + } + + @Override + protected List availableShards() { + return Collections.singletonList(new ShardId("foo", 0)); + } + + @Override + protected void refreshShardAsync(ShardId shardId) { + engine.refresh("memory"); + } + + @Override + protected long getIndexBufferRAMBytesUsed(ShardId shardId) { + System.out.println("BYTES USED: " + engine.indexBufferRAMBytesUsed()); + return engine.indexBufferRAMBytesUsed(); + } + }; + Searcher s = engine.acquireSearcher("test"); final long version1 = ((DirectoryReader) s.reader()).getVersion(); s.close(); @@ -1586,18 +1609,10 @@ public class InternalEngineTests extends ESTestCase { engine.delete(new Engine.Delete("test", id, newUid(id), 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), false)); } - // We must assertBusy because refresh due to version map being full is done in background (REFRESH) thread pool: - assertBusy(new Runnable() { - @Override - public void run() { - Searcher s2 = engine.acquireSearcher("test"); - long version2 = ((DirectoryReader) s2.reader()).getVersion(); - s2.close(); - - // 100 buffered deletes will easily exceed 25% of our 1 KB indexing buffer so it should have forced a refresh: - assertThat(version2, greaterThan(version1)); - } - }); + imc.forceCheck(); + try (Searcher s2 = engine.acquireSearcher("test")) { + assertThat(((DirectoryReader) s2.reader()).getVersion(), greaterThan(version1)); + } } }