diff --git a/core/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java b/core/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java
index e81a4a7c86b..f4ddd75ba85 100644
--- a/core/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java
+++ b/core/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java
@@ -125,18 +125,6 @@ final class CompositeIndexEventListener implements IndexEventListener {
}
}
- @Override
- public void onShardActive(IndexShard indexShard) {
- for (IndexEventListener listener : listeners) {
- try {
- listener.onShardActive(indexShard);
- } catch (Throwable t) {
- logger.warn("[{}] failed to invoke on shard active callback", t, indexShard.shardId().getId());
- throw t;
- }
- }
- }
-
@Override
public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, @Nullable String reason) {
for (IndexEventListener listener : listeners) {
diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java
index 410d2af8d0b..c3dfe6402dd 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java
@@ -19,17 +19,6 @@
package org.elasticsearch.index.engine;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Function;
-
import org.apache.lucene.index.*;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
@@ -61,6 +50,17 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+
/**
*
*/
diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java
index 98572c39f4e..beb93829d93 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java
@@ -89,11 +89,6 @@ public final class EngineConfig {
*/
public static final String INDEX_CODEC_SETTING = "index.codec";
- /**
- * Index setting to control the index buffer size.
- * This setting is not realtime updateable.
- */
-
/** if set to true the engine will start even if the translog id in the commit point can not be found */
public static final String INDEX_FORCE_NEW_TRANSLOG = "index.engine.force_new_translog";
@@ -128,7 +123,8 @@ public final class EngineConfig {
this.eventListener = eventListener;
this.compoundOnFlush = settings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush);
codecName = settings.get(EngineConfig.INDEX_CODEC_SETTING, EngineConfig.DEFAULT_CODEC_NAME);
- // We give IndexWriter a huge buffer, so it won't flush on its own. Instead, IndexingMemoryController periodically checks
+ // We give IndexWriter a "huge" (256 MB) buffer, so it won't flush on its own unless the ES indexing buffer is also huge and/or
+ // there are not too many shards allocated to this node. Instead, IndexingMemoryController periodically checks
// and refreshes the most heap-consuming shards when total indexing heap usage across all shards is too high:
indexingBufferSize = new ByteSizeValue(256, ByteSizeUnit.MB);
gcDeletesInMillis = settings.getAsTime(INDEX_GC_DELETES_SETTING, EngineConfig.DEFAULT_GC_DELETES).millis();
diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index 486920dc750..098868e2b3f 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -103,6 +103,11 @@ public class InternalEngine extends Engine {
private final IndexThrottle throttle;
+ // How many callers are currently requesting index throttling. Currently there are only two times we do this: when merges are falling
+ // behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling incoming
+ // indexing ops to a single thread:
+ private final AtomicInteger throttleRequestCount = new AtomicInteger();
+
public InternalEngine(EngineConfig engineConfig, boolean skipInitialTranslogRecovery) throws EngineException {
super(engineConfig);
this.versionMap = new LiveVersionMap();
@@ -516,12 +521,11 @@ public class InternalEngine extends Engine {
long versionMapBytes = versionMap.ramBytesUsedForRefresh();
long indexingBufferBytes = indexWriter.ramBytesUsed();
- boolean useRefresh = versionMapRefreshPending.get() || (indexingBufferBytes/4 < versionMapBytes);
-
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
+ boolean useRefresh = versionMapRefreshPending.get() || (indexingBufferBytes/4 < versionMapBytes);
if (useRefresh) {
// The version map is using > 25% of the indexing buffer, so we do a refresh so the version map also clears
logger.debug("use refresh to write indexing buffer (heap size=[{}]), to also clear version map (heap size=[{}])",
@@ -542,15 +546,6 @@ public class InternalEngine extends Engine {
failEngine("writeIndexingBuffer failed", t);
throw new RefreshFailedEngineException(shardId, t);
}
-
- // TODO: maybe we should just put a scheduled job in threadPool?
- // We check for pruning in each delete request, but we also prune here e.g. in case a delete burst comes in and then no more deletes
- // for a long time:
- if (useRefresh) {
- maybePruneDeletedTombstones();
- versionMapRefreshPending.set(false);
- mergeScheduler.refreshConfig();
- }
}
@Override
@@ -1051,8 +1046,6 @@ public class InternalEngine extends Engine {
}
}
- private final AtomicInteger throttleRequestCount = new AtomicInteger();
-
@Override
public void activateThrottling() {
int count = throttleRequestCount.incrementAndGet();
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java b/core/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java
index 9a55b9b6161..8d3523a18b1 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java
@@ -70,7 +70,6 @@ public interface IndexEventListener {
*/
default void afterIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {}
-
/**
* Called after a shard's {@link org.elasticsearch.index.shard.IndexShardState} changes.
* The order of concurrent events is preserved. The execution must be lightweight.
@@ -89,13 +88,6 @@ public interface IndexEventListener {
*/
default void onShardInactive(IndexShard indexShard) {}
- /**
- * Called when a shard is marked as active ie. was previously inactive and is now active again.
- *
- * @param indexShard The shard that was marked active
- */
- default void onShardActive(IndexShard indexShard) {}
-
/**
* Called before the index gets created. Note that this is also called
* when the index is created on data nodes
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index 017dcdd6969..4eed36d4424 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -488,7 +488,10 @@ public class IndexShard extends AbstractIndexShardComponent {
throw ex;
}
indexingService.postIndex(index);
+
+ // Notify IMC so that it can go and check heap used by all indexing buffers periodically:
indexingMemoryController.bytesWritten(index.getTranslogLocation().size);
+
return created;
}
@@ -525,6 +528,8 @@ public class IndexShard extends AbstractIndexShardComponent {
throw ex;
}
indexingService.postDelete(delete);
+
+ // Notify IMC so that it can go and check heap used by all indexing buffers periodically:
indexingMemoryController.bytesWritten(delete.getTranslogLocation().size);
}
@@ -533,13 +538,13 @@ public class IndexShard extends AbstractIndexShardComponent {
return getEngine().get(get, this::acquireSearcher);
}
+ /** Writes all indexing changes to disk and opens a new searcher reflecting all changes. This can throw {@link EngineClosedException}. */
public void refresh(String source) {
verifyNotClosed();
- // nocommit OK to throw EngineClosedExc?
long ramBytesUsed = getEngine().indexBufferRAMBytesUsed();
indexingMemoryController.addWritingBytes(this, ramBytesUsed);
try {
- logger.debug("refresh with source: {} indexBufferRAMBytesUsed={}", source, ramBytesUsed);
+ logger.debug("refresh with source [{}] indexBufferRAMBytesUsed [{}]", source, new ByteSizeValue(ramBytesUsed));
long time = System.nanoTime();
getEngine().refresh(source);
refreshMetric.inc(System.nanoTime() - time);
@@ -1019,14 +1024,6 @@ public class IndexShard extends AbstractIndexShardComponent {
}
}
- /**
- * Returns {@code true} if this shard is active (has seen indexing ops in the last {@link
- * IndexingMemoryController#SHARD_INACTIVE_TIME_SETTING} (default 5 minutes), else {@code false}.
- */
- public boolean getActive() {
- return active.get();
- }
-
public final boolean isFlushOnClose() {
return flushOnClose;
}
@@ -1226,7 +1223,7 @@ public class IndexShard extends AbstractIndexShardComponent {
private void handleRefreshException(Exception e) {
if (e instanceof EngineClosedException) {
// ignore
- } else if (e instanceof RefreshFailedEngineException e) {
+ } else if (e instanceof RefreshFailedEngineException) {
RefreshFailedEngineException rfee = (RefreshFailedEngineException) e;
if (rfee.getCause() instanceof InterruptedException) {
// ignore, we are being shutdown
diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
index 64ff6c74587..3728628f755 100644
--- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
+++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
@@ -119,7 +119,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent implements IndexEventListener {
+public class IndexingMemoryController extends AbstractLifecycleComponent {
/** How much heap (% or bytes) we will share across all actively indexing shards on this node (default: 10%). */
public static final String INDEX_BUFFER_SIZE_SETTING = "indices.memory.index_buffer_size";
@@ -136,9 +134,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent availableShards() {
- return Collections.singletonList(new ShardId("foo", 0));
- }
-
- @Override
- protected void refreshShardAsync(IndexShard shard) {
- engine.refresh("memory");
- }
-
- @Override
- protected long getIndexBufferRAMBytesUsed(ShardId shardId) {
- System.out.println("BYTES USED: " + engine.indexBufferRAMBytesUsed());
- return engine.indexBufferRAMBytesUsed();
- }
- };
-
- Searcher s = engine.acquireSearcher("test");
- final long version1 = ((DirectoryReader) s.reader()).getVersion();
- s.close();
- for (int i = 0; i < 100; i++) {
- String id = Integer.toString(i);
- engine.delete(new Engine.Delete("test", id, newUid(id), 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), false));
- }
-
- imc.forceCheck();
- try (Searcher s2 = engine.acquireSearcher("test")) {
- assertThat(((DirectoryReader) s2.reader()).getVersion(), greaterThan(version1));
- }
- }
- }
- */
-
public void testMissingTranslog() throws IOException {
// test that we can force start the engine , even if the translog is missing.
engine.close();
diff --git a/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java b/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java
index 883e7522466..ae14c0d7a32 100644
--- a/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java
+++ b/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java
@@ -18,19 +18,23 @@
*/
package org.elasticsearch.indices.memory;
+import java.util.*;
+
+import org.apache.lucene.index.DirectoryReader;
+import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexService;
+import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESSingleNodeTestCase;
-import java.util.*;
-
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.equalTo;
public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
@@ -183,4 +187,65 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB)));
}
+
+ // #10312
+ public void testDeletesAloneCanTriggerRefresh() throws Exception {
+ createIndex("index",
+ Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1)
+ .put(SETTING_NUMBER_OF_REPLICAS, 0)
+ .put("index.refresh_interval", -1)
+ .build());
+ ensureGreen();
+
+ IndicesService indicesService = getInstanceFromNode(IndicesService.class);
+ IndexService indexService = indicesService.indexService("index");
+ IndexShard shard = indexService.getShardOrNull(0);
+ assertNotNull(shard);
+
+ for (int i = 0; i < 100; i++) {
+ String id = Integer.toString(i);
+ client().prepareIndex("index", "type", id).setSource("field", "value").get();
+ }
+
+ // Force merge so we know all merges are done before we start deleting:
+ ForceMergeResponse r = client().admin().indices().prepareForceMerge().setMaxNumSegments(1).execute().actionGet();
+ assertNoFailures(r);
+
+ // Make a shell of an IMC to check up on indexing buffer usage:
+ Settings settings = Settings.builder().put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "1kb").build();
+
+ // TODO: would be cleaner if I could pass this 1kb setting to the single node this test created....
+ IndexingMemoryController imc = new IndexingMemoryController(settings, null, null) {
+ @Override
+ protected List availableShards() {
+ return Collections.singletonList(shard);
+ }
+
+ @Override
+ protected long getIndexBufferRAMBytesUsed(IndexShard shard) {
+ return shard.getIndexBufferRAMBytesUsed();
+ }
+ };
+
+ for (int i = 0; i < 100; i++) {
+ String id = Integer.toString(i);
+ client().prepareDelete("index", "type", id).get();
+ }
+
+ final long indexingBufferBytes1 = shard.getIndexBufferRAMBytesUsed();
+
+ imc.forceCheck();
+
+ // We must assertBusy because the writeIndexingBufferAsync is done in background (REFRESH) thread pool:
+ assertBusy(new Runnable() {
+ @Override
+ public void run() {
+ try (Engine.Searcher s2 = shard.acquireSearcher("index")) {
+ // 100 buffered deletes will easily exceed our 1 KB indexing buffer so it should trigger a write:
+ final long indexingBufferBytes2 = shard.getIndexBufferRAMBytesUsed();
+ assertTrue(indexingBufferBytes2 < indexingBufferBytes1);
+ }
+ }
+ });
+ }
}
diff --git a/docs/reference/modules/indices/indexing_buffer.asciidoc b/docs/reference/modules/indices/indexing_buffer.asciidoc
index e6485733147..d3aa436c5d4 100644
--- a/docs/reference/modules/indices/indexing_buffer.asciidoc
+++ b/docs/reference/modules/indices/indexing_buffer.asciidoc
@@ -12,7 +12,7 @@ in the cluster:
Accepts either a percentage or a byte size value. It defaults to `10%`,
meaning that `10%` of the total heap allocated to a node will be used as the
- indexing buffer size.
+ indexing buffer size shared across all shards.
`indices.memory.min_index_buffer_size`::
@@ -23,10 +23,3 @@ in the cluster:
If the `index_buffer_size` is specified as a percentage, then this
setting can be used to specify an absolute maximum. Defaults to unbounded.
-
-`indices.memory.min_shard_index_buffer_size`::
-
- Sets a hard lower limit for the memory allocated per shard for its own
- indexing buffer. Defaults to `4mb`.
-
-