Allow ingest processors to execute in a non blocking manner. (#47122)

Backport of #46241

This PR changes the ingest executing to be non blocking
by adding an additional method to the Processor interface
that accepts a BiConsumer as handler and changing
IngestService#executeBulkRequest(...) to ingest document
in a non blocking fashion iff a processor executes
in a non blocking fashion.

This is the second PR that merges changes made to server module from
the enrich branch (see #32789) into the master branch.

The plan is to merge changes made to the server module separately from
the pr that will merge enrich into master, so that these changes can
be reviewed in isolation.

This change originates from the enrich branch and was introduced there
in #43361.
This commit is contained in:
Martijn van Groningen 2019-09-26 08:55:28 +02:00 committed by GitHub
parent 45c7783018
commit 429f23ea2f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 869 additions and 409 deletions

View File

@ -28,6 +28,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BiConsumer;
import org.elasticsearch.ingest.WrappingProcessor;
import org.elasticsearch.script.ScriptService;
@ -65,29 +67,46 @@ public final class ForEachProcessor extends AbstractProcessor implements Wrappin
}
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
List<?> values = ingestDocument.getFieldValue(field, List.class, ignoreMissing);
if (values == null) {
if (ignoreMissing) {
return ingestDocument;
handler.accept(ingestDocument, null);
} else {
handler.accept(null, new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements."));
}
throw new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements.");
} else {
List<Object> newValues = new CopyOnWriteArrayList<>();
innerExecute(0, values, newValues, ingestDocument, handler);
}
List<Object> newValues = new ArrayList<>(values.size());
IngestDocument document = ingestDocument;
for (Object value : values) {
Object previousValue = ingestDocument.getIngestMetadata().put("_value", value);
try {
document = processor.execute(document);
if (document == null) {
return null;
}
} finally {
newValues.add(ingestDocument.getIngestMetadata().put("_value", previousValue));
}
void innerExecute(int index, List<?> values, List<Object> newValues, IngestDocument document,
BiConsumer<IngestDocument, Exception> handler) {
if (index == values.size()) {
document.setFieldValue(field, new ArrayList<>(newValues));
handler.accept(document, null);
return;
}
Object value = values.get(index);
Object previousValue = document.getIngestMetadata().put("_value", value);
processor.execute(document, (result, e) -> {
if (e != null) {
newValues.add(document.getIngestMetadata().put("_value", previousValue));
handler.accept(null, e);
} else if (result == null) {
handler.accept(null, null);
} else {
newValues.add(document.getIngestMetadata().put("_value", previousValue));
innerExecute(index + 1, values, newValues, document, handler);
}
}
document.setFieldValue(field, newValues);
return document;
});
}
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
throw new UnsupportedOperationException("this method should not get executed");
}
@Override

View File

@ -53,7 +53,7 @@ public class ForEachProcessorTests extends ESTestCase {
"_tag", "values", new UppercaseProcessor("_tag", "_ingest._value", false, "_ingest._value"),
false
);
processor.execute(ingestDocument);
processor.execute(ingestDocument, (result, e) -> {});
@SuppressWarnings("unchecked")
List<String> result = ingestDocument.getFieldValue("values", List.class);
@ -73,12 +73,9 @@ public class ForEachProcessorTests extends ESTestCase {
}
});
ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor, false);
try {
processor.execute(ingestDocument);
fail("exception expected");
} catch (RuntimeException e) {
assertThat(e.getMessage(), equalTo("failure"));
}
Exception[] exceptions = new Exception[1];
processor.execute(ingestDocument, (result, e) -> {exceptions[0] = e;});
assertThat(exceptions[0].getMessage(), equalTo("failure"));
assertThat(testProcessor.getInvokedCounter(), equalTo(3));
assertThat(ingestDocument.getFieldValue("values", List.class), equalTo(Arrays.asList("a", "b", "c")));
@ -95,7 +92,7 @@ public class ForEachProcessorTests extends ESTestCase {
"_tag", "values", new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor)),
false
);
processor.execute(ingestDocument);
processor.execute(ingestDocument, (result, e) -> {});
assertThat(testProcessor.getInvokedCounter(), equalTo(3));
assertThat(ingestDocument.getFieldValue("values", List.class), equalTo(Arrays.asList("A", "B", "c")));
}
@ -114,7 +111,7 @@ public class ForEachProcessorTests extends ESTestCase {
id.setFieldValue("_ingest._value.id", id.getSourceAndMetadata().get("_id"));
});
ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false);
processor.execute(ingestDocument);
processor.execute(ingestDocument, (result, e) -> {});
assertThat(innerProcessor.getInvokedCounter(), equalTo(2));
assertThat(ingestDocument.getFieldValue("values.0.index", String.class), equalTo("_index"));
@ -142,7 +139,7 @@ public class ForEachProcessorTests extends ESTestCase {
"_tag", "values", new SetProcessor("_tag",
new TestTemplateService.MockTemplateScript.Factory("_ingest._value.new_field"),
(model) -> model.get("other")), false);
processor.execute(ingestDocument);
processor.execute(ingestDocument, (result, e) -> {});
assertThat(ingestDocument.getFieldValue("values.0.new_field", String.class), equalTo("value"));
assertThat(ingestDocument.getFieldValue("values.1.new_field", String.class), equalTo("value"));
@ -180,7 +177,7 @@ public class ForEachProcessorTests extends ESTestCase {
);
ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false);
processor.execute(ingestDocument);
processor.execute(ingestDocument, (result, e) -> {});
@SuppressWarnings("unchecked")
List<String> result = ingestDocument.getFieldValue("values", List.class);
assertThat(result.size(), equalTo(numValues));
@ -205,7 +202,7 @@ public class ForEachProcessorTests extends ESTestCase {
Collections.singletonList(new UppercaseProcessor("_tag_upper", "_ingest._value", false, "_ingest._value")),
Collections.singletonList(new AppendProcessor("_tag", template, (model) -> (Collections.singletonList("added"))))
), false);
processor.execute(ingestDocument);
processor.execute(ingestDocument, (result, e) -> {});
List<?> result = ingestDocument.getFieldValue("values", List.class);
assertThat(result.get(0), equalTo("STRING"));
@ -231,7 +228,7 @@ public class ForEachProcessorTests extends ESTestCase {
TestProcessor processor = new TestProcessor(doc -> doc.setFieldValue("_ingest._value",
doc.getFieldValue("_source._value", String.class)));
ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor, false);
forEachProcessor.execute(ingestDocument);
forEachProcessor.execute(ingestDocument, (result, e) -> {});
List<?> result = ingestDocument.getFieldValue("values", List.class);
assertThat(result.get(0), equalTo("new_value"));
@ -264,7 +261,7 @@ public class ForEachProcessorTests extends ESTestCase {
);
ForEachProcessor processor = new ForEachProcessor(
"_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor, false), false);
processor.execute(ingestDocument);
processor.execute(ingestDocument, (result, e) -> {});
List<?> result = ingestDocument.getFieldValue("values1.0.values2", List.class);
assertThat(result.get(0), equalTo("ABC"));
@ -282,7 +279,7 @@ public class ForEachProcessorTests extends ESTestCase {
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
TestProcessor testProcessor = new TestProcessor(doc -> {});
ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", testProcessor, true);
processor.execute(ingestDocument);
processor.execute(ingestDocument, (result, e) -> {});
assertIngestDocument(originalIngestDocument, ingestDocument);
assertThat(testProcessor.getInvokedCounter(), equalTo(0));
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.action.bulk;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.SparseFixedBitSet;
import org.elasticsearch.Assertions;
@ -57,6 +59,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
@ -82,6 +85,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
@ -648,14 +652,13 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
}
void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListener<BulkResponse> listener) {
long ingestStartTimeInNanos = System.nanoTime();
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
ingestService.executeBulkRequest(() -> bulkRequestModifier,
(indexRequest, exception) -> {
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]",
indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), exception);
bulkRequestModifier.markCurrentItemAsFailed(exception);
}, (exception) -> {
final long ingestStartTimeInNanos = System.nanoTime();
final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
ingestService.executeBulkRequest(
original.numberOfActions(),
() -> bulkRequestModifier,
bulkRequestModifier::markItemAsFailed,
(originalThread, exception) -> {
if (exception != null) {
logger.error("failed to execute pipeline for a bulk request", exception);
listener.onFailure(exception);
@ -670,26 +673,56 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
// (this will happen if pre-processing all items in the bulk failed)
actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0));
} else {
doExecute(task, bulkRequest, actionListener);
// If a processor went async and returned a response on a different thread then
// before we continue the bulk request we should fork back on a write thread:
if (originalThread == Thread.currentThread()) {
assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE);
doExecute(task, bulkRequest, actionListener);
} else {
threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
@Override
protected void doRun() throws Exception {
doExecute(task, bulkRequest, actionListener);
}
@Override
public boolean isForceExecution() {
// If we fork back to a write thread we **not** should fail, because tp queue is full.
// (Otherwise the work done during ingest will be lost)
// It is okay to force execution here. Throttling of write requests happens prior to
// ingest when a node receives a bulk request.
return true;
}
});
}
}
}
},
indexRequest -> bulkRequestModifier.markCurrentItemAsDropped());
bulkRequestModifier::markItemAsDropped
);
}
static final class BulkRequestModifier implements Iterator<DocWriteRequest<?>> {
private static final Logger LOGGER = LogManager.getLogger(BulkRequestModifier.class);
final BulkRequest bulkRequest;
final SparseFixedBitSet failedSlots;
final List<BulkItemResponse> itemResponses;
final AtomicIntegerArray originalSlots;
int currentSlot = -1;
int[] originalSlots;
volatile int currentSlot = -1;
BulkRequestModifier(BulkRequest bulkRequest) {
this.bulkRequest = bulkRequest;
this.failedSlots = new SparseFixedBitSet(bulkRequest.requests().size());
this.itemResponses = new ArrayList<>(bulkRequest.requests().size());
this.originalSlots = new AtomicIntegerArray(bulkRequest.requests().size()); // oversize, but that's ok
}
@Override
@ -713,12 +746,11 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
int slot = 0;
List<DocWriteRequest<?>> requests = bulkRequest.requests();
originalSlots = new int[requests.size()]; // oversize, but that's ok
for (int i = 0; i < requests.size(); i++) {
DocWriteRequest<?> request = requests.get(i);
if (failedSlots.get(i) == false) {
modifiedBulkRequest.add(request);
originalSlots[slot++] = i;
originalSlots.set(slot++, i);
}
}
return modifiedBulkRequest;
@ -733,7 +765,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
return ActionListener.delegateFailure(actionListener, (delegatedListener, response) -> {
BulkItemResponse[] items = response.getItems();
for (int i = 0; i < items.length; i++) {
itemResponses.add(originalSlots[i], response.getItems()[i]);
itemResponses.add(originalSlots.get(i), response.getItems()[i]);
}
delegatedListener.onResponse(
new BulkResponse(
@ -742,12 +774,12 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
}
}
void markCurrentItemAsDropped() {
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(currentSlot));
failedSlots.set(currentSlot);
synchronized void markItemAsDropped(int slot) {
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot));
failedSlots.set(slot);
final String id = indexRequest.id() == null ? DROPPED_ITEM_WITH_AUTO_GENERATED_ID : indexRequest.id();
itemResponses.add(
new BulkItemResponse(currentSlot, indexRequest.opType(),
new BulkItemResponse(slot, indexRequest.opType(),
new UpdateResponse(
new ShardId(indexRequest.index(), IndexMetaData.INDEX_UUID_NA_VALUE, 0),
indexRequest.type(), id, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
@ -757,16 +789,19 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
);
}
void markCurrentItemAsFailed(Exception e) {
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(currentSlot));
synchronized void markItemAsFailed(int slot, Exception e) {
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot));
LOGGER.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]",
indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), e);
// We hit a error during preprocessing a request, so we:
// 1) Remember the request item slot from the bulk, so that we're done processing all requests we know what failed
// 2) Add a bulk item failure for this request
// 3) Continue with the next request in the bulk.
failedSlots.set(currentSlot);
failedSlots.set(slot);
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(),
indexRequest.id(), e);
itemResponses.add(new BulkItemResponse(currentSlot, indexRequest.opType(), failure));
itemResponses.add(new BulkItemResponse(slot, indexRequest.opType(), failure));
}
}

View File

@ -26,8 +26,10 @@ import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import static org.elasticsearch.ingest.TrackingResultProcessor.decorate;
@ -41,38 +43,42 @@ class SimulateExecutionService {
this.threadPool = threadPool;
}
SimulateDocumentResult executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose) {
void executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose,
BiConsumer<SimulateDocumentResult, Exception> handler) {
if (verbose) {
List<SimulateProcessorResult> processorResultList = new ArrayList<>();
List<SimulateProcessorResult> processorResultList = new CopyOnWriteArrayList<>();
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList);
try {
Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(),
verbosePipelineProcessor);
ingestDocument.executePipeline(verbosePipeline);
return new SimulateDocumentVerboseResult(processorResultList);
} catch (Exception e) {
return new SimulateDocumentVerboseResult(processorResultList);
}
Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(),
verbosePipelineProcessor);
ingestDocument.executePipeline(verbosePipeline, (result, e) -> {
handler.accept(new SimulateDocumentVerboseResult(processorResultList), e);
});
} else {
try {
IngestDocument result = pipeline.execute(ingestDocument);
return new SimulateDocumentBaseResult(result);
} catch (Exception e) {
return new SimulateDocumentBaseResult(e);
}
pipeline.execute(ingestDocument, (result, e) -> {
if (e == null) {
handler.accept(new SimulateDocumentBaseResult(result), null);
} else {
handler.accept(new SimulateDocumentBaseResult(e), null);
}
});
}
}
public void execute(SimulatePipelineRequest.Parsed request, ActionListener<SimulatePipelineResponse> listener) {
threadPool.executor(THREAD_POOL_NAME).execute(ActionRunnable.wrap(listener, l -> {
List<SimulateDocumentResult> responses = new ArrayList<>();
for (IngestDocument ingestDocument : request.getDocuments()) {
SimulateDocumentResult response = executeDocument(request.getPipeline(), ingestDocument, request.isVerbose());
final AtomicInteger counter = new AtomicInteger();
final List<SimulateDocumentResult> responses = new CopyOnWriteArrayList<>();
for (IngestDocument ingestDocument : request.getDocuments()) {
executeDocument(request.getPipeline(), ingestDocument, request.isVerbose(), (response, e) -> {
if (response != null) {
responses.add(response);
}
}
l.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(), request.isVerbose(), responses));
if (counter.incrementAndGet() == request.getDocuments().size()) {
l.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(),
request.isVerbose(), responses));
}
});
}
}));
}
}

View File

@ -28,6 +28,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
@ -114,58 +115,78 @@ public class CompoundProcessor implements Processor {
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
for (Tuple<Processor, IngestMetric> processorWithMetric : processorsWithMetrics) {
Processor processor = processorWithMetric.v1();
IngestMetric metric = processorWithMetric.v2();
long startTimeInNanos = relativeTimeProvider.getAsLong();
try {
metric.preIngest();
if (processor.execute(ingestDocument) == null) {
return null;
}
} catch (Exception e) {
metric.ingestFailed();
if (ignoreFailure) {
continue;
}
ElasticsearchException compoundProcessorException =
newCompoundProcessorException(e, processor.getType(), processor.getTag());
if (onFailureProcessors.isEmpty()) {
throw compoundProcessorException;
} else {
if (executeOnFailure(ingestDocument, compoundProcessorException) == false) {
return null;
}
break;
}
} finally {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
metric.postIngest(ingestTimeInMillis);
}
}
return ingestDocument;
throw new UnsupportedOperationException("this method should not get executed");
}
/**
* @return true if execution should continue, false if document is dropped.
*/
boolean executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception {
try {
putFailureMetadata(ingestDocument, exception);
for (Processor processor : onFailureProcessors) {
try {
if (processor.execute(ingestDocument) == null) {
return false;
@Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
innerExecute(0, ingestDocument, handler);
}
void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
if (currentProcessor == processorsWithMetrics.size()) {
handler.accept(ingestDocument, null);
return;
}
Tuple<Processor, IngestMetric> processorWithMetric = processorsWithMetrics.get(currentProcessor);
final Processor processor = processorWithMetric.v1();
final IngestMetric metric = processorWithMetric.v2();
final long startTimeInNanos = relativeTimeProvider.getAsLong();
metric.preIngest();
processor.execute(ingestDocument, (result, e) -> {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
metric.postIngest(ingestTimeInMillis);
if (e != null) {
metric.ingestFailed();
if (ignoreFailure) {
innerExecute(currentProcessor + 1, ingestDocument, handler);
} else {
ElasticsearchException compoundProcessorException =
newCompoundProcessorException(e, processor.getType(), processor.getTag());
if (onFailureProcessors.isEmpty()) {
handler.accept(null, compoundProcessorException);
} else {
executeOnFailureAsync(0, ingestDocument, compoundProcessorException, handler);
}
} catch (Exception e) {
throw newCompoundProcessorException(e, processor.getType(), processor.getTag());
}
} else {
if (result != null) {
innerExecute(currentProcessor + 1, result, handler);
} else {
handler.accept(null, null);
}
}
} finally {
removeFailureMetadata(ingestDocument);
});
}
void executeOnFailureAsync(int currentOnFailureProcessor, IngestDocument ingestDocument, ElasticsearchException exception,
BiConsumer<IngestDocument, Exception> handler) {
if (currentOnFailureProcessor == 0) {
putFailureMetadata(ingestDocument, exception);
}
return true;
if (currentOnFailureProcessor == onFailureProcessors.size()) {
removeFailureMetadata(ingestDocument);
handler.accept(ingestDocument, null);
return;
}
final Processor onFailureProcessor = onFailureProcessors.get(currentOnFailureProcessor);
onFailureProcessor.execute(ingestDocument, (result, e) -> {
if (e != null) {
removeFailureMetadata(ingestDocument);
handler.accept(null, newCompoundProcessorException(e, onFailureProcessor.getType(), onFailureProcessor.getTag()));
return;
}
if (result == null) {
removeFailureMetadata(ingestDocument);
handler.accept(null, null);
return;
}
executeOnFailureAsync(currentOnFailureProcessor + 1, ingestDocument, exception, handler);
});
}
private void putFailureMetadata(IngestDocument ingestDocument, ElasticsearchException cause) {

View File

@ -30,6 +30,7 @@ import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
@ -74,21 +75,28 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP
}
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
if (evaluate(ingestDocument)) {
long startTimeInNanos = relativeTimeProvider.getAsLong();
try {
metric.preIngest();
return processor.execute(ingestDocument);
} catch (Exception e) {
metric.ingestFailed();
throw e;
} finally {
final long startTimeInNanos = relativeTimeProvider.getAsLong();
metric.preIngest();
processor.execute(ingestDocument, (result, e) -> {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
metric.postIngest(ingestTimeInMillis);
}
if (e != null) {
metric.ingestFailed();
handler.accept(null, e);
} else {
handler.accept(result, null);
}
});
} else {
handler.accept(ingestDocument, null);
}
return ingestDocument;
}
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
throw new UnsupportedOperationException("this method should not get executed");
}
boolean evaluate(IngestDocument ingestDocument) {

View File

@ -43,6 +43,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;
/**
* Represents a single document being captured before indexing and holds the source and metadata (like id, type and index).
@ -641,17 +642,18 @@ public final class IngestDocument {
/**
* Executes the given pipeline with for this document unless the pipeline has already been executed
* for this document.
* @param pipeline Pipeline to execute
* @throws Exception On exception in pipeline execution
*
* @param pipeline the pipeline to execute
* @param handler handles the result or failure
*/
public IngestDocument executePipeline(Pipeline pipeline) throws Exception {
try {
if (this.executedPipelines.add(pipeline) == false) {
throw new IllegalStateException("Cycle detected for pipeline: " + pipeline.getId());
}
return pipeline.execute(this);
} finally {
executedPipelines.remove(pipeline);
public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Exception> handler) {
if (executedPipelines.add(pipeline)) {
pipeline.execute(this, (result, e) -> {
executedPipelines.remove(pipeline);
handler.accept(result, e);
});
} else {
handler.accept(null, new IllegalStateException("Cycle detected for pipeline: " + pipeline.getId()));
}
}

View File

@ -63,8 +63,10 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.IntConsumer;
/**
* Holder class for several ingest related services.
@ -329,42 +331,72 @@ public class IngestService implements ClusterStateApplier {
ExceptionsHelper.rethrowAndSuppress(exceptions);
}
public void executeBulkRequest(Iterable<DocWriteRequest<?>> actionRequests,
BiConsumer<IndexRequest, Exception> itemFailureHandler, Consumer<Exception> completionHandler,
Consumer<IndexRequest> itemDroppedHandler) {
public void executeBulkRequest(int numberOfActionRequests,
Iterable<DocWriteRequest<?>> actionRequests,
BiConsumer<Integer, Exception> itemFailureHandler,
BiConsumer<Thread, Exception> completionHandler,
IntConsumer itemDroppedHandler) {
threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
completionHandler.accept(e);
completionHandler.accept(null, e);
}
@Override
protected void doRun() {
final Thread originalThread = Thread.currentThread();
final AtomicInteger counter = new AtomicInteger(numberOfActionRequests);
int i = 0;
for (DocWriteRequest<?> actionRequest : actionRequests) {
IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest);
if (indexRequest == null) {
if (counter.decrementAndGet() == 0){
completionHandler.accept(originalThread, null);
}
assert counter.get() >= 0;
continue;
}
String pipelineId = indexRequest.getPipeline();
if (NOOP_PIPELINE_NAME.equals(pipelineId) == false) {
try {
PipelineHolder holder = pipelines.get(pipelineId);
if (holder == null) {
throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist");
}
Pipeline pipeline = holder.pipeline;
innerExecute(indexRequest, pipeline, itemDroppedHandler);
//this shouldn't be needed here but we do it for consistency with index api
// which requires it to prevent double execution
indexRequest.setPipeline(NOOP_PIPELINE_NAME);
} catch (Exception e) {
itemFailureHandler.accept(indexRequest, e);
if (NOOP_PIPELINE_NAME.equals(pipelineId)) {
if (counter.decrementAndGet() == 0){
completionHandler.accept(originalThread, null);
}
assert counter.get() >= 0;
continue;
}
final int slot = i;
try {
PipelineHolder holder = pipelines.get(pipelineId);
if (holder == null) {
throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist");
}
Pipeline pipeline = holder.pipeline;
innerExecute(slot, indexRequest, pipeline, itemDroppedHandler, e -> {
if (e == null) {
// this shouldn't be needed here but we do it for consistency with index api
// which requires it to prevent double execution
indexRequest.setPipeline(NOOP_PIPELINE_NAME);
} else {
itemFailureHandler.accept(slot, e);
}
if (counter.decrementAndGet() == 0){
completionHandler.accept(originalThread, null);
}
assert counter.get() >= 0;
});
} catch (Exception e) {
itemFailureHandler.accept(slot, e);
if (counter.decrementAndGet() == 0){
completionHandler.accept(originalThread, null);
}
assert counter.get() >= 0;
}
i++;
}
completionHandler.accept(null);
}
});
}
@ -420,26 +452,34 @@ public class IngestService implements ClusterStateApplier {
return sb.toString();
}
private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer<IndexRequest> itemDroppedHandler) throws Exception {
private void innerExecute(int slot, IndexRequest indexRequest, Pipeline pipeline, IntConsumer itemDroppedHandler,
Consumer<Exception> handler) {
if (pipeline.getProcessors().isEmpty()) {
handler.accept(null);
return;
}
long startTimeInNanos = System.nanoTime();
// the pipeline specific stat holder may not exist and that is fine:
// (e.g. the pipeline may have been removed while we're ingesting a document
try {
totalMetrics.preIngest();
String index = indexRequest.index();
String type = indexRequest.type();
String id = indexRequest.id();
String routing = indexRequest.routing();
Long version = indexRequest.version();
VersionType versionType = indexRequest.versionType();
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, version, versionType, sourceAsMap);
if (pipeline.execute(ingestDocument) == null) {
itemDroppedHandler.accept(indexRequest);
totalMetrics.preIngest();
String index = indexRequest.index();
String type = indexRequest.type();
String id = indexRequest.id();
String routing = indexRequest.routing();
Long version = indexRequest.version();
VersionType versionType = indexRequest.versionType();
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, version, versionType, sourceAsMap);
pipeline.execute(ingestDocument, (result, e) -> {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos);
totalMetrics.postIngest(ingestTimeInMillis);
if (e != null) {
totalMetrics.ingestFailed();
handler.accept(e);
} else if (result == null) {
itemDroppedHandler.accept(slot);
handler.accept(null);
} else {
Map<IngestDocument.MetaData, Object> metadataMap = ingestDocument.extractMetadata();
//it's fine to set all metadata fields all the time, as ingest document holds their starting values
@ -453,14 +493,9 @@ public class IngestService implements ClusterStateApplier {
indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.MetaData.VERSION_TYPE)));
}
indexRequest.source(ingestDocument.getSourceAndMetadata(), indexRequest.getContentType());
handler.accept(null);
}
} catch (Exception e) {
totalMetrics.ingestFailed();
throw e;
} finally {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos);
totalMetrics.postIngest(ingestTimeInMillis);
}
});
}
@Override

View File

@ -27,6 +27,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier;
import org.elasticsearch.script.ScriptService;
@ -93,18 +94,17 @@ public final class Pipeline {
* If <code>null</code> is returned then this document will be dropped and not indexed, otherwise
* this document will be kept and indexed.
*/
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
long startTimeInNanos = relativeTimeProvider.getAsLong();
try {
metrics.preIngest();
return compoundProcessor.execute(ingestDocument);
} catch (Exception e) {
metrics.ingestFailed();
throw e;
} finally {
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
final long startTimeInNanos = relativeTimeProvider.getAsLong();
metrics.preIngest();
compoundProcessor.execute(ingestDocument, (result, e) -> {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
metrics.postIngest(ingestTimeInMillis);
}
if (e != null) {
metrics.ingestFailed();
}
handler.accept(result, e);
});
}
/**

View File

@ -20,6 +20,7 @@
package org.elasticsearch.ingest;
import java.util.Map;
import java.util.function.BiConsumer;
public class PipelineProcessor extends AbstractProcessor {
@ -36,12 +37,19 @@ public class PipelineProcessor extends AbstractProcessor {
}
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
Pipeline pipeline = ingestService.getPipeline(pipelineName);
if (pipeline == null) {
throw new IllegalStateException("Pipeline processor configured for non-existent pipeline [" + pipelineName + ']');
if (pipeline != null) {
ingestDocument.executePipeline(pipeline, handler);
} else {
handler.accept(null,
new IllegalStateException("Pipeline processor configured for non-existent pipeline [" + pipelineName + ']'));
}
return ingestDocument.executePipeline(pipeline);
}
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
throw new UnsupportedOperationException("this method should not get executed");
}
Pipeline getPipeline(){

View File

@ -27,6 +27,7 @@ import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.Scheduler;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.LongSupplier;
@ -38,6 +39,21 @@ import java.util.function.LongSupplier;
*/
public interface Processor {
/**
* Introspect and potentially modify the incoming data.
*
* Expert method: only override this method if a processor implementation needs to make an asynchronous call,
* otherwise just overwrite {@link #execute(IngestDocument)}.
*/
default void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
try {
IngestDocument result = execute(ingestDocument);
handler.accept(result, null);
} catch (Exception e) {
handler.accept(null, e);
}
}
/**
* Introspect and potentially modify the incoming data.
*

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.ingest.SimulateProcessorResult;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;
/**
* Processor to be used within Simulate API to keep track of processors executed in pipeline.
@ -41,56 +42,76 @@ public final class TrackingResultProcessor implements Processor {
}
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
Processor processor = actualProcessor;
try {
if (processor instanceof ConditionalProcessor) {
ConditionalProcessor conditionalProcessor = (ConditionalProcessor) processor;
if (conditionalProcessor.evaluate(ingestDocument) == false) {
return ingestDocument;
}
if (conditionalProcessor.getInnerProcessor() instanceof PipelineProcessor) {
processor = conditionalProcessor.getInnerProcessor();
}
}
if (processor instanceof PipelineProcessor) {
PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor);
Pipeline pipeline = pipelineProcessor.getPipeline();
//runtime check for cycles against a copy of the document. This is needed to properly handle conditionals around pipelines
try {
IngestDocument ingestDocumentCopy = new IngestDocument(ingestDocument);
ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline());
} catch (ElasticsearchException elasticsearchException) {
if (elasticsearchException.getCause().getCause() instanceof IllegalStateException) {
throw elasticsearchException;
}
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
if (actualProcessor instanceof PipelineProcessor) {
PipelineProcessor pipelineProcessor = ((PipelineProcessor) actualProcessor);
Pipeline pipeline = pipelineProcessor.getPipeline();
//runtime check for cycles against a copy of the document. This is needed to properly handle conditionals around pipelines
IngestDocument ingestDocumentCopy = new IngestDocument(ingestDocument);
ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline(), (result, e) -> {
// do nothing, let the tracking processors throw the exception while recording the path up to the failure
if (e instanceof ElasticsearchException) {
ElasticsearchException elasticsearchException = (ElasticsearchException) e;
//else do nothing, let the tracking processors throw the exception while recording the path up to the failure
} catch (Exception e) {
// do nothing, let the tracking processors throw the exception while recording the path up to the failure
if (elasticsearchException.getCause().getCause() instanceof IllegalStateException) {
if (ignoreFailure) {
processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getTag(),
new IngestDocument(ingestDocument), e));
} else {
processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getTag(), e));
}
handler.accept(null, elasticsearchException);
}
} else {
//now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and execute it
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList);
Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(),
verbosePipelineProcessor);
ingestDocument.executePipeline(verbosePipeline, handler);
}
//now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and execute it
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList);
Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(),
verbosePipelineProcessor);
ingestDocument.executePipeline(verbosePipeline);
});
return;
}
final Processor processor;
if (actualProcessor instanceof ConditionalProcessor) {
ConditionalProcessor conditionalProcessor = (ConditionalProcessor) actualProcessor;
if (conditionalProcessor.evaluate(ingestDocument) == false) {
handler.accept(ingestDocument, null);
return;
}
if (conditionalProcessor.getInnerProcessor() instanceof PipelineProcessor) {
processor = conditionalProcessor.getInnerProcessor();
} else {
processor = actualProcessor;
}
} else {
processor = actualProcessor;
}
processor.execute(ingestDocument, (result, e) -> {
if (e != null) {
if (ignoreFailure) {
processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument), e));
} else {
processorResultList.add(new SimulateProcessorResult(processor.getTag(), e));
}
handler.accept(null, e);
} else {
IngestDocument result = processor.execute(ingestDocument);
if (result != null) {
processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument)));
handler.accept(result, null);
} else {
processorResultList.add(new SimulateProcessorResult(processor.getTag()));
return null;
handler.accept(null, null);
}
}
} catch (Exception e) {
if (ignoreFailure) {
processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument), e));
} else {
processorResultList.add(new SimulateProcessorResult(processor.getTag(), e));
}
throw e;
}
return ingestDocument;
});
}
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
throw new UnsupportedOperationException();
}
@Override

View File

@ -55,7 +55,7 @@ public class BulkRequestModifierTests extends ESTestCase {
while (bulkRequestModifier.hasNext()) {
bulkRequestModifier.next();
if (randomBoolean()) {
bulkRequestModifier.markCurrentItemAsFailed(new RuntimeException());
bulkRequestModifier.markItemAsFailed(i, new RuntimeException());
failedSlots.add(i);
}
i++;
@ -93,7 +93,7 @@ public class BulkRequestModifierTests extends ESTestCase {
for (int i = 0; modifier.hasNext(); i++) {
modifier.next();
if (i % 2 == 0) {
modifier.markCurrentItemAsFailed(new RuntimeException());
modifier.markItemAsFailed(i, new RuntimeException());
}
}

View File

@ -67,11 +67,11 @@ import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
@ -93,6 +93,8 @@ public class TransportBulkActionIngestTests extends ESTestCase {
private static final Settings SETTINGS =
Settings.builder().put(AutoCreateIndex.AUTO_CREATE_INDEX_SETTING.getKey(), true).build();
private static final Thread DUMMY_WRITE_THREAD = new Thread(ThreadPool.Names.WRITE);
/** Services needed by bulk action */
TransportService transportService;
ClusterService clusterService;
@ -101,9 +103,9 @@ public class TransportBulkActionIngestTests extends ESTestCase {
/** Arguments to callbacks we want to capture, but which require generics, so we must use @Captor */
@Captor
ArgumentCaptor<BiConsumer<IndexRequest, Exception>> failureHandler;
ArgumentCaptor<BiConsumer<Integer, Exception>> failureHandler;
@Captor
ArgumentCaptor<Consumer<Exception>> completionHandler;
ArgumentCaptor<BiConsumer<Thread, Exception>> completionHandler;
@Captor
ArgumentCaptor<TransportResponseHandler<BulkResponse>> remoteResponseHandler;
@Captor
@ -265,15 +267,16 @@ public class TransportBulkActionIngestTests extends ESTestCase {
assertFalse(action.isExecuted); // haven't executed yet
assertFalse(responseCalled.get());
assertFalse(failureCalled.get());
verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any());
completionHandler.getValue().accept(exception);
verify(ingestService).executeBulkRequest(eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(),
failureHandler.capture(), completionHandler.capture(), any());
completionHandler.getValue().accept(null, exception);
assertTrue(failureCalled.get());
// now check success
Iterator<DocWriteRequest<?>> req = bulkDocsItr.getValue().iterator();
failureHandler.getValue().accept((IndexRequest)req.next(), exception); // have an exception for our one index request
failureHandler.getValue().accept(0, exception); // have an exception for our one index request
indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
completionHandler.getValue().accept(null);
completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null);
assertTrue(action.isExecuted);
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
verifyZeroInteractions(transportService);
@ -299,13 +302,14 @@ public class TransportBulkActionIngestTests extends ESTestCase {
assertFalse(action.isExecuted); // haven't executed yet
assertFalse(responseCalled.get());
assertFalse(failureCalled.get());
verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any());
completionHandler.getValue().accept(exception);
verify(ingestService).executeBulkRequest(eq(1), bulkDocsItr.capture(), failureHandler.capture(),
completionHandler.capture(), any());
completionHandler.getValue().accept(null, exception);
assertTrue(failureCalled.get());
// now check success
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
completionHandler.getValue().accept(null);
completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null);
assertTrue(action.isExecuted);
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
verifyZeroInteractions(transportService);
@ -331,7 +335,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
action.execute(null, bulkRequest, listener);
// should not have executed ingest locally
verify(ingestService, never()).executeBulkRequest(any(), any(), any(), any());
verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any());
// but instead should have sent to a remote node with the transport service
ArgumentCaptor<DiscoveryNode> node = ArgumentCaptor.forClass(DiscoveryNode.class);
verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture());
@ -375,7 +379,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
singleItemBulkWriteAction.execute(null, indexRequest, listener);
// should not have executed ingest locally
verify(ingestService, never()).executeBulkRequest(any(), any(), any(), any());
verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any());
// but instead should have sent to a remote node with the transport service
ArgumentCaptor<DiscoveryNode> node = ArgumentCaptor.forClass(DiscoveryNode.class);
verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture());
@ -459,18 +463,19 @@ public class TransportBulkActionIngestTests extends ESTestCase {
assertFalse(action.isExecuted); // haven't executed yet
assertFalse(responseCalled.get());
assertFalse(failureCalled.get());
verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any());
verify(ingestService).executeBulkRequest(eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(),
failureHandler.capture(), completionHandler.capture(), any());
assertEquals(indexRequest1.getPipeline(), "default_pipeline");
assertEquals(indexRequest2.getPipeline(), "default_pipeline");
assertEquals(indexRequest3.getPipeline(), "default_pipeline");
completionHandler.getValue().accept(exception);
completionHandler.getValue().accept(null, exception);
assertTrue(failureCalled.get());
// now check success of the transport bulk action
indexRequest1.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
indexRequest3.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
completionHandler.getValue().accept(null);
completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null);
assertTrue(action.isExecuted);
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
verifyZeroInteractions(transportService);
@ -497,14 +502,15 @@ public class TransportBulkActionIngestTests extends ESTestCase {
assertFalse(action.indexCreated); // no index yet
assertFalse(responseCalled.get());
assertFalse(failureCalled.get());
verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any());
completionHandler.getValue().accept(exception);
verify(ingestService).executeBulkRequest(eq(1), bulkDocsItr.capture(), failureHandler.capture(),
completionHandler.capture(), any());
completionHandler.getValue().accept(null, exception);
assertFalse(action.indexCreated); // still no index yet, the ingest node failed.
assertTrue(failureCalled.get());
// now check success
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
completionHandler.getValue().accept(null);
completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null);
assertTrue(action.isExecuted);
assertTrue(action.indexCreated); // now the index is created since we skipped the ingest node path.
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
@ -561,7 +567,8 @@ public class TransportBulkActionIngestTests extends ESTestCase {
}));
assertEquals("pipeline2", indexRequest.getPipeline());
verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any());
verify(ingestService).executeBulkRequest(eq(1), bulkDocsItr.capture(), failureHandler.capture(),
completionHandler.capture(), any());
}
private void validateDefaultPipeline(IndexRequest indexRequest) {
@ -583,14 +590,15 @@ public class TransportBulkActionIngestTests extends ESTestCase {
assertFalse(action.isExecuted); // haven't executed yet
assertFalse(responseCalled.get());
assertFalse(failureCalled.get());
verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any());
verify(ingestService).executeBulkRequest(eq(1), bulkDocsItr.capture(), failureHandler.capture(),
completionHandler.capture(), any());
assertEquals(indexRequest.getPipeline(), "default_pipeline");
completionHandler.getValue().accept(exception);
completionHandler.getValue().accept(null, exception);
assertTrue(failureCalled.get());
// now check success
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
completionHandler.getValue().accept(null);
completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null);
assertTrue(action.isExecuted);
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
verifyZeroInteractions(transportService);

View File

@ -0,0 +1,160 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;
import static org.hamcrest.Matchers.equalTo;
/**
* The purpose of this test is to verify that when a processor executes an operation asynchronously that
* the expected result is the same as if the same operation happens synchronously.
*
* In this test two test processor are defined that basically do the same operation, but a single processor
* executes asynchronously. The result of the operation should be the same and also the order in which the
* bulk responses are returned should be the same as how the corresponding index requests were defined.
*/
public class AsyncIngestProcessorIT extends ESSingleNodeTestCase {
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return Collections.singleton(TestPlugin.class);
}
public void testAsyncProcessorImplementation() {
// A pipeline with 2 processors: the test async processor and sync test processor.
BytesReference pipelineBody = new BytesArray("{\"processors\": [{\"test-async\": {}, \"test\": {}}]}");
client().admin().cluster().putPipeline(new PutPipelineRequest("_id", pipelineBody, XContentType.JSON)).actionGet();
BulkRequest bulkRequest = new BulkRequest();
int numDocs = randomIntBetween(8, 256);
for (int i = 0; i < numDocs; i++) {
bulkRequest.add(new IndexRequest("foobar")
.id(Integer.toString(i))
.source("{}", XContentType.JSON)
.setPipeline("_id")
);
}
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
assertThat(bulkResponse.getItems().length, equalTo(numDocs));
for (int i = 0; i < numDocs; i++) {
String id = Integer.toString(i);
assertThat(bulkResponse.getItems()[i].getId(), equalTo(id));
GetResponse getResponse = client().get(new GetRequest("foobar", id)).actionGet();
// The expected result of async test processor:
assertThat(getResponse.getSource().get("foo"), equalTo("bar-" + id));
// The expected result of sync test processor:
assertThat(getResponse.getSource().get("bar"), equalTo("baz-" + id));
}
}
public static class TestPlugin extends Plugin implements IngestPlugin {
private ThreadPool threadPool;
@Override
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
NamedXContentRegistry xContentRegistry, Environment environment,
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
this.threadPool = threadPool;
return Collections.emptyList();
}
@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
Map<String, Processor.Factory> processors = new HashMap<>();
processors.put("test-async", (factories, tag, config) -> {
return new AbstractProcessor(tag) {
@Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
threadPool.generic().execute(() -> {
String id = (String) ingestDocument.getSourceAndMetadata().get("_id");
if (usually()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// ignore
}
}
ingestDocument.setFieldValue("foo", "bar-" + id);
handler.accept(ingestDocument, null);
});
}
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public String getType() {
return "test-async";
}
};
});
processors.put("test", (processorFactories, tag, config) -> {
return new AbstractProcessor(tag) {
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
String id = (String) ingestDocument.getSourceAndMetadata().get("_id");
ingestDocument.setFieldValue("bar", "baz-" + id);
return ingestDocument;
}
@Override
public String getType() {
return "test";
}
};
});
return processors;
}
}
}

View File

@ -34,6 +34,8 @@ import org.junit.Before;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
import static org.hamcrest.Matchers.equalTo;
@ -66,7 +68,15 @@ public class SimulateExecutionServiceTests extends ESTestCase {
public void testExecuteVerboseItem() throws Exception {
TestProcessor processor = new TestProcessor("test-id", "mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> {
holder.set(r);
latch.countDown();
});
latch.await();
SimulateDocumentResult actualItemResponse = holder.get();
assertThat(processor.getInvokedCounter(), equalTo(2));
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
@ -91,7 +101,14 @@ public class SimulateExecutionServiceTests extends ESTestCase {
public void testExecuteItem() throws Exception {
TestProcessor processor = new TestProcessor("processor_0", "mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, false);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
executionService.executeDocument(pipeline, ingestDocument, false, (r, e) -> {
holder.set(r);
latch.countDown();
});
latch.await();
SimulateDocumentResult actualItemResponse = holder.get();
assertThat(processor.getInvokedCounter(), equalTo(2));
assertThat(actualItemResponse, instanceOf(SimulateDocumentBaseResult.class));
SimulateDocumentBaseResult simulateDocumentBaseResult = (SimulateDocumentBaseResult) actualItemResponse;
@ -104,7 +121,14 @@ public class SimulateExecutionServiceTests extends ESTestCase {
TestProcessor processor2 = new TestProcessor("processor_1", "mock", new RuntimeException("processor failed"));
TestProcessor processor3 = new TestProcessor("processor_2", "mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2, processor3));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> {
holder.set(r);
latch.countDown();
});
latch.await();
SimulateDocumentResult actualItemResponse = holder.get();
assertThat(processor1.getInvokedCounter(), equalTo(1));
assertThat(processor2.getInvokedCounter(), equalTo(1));
assertThat(processor3.getInvokedCounter(), equalTo(0));
@ -131,7 +155,14 @@ public class SimulateExecutionServiceTests extends ESTestCase {
Pipeline pipeline = new Pipeline("_id", "_description", version,
new CompoundProcessor(new CompoundProcessor(false, Collections.singletonList(processor1),
Collections.singletonList(processor2)), processor3));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> {
holder.set(r);
latch.countDown();
});
latch.await();
SimulateDocumentResult actualItemResponse = holder.get();
assertThat(processor1.getInvokedCounter(), equalTo(1));
assertThat(processor2.getInvokedCounter(), equalTo(1));
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
@ -166,7 +197,14 @@ public class SimulateExecutionServiceTests extends ESTestCase {
TestProcessor testProcessor = new TestProcessor("processor_0", "mock", exception);
CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList());
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> {
holder.set(r);
latch.countDown();
});
latch.await();
SimulateDocumentResult actualItemResponse = holder.get();
assertThat(testProcessor.getInvokedCounter(), equalTo(1));
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
@ -183,7 +221,14 @@ public class SimulateExecutionServiceTests extends ESTestCase {
TestProcessor testProcessor = new TestProcessor("processor_0", "mock", ingestDocument -> { });
CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList());
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> {
holder.set(r);
latch.countDown();
});
latch.await();
SimulateDocumentResult actualItemResponse = holder.get();
assertThat(testProcessor.getInvokedCounter(), equalTo(1));
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
@ -199,7 +244,14 @@ public class SimulateExecutionServiceTests extends ESTestCase {
public void testExecuteItemWithFailure() throws Exception {
TestProcessor processor = new TestProcessor(ingestDocument -> { throw new RuntimeException("processor failed"); });
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, false);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
executionService.executeDocument(pipeline, ingestDocument, false, (r, e) -> {
holder.set(r);
latch.countDown();
});
latch.await();
SimulateDocumentResult actualItemResponse = holder.get();
assertThat(processor.getInvokedCounter(), equalTo(1));
assertThat(actualItemResponse, instanceOf(SimulateDocumentBaseResult.class));
SimulateDocumentBaseResult simulateDocumentBaseResult = (SimulateDocumentBaseResult) actualItemResponse;
@ -210,12 +262,19 @@ public class SimulateExecutionServiceTests extends ESTestCase {
assertThat(exception.getMessage(), equalTo("java.lang.IllegalArgumentException: java.lang.RuntimeException: processor failed"));
}
public void testDropDocument() {
public void testDropDocument() throws Exception {
TestProcessor processor1 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field", "value"));
Processor processor2 = new DropProcessor.Factory().create(Collections.emptyMap(), null, Collections.emptyMap());
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, false);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
executionService.executeDocument(pipeline, ingestDocument, false, (r, e) -> {
holder.set(r);
latch.countDown();
});
latch.await();
SimulateDocumentResult actualItemResponse = holder.get();
assertThat(processor1.getInvokedCounter(), equalTo(1));
assertThat(actualItemResponse, instanceOf(SimulateDocumentBaseResult.class));
SimulateDocumentBaseResult simulateDocumentBaseResult = (SimulateDocumentBaseResult) actualItemResponse;
@ -223,12 +282,19 @@ public class SimulateExecutionServiceTests extends ESTestCase {
assertThat(simulateDocumentBaseResult.getFailure(), nullValue());
}
public void testDropDocumentVerbose() {
public void testDropDocumentVerbose() throws Exception {
TestProcessor processor1 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field", "value"));
Processor processor2 = new DropProcessor.Factory().create(Collections.emptyMap(), null, Collections.emptyMap());
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> {
holder.set(r);
latch.countDown();
});
latch.await();
SimulateDocumentResult actualItemResponse = holder.get();
assertThat(processor1.getInvokedCounter(), equalTo(1));
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult verboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
@ -239,13 +305,20 @@ public class SimulateExecutionServiceTests extends ESTestCase {
assertThat(verboseResult.getProcessorResults().get(1).getFailure(), nullValue());
}
public void testDropDocumentVerboseExtraProcessor() {
public void testDropDocumentVerboseExtraProcessor() throws Exception {
TestProcessor processor1 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field1", "value"));
Processor processor2 = new DropProcessor.Factory().create(Collections.emptyMap(), null, Collections.emptyMap());
TestProcessor processor3 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field2", "value"));
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2, processor3));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> {
holder.set(r);
latch.countDown();
});
latch.await();
SimulateDocumentResult actualItemResponse = holder.get();
assertThat(processor1.getInvokedCounter(), equalTo(1));
assertThat(processor3.getInvokedCounter(), equalTo(0));
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));

View File

@ -34,6 +34,7 @@ import java.util.function.LongSupplier;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@ -52,7 +53,7 @@ public class CompoundProcessorTests extends ESTestCase {
CompoundProcessor processor = new CompoundProcessor();
assertThat(processor.getProcessors().isEmpty(), is(true));
assertThat(processor.getOnFailureProcessors().isEmpty(), is(true));
processor.execute(ingestDocument);
processor.execute(ingestDocument, (result, e) -> {});
}
public void testSingleProcessor() throws Exception {
@ -67,7 +68,7 @@ public class CompoundProcessorTests extends ESTestCase {
assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor));
assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor));
assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true));
compoundProcessor.execute(ingestDocument);
compoundProcessor.execute(ingestDocument, (result, e) -> {});
verify(relativeTimeProvider, times(2)).getAsLong();
assertThat(processor.getInvokedCounter(), equalTo(1));
assertStats(compoundProcessor, 1, 0, 1);
@ -82,12 +83,9 @@ public class CompoundProcessorTests extends ESTestCase {
assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor));
assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true));
try {
compoundProcessor.execute(ingestDocument);
fail("should throw exception");
} catch (ElasticsearchException e) {
assertThat(e.getRootCause().getMessage(), equalTo("error"));
}
Exception[] holder = new Exception[1];
compoundProcessor.execute(ingestDocument, (result, e) -> holder[0] = e);
assertThat(((ElasticsearchException) holder[0]).getRootCause().getMessage(), equalTo("error"));
assertThat(processor.getInvokedCounter(), equalTo(1));
assertStats(compoundProcessor, 1, 1, 0);
@ -100,7 +98,7 @@ public class CompoundProcessorTests extends ESTestCase {
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor compoundProcessor =
new CompoundProcessor(true, Arrays.asList(processor1, processor2), Collections.emptyList(), relativeTimeProvider);
compoundProcessor.execute(ingestDocument);
compoundProcessor.execute(ingestDocument, (result, e) -> {});
assertThat(processor1.getInvokedCounter(), equalTo(1));
assertStats(0, compoundProcessor, 0, 1, 1, 0);
assertThat(processor2.getInvokedCounter(), equalTo(1));
@ -122,7 +120,7 @@ public class CompoundProcessorTests extends ESTestCase {
when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1));
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1),
Collections.singletonList(processor2), relativeTimeProvider);
compoundProcessor.execute(ingestDocument);
compoundProcessor.execute(ingestDocument, (result, e) -> {});
verify(relativeTimeProvider, times(2)).getAsLong();
assertThat(processor1.getInvokedCounter(), equalTo(1));
@ -154,7 +152,9 @@ public class CompoundProcessorTests extends ESTestCase {
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1),
Collections.singletonList(processor2), relativeTimeProvider);
assertNull(compoundProcessor.execute(ingestDocument));
IngestDocument[] result = new IngestDocument[1];
compoundProcessor.execute(ingestDocument, (r, e) -> result[0] = r);
assertThat(result[0], nullValue());
assertThat(processor1.getInvokedCounter(), equalTo(1));
assertStats(compoundProcessor, 1, 1, 0);
}
@ -182,7 +182,7 @@ public class CompoundProcessorTests extends ESTestCase {
Collections.singletonList(lastProcessor), relativeTimeProvider);
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor),
Collections.singletonList(compoundOnFailProcessor), relativeTimeProvider);
compoundProcessor.execute(ingestDocument);
compoundProcessor.execute(ingestDocument, (result, e) -> {});
assertThat(processorToFail.getInvokedCounter(), equalTo(1));
assertThat(lastProcessor.getInvokedCounter(), equalTo(1));
@ -205,7 +205,7 @@ public class CompoundProcessorTests extends ESTestCase {
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor),
Collections.singletonList(secondProcessor), relativeTimeProvider);
compoundProcessor.execute(ingestDocument);
compoundProcessor.execute(ingestDocument, (result, e) -> {});
assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
assertThat(secondProcessor.getInvokedCounter(), equalTo(1));
@ -231,7 +231,7 @@ public class CompoundProcessorTests extends ESTestCase {
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor),
Collections.singletonList(secondProcessor), relativeTimeProvider);
compoundProcessor.execute(ingestDocument);
compoundProcessor.execute(ingestDocument, (result, e) -> {});
assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
assertThat(secondProcessor.getInvokedCounter(), equalTo(1));
@ -257,7 +257,7 @@ public class CompoundProcessorTests extends ESTestCase {
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor),
Collections.singletonList(secondProcessor), relativeTimeProvider);
compoundProcessor.execute(ingestDocument);
compoundProcessor.execute(ingestDocument, (result, e) -> {});
assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
assertThat(secondProcessor.getInvokedCounter(), equalTo(1));
@ -272,7 +272,7 @@ public class CompoundProcessorTests extends ESTestCase {
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor pipeline = new CompoundProcessor(false, Arrays.asList(firstProcessor, secondProcessor),
Collections.singletonList(onFailureProcessor), relativeTimeProvider);
pipeline.execute(ingestDocument);
pipeline.execute(ingestDocument, (result, e) -> {});
assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
assertThat(secondProcessor.getInvokedCounter(), equalTo(0));
assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1));

View File

@ -98,7 +98,7 @@ public class ConditionalProcessorTests extends ESTestCase {
String falseValue = "falsy";
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
ingestDocument.setFieldValue(conditionalField, falseValue);
processor.execute(ingestDocument);
processor.execute(ingestDocument, (result, e) -> {});
assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(falseValue));
assertThat(ingestDocument.getSourceAndMetadata(), not(hasKey("foo")));
assertStats(processor, 0, 0, 0);
@ -106,13 +106,13 @@ public class ConditionalProcessorTests extends ESTestCase {
ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
ingestDocument.setFieldValue(conditionalField, falseValue);
ingestDocument.setFieldValue("error", true);
processor.execute(ingestDocument);
processor.execute(ingestDocument, (result, e) -> {});
assertStats(processor, 0, 0, 0);
//true, always call processor and increments metrics
ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
ingestDocument.setFieldValue(conditionalField, trueValue);
processor.execute(ingestDocument);
processor.execute(ingestDocument, (result, e) -> {});
assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(trueValue));
assertThat(ingestDocument.getSourceAndMetadata().get("foo"), is("bar"));
assertStats(processor, 1, 0, 1);
@ -121,7 +121,9 @@ public class ConditionalProcessorTests extends ESTestCase {
ingestDocument.setFieldValue(conditionalField, trueValue);
ingestDocument.setFieldValue("error", true);
IngestDocument finalIngestDocument = ingestDocument;
expectThrows(RuntimeException.class, () -> processor.execute(finalIngestDocument));
Exception holder[] = new Exception[1];
processor.execute(finalIngestDocument, (result, e) -> {holder[0] = e;});
assertThat(holder[0], instanceOf(RuntimeException.class));
assertStats(processor, 2, 1, 2);
}
@ -177,7 +179,7 @@ public class ConditionalProcessorTests extends ESTestCase {
}, relativeTimeProvider);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
processor.execute(ingestDocument);
processor.execute(ingestDocument, (result, e) -> {});
assertWarnings("[types removal] Looking up doc types [_type] in scripts is deprecated.");
}
@ -213,7 +215,7 @@ public class ConditionalProcessorTests extends ESTestCase {
);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
ingestDocument.setFieldValue("listField", new ArrayList<>());
processor.execute(ingestDocument);
processor.execute(ingestDocument, (result, e) -> {});
Exception e = expectedException.get();
assertThat(e, instanceOf(UnsupportedOperationException.class));
assertEquals("Mutating ingest documents in conditionals is not supported", e.getMessage());

View File

@ -73,6 +73,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.IntConsumer;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
@ -86,6 +87,7 @@ import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
@ -135,20 +137,20 @@ public class IngestServiceTests extends ESTestCase {
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
final SetOnce<Boolean> failure = new SetOnce<>();
final BiConsumer<IndexRequest, Exception> failureHandler = (request, e) -> {
final BiConsumer<Integer, Exception> failureHandler = (slot, e) -> {
failure.set(true);
assertThat(request, sameInstance(indexRequest));
assertThat(slot, equalTo(0));
assertThat(e, instanceOf(IllegalArgumentException.class));
assertThat(e.getMessage(), equalTo("pipeline with id [_id] does not exist"));
};
@SuppressWarnings("unchecked")
final Consumer<Exception> completionHandler = mock(Consumer.class);
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
assertTrue(failure.get());
verify(completionHandler, times(1)).accept(null);
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
}
public void testUpdatePipelines() {
@ -641,7 +643,7 @@ public class IngestServiceTests extends ESTestCase {
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
final SetOnce<Boolean> failure = new SetOnce<>();
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline(id);
final BiConsumer<IndexRequest, Exception> failureHandler = (request, e) -> {
final BiConsumer<Integer, Exception> failureHandler = (slot, e) -> {
assertThat(e.getCause(), instanceOf(IllegalArgumentException.class));
assertThat(e.getCause().getCause(), instanceOf(IllegalStateException.class));
assertThat(e.getCause().getCause().getMessage(), equalTo("error"));
@ -649,17 +651,17 @@ public class IngestServiceTests extends ESTestCase {
};
@SuppressWarnings("unchecked")
final Consumer<Exception> completionHandler = mock(Consumer.class);
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
assertTrue(failure.get());
verify(completionHandler, times(1)).accept(null);
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
}
public void testExecuteBulkPipelineDoesNotExist() {
IngestService ingestService = createWithProcessors(Collections.singletonMap(
"mock", (factories, tag, config) -> mock(CompoundProcessor.class)));
"mock", (factories, tag, config) -> mockCompoundProcessor()));
PutPipelineRequest putRequest = new PutPipelineRequest("_id",
new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON);
@ -676,15 +678,16 @@ public class IngestServiceTests extends ESTestCase {
new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("does_not_exist");
bulkRequest.add(indexRequest2);
@SuppressWarnings("unchecked")
BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
Consumer<Exception> completionHandler = mock(Consumer.class);
ingestService.executeBulkRequest(bulkRequest.requests(), failureHandler, completionHandler, indexReq -> {});
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
ingestService.executeBulkRequest(bulkRequest.numberOfActions(), bulkRequest.requests(), failureHandler,
completionHandler, indexReq -> {});
verify(failureHandler, times(1)).accept(
argThat(new CustomTypeSafeMatcher<IndexRequest>("failure handler was not called with the expected arguments") {
argThat(new CustomTypeSafeMatcher<Integer>("failure handler was not called with the expected arguments") {
@Override
protected boolean matchesSafely(IndexRequest item) {
return item == indexRequest2;
protected boolean matchesSafely(Integer item) {
return item == 1;
}
}),
@ -695,12 +698,12 @@ public class IngestServiceTests extends ESTestCase {
}
})
);
verify(completionHandler, times(1)).accept(null);
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
}
public void testExecuteSuccess() {
IngestService ingestService = createWithProcessors(Collections.singletonMap(
"mock", (factories, tag, config) -> mock(CompoundProcessor.class)));
"mock", (factories, tag, config) -> mockCompoundProcessor()));
PutPipelineRequest putRequest = new PutPipelineRequest("_id",
new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON);
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
@ -709,12 +712,12 @@ public class IngestServiceTests extends ESTestCase {
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
@SuppressWarnings("unchecked")
final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
final Consumer<Exception> completionHandler = mock(Consumer.class);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
verify(failureHandler, never()).accept(any(), any());
verify(completionHandler, times(1)).accept(null);
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
}
public void testExecuteEmptyPipeline() throws Exception {
@ -727,16 +730,16 @@ public class IngestServiceTests extends ESTestCase {
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
@SuppressWarnings("unchecked")
final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
final Consumer<Exception> completionHandler = mock(Consumer.class);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
verify(failureHandler, never()).accept(any(), any());
verify(completionHandler, times(1)).accept(null);
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
}
public void testExecutePropagateAllMetaDataUpdates() throws Exception {
final CompoundProcessor processor = mock(CompoundProcessor.class);
final CompoundProcessor processor = mockCompoundProcessor();
IngestService ingestService = createWithProcessors(Collections.singletonMap(
"mock", (factories, tag, config) -> processor));
PutPipelineRequest putRequest = new PutPipelineRequest("_id",
@ -758,17 +761,21 @@ public class IngestServiceTests extends ESTestCase {
ingestDocument.setFieldValue(metaData.getFieldName(), "update" + metaData.getFieldName());
}
}
return ingestDocument;
}).when(processor).execute(any());
@SuppressWarnings("unchecked")
BiConsumer<IngestDocument, Exception> handler = (BiConsumer<IngestDocument, Exception>) invocationOnMock.getArguments()[1];
handler.accept(ingestDocument, null);
return null;
}).when(processor).execute(any(), any());
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
@SuppressWarnings("unchecked")
final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
final Consumer<Exception> completionHandler = mock(Consumer.class);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
verify(processor).execute(any());
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
verify(processor).execute(any(), any());
verify(failureHandler, never()).accept(any(), any());
verify(completionHandler, times(1)).accept(null);
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
assertThat(indexRequest.index(), equalTo("update_index"));
assertThat(indexRequest.type(), equalTo("update_type"));
assertThat(indexRequest.id(), equalTo("update_id"));
@ -778,7 +785,7 @@ public class IngestServiceTests extends ESTestCase {
}
public void testExecuteFailure() throws Exception {
final CompoundProcessor processor = mock(CompoundProcessor.class);
final CompoundProcessor processor = mockCompoundProcessor();
IngestService ingestService = createWithProcessors(Collections.singletonMap(
"mock", (factories, tag, config) -> processor));
PutPipelineRequest putRequest = new PutPipelineRequest("_id",
@ -790,22 +797,37 @@ public class IngestServiceTests extends ESTestCase {
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
doThrow(new RuntimeException())
.when(processor)
.execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()));
.execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any());
@SuppressWarnings("unchecked")
final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
final Consumer<Exception> completionHandler = mock(Consumer.class);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()));
verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class));
verify(completionHandler, times(1)).accept(null);
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any());
verify(failureHandler, times(1)).accept(eq(0), any(RuntimeException.class));
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
}
public void testExecuteSuccessWithOnFailure() throws Exception {
final Processor processor = mock(Processor.class);
when(processor.getType()).thenReturn("mock_processor_type");
when(processor.getTag()).thenReturn("mock_processor_tag");
doAnswer(args -> {
@SuppressWarnings("unchecked")
BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
handler.accept(null, new RuntimeException());
return null;
}).when(processor).execute(eqIndexTypeId(emptyMap()), any());
final Processor onFailureProcessor = mock(Processor.class);
doAnswer(args -> {
IngestDocument ingestDocument = (IngestDocument) args.getArguments()[0];
@SuppressWarnings("unchecked")
BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
handler.accept(ingestDocument, null);
return null;
}).when(onFailureProcessor).execute(eqIndexTypeId(emptyMap()), any());
final CompoundProcessor compoundProcessor = new CompoundProcessor(
false, Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor)));
IngestService ingestService = createWithProcessors(Collections.singletonMap(
@ -817,14 +839,13 @@ public class IngestServiceTests extends ESTestCase {
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
doThrow(new RuntimeException()).when(processor).execute(eqIndexTypeId(emptyMap()));
@SuppressWarnings("unchecked")
final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
final Consumer<Exception> completionHandler = mock(Consumer.class);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
verify(failureHandler, never()).accept(eq(indexRequest), any(ElasticsearchException.class));
verify(completionHandler, times(1)).accept(null);
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
verify(failureHandler, never()).accept(eq(0), any(ElasticsearchException.class));
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
}
public void testExecuteFailureWithNestedOnFailure() throws Exception {
@ -848,21 +869,21 @@ public class IngestServiceTests extends ESTestCase {
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
doThrow(new RuntimeException())
.when(onFailureOnFailureProcessor)
.execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()));
.execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any());
doThrow(new RuntimeException())
.when(onFailureProcessor)
.execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()));
.execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any());
doThrow(new RuntimeException())
.when(processor)
.execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()));
.execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any());
@SuppressWarnings("unchecked")
final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
final Consumer<Exception> completionHandler = mock(Consumer.class);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()));
verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class));
verify(completionHandler, times(1)).accept(null);
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any());
verify(failureHandler, times(1)).accept(eq(0), any(RuntimeException.class));
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
}
public void testBulkRequestExecutionWithFailures() throws Exception {
@ -891,7 +912,12 @@ public class IngestServiceTests extends ESTestCase {
CompoundProcessor processor = mock(CompoundProcessor.class);
when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class)));
Exception error = new RuntimeException();
doThrow(error).when(processor).execute(any());
doAnswer(args -> {
@SuppressWarnings("unchecked")
BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
handler.accept(null, error);
return null;
}).when(processor).execute(any(), any());
IngestService ingestService = createWithProcessors(Collections.singletonMap(
"mock", (factories, tag, config) -> processor));
PutPipelineRequest putRequest = new PutPipelineRequest("_id",
@ -902,18 +928,18 @@ public class IngestServiceTests extends ESTestCase {
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
@SuppressWarnings("unchecked")
BiConsumer<IndexRequest, Exception> requestItemErrorHandler = mock(BiConsumer.class);
BiConsumer<Integer, Exception> requestItemErrorHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
Consumer<Exception> completionHandler = mock(Consumer.class);
ingestService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {});
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
ingestService.executeBulkRequest(numRequest, bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {});
verify(requestItemErrorHandler, times(numIndexRequests)).accept(any(IndexRequest.class), argThat(new ArgumentMatcher<Exception>() {
verify(requestItemErrorHandler, times(numIndexRequests)).accept(anyInt(), argThat(new ArgumentMatcher<Exception>() {
@Override
public boolean matches(final Object o) {
return ((Exception)o).getCause().getCause().equals(error);
}
}));
verify(completionHandler, times(1)).accept(null);
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
}
public void testBulkRequestExecution() throws Exception {
@ -936,7 +962,12 @@ public class IngestServiceTests extends ESTestCase {
final Processor processor = mock(Processor.class);
when(processor.getType()).thenReturn("mock");
when(processor.getTag()).thenReturn("mockTag");
when(processor.execute(any(IngestDocument.class))).thenReturn( RandomDocumentPicks.randomIngestDocument(random()));
doAnswer(args -> {
@SuppressWarnings("unchecked")
BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
handler.accept(RandomDocumentPicks.randomIngestDocument(random()), null);
return null;
}).when(processor).execute(any(), any());
Map<String, Processor.Factory> map = new HashMap<>(2);
map.put("mock", (factories, tag, config) -> processor);
@ -949,13 +980,13 @@ public class IngestServiceTests extends ESTestCase {
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
@SuppressWarnings("unchecked")
BiConsumer<IndexRequest, Exception> requestItemErrorHandler = mock(BiConsumer.class);
BiConsumer<Integer, Exception> requestItemErrorHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
Consumer<Exception> completionHandler = mock(Consumer.class);
ingestService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {});
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
ingestService.executeBulkRequest(numRequest, bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {});
verify(requestItemErrorHandler, never()).accept(any(), any());
verify(completionHandler, times(1)).accept(null);
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
for (DocWriteRequest<?> docWriteRequest : bulkRequest.requests()) {
IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(docWriteRequest);
assertThat(indexRequest, notNullValue());
@ -970,8 +1001,18 @@ public class IngestServiceTests extends ESTestCase {
when(processor.getTag()).thenReturn("mockTag");
when(processorFailure.getType()).thenReturn("failure-mock");
//avoid returning null and dropping the document
when(processor.execute(any(IngestDocument.class))).thenReturn( RandomDocumentPicks.randomIngestDocument(random()));
when(processorFailure.execute(any(IngestDocument.class))).thenThrow(new RuntimeException("error"));
doAnswer(args -> {
@SuppressWarnings("unchecked")
BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
handler.accept(RandomDocumentPicks.randomIngestDocument(random()), null);
return null;
}).when(processor).execute(any(IngestDocument.class), any());
doAnswer(args -> {
@SuppressWarnings("unchecked")
BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
handler.accept(null, new RuntimeException("error"));
return null;
}).when(processorFailure).execute(any(IngestDocument.class), any());
Map<String, Processor.Factory> map = new HashMap<>(2);
map.put("mock", (factories, tag, config) -> processor);
map.put("failure-mock", (factories, tag, config) -> processorFailure);
@ -993,13 +1034,13 @@ public class IngestServiceTests extends ESTestCase {
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
@SuppressWarnings("unchecked") final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked") final Consumer<Exception> completionHandler = mock(Consumer.class);
@SuppressWarnings("unchecked") final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked") final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
final IndexRequest indexRequest = new IndexRequest("_index");
indexRequest.setPipeline("_id1");
indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10));
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
final IngestStats afterFirstRequestStats = ingestService.stats();
assertThat(afterFirstRequestStats.getPipelineStats().size(), equalTo(2));
@ -1017,7 +1058,7 @@ public class IngestServiceTests extends ESTestCase {
indexRequest.setPipeline("_id2");
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
final IngestStats afterSecondRequestStats = ingestService.stats();
assertThat(afterSecondRequestStats.getPipelineStats().size(), equalTo(2));
//total
@ -1036,7 +1077,7 @@ public class IngestServiceTests extends ESTestCase {
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
indexRequest.setPipeline("_id1");
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
final IngestStats afterThirdRequestStats = ingestService.stats();
assertThat(afterThirdRequestStats.getPipelineStats().size(), equalTo(2));
//total
@ -1060,7 +1101,7 @@ public class IngestServiceTests extends ESTestCase {
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
indexRequest.setPipeline("_id1");
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
final IngestStats afterForthRequestStats = ingestService.stats();
assertThat(afterForthRequestStats.getPipelineStats().size(), equalTo(2));
//total
@ -1126,15 +1167,15 @@ public class IngestServiceTests extends ESTestCase {
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
@SuppressWarnings("unchecked")
final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
final Consumer<Exception> completionHandler = mock(Consumer.class);
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
final Consumer<IndexRequest> dropHandler = mock(Consumer.class);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, dropHandler);
final IntConsumer dropHandler = mock(IntConsumer.class);
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, dropHandler);
verify(failureHandler, never()).accept(any(), any());
verify(completionHandler, times(1)).accept(null);
verify(dropHandler, times(1)).accept(indexRequest);
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
verify(dropHandler, times(1)).accept(0);
}
public void testIngestClusterStateListeners_orderOfExecution() {
@ -1176,7 +1217,7 @@ public class IngestServiceTests extends ESTestCase {
}
private IngestDocument eqIndexTypeId(final Map<String, Object> source) {
return argThat(new IngestDocumentMatcher("_index", "_type", "_id", source));
return argThat(new IngestDocumentMatcher("_index", "_type", "_id", -3L, VersionType.INTERNAL, source));
}
private IngestDocument eqIndexTypeId(final Long version, final VersionType versionType, final Map<String, Object> source) {
@ -1212,6 +1253,17 @@ public class IngestServiceTests extends ESTestCase {
}), client);
}
private CompoundProcessor mockCompoundProcessor() {
CompoundProcessor processor = mock(CompoundProcessor.class);
doAnswer(args -> {
@SuppressWarnings("unchecked")
BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
handler.accept((IngestDocument) args.getArguments()[0], null);
return null;
}).when(processor).execute(any(), any());
return processor;
}
private class IngestDocumentMatcher extends ArgumentMatcher<IngestDocument> {
private final IngestDocument ingestDocument;

View File

@ -64,7 +64,7 @@ public class PipelineProcessorTests extends ESTestCase {
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
Map<String, Object> config = new HashMap<>();
config.put("name", pipelineId);
factory.create(Collections.emptyMap(), null, config).execute(testIngestDocument);
factory.create(Collections.emptyMap(), null, config).execute(testIngestDocument, (result, e) -> {});
assertEquals(testIngestDocument, invoked.get());
}
@ -74,12 +74,11 @@ public class PipelineProcessorTests extends ESTestCase {
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
Map<String, Object> config = new HashMap<>();
config.put("name", "missingPipelineId");
IllegalStateException e = expectThrows(
IllegalStateException.class,
() -> factory.create(Collections.emptyMap(), null, config).execute(testIngestDocument)
);
IllegalStateException[] e = new IllegalStateException[1];
factory.create(Collections.emptyMap(), null, config)
.execute(testIngestDocument, (result, e1) -> e[0] = (IllegalStateException) e1);
assertEquals(
"Pipeline processor configured for non-existent pipeline [missingPipelineId]", e.getMessage()
"Pipeline processor configured for non-existent pipeline [missingPipelineId]", e[0].getMessage()
);
}
@ -104,12 +103,11 @@ public class PipelineProcessorTests extends ESTestCase {
when(ingestService.getPipeline(outerPipelineId)).thenReturn(outer);
when(ingestService.getPipeline(innerPipelineId)).thenReturn(inner);
outerConfig.put("name", innerPipelineId);
ElasticsearchException e = expectThrows(
ElasticsearchException.class,
() -> factory.create(Collections.emptyMap(), null, outerConfig).execute(testIngestDocument)
);
ElasticsearchException[] e = new ElasticsearchException[1];
factory.create(Collections.emptyMap(), null, outerConfig)
.execute(testIngestDocument, (result, e1) -> e[0] = (ElasticsearchException) e1);
assertEquals(
"Cycle detected for pipeline: inner", e.getRootCause().getMessage()
"Cycle detected for pipeline: inner", e[0].getRootCause().getMessage()
);
}
@ -125,8 +123,8 @@ public class PipelineProcessorTests extends ESTestCase {
);
when(ingestService.getPipeline(innerPipelineId)).thenReturn(inner);
Processor outerProc = factory.create(Collections.emptyMap(), null, outerConfig);
outerProc.execute(testIngestDocument);
outerProc.execute(testIngestDocument);
outerProc.execute(testIngestDocument, (result, e) -> {});
outerProc.execute(testIngestDocument, (result, e) -> {});
}
public void testPipelineProcessorWithPipelineChain() throws Exception {
@ -177,7 +175,7 @@ public class PipelineProcessorTests extends ESTestCase {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
//start the chain
ingestDocument.executePipeline(pipeline1);
ingestDocument.executePipeline(pipeline1, (result, e) -> {});
assertNotNull(ingestDocument.getSourceAndMetadata().get(key1));
//check the stats

View File

@ -66,7 +66,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
public void testActualProcessor() throws Exception {
TestProcessor actualProcessor = new TestProcessor(ingestDocument -> {});
TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, resultList);
trackingProcessor.execute(ingestDocument);
trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
@ -84,12 +84,9 @@ public class TrackingResultProcessorTests extends ESTestCase {
CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor);
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
try {
trackingProcessor.execute(ingestDocument);
fail("processor should throw exception");
} catch (ElasticsearchException e) {
assertThat(e.getRootCause().getMessage(), equalTo(exception.getMessage()));
}
Exception[] holder = new Exception[1];
trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e);
assertThat(((ElasticsearchException) holder[0]).getRootCause().getMessage(), equalTo(exception.getMessage()));
SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument);
assertThat(testProcessor.getInvokedCounter(), equalTo(1));
@ -109,7 +106,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
Arrays.asList(onFailureProcessor, failProcessor))),
Arrays.asList(onFailureProcessor));
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
trackingProcessor.execute(ingestDocument);
trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument);
SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), ingestDocument);
@ -148,7 +145,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
Collections.emptyList());
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
trackingProcessor.execute(ingestDocument);
trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument);
assertThat(testProcessor.getInvokedCounter(), equalTo(1));
@ -178,7 +175,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key3, randomInt()); }));
CompoundProcessor trackingProcessor = decorate(compoundProcessor, resultList);
trackingProcessor.execute(ingestDocument);
trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(compoundProcessor.getTag(), ingestDocument);
//the step for key 2 is never executed due to conditional and thus not part of the result set
@ -221,7 +218,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
trackingProcessor.execute(ingestDocument);
trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
@ -287,7 +284,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
trackingProcessor.execute(ingestDocument);
trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
@ -355,7 +352,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
trackingProcessor.execute(ingestDocument);
trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
@ -407,7 +404,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
trackingProcessor.execute(ingestDocument);
trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
@ -457,7 +454,9 @@ public class TrackingResultProcessorTests extends ESTestCase {
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
ElasticsearchException exception = expectThrows(ElasticsearchException.class, () -> trackingProcessor.execute(ingestDocument));
Exception[] holder = new Exception[1];
trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e);
ElasticsearchException exception = (ElasticsearchException) holder[0];
assertThat(exception.getCause(), instanceOf(IllegalArgumentException.class));
assertThat(exception.getCause().getCause(), instanceOf(IllegalStateException.class));
assertThat(exception.getMessage(), containsString("Cycle detected for pipeline: pipeline1"));
@ -482,7 +481,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
trackingProcessor.execute(ingestDocument);
trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);