Add ref-counting to SearchContext to prevent accessing already closed readers (#20095)

When a SearchContext is closed it's reader / searcher reference is closed too.
If this happens while a search is accessing it's reader reference it can lead
to an unexpected `AlreadyClosedException` or worst case, an already closed MMapDirectory
is access causing a `SIGSEV` like in #20008 (even though the window for this is very small).

SearchContext can be closed concurrently if:
 * an index is deleted / removed from the node
 * a search context is idle for too long and is cleaned by the reaper
 * an explicit freeContext message is received

This change adds reference counting to the SearchContext base class and it's used
inside SearchService each time the context is accessed.

Closes #20008
This commit is contained in:
Simon Willnauer 2016-08-22 15:41:05 +02:00 committed by GitHub
parent 3d7daa1a5a
commit 29336b231b
3 changed files with 139 additions and 22 deletions

View File

@ -21,7 +21,6 @@ package org.elasticsearch.search;
import com.carrotsearch.hppc.ObjectFloatHashMap;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
@ -107,9 +106,6 @@ import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes;
/**
*
*/
public class SearchService extends AbstractLifecycleComponent implements IndexEventListener {
// we can have 5 minutes here, since we make sure to clean with search requests and when shard/index closes
@ -233,6 +229,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
public DfsSearchResult executeDfsPhase(ShardSearchRequest request) throws IOException {
final SearchContext context = createAndPutContext(request);
context.incRef();
try {
contextProcessing(context);
dfsPhase.execute(context);
@ -262,6 +259,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) throws IOException {
final SearchContext context = createAndPutContext(request);
final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
context.incRef();
try {
operationListener.onPreQueryPhase(context);
long time = System.nanoTime();
@ -295,6 +293,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest request) {
final SearchContext context = findContext(request.id());
SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
context.incRef();
try {
operationListener.onPreQueryPhase(context);
long time = System.nanoTime();
@ -316,11 +315,13 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
public QuerySearchResult executeQueryPhase(QuerySearchRequest request) {
final SearchContext context = findContext(request.id());
contextProcessing(context);
context.searcher().setAggregatedDfs(request.dfs());
IndexShard indexShard = context.indexShard();
SearchOperationListener operationListener = indexShard.getSearchOperationListener();
context.incRef();
try {
contextProcessing(context);
context.searcher().setAggregatedDfs(request.dfs());
operationListener.onPreQueryPhase(context);
long time = System.nanoTime();
queryPhase.execute(context);
@ -354,8 +355,9 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
public QueryFetchSearchResult executeFetchPhase(ShardSearchRequest request) throws IOException {
final SearchContext context = createAndPutContext(request);
contextProcessing(context);
context.incRef();
try {
contextProcessing(context);
SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
operationListener.onPreQueryPhase(context);
long time = System.nanoTime();
@ -393,9 +395,10 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
public QueryFetchSearchResult executeFetchPhase(QuerySearchRequest request) {
final SearchContext context = findContext(request.id());
contextProcessing(context);
context.searcher().setAggregatedDfs(request.dfs());
context.incRef();
try {
contextProcessing(context);
context.searcher().setAggregatedDfs(request.dfs());
SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
operationListener.onPreQueryPhase(context);
long time = System.nanoTime();
@ -433,8 +436,9 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
public ScrollQueryFetchSearchResult executeFetchPhase(InternalScrollSearchRequest request) {
final SearchContext context = findContext(request.id());
contextProcessing(context);
context.incRef();
try {
contextProcessing(context);
SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
processScroll(request, context);
operationListener.onPreQueryPhase(context);
@ -473,9 +477,10 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
public FetchSearchResult executeFetchPhase(ShardFetchRequest request) {
final SearchContext context = findContext(request.id());
contextProcessing(context);
final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
context.incRef();
try {
contextProcessing(context);
if (request.lastEmittedDoc() != null) {
context.scrollContext().lastEmittedDoc = request.lastEmittedDoc();
}
@ -593,6 +598,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
public boolean freeContext(long id) {
final SearchContext context = removeContext(id);
if (context != null) {
assert context.refCount() > 0 : " refCount must be > 0: " + context.refCount();
try {
context.indexShard().getSearchOperationListener().onFreeContext(context);
if (context.scrollContext() != null) {
@ -624,9 +630,13 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
private void cleanContext(SearchContext context) {
assert context == SearchContext.current();
context.clearReleasables(Lifetime.PHASE);
SearchContext.removeCurrent();
try {
assert context == SearchContext.current();
context.clearReleasables(Lifetime.PHASE);
SearchContext.removeCurrent();
} finally {
context.decRef();
}
}
private void processFailure(SearchContext context, Exception e) {

View File

@ -22,7 +22,9 @@ package org.elasticsearch.search.internal;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.Counter;
import org.apache.lucene.util.RefCount;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseFieldMatcher;
@ -30,6 +32,8 @@ import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.RefCounted;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
@ -67,7 +71,18 @@ import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
public abstract class SearchContext implements Releasable {
/**
* This class encapsulates the state needed to execute a search. It holds a reference to the
* shards point in time snapshot (IndexReader / ContextIndexSearcher) and allows passing on
* state from one query / fetch phase to another.
*
* This class also implements {@link RefCounted} since in some situations like in {@link org.elasticsearch.search.SearchService}
* a SearchContext can be closed concurrently due to independent events ie. when an index gets removed. To prevent accessing closed
* IndexReader / IndexSearcher instances the SearchContext can be guarded by a reference count and fail if it's been closed by
* an external event.
*/
// For reference why we use RefCounted here see #20095
public abstract class SearchContext extends AbstractRefCounted implements Releasable {
private static ThreadLocal<SearchContext> current = new ThreadLocal<>();
public static final int DEFAULT_TERMINATE_AFTER = 0;
@ -91,6 +106,7 @@ public abstract class SearchContext implements Releasable {
protected final ParseFieldMatcher parseFieldMatcher;
protected SearchContext(ParseFieldMatcher parseFieldMatcher) {
super("search_context");
this.parseFieldMatcher = parseFieldMatcher;
}
@ -100,17 +116,27 @@ public abstract class SearchContext implements Releasable {
@Override
public final void close() {
if (closed.compareAndSet(false, true)) { // prevent double release
try {
clearReleasables(Lifetime.CONTEXT);
} finally {
doClose();
}
if (closed.compareAndSet(false, true)) { // prevent double closing
decRef();
}
}
private boolean nowInMillisUsed;
@Override
protected final void closeInternal() {
try {
clearReleasables(Lifetime.CONTEXT);
} finally {
doClose();
}
}
@Override
protected void alreadyClosed() {
throw new IllegalStateException("search context is already closed can't increment refCount current count [" + refCount() + "]");
}
protected abstract void doClose();
/**

View File

@ -18,10 +18,15 @@
*/
package org.elasticsearch.search;
import com.carrotsearch.hppc.IntArrayList;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -34,12 +39,19 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.ShardFetchRequest;
import org.elasticsearch.search.internal.ShardSearchLocalRequest;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.test.ESSingleNodeTestCase;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.Collections.singletonList;
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
@ -112,6 +124,75 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
assertEquals(activeRefs, indexShard.store().refCount());
}
public void testSearchWhileIndexDeleted() throws IOException, InterruptedException {
createIndex("index");
client().prepareIndex("index", "type", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
SearchService service = getInstanceFromNode(SearchService.class);
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index"));
IndexShard indexShard = indexService.getShard(0);
AtomicBoolean running = new AtomicBoolean(true);
CountDownLatch startGun = new CountDownLatch(1);
Semaphore semaphore = new Semaphore(Integer.MAX_VALUE);
final Thread thread = new Thread() {
@Override
public void run() {
startGun.countDown();
while(running.get()) {
service.afterIndexDeleted(indexService.index(), indexService.getIndexSettings().getSettings());
if (randomBoolean()) {
// here we trigger some refreshes to ensure the IR go out of scope such that we hit ACE if we access a search
// context in a non-sane way.
try {
semaphore.acquire();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
client().prepareIndex("index", "type").setSource("field", "value")
.setRefreshPolicy(randomFrom(WriteRequest.RefreshPolicy.values())).execute(new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
semaphore.release();
}
@Override
public void onFailure(Exception e) {
semaphore.release();
}
});
}
}
}
};
thread.start();
startGun.await();
try {
final int rounds = scaledRandomIntBetween(100, 10000);
for (int i = 0; i < rounds; i++) {
try {
QuerySearchResultProvider querySearchResultProvider = service.executeQueryPhase(
new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT,
new SearchSourceBuilder(), new String[0], false));
IntArrayList intCursors = new IntArrayList(1);
intCursors.add(0);
ShardFetchRequest req = new ShardFetchRequest(querySearchResultProvider.id(), intCursors, null /* not a scroll */);
service.executeFetchPhase(req);
} catch (AlreadyClosedException ex) {
throw ex;
} catch (IllegalStateException ex) {
assertEquals("search context is already closed can't increment refCount current count [0]", ex.getMessage());
} catch (SearchContextMissingException ex) {
// that's fine
}
}
} finally {
running.set(false);
thread.join();
semaphore.acquire(Integer.MAX_VALUE);
}
}
public static class FailOnRewriteQueryPlugin extends Plugin implements SearchPlugin {
@Override
public List<QuerySpec<?>> getQueries() {