From 7119ffa7bcb46e043afce45c137fa93c4ff91e72 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 1 Jul 2014 17:34:28 +0200 Subject: [PATCH] IndexingMemoryController should only update buffer settings of recovered shards At the moment the IndexingMemoryController can try to update the index buffer memory of shards at any give moment. This update involves a flush, which may cause a FlushNotAllowedEngineException to be thrown in a concurrently finalizing recovery. Closes #6642, closes #6667 --- .../index/engine/internal/InternalEngine.java | 48 +++++------ .../memory/IndexingMemoryController.java | 37 ++++++--- .../memory/IndexMemoryControllerTests.java | 80 +++++++++++++++++++ 3 files changed, 132 insertions(+), 33 deletions(-) create mode 100644 src/test/java/org/elasticsearch/indices/memory/IndexMemoryControllerTests.java diff --git a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java index 5de9212d52b..c8f0cfd9d87 100644 --- a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java @@ -19,19 +19,7 @@ package org.elasticsearch.index.engine.internal; -import java.io.IOException; -import java.util.*; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - +import com.google.common.collect.Lists; import org.apache.lucene.index.*; import org.apache.lucene.index.IndexWriter.IndexReaderWarmer; import org.apache.lucene.search.IndexSearcher; @@ -40,7 +28,6 @@ import org.apache.lucene.search.SearcherFactory; import org.apache.lucene.search.SearcherManager; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.LockObtainFailedException; -import org.apache.lucene.store.NoLockFactory; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; @@ -53,7 +40,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.lucene.LoggerInfoStream; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.SegmentReaderUtils; @@ -64,7 +50,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.codec.CodecService; @@ -89,7 +74,18 @@ import org.elasticsearch.index.translog.TranslogStreams; import org.elasticsearch.indices.warmer.IndicesWarmer; import org.elasticsearch.indices.warmer.InternalIndicesWarmer; import org.elasticsearch.threadpool.ThreadPool; -import com.google.common.collect.Lists; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * @@ -314,6 +310,11 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin return new TimeValue(1, TimeUnit.SECONDS); } + /** return the current indexing buffer size setting * */ + public ByteSizeValue indexingBufferSize() { + return indexingBufferSize; + } + @Override public void enableGcDeletes(boolean enableGcDeletes) { this.enableGcDeletes = enableGcDeletes; @@ -1566,11 +1567,11 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin @Override public void beforeMerge(OnGoingMerge merge) { - if (numMergesInFlight.incrementAndGet() > maxNumMerges) { - if (isThrottling.getAndSet(true) == false) { - logger.info("now throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges); - } - lock = lockReference; + if (numMergesInFlight.incrementAndGet() > maxNumMerges) { + if (isThrottling.getAndSet(true) == false) { + logger.info("now throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges); + } + lock = lockReference; } } @@ -1588,7 +1589,8 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin private static final class NoOpLock implements Lock { @Override - public void lock() {} + public void lock() { + } @Override public void lockInterruptibly() throws InterruptedException { diff --git a/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java b/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java index 5f2da8ccd35..e081e31e636 100644 --- a/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java +++ b/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java @@ -32,6 +32,7 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineClosedException; import org.elasticsearch.index.engine.FlushNotAllowedEngineException; import org.elasticsearch.index.service.IndexService; +import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.InternalIndexShard; @@ -41,6 +42,7 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.threadpool.ThreadPool; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledFuture; @@ -64,7 +66,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent CAN_UPDATE_INDEX_BUFFER_STATES = EnumSet.of(IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED); + @Inject public IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService) { super(settings); @@ -151,6 +155,14 @@ public class IndexingMemoryController extends AbstractLifecycleComponent() { + @Override + public boolean apply(Object input) { + return ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() <= expectedShardSize && + ((InternalEngine) shard2.engine()).indexingBufferSize().bytes() <= expectedShardSize; + } + }); + + if (!success) { + fail("failed to update shard indexing buffer size. expected [" + expectedShardSize + "] shard1 [" + + ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() + "] shard2 [" + + ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() + "]" + ); + } + + } +}