optimize acquiring search handler to use a search manager, also, creating a ContextIndexSearcher can be optimized if it is created from a searcher

This commit is contained in:
Shay Banon 2012-03-09 22:41:09 +02:00
parent 0badf3d92a
commit 752ae6e206
13 changed files with 440 additions and 613 deletions

View File

@ -26,8 +26,8 @@ import org.apache.lucene.search.IndexSearcher;
*/ */
public class ExtendedIndexSearcher extends IndexSearcher { public class ExtendedIndexSearcher extends IndexSearcher {
public ExtendedIndexSearcher(IndexSearcher searcher) { public ExtendedIndexSearcher(ExtendedIndexSearcher searcher) {
super(searcher.getIndexReader()); super(searcher.getIndexReader(), searcher.subReaders(), searcher.docStarts());
setSimilarity(searcher.getSimilarity()); setSimilarity(searcher.getSimilarity());
} }

View File

@ -1,66 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.lucene;
import org.apache.lucene.index.ExtendedIndexSearcher;
import org.apache.lucene.index.IndexReader;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.lease.Releasable;
/**
* A very simple holder for a tuple of reader and searcher.
*
*
*/
public class ReaderSearcherHolder implements Releasable {
private final ExtendedIndexSearcher indexSearcher;
public ReaderSearcherHolder(IndexReader indexReader) {
this(new ExtendedIndexSearcher(indexReader));
}
public ReaderSearcherHolder(ExtendedIndexSearcher indexSearcher) {
this.indexSearcher = indexSearcher;
}
public IndexReader reader() {
return indexSearcher.getIndexReader();
}
public ExtendedIndexSearcher searcher() {
return indexSearcher;
}
@Override
public boolean release() throws ElasticSearchException {
try {
indexSearcher.close();
} catch (Exception e) {
// do nothing
}
try {
indexSearcher.getIndexReader().close();
} catch (Exception e) {
// do nothing
}
return true;
}
}

View File

@ -0,0 +1,181 @@
package org.elasticsearch.common.lucene.manager;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.store.AlreadyClosedException;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.Semaphore;
/**
* Utility class to safely share instances of a certain type across multiple
* threads, while periodically refreshing them. This class ensures each
* reference is closed only once all threads have finished using it. It is
* recommended to consult the documentation of {@link ReferenceManager}
* implementations for their {@link #maybeRefresh()} semantics.
*
* @param <G> the concrete type that will be {@link #acquire() acquired} and
* {@link #release(Object) released}.
* @lucene.experimental
*/
// LUCENE MONITOR: 3.6 Remove this once 3.6 is out and use it
public abstract class ReferenceManager<G> implements Closeable {
private static final String REFERENCE_MANAGER_IS_CLOSED_MSG = "this ReferenceManager is closed";
protected volatile G current;
private final Semaphore reopenLock = new Semaphore(1);
private void ensureOpen() {
if (current == null) {
throw new AlreadyClosedException(REFERENCE_MANAGER_IS_CLOSED_MSG);
}
}
private synchronized void swapReference(G newReference) throws IOException {
ensureOpen();
final G oldReference = current;
current = newReference;
release(oldReference);
}
/**
* Decrement reference counting on the given reference.
*/
protected abstract void decRef(G reference) throws IOException;
/**
* Refresh the given reference if needed. Returns {@code null} if no refresh
* was needed, otherwise a new refreshed reference.
*/
protected abstract G refreshIfNeeded(G referenceToRefresh) throws IOException;
/**
* Try to increment reference counting on the given reference. Return true if
* the operation was successful.
*/
protected abstract boolean tryIncRef(G reference);
/**
* Obtain the current reference. You must match every call to acquire with one
* call to {@link #release}; it's best to do so in a finally clause, and set
* the reference to {@code null} to prevent accidental usage after it has been
* released.
*/
public final G acquire() {
G ref;
do {
if ((ref = current) == null) {
throw new AlreadyClosedException(REFERENCE_MANAGER_IS_CLOSED_MSG);
}
} while (!tryIncRef(ref));
return ref;
}
/**
* Close this ReferenceManager to future {@link #acquire() acquiring}. Any
* references that were previously {@link #acquire() acquired} won't be
* affected, and they should still be {@link #release released} when they are
* not needed anymore.
*/
public final synchronized void close() throws IOException {
if (current != null) {
// make sure we can call this more than once
// closeable javadoc says:
// if this is already closed then invoking this method has no effect.
swapReference(null);
afterClose();
}
}
/**
* Called after close(), so subclass can free any resources.
*/
protected void afterClose() throws IOException {
}
/**
* You must call this, periodically, if you want that {@link #acquire()} will
* return refreshed instances.
* <p/>
* <p/>
* <b>Threads</b>: it's fine for more than one thread to call this at once.
* Only the first thread will attempt the refresh; subsequent threads will see
* that another thread is already handling refresh and will return
* immediately. Note that this means if another thread is already refreshing
* then subsequent threads will return right away without waiting for the
* refresh to complete.
* <p/>
* <p/>
* If this method returns true it means the calling thread either refreshed
* or that there were no changes to refresh. If it returns false it means another
* thread is currently refreshing.
*/
public final boolean maybeRefresh() throws IOException {
ensureOpen();
// Ensure only 1 thread does reopen at once; other threads just return immediately:
final boolean doTryRefresh = reopenLock.tryAcquire();
if (doTryRefresh) {
try {
final G reference = acquire();
try {
G newReference = refreshIfNeeded(reference);
if (newReference != null) {
assert newReference != reference : "refreshIfNeeded should return null if refresh wasn't needed";
boolean success = false;
try {
swapReference(newReference);
success = true;
} finally {
if (!success) {
release(newReference);
}
}
}
} finally {
release(reference);
}
afterRefresh();
} finally {
reopenLock.release();
}
}
return doTryRefresh;
}
/**
* Called after swapReference has installed a new
* instance.
*/
protected void afterRefresh() throws IOException {
}
/**
* Release the refernce previously obtained via {@link #acquire()}.
* <p/>
* <b>NOTE:</b> it's safe to call this after {@link #close()}.
*/
public final void release(G reference) throws IOException {
assert reference != null;
decRef(reference);
}
}

View File

@ -0,0 +1,56 @@
package org.elasticsearch.common.lucene.manager;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.IndexSearcher;
import java.io.IOException;
/**
* Factory class used by {@link org.apache.lucene.search.SearcherManager} and {@link org.apache.lucene.search.NRTManager} to
* create new IndexSearchers. The default implementation just creates
* an IndexSearcher with no custom behavior:
* <p/>
* <pre class="prettyprint">
* public IndexSearcher newSearcher(IndexReader r) throws IOException {
* return new IndexSearcher(r);
* }
* </pre>
* <p/>
* You can pass your own factory instead if you want custom behavior, such as:
* <ul>
* <li>Setting a custom scoring model: {@link org.apache.lucene.search.IndexSearcher#setSimilarity(org.apache.lucene.search.Similarity)}
* <li>Parallel per-segment search: {@link org.apache.lucene.search.IndexSearcher#IndexSearcher(org.apache.lucene.index.IndexReader, java.util.concurrent.ExecutorService)}
* <li>Return custom subclasses of IndexSearcher (for example that implement distributed scoring)
* <li>Run queries to warm your IndexSearcher before it is used. Note: when using near-realtime search
* you may want to also {@link org.apache.lucene.index.IndexWriterConfig#setMergedSegmentWarmer(org.apache.lucene.index.IndexWriter.IndexReaderWarmer)} to warm
* newly merged segments in the background, outside of the reopen path.
* </ul>
*
* @lucene.experimental
*/
// LUCENE MONITOR: 3.6 Remove this once 3.6 is out and use it
public class SearcherFactory {
/**
* Returns a new IndexSearcher over the given reader.
*/
public IndexSearcher newSearcher(IndexReader reader) throws IOException {
return new IndexSearcher(reader);
}
}

View File

@ -0,0 +1,163 @@
package org.elasticsearch.common.lucene.manager;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory;
import java.io.IOException;
/**
* Utility class to safely share {@link org.apache.lucene.search.IndexSearcher} instances across multiple
* threads, while periodically reopening. This class ensures each searcher is
* closed only once all threads have finished using it.
* <p/>
* <p/>
* Use {@link #acquire} to obtain the current searcher, and {@link #release} to
* release it, like this:
* <p/>
* <pre class="prettyprint">
* IndexSearcher s = manager.acquire();
* try {
* // Do searching, doc retrieval, etc. with s
* } finally {
* manager.release(s);
* }
* // Do not use s after this!
* s = null;
* </pre>
* <p/>
* <p/>
* In addition you should periodically call {@link #maybeRefresh}. While it's
* possible to call this just before running each query, this is discouraged
* since it penalizes the unlucky queries that do the reopen. It's better to use
* a separate background thread, that periodically calls maybeReopen. Finally,
* be sure to call {@link #close} once you are done.
*
* @lucene.experimental
* @see SearcherFactory
*/
// LUCENE MONITOR: 3.6 Remove this once 3.6 is out and use it
public final class SearcherManager extends ReferenceManager<IndexSearcher> {
private final SearcherFactory searcherFactory;
/**
* Creates and returns a new SearcherManager from the given {@link org.apache.lucene.index.IndexWriter}.
*
* @param writer the IndexWriter to open the IndexReader from.
* @param applyAllDeletes If <code>true</code>, all buffered deletes will
* be applied (made visible) in the {@link org.apache.lucene.search.IndexSearcher} / {@link org.apache.lucene.index.IndexReader}.
* If <code>false</code>, the deletes may or may not be applied, but remain buffered
* (in IndexWriter) so that they will be applied in the future.
* Applying deletes can be costly, so if your app can tolerate deleted documents
* being returned you might gain some performance by passing <code>false</code>.
* See {@link org.apache.lucene.index.IndexReader#openIfChanged(org.apache.lucene.index.IndexReader, org.apache.lucene.index.IndexWriter, boolean)}.
* @param searcherFactory An optional {@link SearcherFactory}. Pass
* <code>null</code> if you don't require the searcher to be warmed
* before going live or other custom behavior.
* @throws java.io.IOException
*/
public SearcherManager(IndexWriter writer, boolean applyAllDeletes, SearcherFactory searcherFactory) throws IOException {
if (searcherFactory == null) {
searcherFactory = new SearcherFactory();
}
this.searcherFactory = searcherFactory;
current = getSearcher(searcherFactory, IndexReader.open(writer, applyAllDeletes));
}
/**
* Creates and returns a new SearcherManager from the given {@link org.apache.lucene.store.Directory}.
*
* @param dir the directory to open the DirectoryReader on.
* @param searcherFactory An optional {@link SearcherFactory}. Pass
* <code>null</code> if you don't require the searcher to be warmed
* before going live or other custom behavior.
* @throws java.io.IOException
*/
public SearcherManager(Directory dir, SearcherFactory searcherFactory) throws IOException {
if (searcherFactory == null) {
searcherFactory = new SearcherFactory();
}
this.searcherFactory = searcherFactory;
current = getSearcher(searcherFactory, IndexReader.open(dir));
}
@Override
protected void decRef(IndexSearcher reference) throws IOException {
reference.getIndexReader().decRef();
}
@Override
protected IndexSearcher refreshIfNeeded(IndexSearcher referenceToRefresh) throws IOException {
final IndexReader newReader = IndexReader.openIfChanged(referenceToRefresh.getIndexReader());
if (newReader == null) {
return null;
} else {
return getSearcher(searcherFactory, newReader);
}
}
@Override
protected boolean tryIncRef(IndexSearcher reference) {
return reference.getIndexReader().tryIncRef();
}
/**
* @deprecated see {@link #maybeRefresh()}.
*/
@Deprecated
public boolean maybeReopen() throws IOException {
return maybeRefresh();
}
/**
* Returns <code>true</code> if no changes have occured since this searcher
* ie. reader was opened, otherwise <code>false</code>.
*
* @see org.apache.lucene.index.IndexReader#isCurrent()
*/
public boolean isSearcherCurrent() throws IOException {
final IndexSearcher searcher = acquire();
try {
return searcher.getIndexReader().isCurrent();
} finally {
release(searcher);
}
}
// NOTE: decRefs incoming reader on throwing an exception
static IndexSearcher getSearcher(SearcherFactory searcherFactory, IndexReader reader) throws IOException {
boolean success = false;
final IndexSearcher searcher;
try {
searcher = searcherFactory.newSearcher(reader);
if (searcher.getIndexReader() != reader) {
throw new IllegalStateException("SearcherFactory must wrap exactly the provided reader (got " + searcher.getIndexReader() + " but expected " + reader + ")");
}
success = true;
} finally {
if (!success) {
reader.decRef();
}
}
return searcher;
}
}

View File

@ -1,57 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent.resource;
import org.elasticsearch.common.lease.Releasable;
/**
* A wrapper around a resource that can be released. Note, release should not be
* called directly on the resource itself.
* <p/>
* <p>Yea, I now, the fact that the resouce itself is releasable basically means that
* users of this class should take care... .
*
*
*/
public interface AcquirableResource<T extends Releasable> {
T resource();
/**
* Acquires the resource, returning <tt>true</tt> if it was acquired.
*/
boolean acquire();
/**
* Releases the resource, will close it if there are no more acquirers and it is marked for close.
*/
void release();
/**
* Marks the resource to be closed. Will close it if there are no current
* acquires.
*/
void markForClose();
/**
* Forces the resource to be closed, regardless of the number of acquirers.
*/
void forceClose();
}

View File

@ -1,36 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent.resource;
import org.elasticsearch.common.lease.Releasable;
/**
*
*/
public final class AcquirableResourceFactory {
public static <T extends Releasable> AcquirableResource<T> newAcquirableResource(T resource) {
return new BlockingAcquirableResource<T>(resource);
}
private AcquirableResourceFactory() {
}
}

View File

@ -1,95 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent.resource;
import org.elasticsearch.common.lease.Releasable;
/**
* A wrapper around a resource that can be released. Note, release should not be
* called directly on the resource itself.
* <p/>
* <p>Yea, I now, the fact that the resource itself is releasable basically means that
* users of this class should take care... .
*
*
*/
public class BlockingAcquirableResource<T extends Releasable> implements AcquirableResource<T> {
private final T resource;
private int count = 0;
private boolean markForClose = false;
private boolean closed;
public BlockingAcquirableResource(T resource) {
this.resource = resource;
}
@Override
public T resource() {
return resource;
}
/**
* Acquires the resource, returning <tt>true</tt> if it was acquired.
*/
@Override
public synchronized boolean acquire() {
if (markForClose) {
return false;
}
count++;
return true;
}
/**
* Releases the resource, will close it if there are no more acquirers.
*/
@Override
public synchronized void release() {
count--;
checkIfCanClose();
}
/**
* Marks the resource to be closed. Will close it if there are no current
* acquires.
*/
@Override
public synchronized void markForClose() {
markForClose = true;
checkIfCanClose();
}
@Override
public void forceClose() {
count = 0;
markForClose();
}
private void checkIfCanClose() {
if (markForClose && count <= 0 && !closed) {
closed = true;
resource.release();
}
}
}

View File

@ -1,108 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent.resource;
import org.elasticsearch.common.lease.Releasable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicStampedReference;
/**
* A wrapper around a resource that can be released. Note, release should not be
* called directly on the resource itself.
* <p/>
* <p>Yea, I now, the fact that the resource itself is releasable basically means that
* users of this class should take care... .
*
*
*/
public class NonBlockingAcquirableResource<T extends Releasable> implements AcquirableResource<T> {
private final T resource;
private AtomicStampedReference<Boolean> counter = new AtomicStampedReference<Boolean>(false, 0);
private final AtomicBoolean closed = new AtomicBoolean();
public NonBlockingAcquirableResource(T resource) {
this.resource = resource;
}
@Override
public T resource() {
return resource;
}
@Override
public boolean acquire() {
while (true) {
int stamp = counter.getStamp();
boolean result = counter.compareAndSet(false, false, stamp, stamp + 1);
if (result) {
return true;
}
if (counter.getReference()) {
return false;
}
}
}
@Override
public void release() {
while (true) {
boolean currentReference = counter.getReference();
int stamp = counter.getStamp();
boolean result = counter.compareAndSet(currentReference, currentReference, stamp, stamp - 1);
if (result) {
if (currentReference && (stamp <= 1)) {
close();
}
return;
}
}
}
@Override
public void markForClose() {
while (true) {
int stamp = counter.getStamp();
boolean result = counter.compareAndSet(false, true, stamp, stamp);
if (result) {
if (stamp <= 0) {
close();
}
return;
} else if (counter.getReference()) {
return;
}
}
}
@Override
public void forceClose() {
close();
}
private void close() {
if (closed.compareAndSet(false, true)) {
resource.release();
}
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.index.engine.robin;
import org.apache.lucene.index.*; import org.apache.lucene.index.*;
import org.apache.lucene.search.FilteredQuery; import org.apache.lucene.search.FilteredQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query; import org.apache.lucene.search.Query;
import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.UnicodeUtil; import org.apache.lucene.util.UnicodeUtil;
@ -32,13 +33,13 @@ import org.elasticsearch.common.bloom.BloomFilter;
import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.ReaderSearcherHolder; import org.elasticsearch.common.lucene.manager.SearcherFactory;
import org.elasticsearch.common.lucene.manager.SearcherManager;
import org.elasticsearch.common.lucene.uid.UidField; import org.elasticsearch.common.lucene.uid.UidField;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.resource.AcquirableResource;
import org.elasticsearch.index.VersionType; import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.cache.bloom.BloomCache; import org.elasticsearch.index.cache.bloom.BloomCache;
@ -71,7 +72,6 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.elasticsearch.common.lucene.Lucene.safeClose; import static org.elasticsearch.common.lucene.Lucene.safeClose;
import static org.elasticsearch.common.util.concurrent.resource.AcquirableResourceFactory.newAcquirableResource;
/** /**
* *
@ -118,10 +118,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
private volatile IndexWriter indexWriter; private volatile IndexWriter indexWriter;
// TODO LUCENE MONITOR 3.6: Replace this with SearchManager (3.6) once its out, it will not allow for forceClose, but maybe its a good thing... // LUCENE MONITOR: 3.6 Remove using the custom SearchManager and use the Lucene 3.6 one
// in any case, if we want to retain forceClose, we can call release multiple times... private final SearcherFactory searcherFactory = new RobinSearchFactory();
// we won't need AcquirableResource any more as well, and close will not need to replace it with a closeable one private volatile SearcherManager searcherManager;
private volatile AcquirableResource<ReaderSearcherHolder> nrtResource;
private volatile boolean closed = false; private volatile boolean closed = false;
@ -263,7 +262,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
indexWriter.commit(MapBuilder.<String, String>newMapBuilder().put(Translog.TRANSLOG_ID_KEY, Long.toString(translogIdGenerator.get())).map()); indexWriter.commit(MapBuilder.<String, String>newMapBuilder().put(Translog.TRANSLOG_ID_KEY, Long.toString(translogIdGenerator.get())).map());
} }
translog.newTranslog(translogIdGenerator.get()); translog.newTranslog(translogIdGenerator.get());
this.nrtResource = buildNrtResource(indexWriter); this.searcherManager = buildSearchManager(indexWriter);
SegmentInfos infos = new SegmentInfos(); SegmentInfos infos = new SegmentInfos();
infos.read(store.directory()); infos.read(store.directory());
lastCommittedSegmentInfos = infos; lastCommittedSegmentInfos = infos;
@ -712,15 +711,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
@Override @Override
public Searcher searcher() throws EngineException { public Searcher searcher() throws EngineException {
AcquirableResource<ReaderSearcherHolder> holder; SearcherManager manager = this.searcherManager;
for (; ; ) { IndexSearcher searcher = manager.acquire();
holder = this.nrtResource; return new RobinSearchResult(searcher, manager);
if (holder.acquire()) {
break;
}
Thread.yield();
}
return new RobinSearchResult(holder);
} }
@Override @Override
@ -748,20 +741,12 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
throw new EngineClosedException(shardId, failedEngine); throw new EngineClosedException(shardId, failedEngine);
} }
try { try {
// we need to obtain a mutex here, to make sure we don't leave dangling readers // maybeRefresh will only allow one refresh to execute, and the rest will "pass through",
// we could have used an AtomicBoolean#compareAndSet, but, then we might miss refresh requests // but, we want to make sure not to loose ant refresh calls, if one is taking time
// compared to on going ones
synchronized (refreshMutex) { synchronized (refreshMutex) {
if (dirty || refresh.force()) { if (dirty || refresh.force()) {
dirty = false; dirty = false;
AcquirableResource<ReaderSearcherHolder> current = nrtResource; searcherManager.maybeRefresh();
IndexReader newReader = IndexReader.openIfChanged(current.resource().reader(), true);
if (newReader != null) {
ExtendedIndexSearcher indexSearcher = new ExtendedIndexSearcher(newReader);
indexSearcher.setSimilarity(similarityService.defaultSearchSimilarity());
nrtResource = newAcquirableResource(new ReaderSearcherHolder(indexSearcher));
current.markForClose();
}
} }
} }
} catch (AlreadyClosedException e) { } catch (AlreadyClosedException e) {
@ -831,9 +816,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
translog.newTranslog(translogId); translog.newTranslog(translogId);
} }
AcquirableResource<ReaderSearcherHolder> current = nrtResource; SearcherManager current = this.searcherManager;
nrtResource = buildNrtResource(indexWriter); this.searcherManager = buildSearchManager(indexWriter);
current.markForClose(); current.close();
refreshVersioningTable(threadPool.estimatedTimeInMillis()); refreshVersioningTable(threadPool.estimatedTimeInMillis());
} catch (OutOfMemoryError e) { } catch (OutOfMemoryError e) {
@ -1229,10 +1214,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
this.versionMap.clear(); this.versionMap.clear();
this.failedEngineListeners.clear(); this.failedEngineListeners.clear();
try { try {
if (nrtResource != null) { if (searcherManager != null) {
this.nrtResource.forceClose(); searcherManager.close();
// replace the NRT resource with a closed one, meaning that
this.nrtResource = new ClosedNrtResource();
} }
// no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed // no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed
if (indexWriter != null) { if (indexWriter != null) {
@ -1366,35 +1349,38 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
} }
} }
private AcquirableResource<ReaderSearcherHolder> buildNrtResource(IndexWriter indexWriter) throws IOException { private SearcherManager buildSearchManager(IndexWriter indexWriter) throws IOException {
IndexReader indexReader = IndexReader.open(indexWriter, true); return new SearcherManager(indexWriter, true, searcherFactory);
ExtendedIndexSearcher indexSearcher = new ExtendedIndexSearcher(indexReader);
indexSearcher.setSimilarity(similarityService.defaultSearchSimilarity());
return newAcquirableResource(new ReaderSearcherHolder(indexSearcher));
} }
static class RobinSearchResult implements Searcher { static class RobinSearchResult implements Searcher {
private final AcquirableResource<ReaderSearcherHolder> nrtHolder; private final IndexSearcher searcher;
private final SearcherManager manager;
private RobinSearchResult(AcquirableResource<ReaderSearcherHolder> nrtHolder) { private RobinSearchResult(IndexSearcher searcher, SearcherManager manager) {
this.nrtHolder = nrtHolder; this.searcher = searcher;
this.manager = manager;
} }
@Override @Override
public IndexReader reader() { public IndexReader reader() {
return nrtHolder.resource().reader(); return searcher.getIndexReader();
} }
@Override @Override
public ExtendedIndexSearcher searcher() { public ExtendedIndexSearcher searcher() {
return nrtHolder.resource().searcher(); return (ExtendedIndexSearcher) searcher;
} }
@Override @Override
public boolean release() throws ElasticSearchException { public boolean release() throws ElasticSearchException {
nrtHolder.release(); try {
return true; manager.release(searcher);
return true;
} catch (IOException e) {
return false;
}
} }
} }
@ -1428,27 +1414,13 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
} }
} }
class ClosedNrtResource implements AcquirableResource<ReaderSearcherHolder> { class RobinSearchFactory extends SearcherFactory {
@Override
public ReaderSearcherHolder resource() {
return null;
}
@Override @Override
public boolean acquire() { public IndexSearcher newSearcher(IndexReader reader) throws IOException {
throw new EngineClosedException(shardId); ExtendedIndexSearcher searcher = new ExtendedIndexSearcher(reader);
} searcher.setSimilarity(similarityService.defaultSearchSimilarity());
return searcher;
@Override
public void release() {
}
@Override
public void markForClose() {
}
@Override
public void forceClose() {
} }
} }
} }

View File

@ -1,113 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.unit.common.util.concurrent.resource;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.resource.AcquirableResource;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
/**
*
*/
public abstract class AbstractAcquirableResourceTests {
protected abstract <T extends Releasable> AcquirableResource<T> createInstance(T resource);
@Test
public void testSimple() throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final AcquirableResource<Resource> acquirableResource = createInstance(new Resource());
List<Future> results = new ArrayList<Future>();
final int cycles = 50;
final int operationsWithinCycle = 100000;
final CyclicBarrier barrier1 = new CyclicBarrier(cycles * 2 + 1);
final CyclicBarrier barrier2 = new CyclicBarrier(cycles * 2 + 1);
for (int i = 0; i < cycles; i++) {
results.add(executorService.submit(new Callable() {
@Override
public Object call() throws Exception {
barrier1.await();
barrier2.await();
for (int j = 0; j < operationsWithinCycle; j++) {
assertThat(acquirableResource.acquire(), equalTo(true));
}
return null;
}
}));
results.add(executorService.submit(new Callable() {
@Override
public Object call() throws Exception {
barrier1.await();
barrier2.await();
for (int j = 0; j < operationsWithinCycle; j++) {
acquirableResource.release();
}
return null;
}
}));
}
barrier1.await();
StopWatch stopWatch = new StopWatch("Acquirable");
stopWatch.start();
barrier2.await();
for (Future f : results) {
f.get();
}
assertThat(acquirableResource.resource().isReleased(), equalTo(false));
acquirableResource.markForClose();
assertThat(acquirableResource.resource().isReleased(), equalTo(true));
stopWatch.stop();
System.out.println("Took: " + stopWatch.shortSummary());
}
private static class Resource implements Releasable {
private volatile boolean released = false;
@Override
public boolean release() throws ElasticSearchException {
released = true;
return true;
}
public boolean isReleased() {
return released;
}
}
}

View File

@ -1,35 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.unit.common.util.concurrent.resource;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.resource.AcquirableResource;
import org.elasticsearch.common.util.concurrent.resource.BlockingAcquirableResource;
/**
*
*/
public class BlockingAcquirableResourceTests extends AbstractAcquirableResourceTests {
@Override
protected <T extends Releasable> AcquirableResource<T> createInstance(T resource) {
return new BlockingAcquirableResource<T>(resource);
}
}

View File

@ -1,35 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.unit.common.util.concurrent.resource;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.resource.AcquirableResource;
import org.elasticsearch.common.util.concurrent.resource.NonBlockingAcquirableResource;
/**
*
*/
public class NonBlockingAcquirableResourceTests extends AbstractAcquirableResourceTests {
@Override
protected <T extends Releasable> AcquirableResource<T> createInstance(T resource) {
return new NonBlockingAcquirableResource<T>(resource);
}
}