better release of search context when it is not needed in queryThenFetch type actions

This commit is contained in:
kimchy 2010-04-07 20:32:47 +03:00
parent 9464c390ac
commit f631e9aded
18 changed files with 213 additions and 57 deletions

View File

@ -39,6 +39,7 @@
<w>nanos</w>
<w>newcount</w>
<w>ngram</w>
<w>nospawn</w>
<w>param</w>
<w>params</w>
<w>pluggable</w>

View File

@ -171,6 +171,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
final Map<SearchShardTarget, ExtTIntArrayList> docIdsToLoad = searchPhaseController.docIdsToLoad(sortedShardList);
if (docIdsToLoad.isEmpty()) {
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
finishHim();
}
@ -218,6 +219,8 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
}
}
}
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
}
private void executeFetch(final AtomicInteger counter, FetchSearchRequest fetchSearchRequest, Node node) {

View File

@ -44,7 +44,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class TransportSearchQueryThenFetchAction extends TransportSearchTypeAction {
@ -85,6 +85,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
final Map<SearchShardTarget, ExtTIntArrayList> docIdsToLoad = searchPhaseController.docIdsToLoad(sortedShardList);
if (docIdsToLoad.isEmpty()) {
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
finishHim();
}
@ -133,6 +134,8 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
}
}
}
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
}
private void executeFetch(final AtomicInteger counter, FetchSearchRequest fetchSearchRequest, Node node) {

View File

@ -36,16 +36,19 @@ import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.controller.ShardDoc;
import org.elasticsearch.search.internal.InternalSearchRequest;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.util.settings.Settings;
import org.elasticsearch.util.trove.ExtTIntArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.action.search.type.TransportSearchHelper.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest, SearchResponse> {
@ -271,6 +274,18 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
return TransportSearchHelper.buildShardFailures(shardFailures, searchCache);
}
/**
* Releases shard targets that are not used in the docsIdsToLoad.
*/
protected void releaseIrrelevantSearchContexts(Map<SearchShardTarget, QuerySearchResultProvider> queryResults,
Map<SearchShardTarget, ExtTIntArrayList> docIdsToLoad) {
for (Map.Entry<SearchShardTarget, QuerySearchResultProvider> entry : queryResults.entrySet()) {
if (!docIdsToLoad.containsKey(entry.getKey())) {
searchService.sendFreeContext(nodes.get(entry.getKey().nodeId()), entry.getValue().id());
}
}
}
protected void invokeListener(final SearchResponse response) {
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {

View File

@ -42,10 +42,10 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.io.VoidStreamable;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
import org.elasticsearch.util.io.stream.VoidStreamable;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;

View File

@ -29,10 +29,10 @@ import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.VoidTransportResponseHandler;
import org.elasticsearch.util.component.AbstractComponent;
import org.elasticsearch.util.io.VoidStreamable;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
import org.elasticsearch.util.io.stream.VoidStreamable;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;

View File

@ -29,10 +29,10 @@ import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.VoidTransportResponseHandler;
import org.elasticsearch.util.component.AbstractComponent;
import org.elasticsearch.util.io.VoidStreamable;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
import org.elasticsearch.util.io.stream.VoidStreamable;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;

View File

@ -29,10 +29,10 @@ import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.VoidTransportResponseHandler;
import org.elasticsearch.util.component.AbstractComponent;
import org.elasticsearch.util.io.VoidStreamable;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
import org.elasticsearch.util.io.stream.VoidStreamable;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;

View File

@ -36,10 +36,10 @@ import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.VoidTransportResponseHandler;
import org.elasticsearch.util.component.AbstractComponent;
import org.elasticsearch.util.io.VoidStreamable;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
import org.elasticsearch.util.io.stream.VoidStreamable;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;

View File

@ -43,10 +43,10 @@ import org.elasticsearch.util.SizeValue;
import org.elasticsearch.util.StopWatch;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.component.CloseableComponent;
import org.elasticsearch.util.io.VoidStreamable;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
import org.elasticsearch.util.io.stream.VoidStreamable;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;

View File

@ -29,8 +29,8 @@ import org.elasticsearch.transport.FutureTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.component.AbstractComponent;
import org.elasticsearch.util.io.StringStreamable;
import org.elasticsearch.util.io.VoidStreamable;
import org.elasticsearch.util.io.stream.StringStreamable;
import org.elasticsearch.util.io.stream.VoidStreamable;
import org.elasticsearch.util.settings.Settings;
/**

View File

@ -27,8 +27,10 @@ import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.JsonToken;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.search.dfs.CachedDfSource;
@ -125,25 +127,56 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
@Override protected void doClose() throws ElasticSearchException {
}
public void releaseContextsForIndex(Index index) {
for (SearchContext context : activeContexts.values()) {
if (context.shardTarget().index().equals(index.name())) {
freeContext(context);
}
}
}
public void releaseContextsForShard(ShardId shardId) {
for (SearchContext context : activeContexts.values()) {
if (context.shardTarget().index().equals(shardId.index().name()) && context.shardTarget().shardId() == shardId.id()) {
freeContext(context);
}
}
}
public DfsSearchResult executeDfsPhase(InternalSearchRequest request) throws ElasticSearchException {
SearchContext context = createContext(request);
activeContexts.put(context.id(), context);
dfsPhase.execute(context);
return context.dfsResult();
try {
dfsPhase.execute(context);
return context.dfsResult();
} catch (RuntimeException e) {
freeContext(context);
throw e;
}
}
public QuerySearchResult executeQueryPhase(InternalSearchRequest request) throws ElasticSearchException {
SearchContext context = createContext(request);
activeContexts.put(context.id(), context);
queryPhase.execute(context);
return context.queryResult();
try {
queryPhase.execute(context);
return context.queryResult();
} catch (RuntimeException e) {
freeContext(context);
throw e;
}
}
public QuerySearchResult executeQueryPhase(InternalScrollSearchRequest request) throws ElasticSearchException {
SearchContext context = findContext(request.id());
processScroll(request, context);
queryPhase.execute(context);
return context.queryResult();
try {
processScroll(request, context);
queryPhase.execute(context);
return context.queryResult();
} catch (RuntimeException e) {
freeContext(context);
throw e;
}
}
public QuerySearchResult executeQueryPhase(QuerySearchRequest request) throws ElasticSearchException {
@ -151,21 +184,32 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
try {
context.searcher().dfSource(new CachedDfSource(request.dfs(), context.similarityService().defaultSearchSimilarity()));
} catch (IOException e) {
freeContext(context);
throw new QueryPhaseExecutionException(context, "Failed to set aggregated df", e);
}
queryPhase.execute(context);
return context.queryResult();
try {
queryPhase.execute(context);
return context.queryResult();
} catch (RuntimeException e) {
freeContext(context);
throw e;
}
}
public QueryFetchSearchResult executeFetchPhase(InternalSearchRequest request) throws ElasticSearchException {
SearchContext context = createContext(request);
queryPhase.execute(context);
shortcutDocIdsToLoad(context);
fetchPhase.execute(context);
if (context.scroll() != null) {
activeContexts.put(context.id(), context);
try {
queryPhase.execute(context);
shortcutDocIdsToLoad(context);
fetchPhase.execute(context);
if (context.scroll() != null) {
activeContexts.put(context.id(), context);
}
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
} catch (RuntimeException e) {
freeContext(context);
throw e;
}
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
}
public QueryFetchSearchResult executeFetchPhase(QuerySearchRequest request) throws ElasticSearchException {
@ -173,37 +217,53 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
try {
context.searcher().dfSource(new CachedDfSource(request.dfs(), context.similarityService().defaultSearchSimilarity()));
} catch (IOException e) {
freeContext(context);
throw new QueryPhaseExecutionException(context, "Failed to set aggregated df", e);
}
queryPhase.execute(context);
shortcutDocIdsToLoad(context);
fetchPhase.execute(context);
if (context.scroll() != null) {
activeContexts.put(context.id(), context);
try {
queryPhase.execute(context);
shortcutDocIdsToLoad(context);
fetchPhase.execute(context);
if (context.scroll() != null) {
activeContexts.put(context.id(), context);
}
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
} catch (RuntimeException e) {
freeContext(context);
throw e;
}
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
}
public QueryFetchSearchResult executeFetchPhase(InternalScrollSearchRequest request) throws ElasticSearchException {
SearchContext context = findContext(request.id());
processScroll(request, context);
queryPhase.execute(context);
shortcutDocIdsToLoad(context);
fetchPhase.execute(context);
if (context.scroll() == null) {
freeContext(request.id());
try {
processScroll(request, context);
queryPhase.execute(context);
shortcutDocIdsToLoad(context);
fetchPhase.execute(context);
if (context.scroll() == null) {
freeContext(request.id());
}
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
} catch (RuntimeException e) {
freeContext(context);
throw e;
}
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
}
public FetchSearchResult executeFetchPhase(FetchSearchRequest request) throws ElasticSearchException {
SearchContext context = findContext(request.id());
context.docIdsToLoad(request.docIds());
fetchPhase.execute(context);
if (context.scroll() == null) {
freeContext(request.id());
try {
context.docIdsToLoad(request.docIds());
fetchPhase.execute(context);
if (context.scroll() == null) {
freeContext(request.id());
}
return context.fetchResult();
} catch (RuntimeException e) {
freeContext(context);
throw e;
}
return context.fetchResult();
}
private SearchContext findContext(long id) throws SearchContextMissingException {
@ -255,7 +315,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
return context;
}
private void freeContext(long id) {
public void freeContext(long id) {
SearchContext context = activeContexts.remove(id);
if (context == null) {
return;
@ -264,6 +324,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
}
private void freeContext(SearchContext context) {
activeContexts.remove(context.id());
context.release();
}

View File

@ -32,6 +32,8 @@ import org.elasticsearch.search.internal.InternalSearchRequest;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.transport.*;
import org.elasticsearch.util.io.stream.LongStreamable;
import org.elasticsearch.util.io.stream.VoidStreamable;
/**
* An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through
@ -52,6 +54,7 @@ public class SearchServiceTransportAction {
this.clusterService = clusterService;
this.searchService = searchService;
transportService.registerHandler(SearchFreeContextTransportHandler.ACTION, new SearchFreeContextTransportHandler());
transportService.registerHandler(SearchDfsTransportHandler.ACTION, new SearchDfsTransportHandler());
transportService.registerHandler(SearchQueryTransportHandler.ACTION, new SearchQueryTransportHandler());
transportService.registerHandler(SearchQueryByIdTransportHandler.ACTION, new SearchQueryByIdTransportHandler());
@ -62,6 +65,14 @@ public class SearchServiceTransportAction {
transportService.registerHandler(SearchFetchByIdTransportHandler.ACTION, new SearchFetchByIdTransportHandler());
}
public void sendFreeContext(Node node, final long contextId) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
searchService.freeContext(contextId);
} else {
transportService.sendRequest(node, SearchFreeContextTransportHandler.ACTION, new LongStreamable(contextId), VoidTransportResponseHandler.INSTANCE_NOSPAWN);
}
}
public void sendExecuteDfs(Node node, final InternalSearchRequest request, final SearchServiceListener<DfsSearchResult> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
try {
@ -302,6 +313,20 @@ public class SearchServiceTransportAction {
}
}
private class SearchFreeContextTransportHandler extends BaseTransportRequestHandler<LongStreamable> {
static final String ACTION = "search/freeContext";
@Override public LongStreamable newInstance() {
return new LongStreamable();
}
@Override public void messageReceived(LongStreamable request, TransportChannel channel) throws Exception {
searchService.freeContext(request.get());
channel.sendResponse(VoidStreamable.INSTANCE);
}
}
private class SearchDfsTransportHandler extends BaseTransportRequestHandler<InternalSearchRequest> {

View File

@ -21,6 +21,7 @@ package org.elasticsearch.search.internal;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Sort;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.index.cache.filter.FilterCache;
import org.elasticsearch.index.engine.Engine;
@ -122,6 +123,8 @@ public class SearchContext implements Releasable {
searcher.close();
} catch (IOException e) {
// ignore this exception
} catch (AlreadyClosedException e) {
// ignore this as well
}
engineSearcher.release();
if (!keepAliveTimeout.isCancelled()) {

View File

@ -19,7 +19,7 @@
package org.elasticsearch.transport;
import org.elasticsearch.util.io.VoidStreamable;
import org.elasticsearch.util.io.stream.VoidStreamable;
/**
* @author kimchy (Shay Banon)

View File

@ -0,0 +1,53 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.util.io.stream;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class LongStreamable implements Streamable {
private long value;
public LongStreamable() {
}
public LongStreamable(long value) {
this.value = value;
}
public void set(long newValue) {
value = newValue;
}
public long get() {
return this.value;
}
@Override public void readFrom(StreamInput in) throws IOException {
value = in.readLong();
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeLong(value);
}
}

View File

@ -17,16 +17,12 @@
* under the License.
*/
package org.elasticsearch.util.io;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
package org.elasticsearch.util.io.stream;
import java.io.IOException;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class StringStreamable implements Streamable {

View File

@ -17,16 +17,12 @@
* under the License.
*/
package org.elasticsearch.util.io;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
package org.elasticsearch.util.io.stream;
import java.io.IOException;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class VoidStreamable implements Streamable {