Change the ingest simulate api to not include dropped documents (#44161)

If documents are dropped by the `drop` processor then
these documents are returned as a `null` value in the response.

=== Example

Create pipeline:

```
PUT _ingest/pipeline/droppipeline
{
    "processors": [
        {
            "set": {
                "field": "bla",
                "value": "val"
            }
        },
        {
            "drop": {}
        }
    ]
}
```

Simulate request:

POST _ingest/pipeline/droppipeline/_simulate
{
    "docs": [
        {
            "_source": {
                "message": "text"
            }
        }
    ]
}

Response:

```
{
    "docs": [
        null
    ]
}
```

Response if verbose is enabled:

```
{
    "docs": [
        {
            "processor_results": [
                {
                    "doc": {
                        "_index": "_index",
                        "_type": "_doc",
                        "_id": "_id",
                        "_source": {
                            "message": "text",
                            "bla": "val"
                        },
                        "_ingest": {
                            "timestamp": "2019-07-10T11:07:10.758315Z"
                        }
                    }
                },
                null
            ]
        }
    ]
}
```

Closes #36150

* Abort pipeline simulation in verbose mode when document has been dropped
by drop processor
This commit is contained in:
Martijn van Groningen 2019-08-08 12:32:46 +02:00
parent 99ddb8b3d8
commit e066133016
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
5 changed files with 106 additions and 35 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.ingest;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -66,12 +67,16 @@ public final class SimulateDocumentBaseResult implements SimulateDocumentResult
}
public SimulateDocumentBaseResult(IngestDocument ingestDocument) {
this.ingestDocument = new WriteableIngestDocument(ingestDocument);
failure = null;
if (ingestDocument != null) {
this.ingestDocument = new WriteableIngestDocument(ingestDocument);
} else {
this.ingestDocument = null;
}
this.failure = null;
}
public SimulateDocumentBaseResult(Exception failure) {
ingestDocument = null;
this.ingestDocument = null;
this.failure = failure;
}
@ -79,23 +84,33 @@ public final class SimulateDocumentBaseResult implements SimulateDocumentResult
* Read from a stream.
*/
public SimulateDocumentBaseResult(StreamInput in) throws IOException {
if (in.readBoolean()) {
ingestDocument = null;
if (in.getVersion().onOrAfter(Version.V_7_4_0)) {
failure = in.readException();
ingestDocument = in.readOptionalWriteable(WriteableIngestDocument::new);
} else {
ingestDocument = new WriteableIngestDocument(in);
failure = null;
if (in.readBoolean()) {
ingestDocument = null;
failure = in.readException();
} else {
ingestDocument = new WriteableIngestDocument(in);
failure = null;
}
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
if (failure == null) {
out.writeBoolean(false);
ingestDocument.writeTo(out);
} else {
out.writeBoolean(true);
if (out.getVersion().onOrAfter(Version.V_7_4_0)) {
out.writeException(failure);
out.writeOptionalWriteable(ingestDocument);
} else {
if (failure == null) {
out.writeBoolean(false);
ingestDocument.writeTo(out);
} else {
out.writeBoolean(true);
out.writeException(failure);
}
}
}
@ -112,6 +127,11 @@ public final class SimulateDocumentBaseResult implements SimulateDocumentResult
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (failure == null && ingestDocument == null) {
builder.nullValue();
return builder;
}
builder.startObject();
if (failure == null) {
ingestDocument.toXContent(builder, params);

View File

@ -55,8 +55,8 @@ class SimulateExecutionService {
}
} else {
try {
pipeline.execute(ingestDocument);
return new SimulateDocumentBaseResult(ingestDocument);
IngestDocument result = pipeline.execute(ingestDocument);
return new SimulateDocumentBaseResult(result);
} catch (Exception e) {
return new SimulateDocumentBaseResult(e);
}

View File

@ -105,28 +105,23 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject {
this(processorTag, null, failure);
}
public SimulateProcessorResult(String processorTag) {
this(processorTag, null, null);
}
/**
* Read from a stream.
*/
SimulateProcessorResult(StreamInput in) throws IOException {
this.processorTag = in.readString();
if (in.readBoolean()) {
this.ingestDocument = new WriteableIngestDocument(in);
} else {
this.ingestDocument = null;
}
this.ingestDocument = in.readOptionalWriteable(WriteableIngestDocument::new);
this.failure = in.readException();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(processorTag);
if (ingestDocument == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
ingestDocument.writeTo(out);
}
out.writeOptionalWriteable(ingestDocument);
out.writeException(failure);
}
@ -147,6 +142,11 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (processorTag == null && failure == null && ingestDocument == null) {
builder.nullValue();
return builder;
}
builder.startObject();
if (processorTag != null) {

View File

@ -74,8 +74,13 @@ public final class TrackingResultProcessor implements Processor {
verbosePipelineProcessor);
ingestDocument.executePipeline(verbosePipeline);
} else {
processor.execute(ingestDocument);
processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument)));
IngestDocument result = processor.execute(ingestDocument);
if (result != null) {
processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument)));
} else {
processorResultList.add(new SimulateProcessorResult(processor.getTag()));
return null;
}
}
} catch (Exception e) {
if (ignoreFailure) {

View File

@ -20,14 +20,15 @@
package org.elasticsearch.action.ingest;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.DropProcessor;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.TestThreadPool;
import org.junit.After;
import org.junit.Before;
@ -38,6 +39,7 @@ import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocumen
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
@ -45,17 +47,13 @@ public class SimulateExecutionServiceTests extends ESTestCase {
private final Integer version = randomBoolean() ? randomInt() : null;
private ThreadPool threadPool;
private TestThreadPool threadPool;
private SimulateExecutionService executionService;
private IngestDocument ingestDocument;
@Before
public void setup() {
threadPool = new ThreadPool(
Settings.builder()
.put("node.name", getClass().getName())
.build()
);
threadPool = new TestThreadPool(SimulateExecutionServiceTests.class.getSimpleName());
executionService = new SimulateExecutionService(threadPool);
ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
}
@ -213,4 +211,52 @@ public class SimulateExecutionServiceTests extends ESTestCase {
assertThat(exception, instanceOf(ElasticsearchException.class));
assertThat(exception.getMessage(), equalTo("java.lang.IllegalArgumentException: java.lang.RuntimeException: processor failed"));
}
public void testDropDocument() {
TestProcessor processor1 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field", "value"));
Processor processor2 = new DropProcessor.Factory().create(Collections.emptyMap(), null, Collections.emptyMap());
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, false);
assertThat(processor1.getInvokedCounter(), equalTo(1));
assertThat(actualItemResponse, instanceOf(SimulateDocumentBaseResult.class));
SimulateDocumentBaseResult simulateDocumentBaseResult = (SimulateDocumentBaseResult) actualItemResponse;
assertThat(simulateDocumentBaseResult.getIngestDocument(), nullValue());
assertThat(simulateDocumentBaseResult.getFailure(), nullValue());
}
public void testDropDocumentVerbose() {
TestProcessor processor1 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field", "value"));
Processor processor2 = new DropProcessor.Factory().create(Collections.emptyMap(), null, Collections.emptyMap());
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
assertThat(processor1.getInvokedCounter(), equalTo(1));
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult verboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
assertThat(verboseResult.getProcessorResults().size(), equalTo(2));
assertThat(verboseResult.getProcessorResults().get(0).getIngestDocument(), notNullValue());
assertThat(verboseResult.getProcessorResults().get(0).getFailure(), nullValue());
assertThat(verboseResult.getProcessorResults().get(1).getIngestDocument(), nullValue());
assertThat(verboseResult.getProcessorResults().get(1).getFailure(), nullValue());
}
public void testDropDocumentVerboseExtraProcessor() {
TestProcessor processor1 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field1", "value"));
Processor processor2 = new DropProcessor.Factory().create(Collections.emptyMap(), null, Collections.emptyMap());
TestProcessor processor3 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field2", "value"));
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2, processor3));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
assertThat(processor1.getInvokedCounter(), equalTo(1));
assertThat(processor3.getInvokedCounter(), equalTo(0));
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult verboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
assertThat(verboseResult.getProcessorResults().size(), equalTo(2));
assertThat(verboseResult.getProcessorResults().get(0).getIngestDocument(), notNullValue());
assertThat(verboseResult.getProcessorResults().get(0).getFailure(), nullValue());
assertThat(verboseResult.getProcessorResults().get(1).getIngestDocument(), nullValue());
assertThat(verboseResult.getProcessorResults().get(1).getFailure(), nullValue());
}
}