Reuse MemoryIndex instances across Percolator requests.

* added configurable MemoryIndexPool that pools MemoryIndex instance across Threads
* Pool can be configured based on the number of pooled instances as well as the maximum number of bytes that is reused across the pooled instances

Closes #2581
This commit is contained in:
Simon Willnauer 2013-01-23 16:35:13 +01:00
parent e8c1180ede
commit 88f68264c7
3 changed files with 336 additions and 58 deletions

View File

@ -0,0 +1,38 @@
package org.apache.lucene.index.memory;
import org.apache.lucene.search.IndexSearcher;
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* This class overwrites {@link MemoryIndex} to make the reuse constructor
* visible.
*/
public final class ReusableMemoryIndex extends MemoryIndex {
private final long maxReuseBytes;
public ReusableMemoryIndex(boolean storeOffsets, long maxReusedBytes) {
super(storeOffsets, maxReusedBytes);
this.maxReuseBytes = maxReusedBytes;
}
public long getMaxReuseBytes() {
return maxReuseBytes;
}
}

View File

@ -23,12 +23,14 @@ import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.memory.MemoryIndex;
import org.apache.lucene.index.memory.ReusableMemoryIndex;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.Strings;
@ -38,6 +40,9 @@ import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
@ -58,6 +63,7 @@ import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
@ -65,6 +71,11 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.index.mapper.SourceToParse.source;
@ -158,19 +169,147 @@ public class PercolatorExecutor extends AbstractIndexComponent {
private final IndexFieldDataService fieldDataService;
private final Map<String, Query> queries = ConcurrentCollections.newConcurrentMap();
/**
* Realtime index setting to control the number of MemoryIndex instances used to handle
* Percolate requests. The default is <tt>10</tt>
*/
public static final String PERCOLATE_POOL_SIZE = "index.percolate.pool.size";
/**
* Realtime index setting to control the upper memory reuse limit across all {@link MemoryIndex} instances
* pooled to handle Percolate requests. This is NOT a peak upper bound, percolate requests can use more memory than this upper
* bound. Yet, if all pooled {@link MemoryIndex} instances are returned to the pool this marks the upper memory bound use
* buy this idle instances. If more memory was allocated by a {@link MemoryIndex} the additinal memory is freed before it
* returns to the pool. The default is <tt>1 MB</tt>
*/
public static final String PERCOLATE_POOL_MAX_MEMORY = "index.percolate.pool.reuse_memory_size";
/**
* Realtime index setting to control the timeout or the maximum waiting time
* for an pooled memory index until an extra memory index is created. The default is <tt>100 ms</tt>
*/
public static final String PERCOLATE_TIMEOUT = "index.percolate.pool.timeout";
/**
* Simple {@link MemoryIndex} Pool that reuses MemoryIndex instance across threads and allows each of the
* MemoryIndex instance to reuse its internal memory based on a user configured realtime value.
*/
static final class MemoryIndexPool {
private volatile BlockingQueue<ReusableMemoryIndex> memoryIndexQueue;
// used to track the in-flight memoryIdx instances so we don't overallocate
private int poolMaxSize;
private int poolCurrentSize;
private volatile long bytesPerMemoryIndex;
private ByteSizeValue maxMemorySize; // only accessed in sync block
private volatile TimeValue timeout;
public MemoryIndexPool(Settings settings) {
poolMaxSize = settings.getAsInt(PERCOLATE_POOL_SIZE, 10);
if (poolMaxSize <= 0) {
throw new ElasticSearchIllegalArgumentException(PERCOLATE_POOL_SIZE + " size must be > 0 but was [" + poolMaxSize + "]");
}
memoryIndexQueue = new ArrayBlockingQueue<ReusableMemoryIndex>(poolMaxSize);
maxMemorySize = settings.getAsBytesSize(PERCOLATE_POOL_MAX_MEMORY, new ByteSizeValue(1, ByteSizeUnit.MB));
if (maxMemorySize.bytes() < 0) {
throw new ElasticSearchIllegalArgumentException(PERCOLATE_POOL_MAX_MEMORY + " must be positive but was [" + maxMemorySize.bytes() + "]");
}
timeout = settings.getAsTime(PERCOLATE_TIMEOUT, new TimeValue(100));
if (timeout.millis() < 0) {
throw new ElasticSearchIllegalArgumentException(PERCOLATE_TIMEOUT + " must be positive but was [" + timeout + "]");
}
bytesPerMemoryIndex = maxMemorySize.bytes() / poolMaxSize;
}
public synchronized void updateSettings(Settings settings) {
final int newPoolSize = settings.getAsInt(PERCOLATE_POOL_SIZE, poolMaxSize);
if (newPoolSize <= 0) {
throw new ElasticSearchIllegalArgumentException(PERCOLATE_POOL_SIZE + " size must be > 0 but was [" + newPoolSize + "]");
}
final ByteSizeValue byteSize = settings.getAsBytesSize(PERCOLATE_POOL_MAX_MEMORY, maxMemorySize);
if (byteSize.bytes() < 0) {
throw new ElasticSearchIllegalArgumentException(PERCOLATE_POOL_MAX_MEMORY + " must be positive but was [" + byteSize.bytes() + "]");
}
timeout = settings.getAsTime(PERCOLATE_TIMEOUT, timeout); // always set this!
if (timeout.millis() < 0) {
throw new ElasticSearchIllegalArgumentException(PERCOLATE_TIMEOUT + " must be positive but was [" + timeout + "]");
}
if (maxMemorySize.equals(byteSize) && newPoolSize == poolMaxSize) {
// nothing changed - return
return;
}
maxMemorySize = byteSize;
poolMaxSize = newPoolSize;
poolCurrentSize = Integer.MAX_VALUE; // prevent new creations until we have the new index in place
/*
* if this has changed we simply change the blocking queue instance with a new pool
* size and reset the
*/
bytesPerMemoryIndex = byteSize.bytes() / newPoolSize;
memoryIndexQueue = new ArrayBlockingQueue<ReusableMemoryIndex>(newPoolSize);
poolCurrentSize = 0; // lets refill the queue
}
public ReusableMemoryIndex acquire() {
final BlockingQueue<ReusableMemoryIndex> queue = memoryIndexQueue;
final ReusableMemoryIndex poll = queue.poll();
return poll == null ? waitOrCreate(queue) : poll;
}
private ReusableMemoryIndex waitOrCreate(BlockingQueue<ReusableMemoryIndex> queue) {
synchronized (this) {
if (poolCurrentSize < poolMaxSize) {
poolCurrentSize++;
return new ReusableMemoryIndex(false, bytesPerMemoryIndex);
}
}
ReusableMemoryIndex poll = null;
try {
final TimeValue timeout = this.timeout; // only read the volatile var once
poll = queue.poll(timeout.getMillis(), TimeUnit.MILLISECONDS); // delay this by 100ms by default
} catch (InterruptedException ie) {
// don't swallow the interrupt
Thread.currentThread().interrupt();
}
return poll == null ? new ReusableMemoryIndex(false, bytesPerMemoryIndex) : poll;
}
public void release(ReusableMemoryIndex index) {
assert index != null : "can't release null reference";
if (bytesPerMemoryIndex == index.getMaxReuseBytes()) {
index.reset();
// only put is back into the queue if the size fits - prune old settings on the fly
memoryIndexQueue.offer(index);
}
}
}
private IndicesService indicesService;
private final MemoryIndexPool memIndexPool;
@Inject
public PercolatorExecutor(Index index, @IndexSettings Settings indexSettings,
MapperService mapperService, IndexQueryParserService queryParserService,
IndexCache indexCache, IndexFieldDataService fieldDataService) {
IndexCache indexCache, IndexFieldDataService fieldDataService, IndexSettingsService indexSettingsService) {
super(index, indexSettings);
this.mapperService = mapperService;
this.queryParserService = queryParserService;
this.indexCache = indexCache;
this.fieldDataService = fieldDataService;
memIndexPool = new MemoryIndexPool(indexSettings);
ApplySettings applySettings = new ApplySettings();
indexSettingsService.addListener(applySettings);
}
class ApplySettings implements IndexSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
memIndexPool.updateSettings(settings);
}
}
public void setIndicesService(IndicesService indicesService) {
@ -293,71 +432,72 @@ public class PercolatorExecutor extends AbstractIndexComponent {
private Response percolate(DocAndQueryRequest request) throws ElasticSearchException {
// first, parse the source doc into a MemoryIndex
final MemoryIndex memoryIndex = new MemoryIndex();
// TODO MemoryIndex now supports a reset call that reuses the internal memory
// maybe we can utilize this here.
// TODO: This means percolation does not support nested docs...
for (IndexableField field : request.doc().rootDoc().getFields()) {
if (!field.fieldType().indexed()) {
continue;
}
// no need to index the UID field
if (field.name().equals(UidFieldMapper.NAME)) {
continue;
}
TokenStream tokenStream;
try {
tokenStream = field.tokenStream(request.doc().analyzer());
if (tokenStream != null) {
tokenStream.reset();
memoryIndex.addField(field.name(), tokenStream, field.boost());
}
} catch (IOException e) {
throw new ElasticSearchException("Failed to create token stream", e);
}
}
final IndexSearcher searcher = memoryIndex.createSearcher();
List<String> matches = new ArrayList<String>();
final ReusableMemoryIndex memoryIndex = memIndexPool.acquire();
try {
if (request.query() == null) {
Lucene.ExistsCollector collector = new Lucene.ExistsCollector();
for (Map.Entry<String, Query> entry : queries.entrySet()) {
collector.reset();
try {
searcher.search(entry.getValue(), collector);
} catch (IOException e) {
logger.warn("[" + entry.getKey() + "] failed to execute query", e);
}
if (collector.exists()) {
matches.add(entry.getKey());
}
// TODO: This means percolation does not support nested docs...
for (IndexableField field : request.doc().rootDoc().getFields()) {
if (!field.fieldType().indexed()) {
continue;
}
} else {
IndexService percolatorIndex = percolatorIndexServiceSafe();
if (percolatorIndex.numberOfShards() == 0) {
throw new PercolateIndexUnavailable(new Index(PercolatorService.INDEX_NAME));
// no need to index the UID field
if (field.name().equals(UidFieldMapper.NAME)) {
continue;
}
IndexShard percolatorShard = percolatorIndex.shard(0);
Engine.Searcher percolatorSearcher = percolatorShard.searcher();
TokenStream tokenStream;
try {
percolatorSearcher.searcher().search(request.query(), new QueryCollector(logger, queries, searcher, percolatorIndex, matches));
tokenStream = field.tokenStream(request.doc().analyzer());
if (tokenStream != null) {
tokenStream.reset();
memoryIndex.addField(field.name(), tokenStream, field.boost());
}
} catch (IOException e) {
logger.warn("failed to execute", e);
} finally {
percolatorSearcher.release();
throw new ElasticSearchException("Failed to create token stream", e);
}
}
final IndexSearcher searcher = memoryIndex.createSearcher();
List<String> matches = new ArrayList<String>();
try {
if (request.query() == null) {
Lucene.ExistsCollector collector = new Lucene.ExistsCollector();
for (Map.Entry<String, Query> entry : queries.entrySet()) {
collector.reset();
try {
searcher.search(entry.getValue(), collector);
} catch (IOException e) {
logger.warn("[" + entry.getKey() + "] failed to execute query", e);
}
if (collector.exists()) {
matches.add(entry.getKey());
}
}
} else {
IndexService percolatorIndex = percolatorIndexServiceSafe();
if (percolatorIndex.numberOfShards() == 0) {
throw new PercolateIndexUnavailable(new Index(PercolatorService.INDEX_NAME));
}
IndexShard percolatorShard = percolatorIndex.shard(0);
Engine.Searcher percolatorSearcher = percolatorShard.searcher();
try {
percolatorSearcher.searcher().search(request.query(), new QueryCollector(logger, queries, searcher, percolatorIndex, matches));
} catch (IOException e) {
logger.warn("failed to execute", e);
} finally {
percolatorSearcher.release();
}
}
} finally {
// explicitly clear the reader, since we can only register on callback on SegmentReader
indexCache.clear(searcher.getIndexReader());
fieldDataService.clear(searcher.getIndexReader());
}
return new Response(matches, request.doc().mappingsModified());
} finally {
// explicitly clear the reader, since we can only register on callback on SegmentReader
indexCache.clear(searcher.getIndexReader());
fieldDataService.clear(searcher.getIndexReader());
memIndexPool.release(memoryIndex);
}
return new Response(matches, request.doc().mappingsModified());
}
private IndexService percolatorIndexServiceSafe() {
@ -429,4 +569,8 @@ public class PercolatorExecutor extends AbstractIndexComponent {
return true;
}
}
public void clearQueries() {
this.queries.clear();
}
}

View File

@ -19,6 +19,12 @@
package org.elasticsearch.test.unit.index.percolator;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.AbstractModule;
@ -28,6 +34,7 @@ import org.elasticsearch.common.inject.util.Providers;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.Index;
@ -41,6 +48,7 @@ import org.elasticsearch.index.percolator.PercolatorExecutor;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.index.query.IndexQueryParserModule;
import org.elasticsearch.index.settings.IndexSettingsModule;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.similarity.SimilarityModule;
import org.elasticsearch.indices.query.IndicesQueriesModule;
import org.elasticsearch.script.ScriptModule;
@ -48,6 +56,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolModule;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import static org.elasticsearch.index.query.QueryBuilders.constantScoreQuery;
@ -65,20 +74,20 @@ public class PercolatorExecutorTests {
private PercolatorExecutor percolatorExecutor;
@BeforeClass
@BeforeTest
public void buildPercolatorService() {
Settings settings = ImmutableSettings.settingsBuilder()
//.put("index.cache.filter.type", "none")
.build();
Index index = new Index("test");
injector = new ModulesBuilder().add(
new IndexSettingsModule(index, settings),
new CodecModule(settings),
new SettingsModule(settings),
new ThreadPoolModule(settings),
new ScriptModule(settings),
new IndicesQueriesModule(),
new MapperServiceModule(),
new IndexSettingsModule(index, settings),
new IndexCacheModule(settings),
new AnalysisModule(settings),
new IndexEngineModule(settings),
@ -117,6 +126,7 @@ public class PercolatorExecutorTests {
.endObject().endObject().endObject();
BytesReference sourceWithType = docWithType.bytes();
percolatorExecutor.clearQueries(); // remove all previously added queries
PercolatorExecutor.Response percolate = percolatorExecutor.percolate(new PercolatorExecutor.SourceRequest("type1", source));
assertThat(percolate.matches(), hasSize(0));
@ -151,4 +161,90 @@ public class PercolatorExecutorTests {
assertThat(percolate.matches(), hasSize(1));
assertThat(percolate.matches(), hasItem("test1"));
}
@Test
public void testConcurrentPerculator() throws InterruptedException, IOException {
// introduce the doc
XContentBuilder bothQueriesB = XContentFactory.jsonBuilder().startObject().startObject("doc")
.field("field1", 1)
.field("field2", "value")
.endObject().endObject();
final BytesReference bothQueries = bothQueriesB.bytes();
XContentBuilder onlyTest1B = XContentFactory.jsonBuilder().startObject().startObject("doc")
.field("field2", "value")
.endObject().endObject();
XContentBuilder onlyTest2B = XContentFactory.jsonBuilder().startObject().startObject("doc")
.field("field1", 1)
.endObject().endObject();
final BytesReference onlyTest1 = onlyTest1B.bytes();
final BytesReference onlyTest2 = onlyTest2B.bytes();
final PercolatorExecutor executor = this.percolatorExecutor;
percolatorExecutor.clearQueries(); // remove all previously added queries
// this adds the mapping and ensures that we do a NRQ for field 1
PercolatorExecutor.Response percolate = percolatorExecutor.percolate(new PercolatorExecutor.SourceRequest("type1", bothQueries));
assertThat(percolate.matches(), hasSize(0));
executor.addQuery("test1", termQuery("field2", "value"));
executor.addQuery("test2", termQuery("field1", 1));
final IndexSettingsService settingsService = injector.getInstance(IndexSettingsService.class);
final CountDownLatch start = new CountDownLatch(1);
final AtomicBoolean stop = new AtomicBoolean(false);
final AtomicInteger counts = new AtomicInteger(0);
Thread[] threads = new Thread[5];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread() {
public void run() {
try {
start.await();
PercolatorExecutor.Response percolate;
while(!stop.get()) {
int count = counts.incrementAndGet();
if ((count % 100) == 0) {
ImmutableSettings.Builder builder = ImmutableSettings.settingsBuilder();
builder.put(PercolatorExecutor.PERCOLATE_POOL_MAX_MEMORY, 1 + (counts.get() % 10), ByteSizeUnit.MB);
builder.put(PercolatorExecutor.PERCOLATE_POOL_SIZE, 1 + (counts.get() % 10));
builder.put(PercolatorExecutor.PERCOLATE_TIMEOUT, 1 + (counts.get() % 1000), TimeUnit.MILLISECONDS);
settingsService.refreshSettings(builder.build());
}
if ((count > 10000)) {
stop.set(true);
}
if (count % 3 == 0) {
percolate = executor.percolate(new PercolatorExecutor.SourceRequest("type1", bothQueries));
assertThat(percolate.matches(), hasSize(2));
assertThat(percolate.matches(), hasItems("test1", "test2"));
} else if (count % 3 == 1) {
percolate = executor.percolate(new PercolatorExecutor.SourceRequest("type1", onlyTest1));
assertThat(percolate.matches(), hasSize(1));
assertThat(percolate.matches(), hasItems("test1"));
} else {
percolate = executor.percolate(new PercolatorExecutor.SourceRequest("type1", onlyTest2));
assertThat(percolate.matches(), hasSize(1));
assertThat(percolate.matches(), hasItems("test2"));
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
};
threads[i].start();
}
start.countDown();
for (Thread thread : threads) {
thread.join();
}
}
}