diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 460501c8b52..26808330986 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -43,7 +43,6 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.util.Accountable; import org.apache.lucene.util.Accountables; -import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.index.IndexRequest; @@ -84,6 +83,7 @@ import org.elasticsearch.search.suggest.completion.CompletionStats; import java.io.Closeable; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.file.NoSuchFileException; import java.util.Arrays; import java.util.Base64; @@ -665,14 +665,23 @@ public abstract class Engine implements Closeable { Releasable releasable = store::decRef; try { ReferenceManager referenceManager = getReferenceManager(scope); - Searcher engineSearcher = new Searcher(source, referenceManager.acquire(), - s -> { - try { - referenceManager.release(s); - } finally { - store.decRef(); - } - }, logger); + IndexSearcher acquire = referenceManager.acquire(); + AtomicBoolean released = new AtomicBoolean(false); + Searcher engineSearcher = new Searcher(source, acquire, + () -> { + if (released.compareAndSet(false, true)) { + try { + referenceManager.release(acquire); + } finally { + store.decRef(); + } + } else { + /* In general, searchers should never be released twice or this would break reference counting. There is one rare case + * when it might happen though: when the request and the Reaper thread would both try to release it in a very short + * amount of time, this is why we only log a warning instead of throwing an exception. */ + logger.warn("Searcher was released twice", new IllegalStateException("Double release")); + } + }); releasable = null; // success - hand over the reference to the engine searcher return engineSearcher; } catch (AlreadyClosedException ex) { @@ -1175,69 +1184,51 @@ public abstract class Engine implements Closeable { } } - public static class Searcher implements Releasable { + public static final class Searcher implements Releasable { private final String source; private final IndexSearcher searcher; - private final AtomicBoolean released = new AtomicBoolean(false); - private final Logger logger; - private final IOUtils.IOConsumer onClose; + private final Closeable onClose; - public Searcher(String source, IndexSearcher searcher, Logger logger) { - this(source, searcher, s -> s.getIndexReader().close(), logger); - } - - public Searcher(String source, IndexSearcher searcher, IOUtils.IOConsumer onClose, Logger logger) { + public Searcher(String source, IndexSearcher searcher, Closeable onClose) { this.source = source; this.searcher = searcher; this.onClose = onClose; - this.logger = logger; } /** * The source that caused this searcher to be acquired. */ - public final String source() { + public String source() { return source; } - public final IndexReader reader() { + public IndexReader reader() { return searcher.getIndexReader(); } - public final DirectoryReader getDirectoryReader() { + public DirectoryReader getDirectoryReader() { if (reader() instanceof DirectoryReader) { return (DirectoryReader) reader(); } throw new IllegalStateException("Can't use " + reader().getClass() + " as a directory reader"); } - public final IndexSearcher searcher() { + public IndexSearcher searcher() { return searcher; } @Override public void close() { - if (released.compareAndSet(false, true) == false) { - /* In general, searchers should never be released twice or this would break reference counting. There is one rare case - * when it might happen though: when the request and the Reaper thread would both try to release it in a very short amount - * of time, this is why we only log a warning instead of throwing an exception. - */ - logger.warn("Searcher was released twice", new IllegalStateException("Double release")); - return; - } try { - onClose.accept(searcher()); + onClose.close(); } catch (IOException e) { - throw new IllegalStateException("Cannot close", e); + throw new UncheckedIOException("failed to close", e); } catch (AlreadyClosedException e) { // This means there's a bug somewhere: don't suppress it throw new AssertionError(e); } } - public final Logger getLogger() { - return logger; - } } public abstract static class Operation { diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 55d93203abe..38ba0257491 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -606,7 +606,7 @@ public class InternalEngine extends Engine { // in the case of a already pruned translog generation we might get null here - yet very unlikely TranslogLeafReader reader = new TranslogLeafReader((Translog.Index) operation, engineConfig .getIndexSettings().getIndexVersionCreated()); - return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader), logger), + return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader), reader::close), new VersionsAndSeqNoResolver.DocIdAndVersion(0, ((Translog.Index) operation).version(), reader, 0)); } } catch (IOException e) { @@ -2086,7 +2086,7 @@ public class InternalEngine extends Engine { if (warmer != null) { try { assert searcher.getIndexReader() instanceof ElasticsearchDirectoryReader : "this class needs an ElasticsearchDirectoryReader but got: " + searcher.getIndexReader().getClass(); - warmer.warm(new Searcher("top_reader_warming", searcher, s -> {}, logger)); + warmer.warm(new Searcher("top_reader_warming", searcher, () -> {})); } catch (Exception e) { if (isEngineClosed.get() == false) { logger.warn("failed to prepare/warm", e); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java b/server/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java index bc62f4067b9..1a13586d4da 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java @@ -99,8 +99,9 @@ public class IndexSearcherWrapper { } else { // we close the reader to make sure wrappers can release resources if needed.... // our NonClosingReaderWrapper makes sure that our reader is not closed - return new Engine.Searcher(engineSearcher.source(), indexSearcher, s -> IOUtils.close(s.getIndexReader(), engineSearcher), - engineSearcher.getLogger()); + return new Engine.Searcher(engineSearcher.source(), indexSearcher, () -> + IOUtils.close(indexSearcher.getIndexReader(), // this will close the wrappers excluding the NonClosingReaderWrapper + engineSearcher)); // this will run the closeable on the wrapped engine searcher } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java index a0bf75ddb13..3ba62647b6b 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java @@ -42,6 +42,7 @@ import org.elasticsearch.test.ESTestCase; import java.io.IOException; import java.util.Collections; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; public class IndexSearcherWrapperTests extends ESTestCase { @@ -73,20 +74,20 @@ public class IndexSearcherWrapperTests extends ESTestCase { final int sourceRefCount = open.getRefCount(); final AtomicInteger count = new AtomicInteger(); final AtomicInteger outerCount = new AtomicInteger(); - try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher, s -> {}, logger)) { - final Engine.Searcher wrap = wrapper.wrap(engineSearcher); - assertEquals(1, wrap.reader().getRefCount()); - ElasticsearchDirectoryReader.addReaderCloseListener(wrap.getDirectoryReader(), key -> { - if (key == open.getReaderCacheHelper().getKey()) { - count.incrementAndGet(); - } - outerCount.incrementAndGet(); - }); - assertEquals(0, wrap.searcher().search(new TermQuery(new Term("field", "doc")), 1).totalHits.value); - wrap.close(); - assertFalse("wrapped reader is closed", wrap.reader().tryIncRef()); - assertEquals(sourceRefCount, open.getRefCount()); - } + final AtomicBoolean closeCalled = new AtomicBoolean(false); + final Engine.Searcher wrap = wrapper.wrap(new Engine.Searcher("foo", searcher, () -> closeCalled.set(true))); + assertEquals(1, wrap.reader().getRefCount()); + ElasticsearchDirectoryReader.addReaderCloseListener(wrap.getDirectoryReader(), key -> { + if (key == open.getReaderCacheHelper().getKey()) { + count.incrementAndGet(); + } + outerCount.incrementAndGet(); + }); + assertEquals(0, wrap.searcher().search(new TermQuery(new Term("field", "doc")), 1).totalHits.value); + wrap.close(); + assertFalse("wrapped reader is closed", wrap.reader().tryIncRef()); + assertEquals(sourceRefCount, open.getRefCount()); + assertTrue(closeCalled.get()); assertEquals(1, closeCalls.get()); IOUtils.close(open, writer, dir); @@ -121,15 +122,15 @@ public class IndexSearcherWrapperTests extends ESTestCase { } }; final ConcurrentHashMap cache = new ConcurrentHashMap<>(); - try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher, s -> {}, logger)) { - try (Engine.Searcher wrap = wrapper.wrap(engineSearcher)) { - ElasticsearchDirectoryReader.addReaderCloseListener(wrap.getDirectoryReader(), key -> { - cache.remove(key); - }); - TopDocs search = wrap.searcher().search(new TermQuery(new Term("field", "doc")), 1); - cache.put(wrap.reader().getReaderCacheHelper().getKey(), search); - } + AtomicBoolean closeCalled = new AtomicBoolean(false); + try (Engine.Searcher wrap = wrapper.wrap(new Engine.Searcher("foo", searcher, () -> closeCalled.set(true)))) { + ElasticsearchDirectoryReader.addReaderCloseListener(wrap.getDirectoryReader(), key -> { + cache.remove(key); + }); + TopDocs search = wrap.searcher().search(new TermQuery(new Term("field", "doc")), 1); + cache.put(wrap.reader().getReaderCacheHelper().getKey(), search); } + assertTrue(closeCalled.get()); assertEquals(1, closeCalls.get()); assertEquals(1, cache.size()); @@ -151,11 +152,11 @@ public class IndexSearcherWrapperTests extends ESTestCase { assertEquals(1, searcher.search(new TermQuery(new Term("field", "doc")), 1).totalHits.value); searcher.setSimilarity(iwc.getSimilarity()); IndexSearcherWrapper wrapper = new IndexSearcherWrapper(); - try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher, logger)) { + try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher, open::close)) { final Engine.Searcher wrap = wrapper.wrap(engineSearcher); assertSame(wrap, engineSearcher); } - IOUtils.close(open, writer, dir); + IOUtils.close(writer, dir); } private static class FieldMaskingReader extends FilterDirectoryReader { diff --git a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java index 22e16342b44..4b86ff668c0 100644 --- a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java +++ b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java @@ -110,7 +110,7 @@ public class DefaultSearchContextTests extends ESTestCase { try (Directory dir = newDirectory(); RandomIndexWriter w = new RandomIndexWriter(random(), dir); IndexReader reader = w.getReader(); - Engine.Searcher searcher = new Engine.Searcher("test", new IndexSearcher(reader), logger)) { + Engine.Searcher searcher = new Engine.Searcher("test", new IndexSearcher(reader), reader::close)) { DefaultSearchContext context1 = new DefaultSearchContext(1L, shardSearchRequest, null, searcher, null, indexService, indexShard, bigArrays, null, timeout, null, null, Version.CURRENT); diff --git a/server/src/test/java/org/elasticsearch/search/profile/query/QueryProfilerTests.java b/server/src/test/java/org/elasticsearch/search/profile/query/QueryProfilerTests.java index b7a9c8cb69a..78a8049d741 100644 --- a/server/src/test/java/org/elasticsearch/search/profile/query/QueryProfilerTests.java +++ b/server/src/test/java/org/elasticsearch/search/profile/query/QueryProfilerTests.java @@ -363,7 +363,7 @@ public class QueryProfilerTests extends ESTestCase { public void testApproximations() throws IOException { QueryProfiler profiler = new QueryProfiler(); - Engine.Searcher engineSearcher = new Engine.Searcher("test", new IndexSearcher(reader), logger); + Engine.Searcher engineSearcher = new Engine.Searcher("test", new IndexSearcher(reader), reader::close); // disable query caching since we want to test approximations, which won't // be exposed on a cached entry ContextIndexSearcher searcher = new ContextIndexSearcher(engineSearcher, null, MAYBE_CACHE_POLICY); diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 6f9c46b4dc4..9ed246ae276 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -240,7 +240,7 @@ public abstract class AggregatorTestCase extends ESTestCase { } protected SearchContext createSearchContext(IndexSearcher indexSearcher, IndexSettings indexSettings) { - Engine.Searcher searcher = new Engine.Searcher("aggregator_test", indexSearcher, logger); + Engine.Searcher searcher = new Engine.Searcher("aggregator_test", indexSearcher, () -> indexSearcher.getIndexReader().close()); QueryCache queryCache = new DisabledQueryCache(indexSettings); QueryCachingPolicy queryCachingPolicy = new QueryCachingPolicy() { @Override @@ -248,7 +248,7 @@ public abstract class AggregatorTestCase extends ESTestCase { } @Override - public boolean shouldCache(Query query) throws IOException { + public boolean shouldCache(Query query) { // never cache a query return false; } diff --git a/test/framework/src/main/java/org/elasticsearch/test/engine/AssertingSearcher.java b/test/framework/src/main/java/org/elasticsearch/test/engine/AssertingSearcher.java deleted file mode 100644 index 0dbdaa55e33..00000000000 --- a/test/framework/src/main/java/org/elasticsearch/test/engine/AssertingSearcher.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.test.engine; - -import org.apache.logging.log4j.Logger; -import org.apache.lucene.search.IndexSearcher; -import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.shard.ShardId; - -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * A searcher that asserts the IndexReader's refcount on close - */ -class AssertingSearcher extends Engine.Searcher { - private final Engine.Searcher wrappedSearcher; - private final ShardId shardId; - private RuntimeException firstReleaseStack; - private final Object lock = new Object(); - private final int initialRefCount; - private final Logger logger; - private final AtomicBoolean closed = new AtomicBoolean(false); - - AssertingSearcher(IndexSearcher indexSearcher, final Engine.Searcher wrappedSearcher, ShardId shardId, Logger logger) { - super(wrappedSearcher.source(), indexSearcher, s -> {throw new AssertionError();}, logger); - // we only use the given index searcher here instead of the IS of the wrapped searcher. the IS might be a wrapped searcher - // with a wrapped reader. - this.wrappedSearcher = wrappedSearcher; - this.logger = logger; - this.shardId = shardId; - initialRefCount = wrappedSearcher.reader().getRefCount(); - assert initialRefCount > 0 : - "IndexReader#getRefCount() was [" + initialRefCount + "] expected a value > [0] - reader is already closed"; - } - - @Override - public void close() { - synchronized (lock) { - if (closed.compareAndSet(false, true)) { - firstReleaseStack = new RuntimeException(); - final int refCount = wrappedSearcher.reader().getRefCount(); - /* - * this assert seems to be paranoid but given LUCENE-5362 we - * better add some assertions here to make sure we catch any - * potential problems. - */ - assert refCount > 0 : "IndexReader#getRefCount() was [" + refCount + "] expected a value > [0] - reader is already " - + " closed. Initial refCount was: [" + initialRefCount + "]"; - try { - wrappedSearcher.close(); - } catch (RuntimeException ex) { - logger.debug("Failed to release searcher", ex); - throw ex; - } - } else { - AssertionError error = new AssertionError("Released Searcher more than once, source [" + wrappedSearcher.source() + "]"); - error.initCause(firstReleaseStack); - throw error; - } - } - } - - public ShardId shardId() { - return shardId; - } - - public boolean isOpen() { - return closed.get() == false; - } -} diff --git a/test/framework/src/main/java/org/elasticsearch/test/engine/MockEngineSupport.java b/test/framework/src/main/java/org/elasticsearch/test/engine/MockEngineSupport.java index fbe64ca6eb9..182038d5b0a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/engine/MockEngineSupport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/engine/MockEngineSupport.java @@ -37,7 +37,6 @@ import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.engine.MockInternalEngine; import java.io.Closeable; import java.io.IOException; @@ -71,7 +70,7 @@ public final class MockEngineSupport { private final ShardId shardId; private final QueryCache filterCache; private final QueryCachingPolicy filterCachingPolicy; - private final SearcherCloseable searcherCloseable; + private final InFlightSearchers inFlightSearchers; private final MockContext mockContext; private final boolean disableFlushOnClose; @@ -107,8 +106,8 @@ public final class MockEngineSupport { logger.trace("Using [{}] for shard [{}] seed: [{}] wrapReader: [{}]", this.getClass().getName(), shardId, seed, wrapReader); } mockContext = new MockContext(random, wrapReader, wrapper, settings); - this.searcherCloseable = new SearcherCloseable(); - LuceneTestCase.closeAfterSuite(searcherCloseable); // only one suite closeable per Engine + this.inFlightSearchers = new InFlightSearchers(); + LuceneTestCase.closeAfterSuite(inFlightSearchers); // only one suite closeable per Engine this.disableFlushOnClose = DISABLE_FLUSH_ON_CLOSE.get(settings); } @@ -188,7 +187,7 @@ public final class MockEngineSupport { } - public Engine.Searcher wrapSearcher(String source, Engine.Searcher engineSearcher) { + public Engine.Searcher wrapSearcher(Engine.Searcher engineSearcher) { final AssertingIndexSearcher assertingIndexSearcher = newSearcher(engineSearcher); assertingIndexSearcher.setSimilarity(engineSearcher.searcher().getSimilarity()); /* @@ -199,26 +198,16 @@ public final class MockEngineSupport { * early. - good news, stuff will fail all over the place if we don't * get this right here */ - AssertingSearcher assertingSearcher = new AssertingSearcher(assertingIndexSearcher, engineSearcher, shardId, logger) { - @Override - public void close() { - try { - searcherCloseable.remove(this); - } finally { - super.close(); - } - } - }; - searcherCloseable.add(assertingSearcher, engineSearcher.source()); - return assertingSearcher; + SearcherCloseable closeable = new SearcherCloseable(engineSearcher, logger, inFlightSearchers); + return new Engine.Searcher(engineSearcher.source(), assertingIndexSearcher, closeable); } - private static final class SearcherCloseable implements Closeable { + private static final class InFlightSearchers implements Closeable { - private final IdentityHashMap openSearchers = new IdentityHashMap<>(); + private final IdentityHashMap openSearchers = new IdentityHashMap<>(); @Override - public synchronized void close() throws IOException { + public synchronized void close() { if (openSearchers.isEmpty() == false) { AssertionError error = new AssertionError("Unreleased searchers found"); for (RuntimeException ex : openSearchers.values()) { @@ -228,15 +217,66 @@ public final class MockEngineSupport { } } - void add(AssertingSearcher searcher, String source) { + void add(Object key, String source) { final RuntimeException ex = new RuntimeException("Unreleased Searcher, source [" + source+ "]"); synchronized (this) { - openSearchers.put(searcher, ex); + openSearchers.put(key, ex); } } - synchronized void remove(AssertingSearcher searcher) { - openSearchers.remove(searcher); + synchronized void remove(Object key) { + openSearchers.remove(key); + } + } + + private static final class SearcherCloseable implements Closeable { + private final Engine.Searcher wrappedSearcher; + private final InFlightSearchers inFlightSearchers; + private RuntimeException firstReleaseStack; + private final Object lock = new Object(); + private final int initialRefCount; + private final Logger logger; + private final AtomicBoolean closed = new AtomicBoolean(false); + + SearcherCloseable(final Engine.Searcher wrappedSearcher, Logger logger, InFlightSearchers inFlightSearchers) { + // we only use the given index searcher here instead of the IS of the wrapped searcher. the IS might be a wrapped searcher + // with a wrapped reader. + this.wrappedSearcher = wrappedSearcher; + this.logger = logger; + initialRefCount = wrappedSearcher.reader().getRefCount(); + this.inFlightSearchers = inFlightSearchers; + assert initialRefCount > 0 : + "IndexReader#getRefCount() was [" + initialRefCount + "] expected a value > [0] - reader is already closed"; + inFlightSearchers.add(this, wrappedSearcher.source()); + } + + @Override + public void close() { + synchronized (lock) { + if (closed.compareAndSet(false, true)) { + inFlightSearchers.remove(this); + firstReleaseStack = new RuntimeException(); + final int refCount = wrappedSearcher.reader().getRefCount(); + /* + * this assert seems to be paranoid but given LUCENE-5362 we + * better add some assertions here to make sure we catch any + * potential problems. + */ + assert refCount > 0 : "IndexReader#getRefCount() was [" + refCount + "] expected a value > [0] - reader is already " + + " closed. Initial refCount was: [" + initialRefCount + "]"; + try { + wrappedSearcher.close(); + } catch (RuntimeException ex) { + logger.debug("Failed to release searcher", ex); + throw ex; + } + } else { + AssertionError error = new AssertionError("Released Searcher more than once, source [" + wrappedSearcher.source() + + "]"); + error.initCause(firstReleaseStack); + throw error; + } + } } } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java b/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java index 35aedb67acc..b255f3bc6ab 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java +++ b/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java @@ -78,6 +78,6 @@ final class MockInternalEngine extends InternalEngine { @Override public Searcher acquireSearcher(String source, SearcherScope scope) { final Searcher engineSearcher = super.acquireSearcher(source, scope); - return support().wrapSearcher(source, engineSearcher); + return support().wrapSearcher(engineSearcher); } }