diff --git a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java index 4cd786a353b..ee3664a8ae5 100644 --- a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java @@ -689,12 +689,23 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin @Override public final Searcher acquireSearcher(String source) throws EngineException { boolean success = false; + /* Acquire order here is store -> manager since we need + * to make sure that the store is not closed before + * the searcher is acquired. */ + store.incRef(); try { - /* Acquire order here is store -> manager since we need - * to make sure that the store is not closed before - * the searcher is acquired. */ - store.incRef(); - final SearcherManager manager = this.searcherManager; + SearcherManager manager = this.searcherManager; + if (manager == null) { + ensureOpen(); + try (InternalLock _ = this.readLock.acquire()) { + // we might start up right now and the searcherManager is not initialized + // we take the read lock and retry again since write lock is taken + // while start() is called and otherwise the ensureOpen() call will + // barf. + manager = this.searcherManager; + assert manager != null : "SearcherManager is null but shouldn't"; + } + } /* This might throw NPE but that's fine we will run ensureOpen() * in the catch block and throw the right exception */ final IndexSearcher searcher = manager.acquire(); @@ -707,6 +718,8 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin manager.release(searcher); } } + } catch (EngineClosedException ex) { + throw ex; } catch (Throwable ex) { ensureOpen(); // throw EngineCloseException here if we are already closed logger.error("failed to acquire searcher, source {}", ex, source); diff --git a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java index 612274ba3b3..fa4adff1707 100644 --- a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java +++ b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java @@ -31,6 +31,7 @@ import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexDeletionPolicy; import org.apache.lucene.index.Term; import org.apache.lucene.search.TermQuery; +import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.LuceneTestCase.Slow; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.bytes.BytesArray; @@ -80,6 +81,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS; @@ -321,6 +323,33 @@ public class InternalEngineTests extends ElasticsearchTestCase { assertThat(segments.get(2).isCompound(), equalTo(true)); } + public void testStartAndAcquireConcurrently() { + ConcurrentMergeSchedulerProvider mergeSchedulerProvider = new ConcurrentMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS)); + final Engine engine = createEngine(engineSettingsService, store, createTranslog(), mergeSchedulerProvider); + final AtomicBoolean startPending = new AtomicBoolean(true); + Thread thread = new Thread() { + public void run() { + try { + Thread.yield(); + engine.start(); + } finally { + startPending.set(false); + } + + } + }; + thread.start(); + while(startPending.get()) { + try { + engine.acquireSearcher("foobar").close(); + break; + } catch (EngineClosedException ex) { + // all good + } + } + engine.close(); + } + @Test public void testSegmentsWithMergeFlag() throws Exception {