From 752ae6e2063096b7d2b4bbdf3ef7eb0c87174e0c Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Fri, 9 Mar 2012 22:41:09 +0200 Subject: [PATCH] optimize acquiring search handler to use a search manager, also, creating a ContextIndexSearcher can be optimized if it is created from a searcher --- .../lucene/index/ExtendedIndexSearcher.java | 4 +- .../common/lucene/ReaderSearcherHolder.java | 66 ------- .../lucene/manager/ReferenceManager.java | 181 ++++++++++++++++++ .../lucene/manager/SearcherFactory.java | 56 ++++++ .../lucene/manager/SearcherManager.java | 163 ++++++++++++++++ .../resource/AcquirableResource.java | 57 ------ .../resource/AcquirableResourceFactory.java | 36 ---- .../resource/BlockingAcquirableResource.java | 95 --------- .../NonBlockingAcquirableResource.java | 108 ----------- .../index/engine/robin/RobinEngine.java | 104 ++++------ .../AbstractAcquirableResourceTests.java | 113 ----------- .../BlockingAcquirableResourceTests.java | 35 ---- .../NonBlockingAcquirableResourceTests.java | 35 ---- 13 files changed, 440 insertions(+), 613 deletions(-) delete mode 100644 src/main/java/org/elasticsearch/common/lucene/ReaderSearcherHolder.java create mode 100755 src/main/java/org/elasticsearch/common/lucene/manager/ReferenceManager.java create mode 100644 src/main/java/org/elasticsearch/common/lucene/manager/SearcherFactory.java create mode 100644 src/main/java/org/elasticsearch/common/lucene/manager/SearcherManager.java delete mode 100644 src/main/java/org/elasticsearch/common/util/concurrent/resource/AcquirableResource.java delete mode 100644 src/main/java/org/elasticsearch/common/util/concurrent/resource/AcquirableResourceFactory.java delete mode 100644 src/main/java/org/elasticsearch/common/util/concurrent/resource/BlockingAcquirableResource.java delete mode 100644 src/main/java/org/elasticsearch/common/util/concurrent/resource/NonBlockingAcquirableResource.java delete mode 100644 src/test/java/org/elasticsearch/test/unit/common/util/concurrent/resource/AbstractAcquirableResourceTests.java delete mode 100644 src/test/java/org/elasticsearch/test/unit/common/util/concurrent/resource/BlockingAcquirableResourceTests.java delete mode 100644 src/test/java/org/elasticsearch/test/unit/common/util/concurrent/resource/NonBlockingAcquirableResourceTests.java diff --git a/src/main/java/org/apache/lucene/index/ExtendedIndexSearcher.java b/src/main/java/org/apache/lucene/index/ExtendedIndexSearcher.java index adb84ebd02d..a3bd3846771 100644 --- a/src/main/java/org/apache/lucene/index/ExtendedIndexSearcher.java +++ b/src/main/java/org/apache/lucene/index/ExtendedIndexSearcher.java @@ -26,8 +26,8 @@ import org.apache.lucene.search.IndexSearcher; */ public class ExtendedIndexSearcher extends IndexSearcher { - public ExtendedIndexSearcher(IndexSearcher searcher) { - super(searcher.getIndexReader()); + public ExtendedIndexSearcher(ExtendedIndexSearcher searcher) { + super(searcher.getIndexReader(), searcher.subReaders(), searcher.docStarts()); setSimilarity(searcher.getSimilarity()); } diff --git a/src/main/java/org/elasticsearch/common/lucene/ReaderSearcherHolder.java b/src/main/java/org/elasticsearch/common/lucene/ReaderSearcherHolder.java deleted file mode 100644 index 8d0756d856c..00000000000 --- a/src/main/java/org/elasticsearch/common/lucene/ReaderSearcherHolder.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to ElasticSearch and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. ElasticSearch licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.lucene; - -import org.apache.lucene.index.ExtendedIndexSearcher; -import org.apache.lucene.index.IndexReader; -import org.elasticsearch.ElasticSearchException; -import org.elasticsearch.common.lease.Releasable; - -/** - * A very simple holder for a tuple of reader and searcher. - * - * - */ -public class ReaderSearcherHolder implements Releasable { - - private final ExtendedIndexSearcher indexSearcher; - - public ReaderSearcherHolder(IndexReader indexReader) { - this(new ExtendedIndexSearcher(indexReader)); - } - - public ReaderSearcherHolder(ExtendedIndexSearcher indexSearcher) { - this.indexSearcher = indexSearcher; - } - - public IndexReader reader() { - return indexSearcher.getIndexReader(); - } - - public ExtendedIndexSearcher searcher() { - return indexSearcher; - } - - @Override - public boolean release() throws ElasticSearchException { - try { - indexSearcher.close(); - } catch (Exception e) { - // do nothing - } - try { - indexSearcher.getIndexReader().close(); - } catch (Exception e) { - // do nothing - } - return true; - } -} diff --git a/src/main/java/org/elasticsearch/common/lucene/manager/ReferenceManager.java b/src/main/java/org/elasticsearch/common/lucene/manager/ReferenceManager.java new file mode 100755 index 00000000000..f4e4df5852c --- /dev/null +++ b/src/main/java/org/elasticsearch/common/lucene/manager/ReferenceManager.java @@ -0,0 +1,181 @@ +package org.elasticsearch.common.lucene.manager; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.lucene.store.AlreadyClosedException; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.Semaphore; + +/** + * Utility class to safely share instances of a certain type across multiple + * threads, while periodically refreshing them. This class ensures each + * reference is closed only once all threads have finished using it. It is + * recommended to consult the documentation of {@link ReferenceManager} + * implementations for their {@link #maybeRefresh()} semantics. + * + * @param the concrete type that will be {@link #acquire() acquired} and + * {@link #release(Object) released}. + * @lucene.experimental + */ +// LUCENE MONITOR: 3.6 Remove this once 3.6 is out and use it +public abstract class ReferenceManager implements Closeable { + + private static final String REFERENCE_MANAGER_IS_CLOSED_MSG = "this ReferenceManager is closed"; + + protected volatile G current; + + private final Semaphore reopenLock = new Semaphore(1); + + private void ensureOpen() { + if (current == null) { + throw new AlreadyClosedException(REFERENCE_MANAGER_IS_CLOSED_MSG); + } + } + + private synchronized void swapReference(G newReference) throws IOException { + ensureOpen(); + final G oldReference = current; + current = newReference; + release(oldReference); + } + + /** + * Decrement reference counting on the given reference. + */ + protected abstract void decRef(G reference) throws IOException; + + /** + * Refresh the given reference if needed. Returns {@code null} if no refresh + * was needed, otherwise a new refreshed reference. + */ + protected abstract G refreshIfNeeded(G referenceToRefresh) throws IOException; + + /** + * Try to increment reference counting on the given reference. Return true if + * the operation was successful. + */ + protected abstract boolean tryIncRef(G reference); + + /** + * Obtain the current reference. You must match every call to acquire with one + * call to {@link #release}; it's best to do so in a finally clause, and set + * the reference to {@code null} to prevent accidental usage after it has been + * released. + */ + public final G acquire() { + G ref; + do { + if ((ref = current) == null) { + throw new AlreadyClosedException(REFERENCE_MANAGER_IS_CLOSED_MSG); + } + } while (!tryIncRef(ref)); + return ref; + } + + /** + * Close this ReferenceManager to future {@link #acquire() acquiring}. Any + * references that were previously {@link #acquire() acquired} won't be + * affected, and they should still be {@link #release released} when they are + * not needed anymore. + */ + public final synchronized void close() throws IOException { + if (current != null) { + // make sure we can call this more than once + // closeable javadoc says: + // if this is already closed then invoking this method has no effect. + swapReference(null); + afterClose(); + } + } + + /** + * Called after close(), so subclass can free any resources. + */ + protected void afterClose() throws IOException { + } + + /** + * You must call this, periodically, if you want that {@link #acquire()} will + * return refreshed instances. + *

+ *

+ * Threads: it's fine for more than one thread to call this at once. + * Only the first thread will attempt the refresh; subsequent threads will see + * that another thread is already handling refresh and will return + * immediately. Note that this means if another thread is already refreshing + * then subsequent threads will return right away without waiting for the + * refresh to complete. + *

+ *

+ * If this method returns true it means the calling thread either refreshed + * or that there were no changes to refresh. If it returns false it means another + * thread is currently refreshing. + */ + public final boolean maybeRefresh() throws IOException { + ensureOpen(); + + // Ensure only 1 thread does reopen at once; other threads just return immediately: + final boolean doTryRefresh = reopenLock.tryAcquire(); + if (doTryRefresh) { + try { + final G reference = acquire(); + try { + G newReference = refreshIfNeeded(reference); + if (newReference != null) { + assert newReference != reference : "refreshIfNeeded should return null if refresh wasn't needed"; + boolean success = false; + try { + swapReference(newReference); + success = true; + } finally { + if (!success) { + release(newReference); + } + } + } + } finally { + release(reference); + } + afterRefresh(); + } finally { + reopenLock.release(); + } + } + + return doTryRefresh; + } + + /** + * Called after swapReference has installed a new + * instance. + */ + protected void afterRefresh() throws IOException { + } + + /** + * Release the refernce previously obtained via {@link #acquire()}. + *

+ * NOTE: it's safe to call this after {@link #close()}. + */ + public final void release(G reference) throws IOException { + assert reference != null; + decRef(reference); + } +} diff --git a/src/main/java/org/elasticsearch/common/lucene/manager/SearcherFactory.java b/src/main/java/org/elasticsearch/common/lucene/manager/SearcherFactory.java new file mode 100644 index 00000000000..c58249d19dc --- /dev/null +++ b/src/main/java/org/elasticsearch/common/lucene/manager/SearcherFactory.java @@ -0,0 +1,56 @@ +package org.elasticsearch.common.lucene.manager; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.search.IndexSearcher; + +import java.io.IOException; + +/** + * Factory class used by {@link org.apache.lucene.search.SearcherManager} and {@link org.apache.lucene.search.NRTManager} to + * create new IndexSearchers. The default implementation just creates + * an IndexSearcher with no custom behavior: + *

+ *

+ * public IndexSearcher newSearcher(IndexReader r) throws IOException {
+ * return new IndexSearcher(r);
+ * }
+ * 
+ *

+ * You can pass your own factory instead if you want custom behavior, such as: + *

    + *
  • Setting a custom scoring model: {@link org.apache.lucene.search.IndexSearcher#setSimilarity(org.apache.lucene.search.Similarity)} + *
  • Parallel per-segment search: {@link org.apache.lucene.search.IndexSearcher#IndexSearcher(org.apache.lucene.index.IndexReader, java.util.concurrent.ExecutorService)} + *
  • Return custom subclasses of IndexSearcher (for example that implement distributed scoring) + *
  • Run queries to warm your IndexSearcher before it is used. Note: when using near-realtime search + * you may want to also {@link org.apache.lucene.index.IndexWriterConfig#setMergedSegmentWarmer(org.apache.lucene.index.IndexWriter.IndexReaderWarmer)} to warm + * newly merged segments in the background, outside of the reopen path. + *
+ * + * @lucene.experimental + */ +// LUCENE MONITOR: 3.6 Remove this once 3.6 is out and use it +public class SearcherFactory { + /** + * Returns a new IndexSearcher over the given reader. + */ + public IndexSearcher newSearcher(IndexReader reader) throws IOException { + return new IndexSearcher(reader); + } +} diff --git a/src/main/java/org/elasticsearch/common/lucene/manager/SearcherManager.java b/src/main/java/org/elasticsearch/common/lucene/manager/SearcherManager.java new file mode 100644 index 00000000000..98ae8f3680f --- /dev/null +++ b/src/main/java/org/elasticsearch/common/lucene/manager/SearcherManager.java @@ -0,0 +1,163 @@ +package org.elasticsearch.common.lucene.manager; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.store.Directory; + +import java.io.IOException; + +/** + * Utility class to safely share {@link org.apache.lucene.search.IndexSearcher} instances across multiple + * threads, while periodically reopening. This class ensures each searcher is + * closed only once all threads have finished using it. + *

+ *

+ * Use {@link #acquire} to obtain the current searcher, and {@link #release} to + * release it, like this: + *

+ *

+ * IndexSearcher s = manager.acquire();
+ * try {
+ * // Do searching, doc retrieval, etc. with s
+ * } finally {
+ * manager.release(s);
+ * }
+ * // Do not use s after this!
+ * s = null;
+ * 
+ *

+ *

+ * In addition you should periodically call {@link #maybeRefresh}. While it's + * possible to call this just before running each query, this is discouraged + * since it penalizes the unlucky queries that do the reopen. It's better to use + * a separate background thread, that periodically calls maybeReopen. Finally, + * be sure to call {@link #close} once you are done. + * + * @lucene.experimental + * @see SearcherFactory + */ +// LUCENE MONITOR: 3.6 Remove this once 3.6 is out and use it +public final class SearcherManager extends ReferenceManager { + + private final SearcherFactory searcherFactory; + + /** + * Creates and returns a new SearcherManager from the given {@link org.apache.lucene.index.IndexWriter}. + * + * @param writer the IndexWriter to open the IndexReader from. + * @param applyAllDeletes If true, all buffered deletes will + * be applied (made visible) in the {@link org.apache.lucene.search.IndexSearcher} / {@link org.apache.lucene.index.IndexReader}. + * If false, the deletes may or may not be applied, but remain buffered + * (in IndexWriter) so that they will be applied in the future. + * Applying deletes can be costly, so if your app can tolerate deleted documents + * being returned you might gain some performance by passing false. + * See {@link org.apache.lucene.index.IndexReader#openIfChanged(org.apache.lucene.index.IndexReader, org.apache.lucene.index.IndexWriter, boolean)}. + * @param searcherFactory An optional {@link SearcherFactory}. Pass + * null if you don't require the searcher to be warmed + * before going live or other custom behavior. + * @throws java.io.IOException + */ + public SearcherManager(IndexWriter writer, boolean applyAllDeletes, SearcherFactory searcherFactory) throws IOException { + if (searcherFactory == null) { + searcherFactory = new SearcherFactory(); + } + this.searcherFactory = searcherFactory; + current = getSearcher(searcherFactory, IndexReader.open(writer, applyAllDeletes)); + } + + /** + * Creates and returns a new SearcherManager from the given {@link org.apache.lucene.store.Directory}. + * + * @param dir the directory to open the DirectoryReader on. + * @param searcherFactory An optional {@link SearcherFactory}. Pass + * null if you don't require the searcher to be warmed + * before going live or other custom behavior. + * @throws java.io.IOException + */ + public SearcherManager(Directory dir, SearcherFactory searcherFactory) throws IOException { + if (searcherFactory == null) { + searcherFactory = new SearcherFactory(); + } + this.searcherFactory = searcherFactory; + current = getSearcher(searcherFactory, IndexReader.open(dir)); + } + + @Override + protected void decRef(IndexSearcher reference) throws IOException { + reference.getIndexReader().decRef(); + } + + @Override + protected IndexSearcher refreshIfNeeded(IndexSearcher referenceToRefresh) throws IOException { + final IndexReader newReader = IndexReader.openIfChanged(referenceToRefresh.getIndexReader()); + if (newReader == null) { + return null; + } else { + return getSearcher(searcherFactory, newReader); + } + } + + @Override + protected boolean tryIncRef(IndexSearcher reference) { + return reference.getIndexReader().tryIncRef(); + } + + /** + * @deprecated see {@link #maybeRefresh()}. + */ + @Deprecated + public boolean maybeReopen() throws IOException { + return maybeRefresh(); + } + + /** + * Returns true if no changes have occured since this searcher + * ie. reader was opened, otherwise false. + * + * @see org.apache.lucene.index.IndexReader#isCurrent() + */ + public boolean isSearcherCurrent() throws IOException { + final IndexSearcher searcher = acquire(); + try { + return searcher.getIndexReader().isCurrent(); + } finally { + release(searcher); + } + } + + // NOTE: decRefs incoming reader on throwing an exception + static IndexSearcher getSearcher(SearcherFactory searcherFactory, IndexReader reader) throws IOException { + boolean success = false; + final IndexSearcher searcher; + try { + searcher = searcherFactory.newSearcher(reader); + if (searcher.getIndexReader() != reader) { + throw new IllegalStateException("SearcherFactory must wrap exactly the provided reader (got " + searcher.getIndexReader() + " but expected " + reader + ")"); + } + success = true; + } finally { + if (!success) { + reader.decRef(); + } + } + return searcher; + } +} diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/resource/AcquirableResource.java b/src/main/java/org/elasticsearch/common/util/concurrent/resource/AcquirableResource.java deleted file mode 100644 index 5121d591564..00000000000 --- a/src/main/java/org/elasticsearch/common/util/concurrent/resource/AcquirableResource.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to ElasticSearch and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. ElasticSearch licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.util.concurrent.resource; - -import org.elasticsearch.common.lease.Releasable; - -/** - * A wrapper around a resource that can be released. Note, release should not be - * called directly on the resource itself. - *

- *

Yea, I now, the fact that the resouce itself is releasable basically means that - * users of this class should take care... . - * - * - */ -public interface AcquirableResource { - - T resource(); - - /** - * Acquires the resource, returning true if it was acquired. - */ - boolean acquire(); - - /** - * Releases the resource, will close it if there are no more acquirers and it is marked for close. - */ - void release(); - - /** - * Marks the resource to be closed. Will close it if there are no current - * acquires. - */ - void markForClose(); - - /** - * Forces the resource to be closed, regardless of the number of acquirers. - */ - void forceClose(); -} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/resource/AcquirableResourceFactory.java b/src/main/java/org/elasticsearch/common/util/concurrent/resource/AcquirableResourceFactory.java deleted file mode 100644 index 6fcc2935316..00000000000 --- a/src/main/java/org/elasticsearch/common/util/concurrent/resource/AcquirableResourceFactory.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to ElasticSearch and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. ElasticSearch licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.util.concurrent.resource; - -import org.elasticsearch.common.lease.Releasable; - -/** - * - */ -public final class AcquirableResourceFactory { - - public static AcquirableResource newAcquirableResource(T resource) { - return new BlockingAcquirableResource(resource); - } - - private AcquirableResourceFactory() { - - } -} diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/resource/BlockingAcquirableResource.java b/src/main/java/org/elasticsearch/common/util/concurrent/resource/BlockingAcquirableResource.java deleted file mode 100644 index e4ed7a459b2..00000000000 --- a/src/main/java/org/elasticsearch/common/util/concurrent/resource/BlockingAcquirableResource.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to ElasticSearch and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. ElasticSearch licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.util.concurrent.resource; - -import org.elasticsearch.common.lease.Releasable; - -/** - * A wrapper around a resource that can be released. Note, release should not be - * called directly on the resource itself. - *

- *

Yea, I now, the fact that the resource itself is releasable basically means that - * users of this class should take care... . - * - * - */ -public class BlockingAcquirableResource implements AcquirableResource { - - private final T resource; - - private int count = 0; - - private boolean markForClose = false; - - private boolean closed; - - public BlockingAcquirableResource(T resource) { - this.resource = resource; - } - - @Override - public T resource() { - return resource; - } - - /** - * Acquires the resource, returning true if it was acquired. - */ - @Override - public synchronized boolean acquire() { - if (markForClose) { - return false; - } - count++; - return true; - } - - /** - * Releases the resource, will close it if there are no more acquirers. - */ - @Override - public synchronized void release() { - count--; - checkIfCanClose(); - } - - /** - * Marks the resource to be closed. Will close it if there are no current - * acquires. - */ - @Override - public synchronized void markForClose() { - markForClose = true; - checkIfCanClose(); - } - - @Override - public void forceClose() { - count = 0; - markForClose(); - } - - private void checkIfCanClose() { - if (markForClose && count <= 0 && !closed) { - closed = true; - resource.release(); - } - } -} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/resource/NonBlockingAcquirableResource.java b/src/main/java/org/elasticsearch/common/util/concurrent/resource/NonBlockingAcquirableResource.java deleted file mode 100644 index 033b67533fc..00000000000 --- a/src/main/java/org/elasticsearch/common/util/concurrent/resource/NonBlockingAcquirableResource.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to ElasticSearch and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. ElasticSearch licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.util.concurrent.resource; - -import org.elasticsearch.common.lease.Releasable; - -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicStampedReference; - -/** - * A wrapper around a resource that can be released. Note, release should not be - * called directly on the resource itself. - *

- *

Yea, I now, the fact that the resource itself is releasable basically means that - * users of this class should take care... . - * - * - */ -public class NonBlockingAcquirableResource implements AcquirableResource { - - private final T resource; - - private AtomicStampedReference counter = new AtomicStampedReference(false, 0); - - private final AtomicBoolean closed = new AtomicBoolean(); - - public NonBlockingAcquirableResource(T resource) { - this.resource = resource; - } - - @Override - public T resource() { - return resource; - } - - @Override - public boolean acquire() { - while (true) { - int stamp = counter.getStamp(); - boolean result = counter.compareAndSet(false, false, stamp, stamp + 1); - if (result) { - return true; - } - if (counter.getReference()) { - return false; - } - } - } - - @Override - public void release() { - while (true) { - boolean currentReference = counter.getReference(); - int stamp = counter.getStamp(); - boolean result = counter.compareAndSet(currentReference, currentReference, stamp, stamp - 1); - if (result) { - if (currentReference && (stamp <= 1)) { - close(); - } - return; - } - } - } - - @Override - public void markForClose() { - while (true) { - int stamp = counter.getStamp(); - boolean result = counter.compareAndSet(false, true, stamp, stamp); - if (result) { - if (stamp <= 0) { - close(); - } - return; - } else if (counter.getReference()) { - return; - } - } - } - - @Override - public void forceClose() { - close(); - } - - private void close() { - if (closed.compareAndSet(false, true)) { - resource.release(); - } - } -} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java index d6491aacb48..ffba421bc17 100644 --- a/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java @@ -21,6 +21,7 @@ package org.elasticsearch.index.engine.robin; import org.apache.lucene.index.*; import org.apache.lucene.search.FilteredQuery; +import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.UnicodeUtil; @@ -32,13 +33,13 @@ import org.elasticsearch.common.bloom.BloomFilter; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lucene.Lucene; -import org.elasticsearch.common.lucene.ReaderSearcherHolder; +import org.elasticsearch.common.lucene.manager.SearcherFactory; +import org.elasticsearch.common.lucene.manager.SearcherManager; import org.elasticsearch.common.lucene.uid.UidField; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.resource.AcquirableResource; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.cache.bloom.BloomCache; @@ -71,7 +72,6 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.elasticsearch.common.lucene.Lucene.safeClose; -import static org.elasticsearch.common.util.concurrent.resource.AcquirableResourceFactory.newAcquirableResource; /** * @@ -118,10 +118,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { private volatile IndexWriter indexWriter; - // TODO LUCENE MONITOR 3.6: Replace this with SearchManager (3.6) once its out, it will not allow for forceClose, but maybe its a good thing... - // in any case, if we want to retain forceClose, we can call release multiple times... - // we won't need AcquirableResource any more as well, and close will not need to replace it with a closeable one - private volatile AcquirableResource nrtResource; + // LUCENE MONITOR: 3.6 Remove using the custom SearchManager and use the Lucene 3.6 one + private final SearcherFactory searcherFactory = new RobinSearchFactory(); + private volatile SearcherManager searcherManager; private volatile boolean closed = false; @@ -263,7 +262,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { indexWriter.commit(MapBuilder.newMapBuilder().put(Translog.TRANSLOG_ID_KEY, Long.toString(translogIdGenerator.get())).map()); } translog.newTranslog(translogIdGenerator.get()); - this.nrtResource = buildNrtResource(indexWriter); + this.searcherManager = buildSearchManager(indexWriter); SegmentInfos infos = new SegmentInfos(); infos.read(store.directory()); lastCommittedSegmentInfos = infos; @@ -712,15 +711,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { @Override public Searcher searcher() throws EngineException { - AcquirableResource holder; - for (; ; ) { - holder = this.nrtResource; - if (holder.acquire()) { - break; - } - Thread.yield(); - } - return new RobinSearchResult(holder); + SearcherManager manager = this.searcherManager; + IndexSearcher searcher = manager.acquire(); + return new RobinSearchResult(searcher, manager); } @Override @@ -748,20 +741,12 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { throw new EngineClosedException(shardId, failedEngine); } try { - // we need to obtain a mutex here, to make sure we don't leave dangling readers - // we could have used an AtomicBoolean#compareAndSet, but, then we might miss refresh requests - // compared to on going ones + // maybeRefresh will only allow one refresh to execute, and the rest will "pass through", + // but, we want to make sure not to loose ant refresh calls, if one is taking time synchronized (refreshMutex) { if (dirty || refresh.force()) { dirty = false; - AcquirableResource current = nrtResource; - IndexReader newReader = IndexReader.openIfChanged(current.resource().reader(), true); - if (newReader != null) { - ExtendedIndexSearcher indexSearcher = new ExtendedIndexSearcher(newReader); - indexSearcher.setSimilarity(similarityService.defaultSearchSimilarity()); - nrtResource = newAcquirableResource(new ReaderSearcherHolder(indexSearcher)); - current.markForClose(); - } + searcherManager.maybeRefresh(); } } } catch (AlreadyClosedException e) { @@ -831,9 +816,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { translog.newTranslog(translogId); } - AcquirableResource current = nrtResource; - nrtResource = buildNrtResource(indexWriter); - current.markForClose(); + SearcherManager current = this.searcherManager; + this.searcherManager = buildSearchManager(indexWriter); + current.close(); refreshVersioningTable(threadPool.estimatedTimeInMillis()); } catch (OutOfMemoryError e) { @@ -1229,10 +1214,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { this.versionMap.clear(); this.failedEngineListeners.clear(); try { - if (nrtResource != null) { - this.nrtResource.forceClose(); - // replace the NRT resource with a closed one, meaning that - this.nrtResource = new ClosedNrtResource(); + if (searcherManager != null) { + searcherManager.close(); } // no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed if (indexWriter != null) { @@ -1366,35 +1349,38 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { } } - private AcquirableResource buildNrtResource(IndexWriter indexWriter) throws IOException { - IndexReader indexReader = IndexReader.open(indexWriter, true); - ExtendedIndexSearcher indexSearcher = new ExtendedIndexSearcher(indexReader); - indexSearcher.setSimilarity(similarityService.defaultSearchSimilarity()); - return newAcquirableResource(new ReaderSearcherHolder(indexSearcher)); + private SearcherManager buildSearchManager(IndexWriter indexWriter) throws IOException { + return new SearcherManager(indexWriter, true, searcherFactory); } static class RobinSearchResult implements Searcher { - private final AcquirableResource nrtHolder; + private final IndexSearcher searcher; + private final SearcherManager manager; - private RobinSearchResult(AcquirableResource nrtHolder) { - this.nrtHolder = nrtHolder; + private RobinSearchResult(IndexSearcher searcher, SearcherManager manager) { + this.searcher = searcher; + this.manager = manager; } @Override public IndexReader reader() { - return nrtHolder.resource().reader(); + return searcher.getIndexReader(); } @Override public ExtendedIndexSearcher searcher() { - return nrtHolder.resource().searcher(); + return (ExtendedIndexSearcher) searcher; } @Override public boolean release() throws ElasticSearchException { - nrtHolder.release(); - return true; + try { + manager.release(searcher); + return true; + } catch (IOException e) { + return false; + } } } @@ -1428,27 +1414,13 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { } } - class ClosedNrtResource implements AcquirableResource { - @Override - public ReaderSearcherHolder resource() { - return null; - } + class RobinSearchFactory extends SearcherFactory { @Override - public boolean acquire() { - throw new EngineClosedException(shardId); - } - - @Override - public void release() { - } - - @Override - public void markForClose() { - } - - @Override - public void forceClose() { + public IndexSearcher newSearcher(IndexReader reader) throws IOException { + ExtendedIndexSearcher searcher = new ExtendedIndexSearcher(reader); + searcher.setSimilarity(similarityService.defaultSearchSimilarity()); + return searcher; } } } diff --git a/src/test/java/org/elasticsearch/test/unit/common/util/concurrent/resource/AbstractAcquirableResourceTests.java b/src/test/java/org/elasticsearch/test/unit/common/util/concurrent/resource/AbstractAcquirableResourceTests.java deleted file mode 100644 index 712f9d164aa..00000000000 --- a/src/test/java/org/elasticsearch/test/unit/common/util/concurrent/resource/AbstractAcquirableResourceTests.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to ElasticSearch and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. ElasticSearch licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.test.unit.common.util.concurrent.resource; - -import org.elasticsearch.ElasticSearchException; -import org.elasticsearch.common.StopWatch; -import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.common.util.concurrent.resource.AcquirableResource; -import org.testng.annotations.Test; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.*; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; - -/** - * - */ -public abstract class AbstractAcquirableResourceTests { - - protected abstract AcquirableResource createInstance(T resource); - - @Test - public void testSimple() throws Exception { - ExecutorService executorService = Executors.newCachedThreadPool(); - - final AcquirableResource acquirableResource = createInstance(new Resource()); - - List results = new ArrayList(); - - - final int cycles = 50; - final int operationsWithinCycle = 100000; - final CyclicBarrier barrier1 = new CyclicBarrier(cycles * 2 + 1); - final CyclicBarrier barrier2 = new CyclicBarrier(cycles * 2 + 1); - - for (int i = 0; i < cycles; i++) { - results.add(executorService.submit(new Callable() { - @Override - public Object call() throws Exception { - barrier1.await(); - barrier2.await(); - for (int j = 0; j < operationsWithinCycle; j++) { - assertThat(acquirableResource.acquire(), equalTo(true)); - } - return null; - } - })); - results.add(executorService.submit(new Callable() { - @Override - public Object call() throws Exception { - barrier1.await(); - barrier2.await(); - for (int j = 0; j < operationsWithinCycle; j++) { - acquirableResource.release(); - } - return null; - } - })); - } - barrier1.await(); - - StopWatch stopWatch = new StopWatch("Acquirable"); - stopWatch.start(); - - barrier2.await(); - - for (Future f : results) { - f.get(); - } - - assertThat(acquirableResource.resource().isReleased(), equalTo(false)); - acquirableResource.markForClose(); - assertThat(acquirableResource.resource().isReleased(), equalTo(true)); - - stopWatch.stop(); - System.out.println("Took: " + stopWatch.shortSummary()); - } - - private static class Resource implements Releasable { - - private volatile boolean released = false; - - @Override - public boolean release() throws ElasticSearchException { - released = true; - return true; - } - - public boolean isReleased() { - return released; - } - } -} diff --git a/src/test/java/org/elasticsearch/test/unit/common/util/concurrent/resource/BlockingAcquirableResourceTests.java b/src/test/java/org/elasticsearch/test/unit/common/util/concurrent/resource/BlockingAcquirableResourceTests.java deleted file mode 100644 index b97718c7a32..00000000000 --- a/src/test/java/org/elasticsearch/test/unit/common/util/concurrent/resource/BlockingAcquirableResourceTests.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to ElasticSearch and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. ElasticSearch licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.test.unit.common.util.concurrent.resource; - -import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.common.util.concurrent.resource.AcquirableResource; -import org.elasticsearch.common.util.concurrent.resource.BlockingAcquirableResource; - -/** - * - */ -public class BlockingAcquirableResourceTests extends AbstractAcquirableResourceTests { - - @Override - protected AcquirableResource createInstance(T resource) { - return new BlockingAcquirableResource(resource); - } -} diff --git a/src/test/java/org/elasticsearch/test/unit/common/util/concurrent/resource/NonBlockingAcquirableResourceTests.java b/src/test/java/org/elasticsearch/test/unit/common/util/concurrent/resource/NonBlockingAcquirableResourceTests.java deleted file mode 100644 index b83bfeab2c0..00000000000 --- a/src/test/java/org/elasticsearch/test/unit/common/util/concurrent/resource/NonBlockingAcquirableResourceTests.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to ElasticSearch and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. ElasticSearch licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.test.unit.common.util.concurrent.resource; - -import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.common.util.concurrent.resource.AcquirableResource; -import org.elasticsearch.common.util.concurrent.resource.NonBlockingAcquirableResource; - -/** - * - */ -public class NonBlockingAcquirableResourceTests extends AbstractAcquirableResourceTests { - - @Override - protected AcquirableResource createInstance(T resource) { - return new NonBlockingAcquirableResource(resource); - } -} \ No newline at end of file