moar updates
This commit is contained in:
parent
b40af1bcfd
commit
674084973d
|
@ -22,7 +22,6 @@ package org.elasticsearch.ingest;
|
|||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||
|
||||
import java.lang.reflect.Array;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
|
@ -131,6 +130,16 @@ public final class Data {
|
|||
return modified;
|
||||
}
|
||||
|
||||
public Map<String, Object> asMap() {
|
||||
Map<String, Object> dataMap = new HashMap<>();
|
||||
dataMap.put("_index", index);
|
||||
dataMap.put("_type", type);
|
||||
dataMap.put("_id", id);
|
||||
dataMap.put("_source", document);
|
||||
|
||||
return dataMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == this) { return true; }
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.elasticsearch.ingest.processor.geoip.GeoIpProcessor;
|
|||
import org.elasticsearch.ingest.processor.grok.GrokProcessor;
|
||||
import org.elasticsearch.ingest.processor.mutate.MutateProcessor;
|
||||
import org.elasticsearch.plugin.ingest.rest.IngestRestFilter;
|
||||
import org.elasticsearch.plugin.ingest.simulate.SimulateExecutionService;
|
||||
import org.elasticsearch.plugin.ingest.transport.simulate.SimulateExecutionService;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
|
|
@ -36,10 +36,10 @@ public class RestSimulatePipelineAction extends BaseRestHandler {
|
|||
@Inject
|
||||
public RestSimulatePipelineAction(Settings settings, RestController controller, Client client) {
|
||||
super(settings, controller, client);
|
||||
controller.registerHandler(RestRequest.Method.POST, "/_ingest/pipeline/{id}/_simulate", this);
|
||||
controller.registerHandler(RestRequest.Method.POST, "/_ingest/pipeline/_simulate/{id}", this);
|
||||
controller.registerHandler(RestRequest.Method.GET, "/_ingest/pipeline/_simulate/{id}", this);
|
||||
controller.registerHandler(RestRequest.Method.POST, "/_ingest/pipeline/_simulate", this);
|
||||
// controller.registerHandler(RestRequest.Method.GET, "/_ingest/pipeline/{id}/_simulate", this);
|
||||
// controller.registerHandler(RestRequest.Method.GET, "/_ingest/pipeline/_simulate", this);
|
||||
controller.registerHandler(RestRequest.Method.GET, "/_ingest/pipeline/_simulate", this);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.plugin.ingest.simulate;
|
||||
package org.elasticsearch.plugin.ingest.transport.simulate;
|
||||
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.Pipeline;
|
|
@ -16,37 +16,38 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.plugin.ingest.simulate;
|
||||
package org.elasticsearch.plugin.ingest.transport.simulate;
|
||||
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.xcontent.StatusToXContent;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
public class ProcessedData implements Streamable, StatusToXContent {
|
||||
public class ProcessorResult implements Streamable, ToXContent {
|
||||
|
||||
private String processorId;
|
||||
private Data data;
|
||||
private Throwable failure;
|
||||
|
||||
public ProcessedData() {
|
||||
public ProcessorResult() {
|
||||
|
||||
}
|
||||
|
||||
public ProcessedData(String processorId, Data data) {
|
||||
public ProcessorResult(String processorId, Data data) {
|
||||
this.processorId = processorId;
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
public ProcessedData(Throwable failure) {
|
||||
public ProcessorResult(String processorId, Throwable failure) {
|
||||
this.processorId = processorId;
|
||||
this.failure = failure;
|
||||
}
|
||||
|
||||
|
@ -67,7 +68,6 @@ public class ProcessedData implements Streamable, StatusToXContent {
|
|||
boolean isFailure = in.readBoolean();
|
||||
if (isFailure) {
|
||||
this.failure = in.readThrowable();
|
||||
// TODO(talevy): check out mget for throwable limitations
|
||||
} else {
|
||||
this.processorId = in.readString();
|
||||
String index = in.readString();
|
||||
|
@ -98,31 +98,22 @@ public class ProcessedData implements Streamable, StatusToXContent {
|
|||
builder.field(Fields.PROCESSOR_ID, processorId);
|
||||
builder.field(Fields.ERROR, isFailed());
|
||||
if (isFailed()) {
|
||||
builder.field(Fields.FAILURE, failure.toString());
|
||||
builder.field(Fields.ERROR_MESSAGE, ExceptionsHelper.detailedMessage(failure));
|
||||
} else {
|
||||
builder.field(Fields.MODIFIED, data.isModified());
|
||||
builder.field(Fields.DOCUMENT, data.getDocument());
|
||||
builder.field(Fields.DOCUMENT, data.asMap());
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RestStatus status() {
|
||||
if (isFailed()) {
|
||||
return RestStatus.BAD_REQUEST;
|
||||
} else {
|
||||
return RestStatus.OK;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == this) { return true; }
|
||||
if (obj == null || getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
ProcessedData other = (ProcessedData) obj;
|
||||
ProcessorResult other = (ProcessorResult) obj;
|
||||
return Objects.equals(processorId, other.processorId) && Objects.equals(data, other.data) && Objects.equals(failure, other.failure);
|
||||
}
|
||||
|
||||
|
@ -135,7 +126,7 @@ public class ProcessedData implements Streamable, StatusToXContent {
|
|||
static final XContentBuilderString DOCUMENT = new XContentBuilderString("doc");
|
||||
static final XContentBuilderString PROCESSOR_ID = new XContentBuilderString("processor_id");
|
||||
static final XContentBuilderString ERROR = new XContentBuilderString("error");
|
||||
static final XContentBuilderString FAILURE = new XContentBuilderString("failure");
|
||||
static final XContentBuilderString ERROR_MESSAGE = new XContentBuilderString("error_message");
|
||||
static final XContentBuilderString MODIFIED = new XContentBuilderString("modified");
|
||||
}
|
||||
}
|
|
@ -17,13 +17,12 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.ingest.simulate;
|
||||
package org.elasticsearch.plugin.ingest.transport.simulate;
|
||||
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.Pipeline;
|
||||
import org.elasticsearch.ingest.processor.Processor;
|
||||
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineResponse;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -41,14 +40,10 @@ public class SimulateExecutionService {
|
|||
}
|
||||
|
||||
|
||||
SimulatedItemResponse executeItem(Pipeline pipeline, Data data, boolean verbose) {
|
||||
SimulatedItemResponse executeItem(Pipeline pipeline, Data data) {
|
||||
try {
|
||||
if (verbose) {
|
||||
return executeVerboseItem(pipeline, data);
|
||||
} else {
|
||||
pipeline.execute(data);
|
||||
return new SimulatedItemResponse(data);
|
||||
}
|
||||
pipeline.execute(data);
|
||||
return new SimulatedItemResponse(data);
|
||||
} catch (Exception e) {
|
||||
return new SimulatedItemResponse(e);
|
||||
}
|
||||
|
@ -56,24 +51,32 @@ public class SimulateExecutionService {
|
|||
}
|
||||
|
||||
SimulatedItemResponse executeVerboseItem(Pipeline pipeline, Data data) {
|
||||
List<ProcessedData> processedDataList = new ArrayList<>();
|
||||
List<ProcessorResult> processorResultList = new ArrayList<>();
|
||||
Data currentData = new Data(data);
|
||||
for (int i = 0; i < pipeline.getProcessors().size(); i++) {
|
||||
Processor processor = pipeline.getProcessors().get(i);
|
||||
String processorId = "processor[" + processor.getType() + "]-" + i;
|
||||
|
||||
processor.execute(currentData);
|
||||
processedDataList.add(new ProcessedData(processorId, currentData));
|
||||
try {
|
||||
processor.execute(currentData);
|
||||
processorResultList.add(new ProcessorResult(processorId, currentData));
|
||||
} catch (Exception e) {
|
||||
processorResultList.add(new ProcessorResult(processorId, e));
|
||||
}
|
||||
|
||||
currentData = new Data(currentData);
|
||||
}
|
||||
return new SimulatedItemResponse(processedDataList);
|
||||
return new SimulatedItemResponse(processorResultList);
|
||||
}
|
||||
|
||||
SimulatePipelineResponse execute(ParsedSimulateRequest request) {
|
||||
List<SimulatedItemResponse> responses = new ArrayList<>();
|
||||
for (Data data : request.getDocuments()) {
|
||||
responses.add(executeItem(request.getPipeline(), data, request.isVerbose()));
|
||||
if (request.isVerbose()) {
|
||||
responses.add(executeVerboseItem(request.getPipeline(), data));
|
||||
} else {
|
||||
responses.add(executeItem(request.getPipeline(), data));
|
||||
}
|
||||
}
|
||||
return new SimulatePipelineResponse(request.getPipeline().getId(), responses);
|
||||
}
|
|
@ -24,7 +24,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.StatusToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.plugin.ingest.simulate.SimulatedItemResponse;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
||||
import java.io.IOException;
|
||||
|
|
|
@ -26,10 +26,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
|||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.ingest.Pipeline;
|
||||
import org.elasticsearch.plugin.ingest.simulate.ParsedSimulateRequest;
|
||||
import org.elasticsearch.plugin.ingest.PipelineStore;
|
||||
import org.elasticsearch.plugin.ingest.simulate.SimulateExecutionService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
|
@ -51,16 +48,16 @@ public class SimulatePipelineTransportAction extends HandledTransportAction<Simu
|
|||
protected void doExecute(SimulatePipelineRequest request, ActionListener<SimulatePipelineResponse> listener) {
|
||||
Map<String, Object> source = XContentHelper.convertToMap(request.source(), false).v2();
|
||||
|
||||
ParsedSimulateRequest payload;
|
||||
ParsedSimulateRequest simulateRequest;
|
||||
ParsedSimulateRequest.Parser parser = new ParsedSimulateRequest.Parser();
|
||||
try {
|
||||
payload = parser.parse(request.id(), source, request.verbose(), pipelineStore);
|
||||
simulateRequest = parser.parse(request.id(), source, request.verbose(), pipelineStore);
|
||||
} catch (IOException e) {
|
||||
listener.onFailure(e);
|
||||
return;
|
||||
}
|
||||
|
||||
executionService.execute(payload, new SimulateExecutionService.Listener() {
|
||||
executionService.execute(simulateRequest, new SimulateExecutionService.Listener() {
|
||||
@Override
|
||||
public void onResponse(SimulatePipelineResponse response) {
|
||||
listener.onResponse(response);
|
||||
|
|
|
@ -16,16 +16,16 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.plugin.ingest.simulate;
|
||||
package org.elasticsearch.plugin.ingest.transport.simulate;
|
||||
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.xcontent.StatusToXContent;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -33,10 +33,10 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
public class SimulatedItemResponse implements Streamable, StatusToXContent {
|
||||
public class SimulatedItemResponse implements Streamable, ToXContent {
|
||||
|
||||
private Data data;
|
||||
private List<ProcessedData> processedDataList;
|
||||
private List<ProcessorResult> processorResultList;
|
||||
private Throwable failure;
|
||||
|
||||
public SimulatedItemResponse() {
|
||||
|
@ -47,8 +47,8 @@ public class SimulatedItemResponse implements Streamable, StatusToXContent {
|
|||
this.data = data;
|
||||
}
|
||||
|
||||
public SimulatedItemResponse(List<ProcessedData> processedDataList) {
|
||||
this.processedDataList = processedDataList;
|
||||
public SimulatedItemResponse(List<ProcessorResult> processorResultList) {
|
||||
this.processorResultList = processorResultList;
|
||||
}
|
||||
|
||||
public SimulatedItemResponse(Throwable failure) {
|
||||
|
@ -56,19 +56,29 @@ public class SimulatedItemResponse implements Streamable, StatusToXContent {
|
|||
}
|
||||
|
||||
public boolean isFailed() {
|
||||
return this.failure != null;
|
||||
if (failure != null) {
|
||||
return true;
|
||||
} else if (processorResultList != null) {
|
||||
for (ProcessorResult result : processorResultList) {
|
||||
if (result.isFailed()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean isVerbose() {
|
||||
return this.processedDataList != null;
|
||||
return this.processorResultList != null;
|
||||
}
|
||||
|
||||
public Data getData() {
|
||||
return data;
|
||||
}
|
||||
|
||||
public List<ProcessedData> getProcessedDataList() {
|
||||
return processedDataList;
|
||||
public List<ProcessorResult> getProcessorResultList() {
|
||||
return processorResultList;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -77,14 +87,13 @@ public class SimulatedItemResponse implements Streamable, StatusToXContent {
|
|||
boolean isVerbose = in.readBoolean();
|
||||
if (isFailed) {
|
||||
this.failure = in.readThrowable();
|
||||
// TODO(talevy): check out mget for throwable limitations
|
||||
} else if (isVerbose) {
|
||||
int size = in.readVInt();
|
||||
processedDataList = new ArrayList<>();
|
||||
processorResultList = new ArrayList<>();
|
||||
for (int i = 0; i < size; i++) {
|
||||
ProcessedData processedData = new ProcessedData();
|
||||
processedData.readFrom(in);
|
||||
processedDataList.add(processedData);
|
||||
ProcessorResult processorResult = new ProcessorResult();
|
||||
processorResult.readFrom(in);
|
||||
processorResultList.add(processorResult);
|
||||
}
|
||||
} else {
|
||||
String index = in.readString();
|
||||
|
@ -100,11 +109,11 @@ public class SimulatedItemResponse implements Streamable, StatusToXContent {
|
|||
out.writeBoolean(isFailed());
|
||||
out.writeBoolean(isVerbose());
|
||||
|
||||
if (isFailed()) {
|
||||
if (failure != null) {
|
||||
out.writeThrowable(failure);
|
||||
} else if (isVerbose()) {
|
||||
out.writeVInt(processedDataList.size());
|
||||
for (ProcessedData p : processedDataList) {
|
||||
out.writeVInt(processorResultList.size());
|
||||
for (ProcessorResult p : processorResultList) {
|
||||
p.writeTo(out);
|
||||
}
|
||||
} else {
|
||||
|
@ -119,32 +128,22 @@ public class SimulatedItemResponse implements Streamable, StatusToXContent {
|
|||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(Fields.ERROR, isFailed());
|
||||
builder.field(Fields.VERBOSE, isVerbose());
|
||||
if (isFailed()) {
|
||||
builder.field(Fields.FAILURE, failure.toString());
|
||||
if (failure != null) {
|
||||
builder.field(Fields.ERROR_MESSAGE, ExceptionsHelper.detailedMessage(failure));
|
||||
} else if (isVerbose()) {
|
||||
builder.startArray(Fields.PROCESSOR_STEPS);
|
||||
for (ProcessedData processedData : processedDataList) {
|
||||
builder.value(processedData);
|
||||
builder.startArray(Fields.PROCESSOR_RESULTS);
|
||||
for (ProcessorResult processorResult : processorResultList) {
|
||||
builder.value(processorResult);
|
||||
}
|
||||
builder.endArray();
|
||||
} else {
|
||||
builder.field(Fields.MODIFIED, data.isModified());
|
||||
builder.field(Fields.DOCUMENT, data.getDocument());
|
||||
builder.field(Fields.DOCUMENT, data.asMap());
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RestStatus status() {
|
||||
if (isFailed()) {
|
||||
return RestStatus.BAD_REQUEST;
|
||||
} else {
|
||||
return RestStatus.OK;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == this) { return true; }
|
||||
|
@ -152,20 +151,19 @@ public class SimulatedItemResponse implements Streamable, StatusToXContent {
|
|||
return false;
|
||||
}
|
||||
SimulatedItemResponse other = (SimulatedItemResponse) obj;
|
||||
return Objects.equals(data, other.data) && Objects.equals(processedDataList, other.processedDataList) && Objects.equals(failure, other.failure);
|
||||
return Objects.equals(data, other.data) && Objects.equals(processorResultList, other.processorResultList) && Objects.equals(failure, other.failure);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(data, processedDataList, failure);
|
||||
return Objects.hash(data, processorResultList, failure);
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final XContentBuilderString DOCUMENT = new XContentBuilderString("doc");
|
||||
static final XContentBuilderString ERROR = new XContentBuilderString("error");
|
||||
static final XContentBuilderString VERBOSE = new XContentBuilderString("verbose");
|
||||
static final XContentBuilderString FAILURE = new XContentBuilderString("failure");
|
||||
static final XContentBuilderString ERROR_MESSAGE = new XContentBuilderString("error_message");
|
||||
static final XContentBuilderString MODIFIED = new XContentBuilderString("modified");
|
||||
static final XContentBuilderString PROCESSOR_STEPS = new XContentBuilderString("processor_steps");
|
||||
static final XContentBuilderString PROCESSOR_RESULTS = new XContentBuilderString("processor_results");
|
||||
}
|
||||
}
|
|
@ -33,7 +33,7 @@ import org.elasticsearch.plugin.ingest.transport.put.PutPipelineRequestBuilder;
|
|||
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineAction;
|
||||
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineRequestBuilder;
|
||||
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineResponse;
|
||||
import org.elasticsearch.plugin.ingest.simulate.SimulatedItemResponse;
|
||||
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatedItemResponse;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.ingest.simulate;
|
||||
package org.elasticsearch.plugin.ingest.transport.simulate;
|
||||
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.Pipeline;
|
|
@ -17,14 +17,13 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.ingest.simulate;
|
||||
package org.elasticsearch.plugin.ingest.transport.simulate;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.Pipeline;
|
||||
import org.elasticsearch.ingest.processor.Processor;
|
||||
import org.elasticsearch.plugin.ingest.PipelineStore;
|
||||
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineResponse;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.junit.After;
|
||||
|
@ -67,23 +66,26 @@ public class SimulateExecutionServiceTests extends ESTestCase {
|
|||
|
||||
public void testExecuteVerboseItem() throws Exception {
|
||||
SimulatedItemResponse expectedItemResponse = new SimulatedItemResponse(
|
||||
Arrays.asList(new ProcessedData("processor[mock]-0", data), new ProcessedData("processor[mock]-1", data)));
|
||||
Arrays.asList(new ProcessorResult("processor[mock]-0", data), new ProcessorResult("processor[mock]-1", data)));
|
||||
SimulatedItemResponse actualItemResponse = executionService.executeVerboseItem(pipeline, data);
|
||||
verify(processor, times(2)).execute(data);
|
||||
assertThat(actualItemResponse, equalTo(expectedItemResponse));
|
||||
}
|
||||
|
||||
public void testExecuteItem_verboseSuccessful() throws Exception {
|
||||
SimulatedItemResponse expectedItemResponse = new SimulatedItemResponse(
|
||||
Arrays.asList(new ProcessedData("processor[mock]-0", data), new ProcessedData("processor[mock]-1", data)));
|
||||
SimulatedItemResponse actualItemResponse = executionService.executeItem(pipeline, data, true);
|
||||
public void testExecuteItem() throws Exception {
|
||||
SimulatedItemResponse expectedItemResponse = new SimulatedItemResponse(data);
|
||||
SimulatedItemResponse actualItemResponse = executionService.executeItem(pipeline, data);
|
||||
verify(processor, times(2)).execute(data);
|
||||
assertThat(actualItemResponse, equalTo(expectedItemResponse));
|
||||
}
|
||||
|
||||
public void testExecuteItem_Simple() throws Exception {
|
||||
SimulatedItemResponse expectedItemResponse = new SimulatedItemResponse(data);
|
||||
SimulatedItemResponse actualItemResponse = executionService.executeItem(pipeline, data, false);
|
||||
public void testExecuteVerboseItem_Failure() throws Exception {
|
||||
Exception e = new RuntimeException("processor failed");
|
||||
SimulatedItemResponse expectedItemResponse = new SimulatedItemResponse(
|
||||
Arrays.asList(new ProcessorResult("processor[mock]-0", e), new ProcessorResult("processor[mock]-1", data))
|
||||
);
|
||||
doThrow(e).doNothing().when(processor).execute(data);
|
||||
SimulatedItemResponse actualItemResponse = executionService.executeVerboseItem(pipeline, data);
|
||||
verify(processor, times(2)).execute(data);
|
||||
assertThat(actualItemResponse, equalTo(expectedItemResponse));
|
||||
}
|
||||
|
@ -92,7 +94,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
|
|||
Exception e = new RuntimeException("processor failed");
|
||||
SimulatedItemResponse expectedItemResponse = new SimulatedItemResponse(e);
|
||||
doThrow(e).when(processor).execute(data);
|
||||
SimulatedItemResponse actualItemResponse = executionService.executeItem(pipeline, data, false);
|
||||
SimulatedItemResponse actualItemResponse = executionService.executeItem(pipeline, data);
|
||||
verify(processor, times(1)).execute(data);
|
||||
assertThat(actualItemResponse, equalTo(expectedItemResponse));
|
||||
}
|
||||
|
@ -116,7 +118,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
|
|||
SimulateExecutionService.Listener listener = mock(SimulateExecutionService.Listener.class);
|
||||
ParsedSimulateRequest request = new ParsedSimulateRequest(pipeline, Collections.singletonList(data), true);
|
||||
SimulatedItemResponse itemResponse = new SimulatedItemResponse(
|
||||
Arrays.asList(new ProcessedData("processor[mock]-0", data), new ProcessedData("processor[mock]-1", data)));
|
||||
Arrays.asList(new ProcessorResult("processor[mock]-0", data), new ProcessorResult("processor[mock]-1", data)));
|
||||
executionService.execute(request, listener);
|
||||
SimulatePipelineResponse response = new SimulatePipelineResponse("_id", Collections.singletonList(itemResponse));
|
||||
assertBusy(new Runnable() {
|
|
@ -1,10 +1,10 @@
|
|||
{
|
||||
"ingest.simulate": {
|
||||
"documentation": "https://www.elastic.co/guide/en/elasticsearch/plugins/master/ingest.html",
|
||||
"methods": [ "POST" ],
|
||||
"methods": [ "GET", "POST" ],
|
||||
"url": {
|
||||
"path": "/_ingest/pipeline/_simulate",
|
||||
"paths": [ "/_ingest/pipeline/_simulate", "/_ingest/pipeline/{id}/_simulate" ],
|
||||
"paths": [ "/_ingest/pipeline/_simulate", "/_ingest/pipeline/_simulate/{id}" ],
|
||||
"parts": {
|
||||
"id": {
|
||||
"type" : "string",
|
||||
|
|
|
@ -49,8 +49,8 @@
|
|||
- length: { docs: 1 }
|
||||
- is_false: docs.0.error
|
||||
- is_true: docs.0.modified
|
||||
- match: { docs.0.foo: "bar" }
|
||||
- match: { docs.0.field2: "_value" }
|
||||
- match: { docs.0.doc._source.foo: "bar" }
|
||||
- match: { docs.0.doc._source.field2: "_value" }
|
||||
|
||||
|
||||
---
|
||||
|
@ -130,17 +130,17 @@
|
|||
]
|
||||
}
|
||||
- length: { docs: 1 }
|
||||
- length: { docs.0.processor_steps: 2 }
|
||||
- match: { docs.0.processor_steps.0.processor_id: "processor[mutate]-0" }
|
||||
- is_false: docs.0.processor_steps.0.error
|
||||
- is_true: docs.0.processor_steps.0.modified
|
||||
- length: { docs.0.processor_steps.0.doc: 2 }
|
||||
- match: { docs.0.processor_steps.0.doc.foo: "bar" }
|
||||
- match: { docs.0.processor_steps.0.doc.field2: "_value" }
|
||||
- length: { docs.0.processor_steps.1.doc: 3 }
|
||||
- match: { docs.0.processor_steps.1.doc.foo: "bar" }
|
||||
- match: { docs.0.processor_steps.1.doc.field2: "_value" }
|
||||
- match: { docs.0.processor_steps.1.doc.field3: "third_val" }
|
||||
- length: { docs.0.processor_results: 2 }
|
||||
- match: { docs.0.processor_results.0.processor_id: "processor[mutate]-0" }
|
||||
- is_false: docs.0.processor_results.0.error
|
||||
- is_true: docs.0.processor_results.0.modified
|
||||
- length: { docs.0.processor_results.0.doc._source: 2 }
|
||||
- match: { docs.0.processor_results.0.doc._source.foo: "bar" }
|
||||
- match: { docs.0.processor_results.0.doc._source.field2: "_value" }
|
||||
- length: { docs.0.processor_results.1.doc._source: 3 }
|
||||
- match: { docs.0.processor_results.1.doc._source.foo: "bar" }
|
||||
- match: { docs.0.processor_results.1.doc._source.field2: "_value" }
|
||||
- match: { docs.0.processor_results.1.doc._source..field3: "third_val" }
|
||||
|
||||
---
|
||||
"Test simulate with exception thrown":
|
||||
|
@ -184,7 +184,71 @@
|
|||
}
|
||||
- length: { docs: 2 }
|
||||
- is_true: docs.0.error
|
||||
- match: { docs.0.failure: "java.lang.NullPointerException" }
|
||||
- match: { docs.0.error_message: "NullPointerException[null]" }
|
||||
- is_false: docs.1.error
|
||||
- is_true: docs.1.modified
|
||||
- match: { docs.1.doc.foo: "BAR" }
|
||||
- match: { docs.1.doc._source.foo: "BAR" }
|
||||
|
||||
---
|
||||
"Test verbose simulate with exception thrown":
|
||||
- do:
|
||||
cluster.health:
|
||||
wait_for_status: green
|
||||
|
||||
- do:
|
||||
catch: request
|
||||
ingest.simulate:
|
||||
verbose: true
|
||||
body: >
|
||||
{
|
||||
"pipeline": {
|
||||
"description": "_description",
|
||||
"processors": [
|
||||
{
|
||||
"mutate" : {
|
||||
"convert" : {
|
||||
"foo": "integer"
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"mutate" : {
|
||||
"uppercase" : ["bar"]
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"docs": [
|
||||
{
|
||||
"_index": "index",
|
||||
"_type": "type",
|
||||
"_id": "id",
|
||||
"_source": {
|
||||
"foo": "bar",
|
||||
"bar": "hello"
|
||||
}
|
||||
},
|
||||
{
|
||||
"_index": "index",
|
||||
"_type": "type",
|
||||
"_id": "id2",
|
||||
"_source": {
|
||||
"foo": "5",
|
||||
"bar": "hello"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
- length: { docs: 2 }
|
||||
- is_true: docs.0.error
|
||||
- is_false: docs.1.error
|
||||
- length: { docs.0.processor_results: 2 }
|
||||
- is_false: docs.1.processor_results.0.error
|
||||
- match: { docs.0.processor_results.0.error_message: "NumberFormatException[For input string: \"bar\"]" }
|
||||
- is_false: docs.1.processor_results.1.error
|
||||
- match: { docs.0.processor_results.1.doc._source.foo: "bar" }
|
||||
- match: { docs.1.processor_results.1.doc._source.bar: "HELLO" }
|
||||
- match: { docs.1.processor_results.0.doc._source.foo: 5 }
|
||||
- match: { docs.1.processor_results.0.doc._source.bar: "hello" }
|
||||
- match: { docs.1.processor_results.1.doc._source.foo: 5 }
|
||||
- match: { docs.1.processor_results.1.doc._source.bar: "HELLO" }
|
||||
|
|
Loading…
Reference in New Issue