diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index d70603e0370..c45517e9567 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -43,6 +43,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.util.Accountable; import org.apache.lucene.util.Accountables; +import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.index.IndexRequest; @@ -663,7 +664,15 @@ public abstract class Engine implements Closeable { } Releasable releasable = store::decRef; try { - EngineSearcher engineSearcher = new EngineSearcher(source, getReferenceManager(scope), store, logger); + ReferenceManager referenceManager = getReferenceManager(scope); + Searcher engineSearcher = new Searcher(source, referenceManager.acquire(), + s -> { + try { + referenceManager.release(s); + } finally { + store.decRef(); + } + }, logger); releasable = null; // success - hand over the reference to the engine searcher return engineSearcher; } catch (AlreadyClosedException ex) { @@ -1167,40 +1176,67 @@ public abstract class Engine implements Closeable { } public static class Searcher implements Releasable { - private final String source; private final IndexSearcher searcher; + private final AtomicBoolean released = new AtomicBoolean(false); + private final Logger logger; + private final IOUtils.IOConsumer onClose; - public Searcher(String source, IndexSearcher searcher) { + public Searcher(String source, IndexSearcher searcher, Logger logger) { + this(source, searcher, s -> s.getIndexReader().close(), logger); + } + + public Searcher(String source, IndexSearcher searcher, IOUtils.IOConsumer onClose, Logger logger) { this.source = source; this.searcher = searcher; + this.onClose = onClose; + this.logger = logger; } /** * The source that caused this searcher to be acquired. */ - public String source() { + public final String source() { return source; } - public IndexReader reader() { + public final IndexReader reader() { return searcher.getIndexReader(); } - public DirectoryReader getDirectoryReader() { + public final DirectoryReader getDirectoryReader() { if (reader() instanceof DirectoryReader) { return (DirectoryReader) reader(); } throw new IllegalStateException("Can't use " + reader().getClass() + " as a directory reader"); } - public IndexSearcher searcher() { + public final IndexSearcher searcher() { return searcher; } @Override public void close() { - // Nothing to close here + if (released.compareAndSet(false, true) == false) { + /* In general, searchers should never be released twice or this would break reference counting. There is one rare case + * when it might happen though: when the request and the Reaper thread would both try to release it in a very short amount + * of time, this is why we only log a warning instead of throwing an exception. + */ + logger.warn("Searcher was released twice", new IllegalStateException("Double release")); + return; + } + try { + onClose.accept(searcher()); + } catch (IOException e) { + throw new IllegalStateException("Cannot close", e); + } catch (AlreadyClosedException e) { + // This means there's a bug somewhere: don't suppress it + throw new AssertionError(e); + } + } + + public final Logger getLogger() { + return logger; } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineSearcher.java b/server/src/main/java/org/elasticsearch/index/engine/EngineSearcher.java deleted file mode 100644 index 7fd0fe6cc39..00000000000 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineSearcher.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.engine; - -import org.apache.logging.log4j.Logger; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.ReferenceManager; -import org.apache.lucene.store.AlreadyClosedException; -import org.elasticsearch.index.store.Store; - -import java.io.IOException; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * Searcher for an Engine - */ -final class EngineSearcher extends Engine.Searcher { - private final AtomicBoolean released = new AtomicBoolean(false); - private final Store store; - private final Logger logger; - private final ReferenceManager referenceManager; - - EngineSearcher(String source, ReferenceManager searcherReferenceManager, Store store, Logger logger) throws IOException { - super(source, searcherReferenceManager.acquire()); - this.store = store; - this.logger = logger; - this.referenceManager = searcherReferenceManager; - } - - @Override - public void close() { - if (!released.compareAndSet(false, true)) { - /* In general, searchers should never be released twice or this would break reference counting. There is one rare case - * when it might happen though: when the request and the Reaper thread would both try to release it in a very short amount - * of time, this is why we only log a warning instead of throwing an exception. - */ - logger.warn("Searcher was released twice", new IllegalStateException("Double release")); - return; - } - try { - referenceManager.release(searcher()); - } catch (IOException e) { - throw new IllegalStateException("Cannot close", e); - } catch (AlreadyClosedException e) { - // This means there's a bug somewhere: don't suppress it - throw new AssertionError(e); - } finally { - store.decRef(); - } - } -} diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 5f770e0d93b..85e7cebcb88 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -606,7 +606,7 @@ public class InternalEngine extends Engine { // in the case of a already pruned translog generation we might get null here - yet very unlikely TranslogLeafReader reader = new TranslogLeafReader((Translog.Index) operation, engineConfig .getIndexSettings().getIndexVersionCreated()); - return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader)), + return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader), logger), new VersionsAndSeqNoResolver.DocIdAndVersion(0, ((Translog.Index) operation).version(), reader, 0)); } } catch (IOException e) { @@ -2085,7 +2085,7 @@ public class InternalEngine extends Engine { if (warmer != null) { try { assert searcher.getIndexReader() instanceof ElasticsearchDirectoryReader : "this class needs an ElasticsearchDirectoryReader but got: " + searcher.getIndexReader().getClass(); - warmer.warm(new Searcher("top_reader_warming", searcher)); + warmer.warm(new Searcher("top_reader_warming", searcher, s -> {}, logger)); } catch (Exception e) { if (isEngineClosed.get() == false) { logger.warn("failed to prepare/warm", e); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java b/server/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java index a6949c05597..bc62f4067b9 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java @@ -24,8 +24,8 @@ import org.apache.lucene.index.FilterDirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReader; import org.apache.lucene.search.IndexSearcher; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.engine.Engine; import java.io.IOException; @@ -97,21 +97,10 @@ public class IndexSearcherWrapper { if (reader == nonClosingReaderWrapper && indexSearcher == innerIndexSearcher) { return engineSearcher; } else { - return new Engine.Searcher(engineSearcher.source(), indexSearcher) { - @Override - public void close() throws ElasticsearchException { - try { - reader().close(); - // we close the reader to make sure wrappers can release resources if needed.... - // our NonClosingReaderWrapper makes sure that our reader is not closed - } catch (IOException e) { - throw new ElasticsearchException("failed to close reader", e); - } finally { - engineSearcher.close(); - } - - } - }; + // we close the reader to make sure wrappers can release resources if needed.... + // our NonClosingReaderWrapper makes sure that our reader is not closed + return new Engine.Searcher(engineSearcher.source(), indexSearcher, s -> IOUtils.close(s.getIndexReader(), engineSearcher), + engineSearcher.getLogger()); } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java index e9f52d7c319..a0bf75ddb13 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java @@ -73,7 +73,7 @@ public class IndexSearcherWrapperTests extends ESTestCase { final int sourceRefCount = open.getRefCount(); final AtomicInteger count = new AtomicInteger(); final AtomicInteger outerCount = new AtomicInteger(); - try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) { + try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher, s -> {}, logger)) { final Engine.Searcher wrap = wrapper.wrap(engineSearcher); assertEquals(1, wrap.reader().getRefCount()); ElasticsearchDirectoryReader.addReaderCloseListener(wrap.getDirectoryReader(), key -> { @@ -121,7 +121,7 @@ public class IndexSearcherWrapperTests extends ESTestCase { } }; final ConcurrentHashMap cache = new ConcurrentHashMap<>(); - try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) { + try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher, s -> {}, logger)) { try (Engine.Searcher wrap = wrapper.wrap(engineSearcher)) { ElasticsearchDirectoryReader.addReaderCloseListener(wrap.getDirectoryReader(), key -> { cache.remove(key); @@ -151,7 +151,7 @@ public class IndexSearcherWrapperTests extends ESTestCase { assertEquals(1, searcher.search(new TermQuery(new Term("field", "doc")), 1).totalHits.value); searcher.setSimilarity(iwc.getSimilarity()); IndexSearcherWrapper wrapper = new IndexSearcherWrapper(); - try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) { + try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher, logger)) { final Engine.Searcher wrap = wrapper.wrap(engineSearcher); assertSame(wrap, engineSearcher); } diff --git a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java index f59cc85c09c..22e16342b44 100644 --- a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java +++ b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java @@ -110,7 +110,7 @@ public class DefaultSearchContextTests extends ESTestCase { try (Directory dir = newDirectory(); RandomIndexWriter w = new RandomIndexWriter(random(), dir); IndexReader reader = w.getReader(); - Engine.Searcher searcher = new Engine.Searcher("test", new IndexSearcher(reader))) { + Engine.Searcher searcher = new Engine.Searcher("test", new IndexSearcher(reader), logger)) { DefaultSearchContext context1 = new DefaultSearchContext(1L, shardSearchRequest, null, searcher, null, indexService, indexShard, bigArrays, null, timeout, null, null, Version.CURRENT); diff --git a/server/src/test/java/org/elasticsearch/search/profile/query/QueryProfilerTests.java b/server/src/test/java/org/elasticsearch/search/profile/query/QueryProfilerTests.java index ba58a79953b..b7a9c8cb69a 100644 --- a/server/src/test/java/org/elasticsearch/search/profile/query/QueryProfilerTests.java +++ b/server/src/test/java/org/elasticsearch/search/profile/query/QueryProfilerTests.java @@ -82,7 +82,7 @@ public class QueryProfilerTests extends ESTestCase { } reader = w.getReader(); w.close(); - Engine.Searcher engineSearcher = new Engine.Searcher("test", new IndexSearcher(reader)); + Engine.Searcher engineSearcher = new Engine.Searcher("test", new IndexSearcher(reader), null); searcher = new ContextIndexSearcher(engineSearcher, IndexSearcher.getDefaultQueryCache(), MAYBE_CACHE_POLICY); } @@ -363,7 +363,7 @@ public class QueryProfilerTests extends ESTestCase { public void testApproximations() throws IOException { QueryProfiler profiler = new QueryProfiler(); - Engine.Searcher engineSearcher = new Engine.Searcher("test", new IndexSearcher(reader)); + Engine.Searcher engineSearcher = new Engine.Searcher("test", new IndexSearcher(reader), logger); // disable query caching since we want to test approximations, which won't // be exposed on a cached entry ContextIndexSearcher searcher = new ContextIndexSearcher(engineSearcher, null, MAYBE_CACHE_POLICY); diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 17202839a65..6f9c46b4dc4 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -240,7 +240,7 @@ public abstract class AggregatorTestCase extends ESTestCase { } protected SearchContext createSearchContext(IndexSearcher indexSearcher, IndexSettings indexSettings) { - Engine.Searcher searcher = new Engine.Searcher("aggregator_test", indexSearcher); + Engine.Searcher searcher = new Engine.Searcher("aggregator_test", indexSearcher, logger); QueryCache queryCache = new DisabledQueryCache(indexSettings); QueryCachingPolicy queryCachingPolicy = new QueryCachingPolicy() { @Override diff --git a/test/framework/src/main/java/org/elasticsearch/test/engine/AssertingSearcher.java b/test/framework/src/main/java/org/elasticsearch/test/engine/AssertingSearcher.java index cf2d69e36d5..0dbdaa55e33 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/engine/AssertingSearcher.java +++ b/test/framework/src/main/java/org/elasticsearch/test/engine/AssertingSearcher.java @@ -38,10 +38,8 @@ class AssertingSearcher extends Engine.Searcher { private final Logger logger; private final AtomicBoolean closed = new AtomicBoolean(false); - AssertingSearcher(IndexSearcher indexSearcher, final Engine.Searcher wrappedSearcher, - ShardId shardId, - Logger logger) { - super(wrappedSearcher.source(), indexSearcher); + AssertingSearcher(IndexSearcher indexSearcher, final Engine.Searcher wrappedSearcher, ShardId shardId, Logger logger) { + super(wrappedSearcher.source(), indexSearcher, s -> {throw new AssertionError();}, logger); // we only use the given index searcher here instead of the IS of the wrapped searcher. the IS might be a wrapped searcher // with a wrapped reader. this.wrappedSearcher = wrappedSearcher; @@ -52,11 +50,6 @@ class AssertingSearcher extends Engine.Searcher { "IndexReader#getRefCount() was [" + initialRefCount + "] expected a value > [0] - reader is already closed"; } - @Override - public String source() { - return wrappedSearcher.source(); - } - @Override public void close() { synchronized (lock) {