remove simulate executor service call and move to simple execution

This commit is contained in:
Tal Levy 2015-11-05 21:46:48 -08:00
parent 1f29fa4fe9
commit c22c1e0f54
6 changed files with 70 additions and 78 deletions

View File

@ -53,10 +53,6 @@ public class PipelineExecutionService {
return pipeline;
}
public Map<String, Processor.Factory> getProcessorFactoryRegistry() {
return store.getProcessorFactoryRegistry();
}
public void execute(Data data, String pipelineId, Listener listener) {
try {
execute(data, getPipeline(pipelineId), listener);

View File

@ -22,6 +22,7 @@ import org.elasticsearch.ingest.Data;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.processor.ConfigurationUtils;
import org.elasticsearch.plugin.ingest.PipelineExecutionService;
import org.elasticsearch.plugin.ingest.PipelineStore;
import java.io.IOException;
import java.util.ArrayList;
@ -48,24 +49,33 @@ public class SimulatePipelineRequestPayload {
}
public Data getDocument(int i) {
return documents.get(i);
public List<Data> documents() {
return documents;
}
public int size() {
return documents.size();
public SimulatePipelineResponse execute() {
List<SimulatedItemResponse> responses = new ArrayList<>();
for (Data data : documents) {
try {
pipeline.execute(data);
responses.add(new SimulatedItemResponse(data));
} catch (Exception e) {
responses.add(new SimulatedItemResponse(e));
}
}
return new SimulatePipelineResponse(pipeline.getId(), responses);
}
public static class Factory {
public SimulatePipelineRequestPayload create(String pipelineId, Map<String, Object> config, PipelineExecutionService executionService) throws IOException {
public SimulatePipelineRequestPayload create(String pipelineId, Map<String, Object> config, PipelineStore pipelineStore) throws IOException {
Pipeline pipeline;
// if pipeline `id` passed to request, fetch pipeline from store.
if (pipelineId != null) {
pipeline = executionService.getPipeline(pipelineId);
pipeline = pipelineStore.get(pipelineId);
} else {
Map<String, Object> pipelineConfig = (Map<String, Object>) config.get("pipeline");
pipeline = (new Pipeline.Factory()).create("_pipeline_id", pipelineConfig, executionService.getProcessorFactoryRegistry());
pipeline = (new Pipeline.Factory()).create("_pipeline_id", pipelineConfig, pipelineStore.getProcessorFactoryRegistry());
}
// distribute docs by shard key to SimulateShardPipelineResponse

View File

@ -27,35 +27,46 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
public class SimulatePipelineResponse extends ActionResponse implements StatusToXContent {
private String pipelineId;
private SimulatedItemResponse[] responses;
private List<SimulatedItemResponse> responses;
public SimulatePipelineResponse() {
}
public SimulatePipelineResponse(String pipelineId, List<SimulatedItemResponse> responses) {
this.pipelineId = pipelineId;
this.responses = Collections.unmodifiableList(responses);
}
public String pipelineId() {
return pipelineId;
}
public SimulatePipelineResponse pipelineId(String pipelineId) {
public void pipelineId(String pipelineId) {
this.pipelineId = pipelineId;
return this;
}
public SimulatePipelineResponse responses(SimulatedItemResponse[] responses) {
this.responses = responses;
return this;
}
public SimulatedItemResponse[] responses() {
public List<SimulatedItemResponse> responses() {
return responses;
}
public void responses(List<SimulatedItemResponse> responses) {
this.responses = responses;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(pipelineId);
out.writeVInt(responses.length);
out.writeVInt(responses.size());
for (SimulatedItemResponse response : responses) {
response.writeTo(out);
}
@ -66,11 +77,11 @@ public class SimulatePipelineResponse extends ActionResponse implements StatusTo
super.readFrom(in);
this.pipelineId = in.readString();
int responsesLength = in.readVInt();
responses = new SimulatedItemResponse[responsesLength];
responses = new ArrayList<>();
for (int i = 0; i < responsesLength; i++) {
SimulatedItemResponse response = new SimulatedItemResponse();
response.readFrom(in);
responses[i] = response;
responses.add(response);
}
}
@ -90,9 +101,23 @@ public class SimulatePipelineResponse extends ActionResponse implements StatusTo
public RestStatus status() {
for (SimulatedItemResponse response : responses) {
if (response.failed()) {
return response.status();
return RestStatus.BAD_REQUEST;
}
}
return RestStatus.OK;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SimulatePipelineResponse that = (SimulatePipelineResponse) o;
return Objects.equals(pipelineId, that.pipelineId) &&
Objects.equals(responses, that.responses);
}
@Override
public int hashCode() {
return Objects.hash(pipelineId, responses);
}
}

View File

@ -25,25 +25,22 @@ import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.ingest.Data;
import org.elasticsearch.plugin.ingest.PipelineExecutionService;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public class SimulatePipelineTransportAction extends HandledTransportAction<SimulatePipelineRequest, SimulatePipelineResponse> {
private final PipelineExecutionService executionService;
private final PipelineStore pipelineStore;
@Inject
public SimulatePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, PipelineExecutionService executionService) {
public SimulatePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, PipelineStore pipelineStore) {
super(settings, SimulatePipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SimulatePipelineRequest::new);
this.executionService = executionService;
this.pipelineStore = pipelineStore;
}
@Override
@ -53,49 +50,17 @@ public class SimulatePipelineTransportAction extends HandledTransportAction<Simu
SimulatePipelineRequestPayload payload;
SimulatePipelineRequestPayload.Factory factory = new SimulatePipelineRequestPayload.Factory();
try {
payload = factory.create(request.id(), source, executionService);
payload = factory.create(request.id(), source, pipelineStore);
} catch (IOException e) {
listener.onFailure(e);
return;
}
final AtomicArray<SimulatedItemResponse> responses = new AtomicArray<>(payload.size());
final AtomicInteger counter = new AtomicInteger(payload.size());
for (int i = 0; i < payload.size(); i++) {
final int index = i;
executionService.execute(payload.getDocument(index), payload.pipeline(), new PipelineExecutionService.Listener() {
@Override
public void executed(Data data) {
responses.set(index, new SimulatedItemResponse(data));
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
@Override
public void failed(Exception e) {
logger.error("failed to execute pipeline [{}]", e, payload.pipelineId());
responses.set(index, new SimulatedItemResponse(e));
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
public void finishHim() {
SimulatedItemResponse[] responseArray = new SimulatedItemResponse[responses.length()];
responses.toArray(responseArray);
SimulatePipelineResponse response = new SimulatePipelineResponse()
.pipelineId(payload.pipelineId())
.responses(responseArray);
listener.onResponse(response);
}
});
}
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new Runnable() {
@Override
public void run() {
listener.onResponse(payload.execute());
}
});
}
}

View File

@ -105,14 +105,10 @@ public class IngestClientIT extends ESIntegTestCase {
expectedDoc.put("foo", "bar");
Data expectedData = new Data("index", "type", "id", expectedDoc);
SimulatedItemResponse expectedResponse = new SimulatedItemResponse(expectedData);
SimulatedItemResponse[] expectedResponses = new SimulatedItemResponse[] { expectedResponse };
List<SimulatedItemResponse> expectedResponses = Arrays.asList(expectedResponse);
SimulatePipelineResponse expected = new SimulatePipelineResponse("_id", expectedResponses);
assertThat(response.responses().length, equalTo(1));
assertThat(response.responses()[0].getData().getIndex(), equalTo(expectedResponse.getData().getIndex()));
assertThat(response.responses()[0].getData(), equalTo(expectedResponse.getData()));
assertThat(response.responses()[0], equalTo(expectedResponse));
assertThat(response.responses(), equalTo(expectedResponses));
assertThat(response.pipelineId(), equalTo("_id"));
assertThat(response, equalTo(expected));
}
public void test() throws Exception {

View File

@ -1,7 +1,7 @@
{
"ingest.simulate": {
"documentation": "https://www.elastic.co/guide/en/elasticsearch/plugins/master/ingest.html",
"methods": [ "GET", "POST" ],
"methods": [ "POST" ],
"url": {
"path": "/_ingest/pipeline/_simulate",
"paths": [ "/_ingest/pipeline/_simulate", "/_ingest/pipeline/{id}/_simulate" ],