improve warmer to allow be able toe execute only on new segments

This commit is contained in:
Shay Banon 2012-05-21 17:05:25 +02:00
parent 13c76baa72
commit 4b5c89478c
4 changed files with 99 additions and 12 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.engine.robin;
import com.google.common.collect.Lists;
import org.apache.lucene.index.*;
import org.apache.lucene.search.*;
import org.apache.lucene.store.AlreadyClosedException;
@ -1438,7 +1439,68 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
ExtendedIndexSearcher searcher = new ExtendedIndexSearcher(reader);
searcher.setSimilarity(similarityService.defaultSearchSimilarity());
if (warmer != null) {
warmer.warm(shardId, new SimpleSearcher(searcher));
// we need to pass a custom searcher that does not release anything on Engine.Search Release,
// we will release explicitly
Searcher currentSearcher = null;
ExtendedIndexSearcher newSearcher = null;
boolean closeNewSearcher = false;
try {
if (searcherManager == null) {
// fresh index writer, just do on all of it
newSearcher = searcher;
} else {
currentSearcher = searcher();
// figure out the newSearcher, with only the new readers that are relevant for us
List<IndexReader> readers = Lists.newArrayList();
for (IndexReader subReader : searcher.subReaders()) {
boolean found = false;
for (IndexReader currentReader : currentSearcher.searcher().subReaders()) {
if (currentReader.getCoreCacheKey().equals(subReader.getCoreCacheKey())) {
found = true;
break;
}
}
if (!found) {
readers.add(subReader);
}
}
if (!readers.isEmpty()) {
// we don't want to close the inner readers, just increase ref on them
newSearcher = new ExtendedIndexSearcher(new MultiReader(readers.toArray(new IndexReader[readers.size()]), false));
closeNewSearcher = true;
}
}
if (newSearcher != null) {
IndicesWarmer.WarmerContext context = new IndicesWarmer.WarmerContext(shardId,
new SimpleSearcher(searcher),
new SimpleSearcher(newSearcher));
warmer.warm(context);
}
} catch (Exception e) {
if (!closed) {
logger.warn("failed to prepare/warm", e);
}
} finally {
// no need to release the fullSearcher, nothing really is done...
if (currentSearcher != null) {
currentSearcher.release();
}
if (newSearcher != null && closeNewSearcher) {
try {
newSearcher.close();
} catch (Exception e) {
// ignore
}
try {
// close the reader as well, since closing the searcher does nothing
// and we want to decRef the inner readers
newSearcher.getIndexReader().close();
} catch (IOException e) {
// ignore
}
}
}
}
return searcher;
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.indices.warmer;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
/**
@ -31,7 +32,33 @@ public interface IndicesWarmer {
String executor();
void warm(IndexShard indexShard, IndexMetaData indexMetaData, Engine.Searcher search);
void warm(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context);
}
public static class WarmerContext {
private final ShardId shardId;
private final Engine.Searcher fullSearcher;
private final Engine.Searcher newSearcher;
public WarmerContext(ShardId shardId, Engine.Searcher fullSearcher, Engine.Searcher newSearcher) {
this.shardId = shardId;
this.fullSearcher = fullSearcher;
this.newSearcher = newSearcher;
}
public ShardId shardId() {
return shardId;
}
public Engine.Searcher fullSearcher() {
return fullSearcher;
}
public Engine.Searcher newSearcher() {
return newSearcher;
}
}
void addListener(Listener listener);

View File

@ -25,9 +25,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
@ -66,24 +64,24 @@ public class InternalIndicesWarmer extends AbstractComponent implements IndicesW
listeners.remove(listener);
}
public void warm(final ShardId shardId, final Engine.Searcher searcher) {
final IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.index().name());
public void warm(final WarmerContext context) {
final IndexMetaData indexMetaData = clusterService.state().metaData().index(context.shardId().index().name());
if (indexMetaData == null) {
return;
}
if (!indexMetaData.settings().getAsBoolean("index.warmer.enabled", settings.getAsBoolean("index.warmer.enabled", true))) {
return;
}
IndexService indexService = indicesService.indexService(shardId.index().name());
IndexService indexService = indicesService.indexService(context.shardId().index().name());
if (indexService == null) {
return;
}
final IndexShard indexShard = indexService.shard(shardId.id());
final IndexShard indexShard = indexService.shard(context.shardId().id());
if (indexShard == null) {
return;
}
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] warming [{}]", shardId.index().name(), shardId.id(), searcher.reader());
logger.trace("[{}][{}] warming [{}], new [{}]", context.shardId().index().name(), context.shardId().id(), context.fullSearcher().reader(), context.newSearcher().reader());
}
indexShard.warmerService().onPreWarm();
long time = System.nanoTime();
@ -93,7 +91,7 @@ public class InternalIndicesWarmer extends AbstractComponent implements IndicesW
@Override
public void run() {
try {
listener.warm(indexShard, indexMetaData, searcher);
listener.warm(indexShard, indexMetaData, context);
} catch (Throwable e) {
indexShard.warmerService().logger().warn("failed to warm [{}]", e, listener);
} finally {

View File

@ -633,7 +633,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
}
@Override
public void warm(IndexShard indexShard, IndexMetaData indexMetaData, Engine.Searcher search) {
public void warm(IndexShard indexShard, IndexMetaData indexMetaData, IndicesWarmer.WarmerContext warmerContext) {
IndexWarmersMetaData custom = indexMetaData.custom(IndexWarmersMetaData.TYPE);
if (custom == null) {
return;
@ -645,7 +645,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
InternalSearchRequest request = new InternalSearchRequest(indexShard.shardId().index().name(), indexShard.shardId().id(), indexMetaData.numberOfShards(), SearchType.COUNT)
.source(entry.source().bytes(), entry.source().offset(), entry.source().length())
.types(entry.types());
context = createContext(request, search);
context = createContext(request, warmerContext.newSearcher());
queryPhase.execute(context);
long took = System.nanoTime() - now;
if (indexShard.warmerService().logger().isTraceEnabled()) {