Make maximum number of parallel search requests configurable. (#22192)

Problem: So far all rank eval requests are being executed in parallel. If there
are more than the search thread pool can handle, or if there are other search
requests executed in parallel rank eval can fail.

Solution: Make number of max_concurrent_searches configurable.

Name of configuration parameter is analogous to msearch. Default
max_concurrent_searches set to 10: Rank_eval isn't particularly time critical so
trying to avoid being more clever than probably needed here. Can set this value
through the API to a higher value anytime.

Fixes #21403
This commit is contained in:
Isabel Drost-Fromm 2016-12-19 13:05:49 +01:00 committed by GitHub
parent bdc32be8b7
commit 46c30e6bc3
4 changed files with 62 additions and 11 deletions

View File

@ -57,7 +57,8 @@ GET /twitter/tweet/_rank_eval
}],
"metric": { <9>
"reciprocal_rank": {}
}
},
"max_concurrent_searches": 10 <10>
}
------------------------------
// CONSOLE
@ -72,6 +73,8 @@ GET /twitter/tweet/_rank_eval
<7> The index where the rated document lives.
<8> For a verbose response, specify which properties of a search hit should be returned in addition to index/type/id.
<9> A metric to use for evaluation. See below for a list.
<10> Maximum number of search requests to execute in parallel. Set to 10 by
default.
== Template based ranking evaluation

View File

@ -50,7 +50,11 @@ public class RankEvalSpec extends ToXContentToBytes implements Writeable {
private Collection<RatedRequest> ratedRequests = new ArrayList<>();
/** Definition of the quality metric, e.g. precision at N */
private RankedListQualityMetric metric;
/** optional: Template to base test requests on */
/** Maximum number of requests to execute in parallel. */
private int maxConcurrentSearches = MAX_CONCURRENT_SEARCHES;
/** Default max number of requests. */
private static final int MAX_CONCURRENT_SEARCHES = 10;
/** optional: Templates to base test requests on */
private Map<String, Script> templates = new HashMap<>();
public RankEvalSpec(Collection<RatedRequest> ratedRequests, RankedListQualityMetric metric, Collection<ScriptWithId> templates) {
@ -97,6 +101,7 @@ public class RankEvalSpec extends ToXContentToBytes implements Writeable {
Script value = new Script(in);
this.templates.put(key, value);
}
maxConcurrentSearches = in.readVInt();
}
@Override
@ -111,6 +116,7 @@ public class RankEvalSpec extends ToXContentToBytes implements Writeable {
out.writeString(entry.getKey());
entry.getValue().writeTo(out);
}
out.writeVInt(maxConcurrentSearches);
}
/** Returns the metric to use for quality evaluation.*/
@ -127,10 +133,21 @@ public class RankEvalSpec extends ToXContentToBytes implements Writeable {
public Map<String, Script> getTemplates() {
return this.templates;
}
/** Returns the max concurrent searches allowed. */
public int getMaxConcurrentSearches() {
return this.maxConcurrentSearches;
}
/** Set the max concurrent searches allowed. */
public void setMaxConcurrentSearches(int maxConcurrentSearches) {
this.maxConcurrentSearches = maxConcurrentSearches;
}
private static final ParseField TEMPLATES_FIELD = new ParseField("templates");
private static final ParseField METRIC_FIELD = new ParseField("metric");
private static final ParseField REQUESTS_FIELD = new ParseField("requests");
private static final ParseField MAX_CONCURRENT_SEARCHES_FIELD = new ParseField("max_concurrent_searches");
@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<RankEvalSpec, RankEvalContext> PARSER =
new ConstructingObjectParser<>("rank_eval",
@ -158,6 +175,7 @@ public class RankEvalSpec extends ToXContentToBytes implements Writeable {
throw new ParsingException(p.getTokenLocation(), "error parsing rank request", ex);
}
}, TEMPLATES_FIELD);
PARSER.declareInt(RankEvalSpec::setMaxConcurrentSearches, MAX_CONCURRENT_SEARCHES_FIELD);
}
public static RankEvalSpec parse(XContentParser parser, RankEvalContext context) throws IOException {
@ -213,6 +231,7 @@ public class RankEvalSpec extends ToXContentToBytes implements Writeable {
}
builder.endArray();
builder.field(METRIC_FIELD.getPreferredName(), this.metric);
builder.field(MAX_CONCURRENT_SEARCHES_FIELD.getPreferredName(), maxConcurrentSearches);
builder.endObject();
return builder;
}
@ -229,11 +248,12 @@ public class RankEvalSpec extends ToXContentToBytes implements Writeable {
return Objects.equals(ratedRequests, other.ratedRequests) &&
Objects.equals(metric, other.metric) &&
Objects.equals(maxConcurrentSearches, other.maxConcurrentSearches) &&
Objects.equals(templates, other.templates);
}
@Override
public final int hashCode() {
return Objects.hash(ratedRequests, metric, templates);
return Objects.hash(ratedRequests, metric, templates, maxConcurrentSearches);
}
}

View File

@ -47,8 +47,10 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -65,6 +67,7 @@ public class TransportRankEvalAction extends HandledTransportAction<RankEvalRequ
private Client client;
private ScriptService scriptService;
private SearchRequestParsers searchRequestParsers;
Queue<RequestTask> taskQueue = new ConcurrentLinkedQueue<>();
@Inject
public TransportRankEvalAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters,
@ -81,10 +84,10 @@ public class TransportRankEvalAction extends HandledTransportAction<RankEvalRequ
protected void doExecute(RankEvalRequest request, ActionListener<RankEvalResponse> listener) {
RankEvalSpec qualityTask = request.getRankEvalSpec();
Collection<RatedRequest> specifications = qualityTask.getRatedRequests();
AtomicInteger responseCounter = new AtomicInteger(specifications.size());
Map<String, EvalQueryQuality> partialResults = new ConcurrentHashMap<>(specifications.size());
Map<String, Exception> errors = new ConcurrentHashMap<>(specifications.size());
Collection<RatedRequest> ratedRequests = qualityTask.getRatedRequests();
AtomicInteger responseCounter = new AtomicInteger(ratedRequests.size());
Map<String, EvalQueryQuality> partialResults = new ConcurrentHashMap<>(ratedRequests.size());
Map<String, Exception> errors = new ConcurrentHashMap<>(ratedRequests.size());
Map<String, CompiledScript> scriptsWithoutParams = new HashMap<>();
for (Entry<String, Script> entry : qualityTask.getTemplates().entrySet()) {
@ -92,7 +95,8 @@ public class TransportRankEvalAction extends HandledTransportAction<RankEvalRequ
entry.getKey(),
scriptService.compile(entry.getValue(), ScriptContext.Standard.SEARCH, new HashMap<>()));
}
for (RatedRequest ratedRequest : specifications) {
for (RatedRequest ratedRequest : ratedRequests) {
final RankEvalActionListener searchListener = new RankEvalActionListener(listener, qualityTask.getMetric(), ratedRequest,
partialResults, errors, responseCounter);
SearchSourceBuilder ratedSearchSource = ratedRequest.getTestRequest();
@ -124,11 +128,28 @@ public class TransportRankEvalAction extends HandledTransportAction<RankEvalRequ
types = ratedRequest.getTypes().toArray(types);
templatedRequest.types(types);
client.search(templatedRequest, searchListener);
RequestTask task = new RequestTask(templatedRequest, searchListener);
taskQueue.add(task);
}
// Execute top n tasks, further execution is triggered in RankEvalActionListener
for (int i = 0; (i < Math.min(ratedRequests.size(), qualityTask.getMaxConcurrentSearches())); i++) {
RequestTask task = taskQueue.poll();
client.search(task.request, task.searchListener);
}
}
public static class RankEvalActionListener implements ActionListener<SearchResponse> {
private class RequestTask {
public SearchRequest request;
public RankEvalActionListener searchListener;
public RequestTask(SearchRequest request, RankEvalActionListener listener) {
this.request = request;
this.searchListener = listener;
}
}
public class RankEvalActionListener implements ActionListener<SearchResponse> {
private ActionListener<RankEvalResponse> listener;
private RatedRequest specification;
@ -165,6 +186,11 @@ public class TransportRankEvalAction extends HandledTransportAction<RankEvalRequ
if (responseCounter.decrementAndGet() == 0) {
// TODO add other statistics like micro/macro avg?
listener.onResponse(new RankEvalResponse(metric.combine(requestDetails.values()), requestDetails, errors));
} else {
if (! taskQueue.isEmpty()) {
RequestTask task = taskQueue.poll();
client.search(task.request, task.searchListener);
}
}
}
}

View File

@ -126,7 +126,9 @@ public class RankEvalSpecTests extends ESTestCase {
"id", Arrays.asList(RatedDocumentTests.createRatedDocument()), new SearchSourceBuilder());
ratedRequests = Arrays.asList(ratedRequest);
}
return new RankEvalSpec(ratedRequests, metric, templates);
RankEvalSpec spec = new RankEvalSpec(ratedRequests, metric, templates);
maybeSet(spec::setMaxConcurrentSearches, randomInt(100));
return spec;
}
public void testRoundtripping() throws IOException {