Set default `operation_threading` to `thread_per_shard` and exposed it as an option in the rest api.
This commit is contained in:
parent
59be83f9fc
commit
4d40a1e77c
|
@ -23,6 +23,7 @@ import org.elasticsearch.ElasticSearchGenerationException;
|
|||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.get.GetRequest;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading;
|
||||
import org.elasticsearch.client.Requests;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
@ -60,15 +61,13 @@ public class PercolateRequest extends BroadcastOperationRequest<PercolateRequest
|
|||
long startTime;
|
||||
|
||||
public PercolateRequest() {
|
||||
}
|
||||
|
||||
public PercolateRequest(String[] indices, String documentType) {
|
||||
super(indices);
|
||||
this.documentType = documentType;
|
||||
// we want to do the percolate in parallel on all the local shards
|
||||
operationThreading(BroadcastOperationThreading.THREAD_PER_SHARD);
|
||||
}
|
||||
|
||||
public PercolateRequest(PercolateRequest request, BytesReference docSource) {
|
||||
super(request.indices());
|
||||
operationThreading(request.operationThreading());
|
||||
this.documentType = request.documentType();
|
||||
this.routing = request.routing();
|
||||
this.preference = request.preference();
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.action.get.GetRequest;
|
|||
import org.elasticsearch.action.percolate.PercolateRequest;
|
||||
import org.elasticsearch.action.percolate.PercolateResponse;
|
||||
import org.elasticsearch.action.support.IgnoreIndices;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -112,6 +113,16 @@ public class RestPercolateAction extends BaseRestHandler {
|
|||
void executePercolate(final PercolateRequest percolateRequest, final RestRequest restRequest, final RestChannel restChannel) {
|
||||
// we just send a response, no need to fork
|
||||
percolateRequest.listenerThreaded(false);
|
||||
|
||||
if (restRequest.hasParam("operation_threading")) {
|
||||
BroadcastOperationThreading operationThreading = BroadcastOperationThreading.fromString(restRequest.param("operation_threading"), null);
|
||||
if (operationThreading == BroadcastOperationThreading.NO_THREADS) {
|
||||
// don't do work on the network thread
|
||||
operationThreading = BroadcastOperationThreading.SINGLE_THREAD;
|
||||
}
|
||||
percolateRequest.operationThreading(operationThreading);
|
||||
}
|
||||
|
||||
client.percolate(percolateRequest, new ActionListener<PercolateResponse>() {
|
||||
@Override
|
||||
public void onResponse(PercolateResponse response) {
|
||||
|
|
|
@ -53,7 +53,7 @@ public class PercolatorStressBenchmark {
|
|||
.put(SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.build();
|
||||
|
||||
Node[] nodes = new Node[2];
|
||||
Node[] nodes = new Node[1];
|
||||
for (int i = 0; i < nodes.length; i++) {
|
||||
nodes[i] = nodeBuilder().settings(settingsBuilder().put(settings).put("name", "node" + i)).node();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue