INGEST: Implement Drop Processor (#32278)

* INGEST: Implement Drop Processor
* Adjust Processor API
* Implement Drop Processor
* Closes #23726
This commit is contained in:
Armin Braun 2018-09-05 14:25:29 +02:00 committed by GitHub
parent a296829205
commit 46774098d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
43 changed files with 235 additions and 115 deletions

View File

@ -57,16 +57,17 @@ abstract class AbstractStringProcessor<T> extends AbstractProcessor {
}
@Override
public final void execute(IngestDocument document) {
public final IngestDocument execute(IngestDocument document) {
String val = document.getFieldValue(field, String.class, ignoreMissing);
if (val == null && ignoreMissing) {
return;
return document;
} else if (val == null) {
throw new IllegalArgumentException("field [" + field + "] is null, cannot process it.");
}
document.setFieldValue(targetField, process(val));
return document;
}
protected abstract T process(String value);

View File

@ -56,8 +56,9 @@ public final class AppendProcessor extends AbstractProcessor {
}
@Override
public void execute(IngestDocument ingestDocument) throws Exception {
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
ingestDocument.appendFieldValue(field, value);
return ingestDocument;
}
@Override

View File

@ -173,12 +173,12 @@ public final class ConvertProcessor extends AbstractProcessor {
}
@Override
public void execute(IngestDocument document) {
public IngestDocument execute(IngestDocument document) {
Object oldValue = document.getFieldValue(field, Object.class, ignoreMissing);
Object newValue;
if (oldValue == null && ignoreMissing) {
return;
return document;
} else if (oldValue == null) {
throw new IllegalArgumentException("Field [" + field + "] is null, cannot be converted to type [" + convertType + "]");
}
@ -194,6 +194,7 @@ public final class ConvertProcessor extends AbstractProcessor {
newValue = convertType.convert(oldValue);
}
document.setFieldValue(targetField, newValue);
return document;
}
@Override

View File

@ -63,7 +63,7 @@ public final class DateIndexNameProcessor extends AbstractProcessor {
}
@Override
public void execute(IngestDocument ingestDocument) throws Exception {
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
// Date can be specified as a string or long:
Object obj = ingestDocument.getFieldValue(field, Object.class);
String date = null;
@ -101,6 +101,7 @@ public final class DateIndexNameProcessor extends AbstractProcessor {
.append('>');
String dynamicIndexName = builder.toString();
ingestDocument.setFieldValue(IngestDocument.MetaData.INDEX.getFieldName(), dynamicIndexName);
return ingestDocument;
}
@Override

View File

@ -74,7 +74,7 @@ public final class DateProcessor extends AbstractProcessor {
}
@Override
public void execute(IngestDocument ingestDocument) {
public IngestDocument execute(IngestDocument ingestDocument) {
Object obj = ingestDocument.getFieldValue(field, Object.class);
String value = null;
if (obj != null) {
@ -98,6 +98,7 @@ public final class DateProcessor extends AbstractProcessor {
}
ingestDocument.setFieldValue(targetField, ISODateTimeFormat.dateTime().print(dateTime));
return ingestDocument;
}
@Override

View File

@ -47,14 +47,15 @@ public final class DissectProcessor extends AbstractProcessor {
}
@Override
public void execute(IngestDocument ingestDocument) {
public IngestDocument execute(IngestDocument ingestDocument) {
String input = ingestDocument.getFieldValue(field, String.class, ignoreMissing);
if (input == null && ignoreMissing) {
return;
return ingestDocument;
} else if (input == null) {
throw new IllegalArgumentException("field [" + field + "] is null, cannot process it.");
}
dissectParser.parse(input).forEach(ingestDocument::setFieldValue);
return ingestDocument;
}
@Override

View File

@ -41,7 +41,7 @@ public final class DotExpanderProcessor extends AbstractProcessor {
@Override
@SuppressWarnings("unchecked")
public void execute(IngestDocument ingestDocument) throws Exception {
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
String path;
Map<String, Object> map;
if (this.path != null) {
@ -75,6 +75,7 @@ public final class DotExpanderProcessor extends AbstractProcessor {
Object value = map.remove(field);
ingestDocument.setFieldValue(path, value);
}
return ingestDocument;
}
@Override

View File

@ -0,0 +1,57 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.common;
import java.util.Map;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
/**
* Drop processor only returns {@code null} for the execution result to indicate that any document
* executed by it should not be indexed.
*/
public final class DropProcessor extends AbstractProcessor {
public static final String TYPE = "drop";
private DropProcessor(final String tag) {
super(tag);
}
@Override
public IngestDocument execute(final IngestDocument ingestDocument) throws Exception {
return null;
}
@Override
public String getType() {
return TYPE;
}
public static final class Factory implements Processor.Factory {
@Override
public Processor create(final Map<String, Processor.Factory> processorFactories, final String tag,
final Map<String, Object> config) {
return new DropProcessor(tag);
}
}
}

View File

@ -48,7 +48,7 @@ public final class FailProcessor extends AbstractProcessor {
}
@Override
public void execute(IngestDocument document) {
public IngestDocument execute(IngestDocument document) {
throw new FailProcessorException(document.renderTemplate(message));
}

View File

@ -63,24 +63,29 @@ public final class ForEachProcessor extends AbstractProcessor {
}
@Override
public void execute(IngestDocument ingestDocument) throws Exception {
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
List<?> values = ingestDocument.getFieldValue(field, List.class, ignoreMissing);
if (values == null) {
if (ignoreMissing) {
return;
return ingestDocument;
}
throw new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements.");
}
List<Object> newValues = new ArrayList<>(values.size());
IngestDocument document = ingestDocument;
for (Object value : values) {
Object previousValue = ingestDocument.getIngestMetadata().put("_value", value);
try {
processor.execute(ingestDocument);
document = processor.execute(document);
if (document == null) {
return null;
}
} finally {
newValues.add(ingestDocument.getIngestMetadata().put("_value", previousValue));
}
}
ingestDocument.setFieldValue(field, newValues);
document.setFieldValue(field, newValues);
return document;
}
@Override

View File

@ -54,11 +54,11 @@ public final class GrokProcessor extends AbstractProcessor {
}
@Override
public void execute(IngestDocument ingestDocument) throws Exception {
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
String fieldValue = ingestDocument.getFieldValue(matchField, String.class, ignoreMissing);
if (fieldValue == null && ignoreMissing) {
return;
return ingestDocument;
} else if (fieldValue == null) {
throw new IllegalArgumentException("field [" + matchField + "] is null, cannot process it.");
}
@ -81,6 +81,7 @@ public final class GrokProcessor extends AbstractProcessor {
ingestDocument.setFieldValue(PATTERN_MATCH_KEY, "0");
}
}
return ingestDocument;
}
@Override

View File

@ -84,6 +84,7 @@ public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPl
processors.put(BytesProcessor.TYPE, new BytesProcessor.Factory());
processors.put(PipelineProcessor.TYPE, new PipelineProcessor.Factory(parameters.ingestService));
processors.put(DissectProcessor.TYPE, new DissectProcessor.Factory());
processors.put(DropProcessor.TYPE, new DropProcessor.Factory());
return Collections.unmodifiableMap(processors);
}

View File

@ -60,7 +60,7 @@ public final class JoinProcessor extends AbstractProcessor {
}
@Override
public void execute(IngestDocument document) {
public IngestDocument execute(IngestDocument document) {
List<?> list = document.getFieldValue(field, List.class);
if (list == null) {
throw new IllegalArgumentException("field [" + field + "] is null, cannot join.");
@ -69,6 +69,7 @@ public final class JoinProcessor extends AbstractProcessor {
.map(Object::toString)
.collect(Collectors.joining(separator));
document.setFieldValue(targetField, joined);
return document;
}
@Override

View File

@ -107,12 +107,13 @@ public final class JsonProcessor extends AbstractProcessor {
}
@Override
public void execute(IngestDocument document) throws Exception {
public IngestDocument execute(IngestDocument document) throws Exception {
if (addToRoot) {
apply(document.getSourceAndMetadata(), field);
} else {
document.setFieldValue(targetField, apply(document.getFieldValue(field, Object.class)));
}
return document;
}
@Override

View File

@ -188,8 +188,9 @@ public final class KeyValueProcessor extends AbstractProcessor {
}
@Override
public void execute(IngestDocument document) {
public IngestDocument execute(IngestDocument document) {
execution.accept(document);
return document;
}
@Override

View File

@ -42,12 +42,12 @@ public class PipelineProcessor extends AbstractProcessor {
}
@Override
public void execute(IngestDocument ingestDocument) throws Exception {
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
Pipeline pipeline = ingestService.getPipeline(pipelineName);
if (pipeline == null) {
throw new IllegalStateException("Pipeline processor configured for non-existent pipeline [" + pipelineName + ']');
}
ingestDocument.executePipeline(pipeline);
return ingestDocument.executePipeline(pipeline);
}
@Override

View File

@ -46,4 +46,5 @@ public final class Processors {
public static String urlDecode(String value) {
return URLDecodeProcessor.apply(value);
}
}

View File

@ -52,7 +52,7 @@ public final class RemoveProcessor extends AbstractProcessor {
}
@Override
public void execute(IngestDocument document) {
public IngestDocument execute(IngestDocument document) {
if (ignoreMissing) {
fields.forEach(field -> {
String path = document.renderTemplate(field);
@ -63,6 +63,7 @@ public final class RemoveProcessor extends AbstractProcessor {
} else {
fields.forEach(document::removeField);
}
return document;
}
@Override

View File

@ -59,11 +59,11 @@ public final class RenameProcessor extends AbstractProcessor {
}
@Override
public void execute(IngestDocument document) {
public IngestDocument execute(IngestDocument document) {
String path = document.renderTemplate(field);
if (document.hasField(path, true) == false) {
if (ignoreMissing) {
return;
return document;
} else {
throw new IllegalArgumentException("field [" + path + "] doesn't exist");
}
@ -86,6 +86,7 @@ public final class RenameProcessor extends AbstractProcessor {
document.setFieldValue(path, value);
throw e;
}
return document;
}
@Override

View File

@ -69,9 +69,10 @@ public final class ScriptProcessor extends AbstractProcessor {
* @param document The Ingest document passed into the script context under the "ctx" object.
*/
@Override
public void execute(IngestDocument document) {
public IngestDocument execute(IngestDocument document) {
IngestScript.Factory factory = scriptService.compile(script, IngestScript.CONTEXT);
factory.newInstance(script.getParams()).execute(document.getSourceAndMetadata());
return document;
}
@Override

View File

@ -65,10 +65,11 @@ public final class SetProcessor extends AbstractProcessor {
}
@Override
public void execute(IngestDocument document) {
public IngestDocument execute(IngestDocument document) {
if (overrideEnabled || document.hasField(field) == false || document.getFieldValue(field, Object.class) == null) {
document.setFieldValue(field, value);
}
return document;
}
@Override

View File

@ -94,7 +94,7 @@ public final class SortProcessor extends AbstractProcessor {
@Override
@SuppressWarnings("unchecked")
public void execute(IngestDocument document) {
public IngestDocument execute(IngestDocument document) {
List<? extends Comparable<Object>> list = document.getFieldValue(field, List.class);
if (list == null) {
@ -110,6 +110,7 @@ public final class SortProcessor extends AbstractProcessor {
}
document.setFieldValue(targetField, copy);
return document;
}
@Override

View File

@ -68,11 +68,11 @@ public final class SplitProcessor extends AbstractProcessor {
}
@Override
public void execute(IngestDocument document) {
public IngestDocument execute(IngestDocument document) {
String oldVal = document.getFieldValue(field, String.class, ignoreMissing);
if (oldVal == null && ignoreMissing) {
return;
return document;
} else if (oldVal == null) {
throw new IllegalArgumentException("field [" + field + "] is null, cannot split.");
}
@ -81,6 +81,7 @@ public final class SplitProcessor extends AbstractProcessor {
List<String> splitList = new ArrayList<>(strings.length);
Collections.addAll(splitList, strings);
document.setFieldValue(targetField, splitList);
return document;
}
@Override

View File

@ -154,9 +154,10 @@ public class ForEachProcessorTests extends ESTestCase {
public void testRandom() throws Exception {
Processor innerProcessor = new Processor() {
@Override
public void execute(IngestDocument ingestDocument) throws Exception {
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
String existingValue = ingestDocument.getFieldValue("_ingest._value", String.class);
ingestDocument.setFieldValue("_ingest._value", existingValue + ".");
return ingestDocument;
}
@Override

View File

@ -45,8 +45,9 @@ public class PipelineProcessorTests extends ESTestCase {
pipelineId, null, null,
new CompoundProcessor(new Processor() {
@Override
public void execute(final IngestDocument ingestDocument) throws Exception {
public IngestDocument execute(final IngestDocument ingestDocument) throws Exception {
invoked.complete(ingestDocument);
return ingestDocument;
}
@Override

View File

@ -73,13 +73,13 @@ public final class AttachmentProcessor extends AbstractProcessor {
}
@Override
public void execute(IngestDocument ingestDocument) {
public IngestDocument execute(IngestDocument ingestDocument) {
Map<String, Object> additionalFields = new HashMap<>();
byte[] input = ingestDocument.getFieldValueAsBytes(field, ignoreMissing);
if (input == null && ignoreMissing) {
return;
return ingestDocument;
} else if (input == null) {
throw new IllegalArgumentException("field [" + field + "] is null, cannot parse.");
}
@ -164,6 +164,7 @@ public final class AttachmentProcessor extends AbstractProcessor {
}
ingestDocument.setFieldValue(targetField, additionalFields);
return ingestDocument;
}
@Override

View File

@ -81,11 +81,11 @@ public final class GeoIpProcessor extends AbstractProcessor {
}
@Override
public void execute(IngestDocument ingestDocument) {
public IngestDocument execute(IngestDocument ingestDocument) {
String ip = ingestDocument.getFieldValue(field, String.class, ignoreMissing);
if (ip == null && ignoreMissing) {
return;
return ingestDocument;
} else if (ip == null) {
throw new IllegalArgumentException("field [" + field + "] is null, cannot extract geoip information.");
}
@ -120,6 +120,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
if (geoData.isEmpty() == false) {
ingestDocument.setFieldValue(targetField, geoData);
}
return ingestDocument;
}
@Override

View File

@ -63,11 +63,11 @@ public class UserAgentProcessor extends AbstractProcessor {
}
@Override
public void execute(IngestDocument ingestDocument) throws Exception {
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
String userAgent = ingestDocument.getFieldValue(field, String.class, ignoreMissing);
if (userAgent == null && ignoreMissing) {
return;
return ingestDocument;
} else if (userAgent == null) {
throw new IllegalArgumentException("field [" + field + "] is null, cannot parse user-agent.");
}
@ -144,6 +144,7 @@ public class UserAgentProcessor extends AbstractProcessor {
}
ingestDocument.setFieldValue(targetField, uaDetails);
return ingestDocument;
}
/** To maintain compatibility with logstash-filter-useragent */

View File

@ -27,6 +27,7 @@ import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
@ -37,6 +38,7 @@ import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.update.TransportUpdateAction;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
@ -521,28 +523,30 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListener<BulkResponse> listener) {
long ingestStartTimeInNanos = System.nanoTime();
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
ingestService.executeBulkRequest(() -> bulkRequestModifier, (indexRequest, exception) -> {
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]",
indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), exception);
bulkRequestModifier.markCurrentItemAsFailed(exception);
}, (exception) -> {
if (exception != null) {
logger.error("failed to execute pipeline for a bulk request", exception);
listener.onFailure(exception);
} else {
long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ingestStartTimeInNanos);
BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest();
ActionListener<BulkResponse> actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(ingestTookInMillis, listener);
if (bulkRequest.requests().isEmpty()) {
// at this stage, the transport bulk action can't deal with a bulk request with no requests,
// so we stop and send an empty response back to the client.
// (this will happen if pre-processing all items in the bulk failed)
actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0));
ingestService.executeBulkRequest(() -> bulkRequestModifier,
(indexRequest, exception) -> {
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]",
indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), exception);
bulkRequestModifier.markCurrentItemAsFailed(exception);
}, (exception) -> {
if (exception != null) {
logger.error("failed to execute pipeline for a bulk request", exception);
listener.onFailure(exception);
} else {
doExecute(task, bulkRequest, actionListener);
long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ingestStartTimeInNanos);
BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest();
ActionListener<BulkResponse> actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(ingestTookInMillis, listener);
if (bulkRequest.requests().isEmpty()) {
// at this stage, the transport bulk action can't deal with a bulk request with no requests,
// so we stop and send an empty response back to the client.
// (this will happen if pre-processing all items in the bulk failed)
actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0));
} else {
doExecute(task, bulkRequest, actionListener);
}
}
}
});
},
indexRequest -> bulkRequestModifier.markCurrentItemAsDropped());
}
static final class BulkRequestModifier implements Iterator<DocWriteRequest<?>> {
@ -604,6 +608,19 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
}
}
void markCurrentItemAsDropped() {
IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(currentSlot);
failedSlots.set(currentSlot);
itemResponses.add(
new BulkItemResponse(currentSlot, indexRequest.opType(),
new UpdateResponse(
new ShardId(indexRequest.index(), IndexMetaData.INDEX_UUID_NA_VALUE, 0),
indexRequest.type(), indexRequest.id(), indexRequest.version(), DocWriteResponse.Result.NOOP
)
)
);
}
void markCurrentItemAsFailed(Exception e) {
IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(currentSlot);
// We hit a error during preprocessing a request, so we:

View File

@ -67,7 +67,10 @@ class SimulateExecutionService {
protected void doRun() throws Exception {
List<SimulateDocumentResult> responses = new ArrayList<>();
for (IngestDocument ingestDocument : request.getDocuments()) {
responses.add(executeDocument(request.getPipeline(), ingestDocument, request.isVerbose()));
SimulateDocumentResult response = executeDocument(request.getPipeline(), ingestDocument, request.isVerbose());
if (response != null) {
responses.add(response);
}
}
listener.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(), request.isVerbose(), responses));
}

View File

@ -42,7 +42,7 @@ public final class TrackingResultProcessor implements Processor {
}
@Override
public void execute(IngestDocument ingestDocument) throws Exception {
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
try {
actualProcessor.execute(ingestDocument);
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument)));
@ -54,6 +54,7 @@ public final class TrackingResultProcessor implements Processor {
}
throw e;
}
return ingestDocument;
}
@Override

View File

@ -94,17 +94,19 @@ public class CompoundProcessor implements Processor {
}
@Override
public void execute(IngestDocument ingestDocument) throws Exception {
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
for (Processor processor : processors) {
try {
processor.execute(ingestDocument);
if (processor.execute(ingestDocument) == null) {
return null;
}
} catch (Exception e) {
if (ignoreFailure) {
continue;
}
ElasticsearchException compoundProcessorException =
newCompoundProcessorException(e, processor.getType(), processor.getTag());
newCompoundProcessorException(e, processor.getType(), processor.getTag());
if (onFailureProcessors.isEmpty()) {
throw compoundProcessorException;
} else {
@ -113,6 +115,7 @@ public class CompoundProcessor implements Processor {
}
}
}
return ingestDocument;
}
void executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception {
@ -149,7 +152,7 @@ public class CompoundProcessor implements Processor {
}
private ElasticsearchException newCompoundProcessorException(Exception e, String processorType, String processorTag) {
if (e instanceof ElasticsearchException && ((ElasticsearchException)e).getHeader("processor_type") != null) {
if (e instanceof ElasticsearchException && ((ElasticsearchException) e).getHeader("processor_type") != null) {
return (ElasticsearchException) e;
}

View File

@ -51,12 +51,13 @@ public class ConditionalProcessor extends AbstractProcessor {
}
@Override
public void execute(IngestDocument ingestDocument) throws Exception {
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
IngestConditionalScript script =
scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams());
if (script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata()))) {
processor.execute(ingestDocument);
return processor.execute(ingestDocument);
}
return ingestDocument;
}
@Override

View File

@ -644,11 +644,11 @@ public final class IngestDocument {
* @param pipeline Pipeline to execute
* @throws Exception On exception in pipeline execution
*/
public void executePipeline(Pipeline pipeline) throws Exception {
public IngestDocument executePipeline(Pipeline pipeline) throws Exception {
if (this.executedPipelines.add(pipeline) == false) {
throw new IllegalStateException("Recursive invocation of pipeline [" + pipeline.getId() + "] detected.");
}
pipeline.execute(this);
return pipeline.execute(this);
}
@Override

View File

@ -270,7 +270,7 @@ public class IngestService implements ClusterStateApplier {
String errorMessage = "pipeline with id [" + id + "] could not be loaded, caused by [" + e.getDetailedMessage() + "]";
Processor failureProcessor = new AbstractProcessor(tag) {
@Override
public void execute(IngestDocument ingestDocument) {
public IngestDocument execute(IngestDocument ingestDocument) {
throw new IllegalStateException(errorMessage);
}
@ -323,7 +323,8 @@ public class IngestService implements ClusterStateApplier {
}
public void executeBulkRequest(Iterable<DocWriteRequest<?>> actionRequests,
BiConsumer<IndexRequest, Exception> itemFailureHandler, Consumer<Exception> completionHandler) {
BiConsumer<IndexRequest, Exception> itemFailureHandler, Consumer<Exception> completionHandler,
Consumer<IndexRequest> itemDroppedHandler) {
threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() {
@Override
@ -351,7 +352,7 @@ public class IngestService implements ClusterStateApplier {
if (pipeline == null) {
throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist");
}
innerExecute(indexRequest, pipeline);
innerExecute(indexRequest, pipeline, itemDroppedHandler);
//this shouldn't be needed here but we do it for consistency with index api
// which requires it to prevent double execution
indexRequest.setPipeline(NOOP_PIPELINE_NAME);
@ -399,7 +400,7 @@ public class IngestService implements ClusterStateApplier {
}
}
private void innerExecute(IndexRequest indexRequest, Pipeline pipeline) throws Exception {
private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer<IndexRequest> itemDroppedHandler) throws Exception {
if (pipeline.getProcessors().isEmpty()) {
return;
}
@ -419,20 +420,22 @@ public class IngestService implements ClusterStateApplier {
VersionType versionType = indexRequest.versionType();
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, version, versionType, sourceAsMap);
pipeline.execute(ingestDocument);
Map<IngestDocument.MetaData, Object> metadataMap = ingestDocument.extractMetadata();
//it's fine to set all metadata fields all the time, as ingest document holds their starting values
//before ingestion, which might also get modified during ingestion.
indexRequest.index((String) metadataMap.get(IngestDocument.MetaData.INDEX));
indexRequest.type((String) metadataMap.get(IngestDocument.MetaData.TYPE));
indexRequest.id((String) metadataMap.get(IngestDocument.MetaData.ID));
indexRequest.routing((String) metadataMap.get(IngestDocument.MetaData.ROUTING));
indexRequest.version(((Number) metadataMap.get(IngestDocument.MetaData.VERSION)).longValue());
if (metadataMap.get(IngestDocument.MetaData.VERSION_TYPE) != null) {
indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.MetaData.VERSION_TYPE)));
if (pipeline.execute(ingestDocument) == null) {
itemDroppedHandler.accept(indexRequest);
} else {
Map<IngestDocument.MetaData, Object> metadataMap = ingestDocument.extractMetadata();
//it's fine to set all metadata fields all the time, as ingest document holds their starting values
//before ingestion, which might also get modified during ingestion.
indexRequest.index((String) metadataMap.get(IngestDocument.MetaData.INDEX));
indexRequest.type((String) metadataMap.get(IngestDocument.MetaData.TYPE));
indexRequest.id((String) metadataMap.get(IngestDocument.MetaData.ID));
indexRequest.routing((String) metadataMap.get(IngestDocument.MetaData.ROUTING));
indexRequest.version(((Number) metadataMap.get(IngestDocument.MetaData.VERSION)).longValue());
if (metadataMap.get(IngestDocument.MetaData.VERSION_TYPE) != null) {
indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.MetaData.VERSION_TYPE)));
}
indexRequest.source(ingestDocument.getSourceAndMetadata());
}
indexRequest.source(ingestDocument.getSourceAndMetadata());
} catch (Exception e) {
totalStats.ingestFailed();
pipelineStats.ifPresent(StatsHolder::ingestFailed);

View File

@ -77,8 +77,8 @@ public final class Pipeline {
/**
* Modifies the data of a document to be indexed based on the processor this pipeline holds
*/
public void execute(IngestDocument ingestDocument) throws Exception {
compoundProcessor.execute(ingestDocument);
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
return compoundProcessor.execute(ingestDocument);
}
/**

View File

@ -40,7 +40,7 @@ public interface Processor {
/**
* Introspect and potentially modify the incoming data.
*/
void execute(IngestDocument ingestDocument) throws Exception;
IngestDocument execute(IngestDocument ingestDocument) throws Exception;
/**
* Gets the type of a processor

View File

@ -259,7 +259,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
assertFalse(action.isExecuted); // haven't executed yet
assertFalse(responseCalled.get());
assertFalse(failureCalled.get());
verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture());
verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any());
completionHandler.getValue().accept(exception);
assertTrue(failureCalled.get());
@ -293,7 +293,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
assertFalse(action.isExecuted); // haven't executed yet
assertFalse(responseCalled.get());
assertFalse(failureCalled.get());
verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture());
verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any());
completionHandler.getValue().accept(exception);
assertTrue(failureCalled.get());
@ -325,7 +325,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
action.execute(null, bulkRequest, listener);
// should not have executed ingest locally
verify(ingestService, never()).executeBulkRequest(any(), any(), any());
verify(ingestService, never()).executeBulkRequest(any(), any(), any(), any());
// but instead should have sent to a remote node with the transport service
ArgumentCaptor<DiscoveryNode> node = ArgumentCaptor.forClass(DiscoveryNode.class);
verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture());
@ -369,7 +369,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
singleItemBulkWriteAction.execute(null, indexRequest, listener);
// should not have executed ingest locally
verify(ingestService, never()).executeBulkRequest(any(), any(), any());
verify(ingestService, never()).executeBulkRequest(any(), any(), any(), any());
// but instead should have sent to a remote node with the transport service
ArgumentCaptor<DiscoveryNode> node = ArgumentCaptor.forClass(DiscoveryNode.class);
verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture());
@ -417,7 +417,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
assertFalse(action.isExecuted); // haven't executed yet
assertFalse(responseCalled.get());
assertFalse(failureCalled.get());
verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture());
verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any());
completionHandler.getValue().accept(exception);
assertTrue(failureCalled.get());
@ -449,7 +449,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
assertFalse(action.isExecuted); // haven't executed yet
assertFalse(responseCalled.get());
assertFalse(failureCalled.get());
verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture());
verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any());
completionHandler.getValue().accept(exception);
assertTrue(failureCalled.get());

View File

@ -65,8 +65,9 @@ public class ConditionalProcessorTests extends ESTestCase {
scriptName, Collections.emptyMap()), scriptService,
new Processor() {
@Override
public void execute(final IngestDocument ingestDocument) throws Exception {
public IngestDocument execute(final IngestDocument ingestDocument) throws Exception {
ingestDocument.setFieldValue("foo", "bar");
return ingestDocument;
}
@Override

View File

@ -126,7 +126,7 @@ public class IngestServiceTests extends ESTestCase {
@SuppressWarnings("unchecked")
final Consumer<Exception> completionHandler = mock(Consumer.class);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
assertTrue(failure.get());
verify(completionHandler, times(1)).accept(null);
@ -424,7 +424,7 @@ public class IngestServiceTests extends ESTestCase {
IngestService ingestService = createWithProcessors(Collections.singletonMap(
"mock", (factories, tag, config) -> new AbstractProcessor("mock") {
@Override
public void execute(IngestDocument ingestDocument) {
public IngestDocument execute(IngestDocument ingestDocument) {
throw new IllegalStateException("error");
}
@ -453,7 +453,7 @@ public class IngestServiceTests extends ESTestCase {
@SuppressWarnings("unchecked")
final Consumer<Exception> completionHandler = mock(Consumer.class);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
assertTrue(failure.get());
verify(completionHandler, times(1)).accept(null);
@ -481,7 +481,7 @@ public class IngestServiceTests extends ESTestCase {
BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
Consumer<Exception> completionHandler = mock(Consumer.class);
ingestService.executeBulkRequest(bulkRequest.requests(), failureHandler, completionHandler);
ingestService.executeBulkRequest(bulkRequest.requests(), failureHandler, completionHandler, indexReq -> {});
verify(failureHandler, times(1)).accept(
argThat(new CustomTypeSafeMatcher<IndexRequest>("failure handler was not called with the expected arguments") {
@Override
@ -514,7 +514,7 @@ public class IngestServiceTests extends ESTestCase {
final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
final Consumer<Exception> completionHandler = mock(Consumer.class);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
verify(failureHandler, never()).accept(any(), any());
verify(completionHandler, times(1)).accept(null);
}
@ -532,7 +532,7 @@ public class IngestServiceTests extends ESTestCase {
final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
final Consumer<Exception> completionHandler = mock(Consumer.class);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
verify(failureHandler, never()).accept(any(), any());
verify(completionHandler, times(1)).accept(null);
}
@ -560,14 +560,14 @@ public class IngestServiceTests extends ESTestCase {
ingestDocument.setFieldValue(metaData.getFieldName(), "update" + metaData.getFieldName());
}
}
return null;
return ingestDocument;
}).when(processor).execute(any());
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
@SuppressWarnings("unchecked")
final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
final Consumer<Exception> completionHandler = mock(Consumer.class);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
verify(processor).execute(any());
verify(failureHandler, never()).accept(any(), any());
verify(completionHandler, times(1)).accept(null);
@ -597,7 +597,7 @@ public class IngestServiceTests extends ESTestCase {
final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
final Consumer<Exception> completionHandler = mock(Consumer.class);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()));
verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class));
verify(completionHandler, times(1)).accept(null);
@ -624,7 +624,7 @@ public class IngestServiceTests extends ESTestCase {
final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
final Consumer<Exception> completionHandler = mock(Consumer.class);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
verify(failureHandler, never()).accept(eq(indexRequest), any(ElasticsearchException.class));
verify(completionHandler, times(1)).accept(null);
}
@ -661,7 +661,7 @@ public class IngestServiceTests extends ESTestCase {
final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
final Consumer<Exception> completionHandler = mock(Consumer.class);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()));
verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class));
verify(completionHandler, times(1)).accept(null);
@ -707,7 +707,7 @@ public class IngestServiceTests extends ESTestCase {
BiConsumer<IndexRequest, Exception> requestItemErrorHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
Consumer<Exception> completionHandler = mock(Consumer.class);
ingestService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler);
ingestService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {});
verify(requestItemErrorHandler, times(numIndexRequests)).accept(any(IndexRequest.class), argThat(new ArgumentMatcher<Exception>() {
@Override
@ -741,7 +741,7 @@ public class IngestServiceTests extends ESTestCase {
BiConsumer<IndexRequest, Exception> requestItemErrorHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
Consumer<Exception> completionHandler = mock(Consumer.class);
ingestService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler);
ingestService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {});
verify(requestItemErrorHandler, never()).accept(any(), any());
verify(completionHandler, times(1)).accept(null);
@ -779,7 +779,7 @@ public class IngestServiceTests extends ESTestCase {
final IndexRequest indexRequest = new IndexRequest("_index");
indexRequest.setPipeline("_id1");
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
final IngestStats afterFirstRequestStats = ingestService.stats();
assertThat(afterFirstRequestStats.getStatsPerPipeline().size(), equalTo(2));
assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L));
@ -787,7 +787,7 @@ public class IngestServiceTests extends ESTestCase {
assertThat(afterFirstRequestStats.getTotalStats().getIngestCount(), equalTo(1L));
indexRequest.setPipeline("_id2");
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
final IngestStats afterSecondRequestStats = ingestService.stats();
assertThat(afterSecondRequestStats.getStatsPerPipeline().size(), equalTo(2));
assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L));
@ -827,8 +827,9 @@ public class IngestServiceTests extends ESTestCase {
String value = (String) config.remove("value");
return new Processor() {
@Override
public void execute(IngestDocument ingestDocument) {
public IngestDocument execute(IngestDocument ingestDocument) {
ingestDocument.setFieldValue(field, value);
return ingestDocument;
}
@Override
@ -846,8 +847,9 @@ public class IngestServiceTests extends ESTestCase {
String field = (String) config.remove("field");
return new Processor() {
@Override
public void execute(IngestDocument ingestDocument) {
public IngestDocument execute(IngestDocument ingestDocument) {
ingestDocument.removeField(field);
return ingestDocument;
}
@Override

View File

@ -45,9 +45,10 @@ public class TestProcessor implements Processor {
}
@Override
public void execute(IngestDocument ingestDocument) throws Exception {
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
invokedCounter.incrementAndGet();
ingestDocumentConsumer.accept(ingestDocument);
return ingestDocument;
}
@Override

View File

@ -74,8 +74,9 @@ public class MockIngestPlugin extends Plugin implements IngestPlugin {
}
@Override
public void execute(IngestDocument ingestDocument) throws Exception {
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
// mock processor does nothing
return ingestDocument;
}
@Override

View File

@ -43,7 +43,7 @@ public final class SetSecurityUserProcessor extends AbstractProcessor {
}
@Override
public void execute(IngestDocument ingestDocument) throws Exception {
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
Authentication authentication = Authentication.getAuthentication(threadContext);
if (authentication == null) {
throw new IllegalStateException("No user authenticated, only use this processor via authenticated user");
@ -86,6 +86,7 @@ public final class SetSecurityUserProcessor extends AbstractProcessor {
}
}
ingestDocument.setFieldValue(field, userObject);
return ingestDocument;
}
@Override