[ENGINE] Wait until engine is started up when acquireing searcher

Today we have a small window where a searcher can be acquired but the
engine is in the state of starting up. This causes a NPE triggering a
shard failure if we are fast enough. This commit fixes this situation
gracefully.

Closes #7455
This commit is contained in:
Simon Willnauer 2014-08-26 13:54:11 +02:00
parent d7b8d1728e
commit 0676869e6d
2 changed files with 47 additions and 5 deletions

View File

@ -689,12 +689,23 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
@Override @Override
public final Searcher acquireSearcher(String source) throws EngineException { public final Searcher acquireSearcher(String source) throws EngineException {
boolean success = false; boolean success = false;
try {
/* Acquire order here is store -> manager since we need /* Acquire order here is store -> manager since we need
* to make sure that the store is not closed before * to make sure that the store is not closed before
* the searcher is acquired. */ * the searcher is acquired. */
store.incRef(); store.incRef();
final SearcherManager manager = this.searcherManager; try {
SearcherManager manager = this.searcherManager;
if (manager == null) {
ensureOpen();
try (InternalLock _ = this.readLock.acquire()) {
// we might start up right now and the searcherManager is not initialized
// we take the read lock and retry again since write lock is taken
// while start() is called and otherwise the ensureOpen() call will
// barf.
manager = this.searcherManager;
assert manager != null : "SearcherManager is null but shouldn't";
}
}
/* This might throw NPE but that's fine we will run ensureOpen() /* This might throw NPE but that's fine we will run ensureOpen()
* in the catch block and throw the right exception */ * in the catch block and throw the right exception */
final IndexSearcher searcher = manager.acquire(); final IndexSearcher searcher = manager.acquire();
@ -707,6 +718,8 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
manager.release(searcher); manager.release(searcher);
} }
} }
} catch (EngineClosedException ex) {
throw ex;
} catch (Throwable ex) { } catch (Throwable ex) {
ensureOpen(); // throw EngineCloseException here if we are already closed ensureOpen(); // throw EngineCloseException here if we are already closed
logger.error("failed to acquire searcher, source {}", ex, source); logger.error("failed to acquire searcher, source {}", ex, source);

View File

@ -31,6 +31,7 @@ import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexDeletionPolicy; import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.Term; import org.apache.lucene.index.Term;
import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
@ -80,6 +81,7 @@ import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS; import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
@ -321,6 +323,33 @@ public class InternalEngineTests extends ElasticsearchTestCase {
assertThat(segments.get(2).isCompound(), equalTo(true)); assertThat(segments.get(2).isCompound(), equalTo(true));
} }
public void testStartAndAcquireConcurrently() {
ConcurrentMergeSchedulerProvider mergeSchedulerProvider = new ConcurrentMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS));
final Engine engine = createEngine(engineSettingsService, store, createTranslog(), mergeSchedulerProvider);
final AtomicBoolean startPending = new AtomicBoolean(true);
Thread thread = new Thread() {
public void run() {
try {
Thread.yield();
engine.start();
} finally {
startPending.set(false);
}
}
};
thread.start();
while(startPending.get()) {
try {
engine.acquireSearcher("foobar").close();
break;
} catch (EngineClosedException ex) {
// all good
}
}
engine.close();
}
@Test @Test
public void testSegmentsWithMergeFlag() throws Exception { public void testSegmentsWithMergeFlag() throws Exception {