From c22c1e0f54d8894934c46ff414635ba80fdb4d8a Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Thu, 5 Nov 2015 21:46:48 -0800 Subject: [PATCH] remove simulate executor service call and move to simple execution --- .../ingest/PipelineExecutionService.java | 4 -- .../SimulatePipelineRequestPayload.java | 24 +++++--- .../simulate/SimulatePipelineResponse.java | 51 ++++++++++++----- .../SimulatePipelineTransportAction.java | 57 ++++--------------- .../elasticsearch/ingest/IngestClientIT.java | 10 +--- .../rest-api-spec/api/ingest.simulate.json | 2 +- 6 files changed, 70 insertions(+), 78 deletions(-) diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java index c6314ae6a6c..3b2f71aa142 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java @@ -53,10 +53,6 @@ public class PipelineExecutionService { return pipeline; } - public Map getProcessorFactoryRegistry() { - return store.getProcessorFactoryRegistry(); - } - public void execute(Data data, String pipelineId, Listener listener) { try { execute(data, getPipeline(pipelineId), listener); diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequestPayload.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequestPayload.java index b376c67d006..03ab8253583 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequestPayload.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequestPayload.java @@ -22,6 +22,7 @@ import org.elasticsearch.ingest.Data; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.processor.ConfigurationUtils; import org.elasticsearch.plugin.ingest.PipelineExecutionService; +import org.elasticsearch.plugin.ingest.PipelineStore; import java.io.IOException; import java.util.ArrayList; @@ -48,24 +49,33 @@ public class SimulatePipelineRequestPayload { } - public Data getDocument(int i) { - return documents.get(i); + public List documents() { + return documents; } - public int size() { - return documents.size(); + public SimulatePipelineResponse execute() { + List responses = new ArrayList<>(); + for (Data data : documents) { + try { + pipeline.execute(data); + responses.add(new SimulatedItemResponse(data)); + } catch (Exception e) { + responses.add(new SimulatedItemResponse(e)); + } + } + return new SimulatePipelineResponse(pipeline.getId(), responses); } public static class Factory { - public SimulatePipelineRequestPayload create(String pipelineId, Map config, PipelineExecutionService executionService) throws IOException { + public SimulatePipelineRequestPayload create(String pipelineId, Map config, PipelineStore pipelineStore) throws IOException { Pipeline pipeline; // if pipeline `id` passed to request, fetch pipeline from store. if (pipelineId != null) { - pipeline = executionService.getPipeline(pipelineId); + pipeline = pipelineStore.get(pipelineId); } else { Map pipelineConfig = (Map) config.get("pipeline"); - pipeline = (new Pipeline.Factory()).create("_pipeline_id", pipelineConfig, executionService.getProcessorFactoryRegistry()); + pipeline = (new Pipeline.Factory()).create("_pipeline_id", pipelineConfig, pipelineStore.getProcessorFactoryRegistry()); } // distribute docs by shard key to SimulateShardPipelineResponse diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineResponse.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineResponse.java index d152424abe5..aeed148d74a 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineResponse.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineResponse.java @@ -27,35 +27,46 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.rest.RestStatus; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; public class SimulatePipelineResponse extends ActionResponse implements StatusToXContent { private String pipelineId; - private SimulatedItemResponse[] responses; + private List responses; + + public SimulatePipelineResponse() { + + } + + public SimulatePipelineResponse(String pipelineId, List responses) { + this.pipelineId = pipelineId; + this.responses = Collections.unmodifiableList(responses); + } public String pipelineId() { return pipelineId; } - public SimulatePipelineResponse pipelineId(String pipelineId) { + public void pipelineId(String pipelineId) { this.pipelineId = pipelineId; - return this; } - public SimulatePipelineResponse responses(SimulatedItemResponse[] responses) { - this.responses = responses; - return this; - } - - public SimulatedItemResponse[] responses() { + public List responses() { return responses; } + public void responses(List responses) { + this.responses = responses; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(pipelineId); - out.writeVInt(responses.length); + out.writeVInt(responses.size()); for (SimulatedItemResponse response : responses) { response.writeTo(out); } @@ -66,11 +77,11 @@ public class SimulatePipelineResponse extends ActionResponse implements StatusTo super.readFrom(in); this.pipelineId = in.readString(); int responsesLength = in.readVInt(); - responses = new SimulatedItemResponse[responsesLength]; + responses = new ArrayList<>(); for (int i = 0; i < responsesLength; i++) { SimulatedItemResponse response = new SimulatedItemResponse(); response.readFrom(in); - responses[i] = response; + responses.add(response); } } @@ -90,9 +101,23 @@ public class SimulatePipelineResponse extends ActionResponse implements StatusTo public RestStatus status() { for (SimulatedItemResponse response : responses) { if (response.failed()) { - return response.status(); + return RestStatus.BAD_REQUEST; } } return RestStatus.OK; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SimulatePipelineResponse that = (SimulatePipelineResponse) o; + return Objects.equals(pipelineId, that.pipelineId) && + Objects.equals(responses, that.responses); + } + + @Override + public int hashCode() { + return Objects.hash(pipelineId, responses); + } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineTransportAction.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineTransportAction.java index 04c26d3c98f..c4f3484ca82 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineTransportAction.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineTransportAction.java @@ -25,25 +25,22 @@ import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.ingest.Data; -import org.elasticsearch.plugin.ingest.PipelineExecutionService; +import org.elasticsearch.plugin.ingest.PipelineStore; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; public class SimulatePipelineTransportAction extends HandledTransportAction { - private final PipelineExecutionService executionService; + private final PipelineStore pipelineStore; @Inject - public SimulatePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, PipelineExecutionService executionService) { + public SimulatePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, PipelineStore pipelineStore) { super(settings, SimulatePipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SimulatePipelineRequest::new); - this.executionService = executionService; + this.pipelineStore = pipelineStore; } @Override @@ -53,49 +50,17 @@ public class SimulatePipelineTransportAction extends HandledTransportAction responses = new AtomicArray<>(payload.size()); - final AtomicInteger counter = new AtomicInteger(payload.size()); - - for (int i = 0; i < payload.size(); i++) { - final int index = i; - - executionService.execute(payload.getDocument(index), payload.pipeline(), new PipelineExecutionService.Listener() { - @Override - public void executed(Data data) { - responses.set(index, new SimulatedItemResponse(data)); - - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - - @Override - public void failed(Exception e) { - logger.error("failed to execute pipeline [{}]", e, payload.pipelineId()); - responses.set(index, new SimulatedItemResponse(e)); - - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - - public void finishHim() { - SimulatedItemResponse[] responseArray = new SimulatedItemResponse[responses.length()]; - responses.toArray(responseArray); - - SimulatePipelineResponse response = new SimulatePipelineResponse() - .pipelineId(payload.pipelineId()) - .responses(responseArray); - - listener.onResponse(response); - } - }); - } + threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new Runnable() { + @Override + public void run() { + listener.onResponse(payload.execute()); + } + }); } } diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java index f055d92f2d2..d0d11ae60a0 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java @@ -105,14 +105,10 @@ public class IngestClientIT extends ESIntegTestCase { expectedDoc.put("foo", "bar"); Data expectedData = new Data("index", "type", "id", expectedDoc); SimulatedItemResponse expectedResponse = new SimulatedItemResponse(expectedData); - SimulatedItemResponse[] expectedResponses = new SimulatedItemResponse[] { expectedResponse }; + List expectedResponses = Arrays.asList(expectedResponse); + SimulatePipelineResponse expected = new SimulatePipelineResponse("_id", expectedResponses); - assertThat(response.responses().length, equalTo(1)); - assertThat(response.responses()[0].getData().getIndex(), equalTo(expectedResponse.getData().getIndex())); - assertThat(response.responses()[0].getData(), equalTo(expectedResponse.getData())); - assertThat(response.responses()[0], equalTo(expectedResponse)); - assertThat(response.responses(), equalTo(expectedResponses)); - assertThat(response.pipelineId(), equalTo("_id")); + assertThat(response, equalTo(expected)); } public void test() throws Exception { diff --git a/plugins/ingest/src/test/resources/rest-api-spec/api/ingest.simulate.json b/plugins/ingest/src/test/resources/rest-api-spec/api/ingest.simulate.json index 33007e3be87..bf08435eb8e 100644 --- a/plugins/ingest/src/test/resources/rest-api-spec/api/ingest.simulate.json +++ b/plugins/ingest/src/test/resources/rest-api-spec/api/ingest.simulate.json @@ -1,7 +1,7 @@ { "ingest.simulate": { "documentation": "https://www.elastic.co/guide/en/elasticsearch/plugins/master/ingest.html", - "methods": [ "GET", "POST" ], + "methods": [ "POST" ], "url": { "path": "/_ingest/pipeline/_simulate", "paths": [ "/_ingest/pipeline/_simulate", "/_ingest/pipeline/{id}/_simulate" ],