Make ingest executing non blocking (#43361)

Added an additional method to the Processor interface to allow a
processor implementation to make a non blocking call.

Also added semaphore in order to avoid search thread pools from rejecting
search requests originating from the match processor. This is a temporary workaround.
This commit is contained in:
Martijn van Groningen 2019-07-01 07:59:50 +02:00
parent adcba69d96
commit 237f2bd60a
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
22 changed files with 814 additions and 471 deletions

View File

@ -28,6 +28,9 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BiConsumer;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
@ -63,29 +66,46 @@ public final class ForEachProcessor extends AbstractProcessor {
} }
@Override @Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception { public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
List<?> values = ingestDocument.getFieldValue(field, List.class, ignoreMissing); List<?> values = ingestDocument.getFieldValue(field, List.class, ignoreMissing);
if (values == null) { if (values == null) {
if (ignoreMissing) { if (ignoreMissing) {
return ingestDocument; handler.accept(ingestDocument, null);
} else {
handler.accept(null, new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements."));
} }
throw new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements."); } else {
} List<Object> newValues = new CopyOnWriteArrayList<>();
List<Object> newValues = new ArrayList<>(values.size()); innerExecute(0, values, newValues, ingestDocument, handler);
IngestDocument document = ingestDocument;
for (Object value : values) {
Object previousValue = ingestDocument.getIngestMetadata().put("_value", value);
try {
document = processor.execute(document);
if (document == null) {
return null;
}
} finally {
newValues.add(ingestDocument.getIngestMetadata().put("_value", previousValue));
} }
} }
document.setFieldValue(field, newValues);
return document; void innerExecute(int index, List<?> values, List<Object> newValues, IngestDocument document,
BiConsumer<IngestDocument, Exception> handler) {
if (index == values.size()) {
document.setFieldValue(field, new ArrayList<>(newValues));
handler.accept(document, null);
return;
}
Object value = values.get(index);
Object previousValue = document.getIngestMetadata().put("_value", value);
processor.execute(document, (result, e) -> {
if (e != null) {
newValues.add(document.getIngestMetadata().put("_value", previousValue));
handler.accept(null, e);
} else if (result == null) {
handler.accept(null, null);
} else {
newValues.add(document.getIngestMetadata().put("_value", previousValue));
innerExecute(index + 1, values, newValues, document, handler);
}
});
}
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
throw new UnsupportedOperationException("this method should not get executed");
} }
@Override @Override

View File

@ -53,7 +53,7 @@ public class ForEachProcessorTests extends ESTestCase {
"_tag", "values", new UppercaseProcessor("_tag", "_ingest._value", false, "_ingest._value"), "_tag", "values", new UppercaseProcessor("_tag", "_ingest._value", false, "_ingest._value"),
false false
); );
processor.execute(ingestDocument); processor.execute(ingestDocument, (result, e) -> {});
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
List<String> result = ingestDocument.getFieldValue("values", List.class); List<String> result = ingestDocument.getFieldValue("values", List.class);
@ -73,12 +73,9 @@ public class ForEachProcessorTests extends ESTestCase {
} }
}); });
ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor, false); ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor, false);
try { Exception[] exceptions = new Exception[1];
processor.execute(ingestDocument); processor.execute(ingestDocument, (result, e) -> {exceptions[0] = e;});
fail("exception expected"); assertThat(exceptions[0].getMessage(), equalTo("failure"));
} catch (RuntimeException e) {
assertThat(e.getMessage(), equalTo("failure"));
}
assertThat(testProcessor.getInvokedCounter(), equalTo(3)); assertThat(testProcessor.getInvokedCounter(), equalTo(3));
assertThat(ingestDocument.getFieldValue("values", List.class), equalTo(Arrays.asList("a", "b", "c"))); assertThat(ingestDocument.getFieldValue("values", List.class), equalTo(Arrays.asList("a", "b", "c")));
@ -95,7 +92,7 @@ public class ForEachProcessorTests extends ESTestCase {
"_tag", "values", new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor)), "_tag", "values", new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor)),
false false
); );
processor.execute(ingestDocument); processor.execute(ingestDocument, (result, e) -> {});
assertThat(testProcessor.getInvokedCounter(), equalTo(3)); assertThat(testProcessor.getInvokedCounter(), equalTo(3));
assertThat(ingestDocument.getFieldValue("values", List.class), equalTo(Arrays.asList("A", "B", "c"))); assertThat(ingestDocument.getFieldValue("values", List.class), equalTo(Arrays.asList("A", "B", "c")));
} }
@ -114,7 +111,7 @@ public class ForEachProcessorTests extends ESTestCase {
id.setFieldValue("_ingest._value.id", id.getSourceAndMetadata().get("_id")); id.setFieldValue("_ingest._value.id", id.getSourceAndMetadata().get("_id"));
}); });
ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false); ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false);
processor.execute(ingestDocument); processor.execute(ingestDocument, (result, e) -> {});
assertThat(innerProcessor.getInvokedCounter(), equalTo(2)); assertThat(innerProcessor.getInvokedCounter(), equalTo(2));
assertThat(ingestDocument.getFieldValue("values.0.index", String.class), equalTo("_index")); assertThat(ingestDocument.getFieldValue("values.0.index", String.class), equalTo("_index"));
@ -142,7 +139,7 @@ public class ForEachProcessorTests extends ESTestCase {
"_tag", "values", new SetProcessor("_tag", "_tag", "values", new SetProcessor("_tag",
new TestTemplateService.MockTemplateScript.Factory("_ingest._value.new_field"), new TestTemplateService.MockTemplateScript.Factory("_ingest._value.new_field"),
(model) -> model.get("other")), false); (model) -> model.get("other")), false);
processor.execute(ingestDocument); processor.execute(ingestDocument, (result, e) -> {});
assertThat(ingestDocument.getFieldValue("values.0.new_field", String.class), equalTo("value")); assertThat(ingestDocument.getFieldValue("values.0.new_field", String.class), equalTo("value"));
assertThat(ingestDocument.getFieldValue("values.1.new_field", String.class), equalTo("value")); assertThat(ingestDocument.getFieldValue("values.1.new_field", String.class), equalTo("value"));
@ -180,7 +177,7 @@ public class ForEachProcessorTests extends ESTestCase {
); );
ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false); ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false);
processor.execute(ingestDocument); processor.execute(ingestDocument, (result, e) -> {});
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
List<String> result = ingestDocument.getFieldValue("values", List.class); List<String> result = ingestDocument.getFieldValue("values", List.class);
assertThat(result.size(), equalTo(numValues)); assertThat(result.size(), equalTo(numValues));
@ -205,7 +202,7 @@ public class ForEachProcessorTests extends ESTestCase {
Collections.singletonList(new UppercaseProcessor("_tag_upper", "_ingest._value", false, "_ingest._value")), Collections.singletonList(new UppercaseProcessor("_tag_upper", "_ingest._value", false, "_ingest._value")),
Collections.singletonList(new AppendProcessor("_tag", template, (model) -> (Collections.singletonList("added")))) Collections.singletonList(new AppendProcessor("_tag", template, (model) -> (Collections.singletonList("added"))))
), false); ), false);
processor.execute(ingestDocument); processor.execute(ingestDocument, (result, e) -> {});
List<?> result = ingestDocument.getFieldValue("values", List.class); List<?> result = ingestDocument.getFieldValue("values", List.class);
assertThat(result.get(0), equalTo("STRING")); assertThat(result.get(0), equalTo("STRING"));
@ -231,7 +228,7 @@ public class ForEachProcessorTests extends ESTestCase {
TestProcessor processor = new TestProcessor(doc -> doc.setFieldValue("_ingest._value", TestProcessor processor = new TestProcessor(doc -> doc.setFieldValue("_ingest._value",
doc.getFieldValue("_source._value", String.class))); doc.getFieldValue("_source._value", String.class)));
ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor, false); ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor, false);
forEachProcessor.execute(ingestDocument); forEachProcessor.execute(ingestDocument, (result, e) -> {});
List<?> result = ingestDocument.getFieldValue("values", List.class); List<?> result = ingestDocument.getFieldValue("values", List.class);
assertThat(result.get(0), equalTo("new_value")); assertThat(result.get(0), equalTo("new_value"));
@ -264,7 +261,7 @@ public class ForEachProcessorTests extends ESTestCase {
); );
ForEachProcessor processor = new ForEachProcessor( ForEachProcessor processor = new ForEachProcessor(
"_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor, false), false); "_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor, false), false);
processor.execute(ingestDocument); processor.execute(ingestDocument, (result, e) -> {});
List<?> result = ingestDocument.getFieldValue("values1.0.values2", List.class); List<?> result = ingestDocument.getFieldValue("values1.0.values2", List.class);
assertThat(result.get(0), equalTo("ABC")); assertThat(result.get(0), equalTo("ABC"));
@ -282,7 +279,7 @@ public class ForEachProcessorTests extends ESTestCase {
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
TestProcessor testProcessor = new TestProcessor(doc -> {}); TestProcessor testProcessor = new TestProcessor(doc -> {});
ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", testProcessor, true); ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", testProcessor, true);
processor.execute(ingestDocument); processor.execute(ingestDocument, (result, e) -> {});
assertIngestDocument(originalIngestDocument, ingestDocument); assertIngestDocument(originalIngestDocument, ingestDocument);
assertThat(testProcessor.getInvokedCounter(), equalTo(0)); assertThat(testProcessor.getInvokedCounter(), equalTo(0));
} }

View File

@ -19,6 +19,8 @@
package org.elasticsearch.action.bulk; package org.elasticsearch.action.bulk;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.SparseFixedBitSet; import org.apache.lucene.util.SparseFixedBitSet;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
@ -80,6 +82,7 @@ import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.function.LongSupplier; import java.util.function.LongSupplier;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -577,14 +580,13 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
} }
void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListener<BulkResponse> listener) { void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListener<BulkResponse> listener) {
long ingestStartTimeInNanos = System.nanoTime(); final long ingestStartTimeInNanos = System.nanoTime();
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original); final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
ingestService.executeBulkRequest(() -> bulkRequestModifier, ingestService.executeBulkRequest(
(indexRequest, exception) -> { original.numberOfActions(),
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]", () -> bulkRequestModifier,
indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), exception); bulkRequestModifier::markItemAsFailed,
bulkRequestModifier.markCurrentItemAsFailed(exception); (originalThread, exception) -> {
}, (exception) -> {
if (exception != null) { if (exception != null) {
logger.error("failed to execute pipeline for a bulk request", exception); logger.error("failed to execute pipeline for a bulk request", exception);
listener.onFailure(exception); listener.onFailure(exception);
@ -599,26 +601,56 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
// (this will happen if pre-processing all items in the bulk failed) // (this will happen if pre-processing all items in the bulk failed)
actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0)); actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0));
} else { } else {
// If a processor went async and returned a response on a different thread then
// before we continue the bulk request we should fork back on a write thread:
if (originalThread == Thread.currentThread()) {
assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE);
doExecute(task, bulkRequest, actionListener); doExecute(task, bulkRequest, actionListener);
} else {
threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
@Override
protected void doRun() throws Exception {
doExecute(task, bulkRequest, actionListener);
}
@Override
public boolean isForceExecution() {
// If we fork back to a write thread we **not** should fail, because tp queue is full.
// (Otherwise the work done during ingest will be lost)
// It is okay to force execution here. Throttling of write requests happens prior to
// ingest when a node receives a bulk request.
return true;
}
});
}
} }
} }
}, },
indexRequest -> bulkRequestModifier.markCurrentItemAsDropped()); bulkRequestModifier::markItemAsDropped
);
} }
static final class BulkRequestModifier implements Iterator<DocWriteRequest<?>> { static final class BulkRequestModifier implements Iterator<DocWriteRequest<?>> {
private static final Logger LOGGER = LogManager.getLogger(BulkRequestModifier.class);
final BulkRequest bulkRequest; final BulkRequest bulkRequest;
final SparseFixedBitSet failedSlots; final SparseFixedBitSet failedSlots;
final List<BulkItemResponse> itemResponses; final List<BulkItemResponse> itemResponses;
final AtomicIntegerArray originalSlots;
int currentSlot = -1; volatile int currentSlot = -1;
int[] originalSlots;
BulkRequestModifier(BulkRequest bulkRequest) { BulkRequestModifier(BulkRequest bulkRequest) {
this.bulkRequest = bulkRequest; this.bulkRequest = bulkRequest;
this.failedSlots = new SparseFixedBitSet(bulkRequest.requests().size()); this.failedSlots = new SparseFixedBitSet(bulkRequest.requests().size());
this.itemResponses = new ArrayList<>(bulkRequest.requests().size()); this.itemResponses = new ArrayList<>(bulkRequest.requests().size());
this.originalSlots = new AtomicIntegerArray(bulkRequest.requests().size()); // oversize, but that's ok
} }
@Override @Override
@ -642,12 +674,11 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
int slot = 0; int slot = 0;
List<DocWriteRequest<?>> requests = bulkRequest.requests(); List<DocWriteRequest<?>> requests = bulkRequest.requests();
originalSlots = new int[requests.size()]; // oversize, but that's ok
for (int i = 0; i < requests.size(); i++) { for (int i = 0; i < requests.size(); i++) {
DocWriteRequest<?> request = requests.get(i); DocWriteRequest<?> request = requests.get(i);
if (failedSlots.get(i) == false) { if (failedSlots.get(i) == false) {
modifiedBulkRequest.add(request); modifiedBulkRequest.add(request);
originalSlots[slot++] = i; originalSlots.set(slot++, i);
} }
} }
return modifiedBulkRequest; return modifiedBulkRequest;
@ -662,7 +693,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
return ActionListener.delegateFailure(actionListener, (delegatedListener, response) -> { return ActionListener.delegateFailure(actionListener, (delegatedListener, response) -> {
BulkItemResponse[] items = response.getItems(); BulkItemResponse[] items = response.getItems();
for (int i = 0; i < items.length; i++) { for (int i = 0; i < items.length; i++) {
itemResponses.add(originalSlots[i], response.getItems()[i]); itemResponses.add(originalSlots.get(i), response.getItems()[i]);
} }
delegatedListener.onResponse( delegatedListener.onResponse(
new BulkResponse( new BulkResponse(
@ -671,11 +702,11 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
} }
} }
void markCurrentItemAsDropped() { synchronized void markItemAsDropped(int slot) {
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(currentSlot)); IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot));
failedSlots.set(currentSlot); failedSlots.set(slot);
itemResponses.add( itemResponses.add(
new BulkItemResponse(currentSlot, indexRequest.opType(), new BulkItemResponse(slot, indexRequest.opType(),
new UpdateResponse( new UpdateResponse(
new ShardId(indexRequest.index(), IndexMetaData.INDEX_UUID_NA_VALUE, 0), new ShardId(indexRequest.index(), IndexMetaData.INDEX_UUID_NA_VALUE, 0),
indexRequest.type(), indexRequest.id(), indexRequest.version(), DocWriteResponse.Result.NOOP indexRequest.type(), indexRequest.id(), indexRequest.version(), DocWriteResponse.Result.NOOP
@ -684,16 +715,19 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
); );
} }
void markCurrentItemAsFailed(Exception e) { synchronized void markItemAsFailed(int slot, Exception e) {
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(currentSlot)); IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot));
LOGGER.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]",
indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), e);
// We hit a error during preprocessing a request, so we: // We hit a error during preprocessing a request, so we:
// 1) Remember the request item slot from the bulk, so that we're done processing all requests we know what failed // 1) Remember the request item slot from the bulk, so that we're done processing all requests we know what failed
// 2) Add a bulk item failure for this request // 2) Add a bulk item failure for this request
// 3) Continue with the next request in the bulk. // 3) Continue with the next request in the bulk.
failedSlots.set(currentSlot); failedSlots.set(slot);
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(),
indexRequest.id(), e); indexRequest.id(), e);
itemResponses.add(new BulkItemResponse(currentSlot, indexRequest.opType(), failure)); itemResponses.add(new BulkItemResponse(slot, indexRequest.opType(), failure));
} }
} }

View File

@ -26,8 +26,10 @@ import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import static org.elasticsearch.ingest.TrackingResultProcessor.decorate; import static org.elasticsearch.ingest.TrackingResultProcessor.decorate;
@ -41,40 +43,44 @@ class SimulateExecutionService {
this.threadPool = threadPool; this.threadPool = threadPool;
} }
SimulateDocumentResult executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose) { void executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose,
BiConsumer<SimulateDocumentResult, Exception> handler) {
if (verbose) { if (verbose) {
List<SimulateProcessorResult> processorResultList = new ArrayList<>(); List<SimulateProcessorResult> processorResultList = new CopyOnWriteArrayList<>();
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList); CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList);
try {
Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(),
verbosePipelineProcessor); verbosePipelineProcessor);
ingestDocument.executePipeline(verbosePipeline); ingestDocument.executePipeline(verbosePipeline, (result, e) -> {
return new SimulateDocumentVerboseResult(processorResultList); handler.accept(new SimulateDocumentVerboseResult(processorResultList), e);
} catch (Exception e) { });
return new SimulateDocumentVerboseResult(processorResultList);
}
} else { } else {
try { pipeline.execute(ingestDocument, (result, e) -> {
pipeline.execute(ingestDocument); if (e == null) {
return new SimulateDocumentBaseResult(ingestDocument); handler.accept(new SimulateDocumentBaseResult(ingestDocument), null);
} catch (Exception e) { } else {
return new SimulateDocumentBaseResult(e); handler.accept(new SimulateDocumentBaseResult(e), null);
} }
} });
} }
}
public void execute(SimulatePipelineRequest.Parsed request, ActionListener<SimulatePipelineResponse> listener) {
threadPool.executor(THREAD_POOL_NAME).execute(new ActionRunnable<SimulatePipelineResponse>(listener) { public void execute(SimulatePipelineRequest.Parsed request, ActionListener<SimulatePipelineResponse> listener) {
@Override threadPool.executor(THREAD_POOL_NAME).execute(new ActionRunnable<>(listener) {
protected void doRun() throws Exception { @Override
List<SimulateDocumentResult> responses = new ArrayList<>(); protected void doRun() {
for (IngestDocument ingestDocument : request.getDocuments()) { final AtomicInteger counter = new AtomicInteger();
SimulateDocumentResult response = executeDocument(request.getPipeline(), ingestDocument, request.isVerbose()); final List<SimulateDocumentResult> responses = new CopyOnWriteArrayList<>();
if (response != null) { for (IngestDocument ingestDocument : request.getDocuments()) {
responses.add(response); executeDocument(request.getPipeline(), ingestDocument, request.isVerbose(), (response, e) -> {
} if (response != null) {
} responses.add(response);
listener.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(), request.isVerbose(), responses)); }
if (counter.incrementAndGet() == request.getDocuments().size()) {
listener.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(),
request.isVerbose(), responses));
}
});
}
} }
}); });
} }

View File

@ -28,6 +28,7 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier; import java.util.function.LongSupplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -114,58 +115,78 @@ public class CompoundProcessor implements Processor {
@Override @Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception { public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
for (Tuple<Processor, IngestMetric> processorWithMetric : processorsWithMetrics) { throw new UnsupportedOperationException("this method should not get executed");
Processor processor = processorWithMetric.v1();
IngestMetric metric = processorWithMetric.v2();
long startTimeInNanos = relativeTimeProvider.getAsLong();
try {
metric.preIngest();
if (processor.execute(ingestDocument) == null) {
return null;
}
} catch (Exception e) {
metric.ingestFailed();
if (ignoreFailure) {
continue;
} }
@Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
innerExecute(0, ingestDocument, handler);
}
void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
if (currentProcessor == processorsWithMetrics.size()) {
handler.accept(ingestDocument, null);
return;
}
Tuple<Processor, IngestMetric> processorWithMetric = processorsWithMetrics.get(currentProcessor);
final Processor processor = processorWithMetric.v1();
final IngestMetric metric = processorWithMetric.v2();
final long startTimeInNanos = relativeTimeProvider.getAsLong();
metric.preIngest();
processor.execute(ingestDocument, (result, e) -> {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
metric.postIngest(ingestTimeInMillis);
if (e != null) {
metric.ingestFailed();
if (ignoreFailure) {
innerExecute(currentProcessor + 1, ingestDocument, handler);
} else {
ElasticsearchException compoundProcessorException = ElasticsearchException compoundProcessorException =
newCompoundProcessorException(e, processor.getType(), processor.getTag()); newCompoundProcessorException(e, processor.getType(), processor.getTag());
if (onFailureProcessors.isEmpty()) { if (onFailureProcessors.isEmpty()) {
throw compoundProcessorException; handler.accept(null, compoundProcessorException);
} else { } else {
if (executeOnFailure(ingestDocument, compoundProcessorException) == false) { executeOnFailureAsync(0, ingestDocument, compoundProcessorException, handler);
return null;
}
break;
}
} finally {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
metric.postIngest(ingestTimeInMillis);
} }
} }
return ingestDocument; } else {
if (result != null) {
innerExecute(currentProcessor + 1, result, handler);
} else {
handler.accept(null, null);
}
}
});
} }
/** void executeOnFailureAsync(int currentOnFailureProcessor, IngestDocument ingestDocument, ElasticsearchException exception,
* @return true if execution should continue, false if document is dropped. BiConsumer<IngestDocument, Exception> handler) {
*/ if (currentOnFailureProcessor == 0) {
boolean executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception {
try {
putFailureMetadata(ingestDocument, exception); putFailureMetadata(ingestDocument, exception);
for (Processor processor : onFailureProcessors) {
try {
if (processor.execute(ingestDocument) == null) {
return false;
} }
} catch (Exception e) {
throw newCompoundProcessorException(e, processor.getType(), processor.getTag()); if (currentOnFailureProcessor == onFailureProcessors.size()) {
}
}
} finally {
removeFailureMetadata(ingestDocument); removeFailureMetadata(ingestDocument);
handler.accept(ingestDocument, null);
return;
} }
return true;
final Processor onFailureProcessor = onFailureProcessors.get(currentOnFailureProcessor);
onFailureProcessor.execute(ingestDocument, (result, e) -> {
if (e != null) {
removeFailureMetadata(ingestDocument);
handler.accept(null, newCompoundProcessorException(e, onFailureProcessor.getType(), onFailureProcessor.getTag()));
return;
}
if (result == null) {
removeFailureMetadata(ingestDocument);
handler.accept(null, null);
return;
}
executeOnFailureAsync(currentOnFailureProcessor + 1, ingestDocument, exception, handler);
});
} }
private void putFailureMetadata(IngestDocument ingestDocument, ElasticsearchException cause) { private void putFailureMetadata(IngestDocument ingestDocument, ElasticsearchException cause) {

View File

@ -30,6 +30,7 @@ import java.util.ListIterator;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier; import java.util.function.LongSupplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -74,21 +75,28 @@ public class ConditionalProcessor extends AbstractProcessor {
} }
@Override @Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception { public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
if (evaluate(ingestDocument)) { if (evaluate(ingestDocument)) {
long startTimeInNanos = relativeTimeProvider.getAsLong(); final long startTimeInNanos = relativeTimeProvider.getAsLong();
try {
metric.preIngest(); metric.preIngest();
return processor.execute(ingestDocument); processor.execute(ingestDocument, (result, e) -> {
} catch (Exception e) {
metric.ingestFailed();
throw e;
} finally {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
metric.postIngest(ingestTimeInMillis); metric.postIngest(ingestTimeInMillis);
if (e != null) {
metric.ingestFailed();
handler.accept(null, e);
} else {
handler.accept(result, null);
}
});
} else {
handler.accept(ingestDocument, null);
} }
} }
return ingestDocument;
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
throw new UnsupportedOperationException("this method should not get executed");
} }
boolean evaluate(IngestDocument ingestDocument) { boolean evaluate(IngestDocument ingestDocument) {

View File

@ -43,6 +43,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.function.BiConsumer;
/** /**
* Represents a single document being captured before indexing and holds the source and metadata (like id, type and index). * Represents a single document being captured before indexing and holds the source and metadata (like id, type and index).
@ -641,17 +642,18 @@ public final class IngestDocument {
/** /**
* Executes the given pipeline with for this document unless the pipeline has already been executed * Executes the given pipeline with for this document unless the pipeline has already been executed
* for this document. * for this document.
* @param pipeline Pipeline to execute *
* @throws Exception On exception in pipeline execution * @param pipeline the pipeline to execute
* @param handler handles the result or failure
*/ */
public IngestDocument executePipeline(Pipeline pipeline) throws Exception { public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Exception> handler) {
try { if (executedPipelines.add(pipeline)) {
if (this.executedPipelines.add(pipeline) == false) { pipeline.execute(this, (result, e) -> {
throw new IllegalStateException("Cycle detected for pipeline: " + pipeline.getId());
}
return pipeline.execute(this);
} finally {
executedPipelines.remove(pipeline); executedPipelines.remove(pipeline);
handler.accept(result, e);
});
} else {
handler.accept(null, new IllegalStateException("Cycle detected for pipeline: " + pipeline.getId()));
} }
} }

View File

@ -63,8 +63,10 @@ import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.IntConsumer;
/** /**
* Holder class for several ingest related services. * Holder class for several ingest related services.
@ -327,42 +329,72 @@ public class IngestService implements ClusterStateApplier {
ExceptionsHelper.rethrowAndSuppress(exceptions); ExceptionsHelper.rethrowAndSuppress(exceptions);
} }
public void executeBulkRequest(Iterable<DocWriteRequest<?>> actionRequests, public void executeBulkRequest(int numberOfActionRequests,
BiConsumer<IndexRequest, Exception> itemFailureHandler, Consumer<Exception> completionHandler, Iterable<DocWriteRequest<?>> actionRequests,
Consumer<IndexRequest> itemDroppedHandler) { BiConsumer<Integer, Exception> itemFailureHandler,
BiConsumer<Thread, Exception> completionHandler,
IntConsumer itemDroppedHandler) {
threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() { threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() {
@Override @Override
public void onFailure(Exception e) { public void onFailure(Exception e) {
completionHandler.accept(e); completionHandler.accept(null, e);
} }
@Override @Override
protected void doRun() { protected void doRun() {
final Thread originalThread = Thread.currentThread();
final AtomicInteger counter = new AtomicInteger(numberOfActionRequests);
int i = 0;
for (DocWriteRequest<?> actionRequest : actionRequests) { for (DocWriteRequest<?> actionRequest : actionRequests) {
IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest); IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest);
if (indexRequest == null) { if (indexRequest == null) {
if (counter.decrementAndGet() == 0){
completionHandler.accept(originalThread, null);
}
assert counter.get() >= 0;
continue; continue;
} }
String pipelineId = indexRequest.getPipeline(); String pipelineId = indexRequest.getPipeline();
if (NOOP_PIPELINE_NAME.equals(pipelineId) == false) { if (NOOP_PIPELINE_NAME.equals(pipelineId)) {
if (counter.decrementAndGet() == 0){
completionHandler.accept(originalThread, null);
}
assert counter.get() >= 0;
continue;
}
final int slot = i;
try { try {
PipelineHolder holder = pipelines.get(pipelineId); PipelineHolder holder = pipelines.get(pipelineId);
if (holder == null) { if (holder == null) {
throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist");
} }
Pipeline pipeline = holder.pipeline; Pipeline pipeline = holder.pipeline;
innerExecute(indexRequest, pipeline, itemDroppedHandler); innerExecute(slot, indexRequest, pipeline, itemDroppedHandler, e -> {
if (e == null) {
// this shouldn't be needed here but we do it for consistency with index api // this shouldn't be needed here but we do it for consistency with index api
// which requires it to prevent double execution // which requires it to prevent double execution
indexRequest.setPipeline(NOOP_PIPELINE_NAME); indexRequest.setPipeline(NOOP_PIPELINE_NAME);
} else {
itemFailureHandler.accept(slot, e);
}
if (counter.decrementAndGet() == 0){
completionHandler.accept(originalThread, null);
}
assert counter.get() >= 0;
});
} catch (Exception e) { } catch (Exception e) {
itemFailureHandler.accept(indexRequest, e); itemFailureHandler.accept(slot, e);
if (counter.decrementAndGet() == 0){
completionHandler.accept(originalThread, null);
} }
assert counter.get() >= 0;
} }
i++;
} }
completionHandler.accept(null);
} }
}); });
} }
@ -418,15 +450,16 @@ public class IngestService implements ClusterStateApplier {
return sb.toString(); return sb.toString();
} }
private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer<IndexRequest> itemDroppedHandler) throws Exception { private void innerExecute(int slot, IndexRequest indexRequest, Pipeline pipeline, IntConsumer itemDroppedHandler,
Consumer<Exception> handler) {
if (pipeline.getProcessors().isEmpty()) { if (pipeline.getProcessors().isEmpty()) {
handler.accept(null);
return; return;
} }
long startTimeInNanos = System.nanoTime(); long startTimeInNanos = System.nanoTime();
// the pipeline specific stat holder may not exist and that is fine: // the pipeline specific stat holder may not exist and that is fine:
// (e.g. the pipeline may have been removed while we're ingesting a document // (e.g. the pipeline may have been removed while we're ingesting a document
try {
totalMetrics.preIngest(); totalMetrics.preIngest();
String index = indexRequest.index(); String index = indexRequest.index();
String type = indexRequest.type(); String type = indexRequest.type();
@ -436,8 +469,15 @@ public class IngestService implements ClusterStateApplier {
VersionType versionType = indexRequest.versionType(); VersionType versionType = indexRequest.versionType();
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap(); Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, version, versionType, sourceAsMap); IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, version, versionType, sourceAsMap);
if (pipeline.execute(ingestDocument) == null) { pipeline.execute(ingestDocument, (result, e) -> {
itemDroppedHandler.accept(indexRequest); long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos);
totalMetrics.postIngest(ingestTimeInMillis);
if (e != null) {
totalMetrics.ingestFailed();
handler.accept(e);
} else if (result == null) {
itemDroppedHandler.accept(slot);
handler.accept(null);
} else { } else {
Map<IngestDocument.MetaData, Object> metadataMap = ingestDocument.extractMetadata(); Map<IngestDocument.MetaData, Object> metadataMap = ingestDocument.extractMetadata();
//it's fine to set all metadata fields all the time, as ingest document holds their starting values //it's fine to set all metadata fields all the time, as ingest document holds their starting values
@ -451,14 +491,9 @@ public class IngestService implements ClusterStateApplier {
indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.MetaData.VERSION_TYPE))); indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.MetaData.VERSION_TYPE)));
} }
indexRequest.source(ingestDocument.getSourceAndMetadata()); indexRequest.source(ingestDocument.getSourceAndMetadata());
handler.accept(null);
} }
} catch (Exception e) { });
totalMetrics.ingestFailed();
throw e;
} finally {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos);
totalMetrics.postIngest(ingestTimeInMillis);
}
} }
@Override @Override

View File

@ -27,6 +27,7 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier; import java.util.function.LongSupplier;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
@ -93,18 +94,17 @@ public final class Pipeline {
* If <code>null</code> is returned then this document will be dropped and not indexed, otherwise * If <code>null</code> is returned then this document will be dropped and not indexed, otherwise
* this document will be kept and indexed. * this document will be kept and indexed.
*/ */
public IngestDocument execute(IngestDocument ingestDocument) throws Exception { public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
long startTimeInNanos = relativeTimeProvider.getAsLong(); final long startTimeInNanos = relativeTimeProvider.getAsLong();
try {
metrics.preIngest(); metrics.preIngest();
return compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument, (result, e) -> {
} catch (Exception e) {
metrics.ingestFailed();
throw e;
} finally {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
metrics.postIngest(ingestTimeInMillis); metrics.postIngest(ingestTimeInMillis);
if (e != null) {
metrics.ingestFailed();
} }
handler.accept(result, e);
});
} }
/** /**

View File

@ -20,6 +20,7 @@
package org.elasticsearch.ingest; package org.elasticsearch.ingest;
import java.util.Map; import java.util.Map;
import java.util.function.BiConsumer;
public class PipelineProcessor extends AbstractProcessor { public class PipelineProcessor extends AbstractProcessor {
@ -36,12 +37,19 @@ public class PipelineProcessor extends AbstractProcessor {
} }
@Override @Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception { public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
Pipeline pipeline = ingestService.getPipeline(pipelineName); Pipeline pipeline = ingestService.getPipeline(pipelineName);
if (pipeline == null) { if (pipeline != null) {
throw new IllegalStateException("Pipeline processor configured for non-existent pipeline [" + pipelineName + ']'); ingestDocument.executePipeline(pipeline, handler);
} else {
handler.accept(null,
new IllegalStateException("Pipeline processor configured for non-existent pipeline [" + pipelineName + ']'));
} }
return ingestDocument.executePipeline(pipeline); }
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
throw new UnsupportedOperationException("this method should not get executed");
} }
Pipeline getPipeline(){ Pipeline getPipeline(){

View File

@ -27,6 +27,7 @@ import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.Scheduler;
import java.util.Map; import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.function.LongSupplier; import java.util.function.LongSupplier;
@ -38,6 +39,21 @@ import java.util.function.LongSupplier;
*/ */
public interface Processor { public interface Processor {
/**
* Introspect and potentially modify the incoming data.
*
* Expert method: only override this method if a processor implementation needs to make an asynchronous call,
* otherwise just overwrite {@link #execute(IngestDocument)}.
*/
default void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
try {
IngestDocument result = execute(ingestDocument);
handler.accept(result, null);
} catch (Exception e) {
handler.accept(null, e);
}
}
/** /**
* Introspect and potentially modify the incoming data. * Introspect and potentially modify the incoming data.
* *

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.ingest.SimulateProcessorResult;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.function.BiConsumer;
/** /**
* Processor to be used within Simulate API to keep track of processors executed in pipeline. * Processor to be used within Simulate API to keep track of processors executed in pipeline.
@ -41,51 +42,71 @@ public final class TrackingResultProcessor implements Processor {
} }
@Override @Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception { public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
Processor processor = actualProcessor; if (actualProcessor instanceof PipelineProcessor) {
try { PipelineProcessor pipelineProcessor = ((PipelineProcessor) actualProcessor);
if (processor instanceof ConditionalProcessor) {
ConditionalProcessor conditionalProcessor = (ConditionalProcessor) processor;
if (conditionalProcessor.evaluate(ingestDocument) == false) {
return ingestDocument;
}
if (conditionalProcessor.getProcessor() instanceof PipelineProcessor) {
processor = conditionalProcessor.getProcessor();
}
}
if (processor instanceof PipelineProcessor) {
PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor);
Pipeline pipeline = pipelineProcessor.getPipeline(); Pipeline pipeline = pipelineProcessor.getPipeline();
//runtime check for cycles against a copy of the document. This is needed to properly handle conditionals around pipelines //runtime check for cycles against a copy of the document. This is needed to properly handle conditionals around pipelines
try {
IngestDocument ingestDocumentCopy = new IngestDocument(ingestDocument); IngestDocument ingestDocumentCopy = new IngestDocument(ingestDocument);
ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline()); ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline(), (result, e) -> {
} catch (ElasticsearchException elasticsearchException) {
if (elasticsearchException.getCause().getCause() instanceof IllegalStateException) {
throw elasticsearchException;
}
//else do nothing, let the tracking processors throw the exception while recording the path up to the failure
} catch (Exception e) {
// do nothing, let the tracking processors throw the exception while recording the path up to the failure // do nothing, let the tracking processors throw the exception while recording the path up to the failure
if (e instanceof ElasticsearchException) {
ElasticsearchException elasticsearchException = (ElasticsearchException) e;
//else do nothing, let the tracking processors throw the exception while recording the path up to the failure
if (elasticsearchException.getCause().getCause() instanceof IllegalStateException) {
if (ignoreFailure) {
processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getTag(),
new IngestDocument(ingestDocument), e));
} else {
processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getTag(), e));
} }
handler.accept(null, elasticsearchException);
}
} else {
//now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and execute it //now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and execute it
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList); CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList);
Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(),
verbosePipelineProcessor); verbosePipelineProcessor);
ingestDocument.executePipeline(verbosePipeline); ingestDocument.executePipeline(verbosePipeline, handler);
} else {
processor.execute(ingestDocument);
processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument)));
} }
} catch (Exception e) { });
return;
}
final Processor processor;
if (actualProcessor instanceof ConditionalProcessor) {
ConditionalProcessor conditionalProcessor = (ConditionalProcessor) actualProcessor;
if (conditionalProcessor.evaluate(ingestDocument) == false) {
handler.accept(ingestDocument, null);
return;
}
if (conditionalProcessor.getProcessor() instanceof PipelineProcessor) {
processor = conditionalProcessor.getProcessor();
} else {
processor = actualProcessor;
}
} else {
processor = actualProcessor;
}
processor.execute(ingestDocument, (result, e) -> {
if (e != null) {
if (ignoreFailure) { if (ignoreFailure) {
processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument), e)); processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument), e));
} else { } else {
processorResultList.add(new SimulateProcessorResult(processor.getTag(), e)); processorResultList.add(new SimulateProcessorResult(processor.getTag(), e));
} }
throw e; handler.accept(null, e);
} else {
processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument)));
handler.accept(result, null);
} }
return ingestDocument; });
}
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
throw new UnsupportedOperationException();
} }
@Override @Override

View File

@ -55,7 +55,7 @@ public class BulkRequestModifierTests extends ESTestCase {
while (bulkRequestModifier.hasNext()) { while (bulkRequestModifier.hasNext()) {
bulkRequestModifier.next(); bulkRequestModifier.next();
if (randomBoolean()) { if (randomBoolean()) {
bulkRequestModifier.markCurrentItemAsFailed(new RuntimeException()); bulkRequestModifier.markItemAsFailed(i, new RuntimeException());
failedSlots.add(i); failedSlots.add(i);
} }
i++; i++;
@ -93,7 +93,7 @@ public class BulkRequestModifierTests extends ESTestCase {
for (int i = 0; modifier.hasNext(); i++) { for (int i = 0; modifier.hasNext(); i++) {
modifier.next(); modifier.next();
if (i % 2 == 0) { if (i % 2 == 0) {
modifier.markCurrentItemAsFailed(new RuntimeException()); modifier.markItemAsFailed(i, new RuntimeException());
} }
} }
@ -102,7 +102,7 @@ public class BulkRequestModifierTests extends ESTestCase {
assertThat(bulkRequest.requests().size(), Matchers.equalTo(16)); assertThat(bulkRequest.requests().size(), Matchers.equalTo(16));
List<BulkItemResponse> responses = new ArrayList<>(); List<BulkItemResponse> responses = new ArrayList<>();
ActionListener<BulkResponse> bulkResponseListener = modifier.wrapActionListenerIfNeeded(1L, new ActionListener<BulkResponse>() { ActionListener<BulkResponse> bulkResponseListener = modifier.wrapActionListenerIfNeeded(1L, new ActionListener<>() {
@Override @Override
public void onResponse(BulkResponse bulkItemResponses) { public void onResponse(BulkResponse bulkItemResponses) {
responses.addAll(Arrays.asList(bulkItemResponses.getItems())); responses.addAll(Arrays.asList(bulkItemResponses.getItems()));

View File

@ -66,11 +66,11 @@ import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
@ -92,6 +92,8 @@ public class TransportBulkActionIngestTests extends ESTestCase {
private static final Settings SETTINGS = private static final Settings SETTINGS =
Settings.builder().put(AutoCreateIndex.AUTO_CREATE_INDEX_SETTING.getKey(), true).build(); Settings.builder().put(AutoCreateIndex.AUTO_CREATE_INDEX_SETTING.getKey(), true).build();
private static final Thread DUMMY_WRITE_THREAD = new Thread(ThreadPool.Names.WRITE);
/** Services needed by bulk action */ /** Services needed by bulk action */
TransportService transportService; TransportService transportService;
ClusterService clusterService; ClusterService clusterService;
@ -100,9 +102,9 @@ public class TransportBulkActionIngestTests extends ESTestCase {
/** Arguments to callbacks we want to capture, but which require generics, so we must use @Captor */ /** Arguments to callbacks we want to capture, but which require generics, so we must use @Captor */
@Captor @Captor
ArgumentCaptor<BiConsumer<IndexRequest, Exception>> failureHandler; ArgumentCaptor<BiConsumer<Integer, Exception>> failureHandler;
@Captor @Captor
ArgumentCaptor<Consumer<Exception>> completionHandler; ArgumentCaptor<BiConsumer<Thread, Exception>> completionHandler;
@Captor @Captor
ArgumentCaptor<TransportResponseHandler<BulkResponse>> remoteResponseHandler; ArgumentCaptor<TransportResponseHandler<BulkResponse>> remoteResponseHandler;
@Captor @Captor
@ -264,15 +266,16 @@ public class TransportBulkActionIngestTests extends ESTestCase {
assertFalse(action.isExecuted); // haven't executed yet assertFalse(action.isExecuted); // haven't executed yet
assertFalse(responseCalled.get()); assertFalse(responseCalled.get());
assertFalse(failureCalled.get()); assertFalse(failureCalled.get());
verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); verify(ingestService).executeBulkRequest(eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(),
completionHandler.getValue().accept(exception); failureHandler.capture(), completionHandler.capture(), any());
completionHandler.getValue().accept(null, exception);
assertTrue(failureCalled.get()); assertTrue(failureCalled.get());
// now check success // now check success
Iterator<DocWriteRequest<?>> req = bulkDocsItr.getValue().iterator(); Iterator<DocWriteRequest<?>> req = bulkDocsItr.getValue().iterator();
failureHandler.getValue().accept((IndexRequest)req.next(), exception); // have an exception for our one index request failureHandler.getValue().accept(0, exception); // have an exception for our one index request
indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
completionHandler.getValue().accept(null); completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null);
assertTrue(action.isExecuted); assertTrue(action.isExecuted);
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
verifyZeroInteractions(transportService); verifyZeroInteractions(transportService);
@ -298,13 +301,14 @@ public class TransportBulkActionIngestTests extends ESTestCase {
assertFalse(action.isExecuted); // haven't executed yet assertFalse(action.isExecuted); // haven't executed yet
assertFalse(responseCalled.get()); assertFalse(responseCalled.get());
assertFalse(failureCalled.get()); assertFalse(failureCalled.get());
verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); verify(ingestService).executeBulkRequest(eq(1), bulkDocsItr.capture(), failureHandler.capture(),
completionHandler.getValue().accept(exception); completionHandler.capture(), any());
completionHandler.getValue().accept(null, exception);
assertTrue(failureCalled.get()); assertTrue(failureCalled.get());
// now check success // now check success
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
completionHandler.getValue().accept(null); completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null);
assertTrue(action.isExecuted); assertTrue(action.isExecuted);
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
verifyZeroInteractions(transportService); verifyZeroInteractions(transportService);
@ -330,7 +334,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
action.execute(null, bulkRequest, listener); action.execute(null, bulkRequest, listener);
// should not have executed ingest locally // should not have executed ingest locally
verify(ingestService, never()).executeBulkRequest(any(), any(), any(), any()); verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any());
// but instead should have sent to a remote node with the transport service // but instead should have sent to a remote node with the transport service
ArgumentCaptor<DiscoveryNode> node = ArgumentCaptor.forClass(DiscoveryNode.class); ArgumentCaptor<DiscoveryNode> node = ArgumentCaptor.forClass(DiscoveryNode.class);
verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture());
@ -374,7 +378,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
singleItemBulkWriteAction.execute(null, indexRequest, listener); singleItemBulkWriteAction.execute(null, indexRequest, listener);
// should not have executed ingest locally // should not have executed ingest locally
verify(ingestService, never()).executeBulkRequest(any(), any(), any(), any()); verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any());
// but instead should have sent to a remote node with the transport service // but instead should have sent to a remote node with the transport service
ArgumentCaptor<DiscoveryNode> node = ArgumentCaptor.forClass(DiscoveryNode.class); ArgumentCaptor<DiscoveryNode> node = ArgumentCaptor.forClass(DiscoveryNode.class);
verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture());
@ -445,18 +449,19 @@ public class TransportBulkActionIngestTests extends ESTestCase {
assertFalse(action.isExecuted); // haven't executed yet assertFalse(action.isExecuted); // haven't executed yet
assertFalse(responseCalled.get()); assertFalse(responseCalled.get());
assertFalse(failureCalled.get()); assertFalse(failureCalled.get());
verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); verify(ingestService).executeBulkRequest(eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(),
failureHandler.capture(), completionHandler.capture(), any());
assertEquals(indexRequest1.getPipeline(), "default_pipeline"); assertEquals(indexRequest1.getPipeline(), "default_pipeline");
assertEquals(indexRequest2.getPipeline(), "default_pipeline"); assertEquals(indexRequest2.getPipeline(), "default_pipeline");
assertEquals(indexRequest3.getPipeline(), "default_pipeline"); assertEquals(indexRequest3.getPipeline(), "default_pipeline");
completionHandler.getValue().accept(exception); completionHandler.getValue().accept(null, exception);
assertTrue(failureCalled.get()); assertTrue(failureCalled.get());
// now check success of the transport bulk action // now check success of the transport bulk action
indexRequest1.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing indexRequest1.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
indexRequest3.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing indexRequest3.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
completionHandler.getValue().accept(null); completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null);
assertTrue(action.isExecuted); assertTrue(action.isExecuted);
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
verifyZeroInteractions(transportService); verifyZeroInteractions(transportService);
@ -483,14 +488,15 @@ public class TransportBulkActionIngestTests extends ESTestCase {
assertFalse(action.indexCreated); // no index yet assertFalse(action.indexCreated); // no index yet
assertFalse(responseCalled.get()); assertFalse(responseCalled.get());
assertFalse(failureCalled.get()); assertFalse(failureCalled.get());
verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); verify(ingestService).executeBulkRequest(eq(1), bulkDocsItr.capture(), failureHandler.capture(),
completionHandler.getValue().accept(exception); completionHandler.capture(), any());
completionHandler.getValue().accept(null, exception);
assertFalse(action.indexCreated); // still no index yet, the ingest node failed. assertFalse(action.indexCreated); // still no index yet, the ingest node failed.
assertTrue(failureCalled.get()); assertTrue(failureCalled.get());
// now check success // now check success
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
completionHandler.getValue().accept(null); completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null);
assertTrue(action.isExecuted); assertTrue(action.isExecuted);
assertTrue(action.indexCreated); // now the index is created since we skipped the ingest node path. assertTrue(action.indexCreated); // now the index is created since we skipped the ingest node path.
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
@ -547,7 +553,8 @@ public class TransportBulkActionIngestTests extends ESTestCase {
})); }));
assertEquals("pipeline2", indexRequest.getPipeline()); assertEquals("pipeline2", indexRequest.getPipeline());
verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); verify(ingestService).executeBulkRequest(eq(1), bulkDocsItr.capture(), failureHandler.capture(),
completionHandler.capture(), any());
} }
private void validateDefaultPipeline(IndexRequest indexRequest) { private void validateDefaultPipeline(IndexRequest indexRequest) {
@ -569,14 +576,15 @@ public class TransportBulkActionIngestTests extends ESTestCase {
assertFalse(action.isExecuted); // haven't executed yet assertFalse(action.isExecuted); // haven't executed yet
assertFalse(responseCalled.get()); assertFalse(responseCalled.get());
assertFalse(failureCalled.get()); assertFalse(failureCalled.get());
verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); verify(ingestService).executeBulkRequest(eq(1), bulkDocsItr.capture(), failureHandler.capture(),
completionHandler.capture(), any());
assertEquals(indexRequest.getPipeline(), "default_pipeline"); assertEquals(indexRequest.getPipeline(), "default_pipeline");
completionHandler.getValue().accept(exception); completionHandler.getValue().accept(null, exception);
assertTrue(failureCalled.get()); assertTrue(failureCalled.get());
// now check success // now check success
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
completionHandler.getValue().accept(null); completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null);
assertTrue(action.isExecuted); assertTrue(action.isExecuted);
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
verifyZeroInteractions(transportService); verifyZeroInteractions(transportService);

View File

@ -20,19 +20,21 @@
package org.elasticsearch.action.ingest; package org.elasticsearch.action.ingest;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.RandomDocumentPicks; import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.TestProcessor; import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.CompoundProcessor; import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
@ -51,11 +53,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
@Before @Before
public void setup() { public void setup() {
threadPool = new ThreadPool( threadPool = new TestThreadPool(SimulateExecutionServiceTests.class.getSimpleName());
Settings.builder()
.put("node.name", getClass().getName())
.build()
);
executionService = new SimulateExecutionService(threadPool); executionService = new SimulateExecutionService(threadPool);
ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
} }
@ -68,7 +66,15 @@ public class SimulateExecutionServiceTests extends ESTestCase {
public void testExecuteVerboseItem() throws Exception { public void testExecuteVerboseItem() throws Exception {
TestProcessor processor = new TestProcessor("test-id", "mock", ingestDocument -> {}); TestProcessor processor = new TestProcessor("test-id", "mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor)); Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> {
holder.set(r);
latch.countDown();
});
latch.await();
SimulateDocumentResult actualItemResponse = holder.getAcquire();
assertThat(processor.getInvokedCounter(), equalTo(2)); assertThat(processor.getInvokedCounter(), equalTo(2));
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class)); assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse; SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
@ -93,7 +99,14 @@ public class SimulateExecutionServiceTests extends ESTestCase {
public void testExecuteItem() throws Exception { public void testExecuteItem() throws Exception {
TestProcessor processor = new TestProcessor("processor_0", "mock", ingestDocument -> {}); TestProcessor processor = new TestProcessor("processor_0", "mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor)); Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, false); CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
executionService.executeDocument(pipeline, ingestDocument, false, (r, e) -> {
holder.set(r);
latch.countDown();
});
latch.await();
SimulateDocumentResult actualItemResponse = holder.getAcquire();
assertThat(processor.getInvokedCounter(), equalTo(2)); assertThat(processor.getInvokedCounter(), equalTo(2));
assertThat(actualItemResponse, instanceOf(SimulateDocumentBaseResult.class)); assertThat(actualItemResponse, instanceOf(SimulateDocumentBaseResult.class));
SimulateDocumentBaseResult simulateDocumentBaseResult = (SimulateDocumentBaseResult) actualItemResponse; SimulateDocumentBaseResult simulateDocumentBaseResult = (SimulateDocumentBaseResult) actualItemResponse;
@ -107,7 +120,14 @@ public class SimulateExecutionServiceTests extends ESTestCase {
ingestDocument -> { throw new RuntimeException("processor failed"); }); ingestDocument -> { throw new RuntimeException("processor failed"); });
TestProcessor processor3 = new TestProcessor("processor_2", "mock", ingestDocument -> {}); TestProcessor processor3 = new TestProcessor("processor_2", "mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2, processor3)); Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2, processor3));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> {
holder.set(r);
latch.countDown();
});
latch.await();
SimulateDocumentResult actualItemResponse = holder.get();
assertThat(processor1.getInvokedCounter(), equalTo(1)); assertThat(processor1.getInvokedCounter(), equalTo(1));
assertThat(processor2.getInvokedCounter(), equalTo(1)); assertThat(processor2.getInvokedCounter(), equalTo(1));
assertThat(processor3.getInvokedCounter(), equalTo(0)); assertThat(processor3.getInvokedCounter(), equalTo(0));
@ -135,7 +155,14 @@ public class SimulateExecutionServiceTests extends ESTestCase {
Pipeline pipeline = new Pipeline("_id", "_description", version, Pipeline pipeline = new Pipeline("_id", "_description", version,
new CompoundProcessor(new CompoundProcessor(false, Collections.singletonList(processor1), new CompoundProcessor(new CompoundProcessor(false, Collections.singletonList(processor1),
Collections.singletonList(processor2)), processor3)); Collections.singletonList(processor2)), processor3));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> {
holder.set(r);
latch.countDown();
});
latch.await();
SimulateDocumentResult actualItemResponse = holder.get();
assertThat(processor1.getInvokedCounter(), equalTo(1)); assertThat(processor1.getInvokedCounter(), equalTo(1));
assertThat(processor2.getInvokedCounter(), equalTo(1)); assertThat(processor2.getInvokedCounter(), equalTo(1));
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class)); assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
@ -170,7 +197,14 @@ public class SimulateExecutionServiceTests extends ESTestCase {
TestProcessor testProcessor = new TestProcessor("processor_0", "mock", ingestDocument -> { throw exception; }); TestProcessor testProcessor = new TestProcessor("processor_0", "mock", ingestDocument -> { throw exception; });
CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList()); CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList());
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor)); Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> {
holder.set(r);
latch.countDown();
});
latch.await();
SimulateDocumentResult actualItemResponse = holder.get();
assertThat(testProcessor.getInvokedCounter(), equalTo(1)); assertThat(testProcessor.getInvokedCounter(), equalTo(1));
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class)); assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse; SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
@ -187,7 +221,14 @@ public class SimulateExecutionServiceTests extends ESTestCase {
TestProcessor testProcessor = new TestProcessor("processor_0", "mock", ingestDocument -> { }); TestProcessor testProcessor = new TestProcessor("processor_0", "mock", ingestDocument -> { });
CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList()); CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList());
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor)); Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> {
holder.set(r);
latch.countDown();
});
latch.await();
SimulateDocumentResult actualItemResponse = holder.get();
assertThat(testProcessor.getInvokedCounter(), equalTo(1)); assertThat(testProcessor.getInvokedCounter(), equalTo(1));
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class)); assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse; SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
@ -203,7 +244,14 @@ public class SimulateExecutionServiceTests extends ESTestCase {
public void testExecuteItemWithFailure() throws Exception { public void testExecuteItemWithFailure() throws Exception {
TestProcessor processor = new TestProcessor(ingestDocument -> { throw new RuntimeException("processor failed"); }); TestProcessor processor = new TestProcessor(ingestDocument -> { throw new RuntimeException("processor failed"); });
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor)); Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, false); CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
executionService.executeDocument(pipeline, ingestDocument, false, (r, e) -> {
holder.set(r);
latch.countDown();
});
latch.await();
SimulateDocumentResult actualItemResponse = holder.get();
assertThat(processor.getInvokedCounter(), equalTo(1)); assertThat(processor.getInvokedCounter(), equalTo(1));
assertThat(actualItemResponse, instanceOf(SimulateDocumentBaseResult.class)); assertThat(actualItemResponse, instanceOf(SimulateDocumentBaseResult.class));
SimulateDocumentBaseResult simulateDocumentBaseResult = (SimulateDocumentBaseResult) actualItemResponse; SimulateDocumentBaseResult simulateDocumentBaseResult = (SimulateDocumentBaseResult) actualItemResponse;

View File

@ -33,6 +33,7 @@ import java.util.function.LongSupplier;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
@ -51,7 +52,7 @@ public class CompoundProcessorTests extends ESTestCase {
CompoundProcessor processor = new CompoundProcessor(); CompoundProcessor processor = new CompoundProcessor();
assertThat(processor.getProcessors().isEmpty(), is(true)); assertThat(processor.getProcessors().isEmpty(), is(true));
assertThat(processor.getOnFailureProcessors().isEmpty(), is(true)); assertThat(processor.getOnFailureProcessors().isEmpty(), is(true));
processor.execute(ingestDocument); processor.execute(ingestDocument, (result, e) -> {});
} }
public void testSingleProcessor() throws Exception { public void testSingleProcessor() throws Exception {
@ -66,7 +67,7 @@ public class CompoundProcessorTests extends ESTestCase {
assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor)); assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor));
assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor)); assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor));
assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true)); assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true));
compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument, (result, e) -> {});
verify(relativeTimeProvider, times(2)).getAsLong(); verify(relativeTimeProvider, times(2)).getAsLong();
assertThat(processor.getInvokedCounter(), equalTo(1)); assertThat(processor.getInvokedCounter(), equalTo(1));
assertStats(compoundProcessor, 1, 0, 1); assertStats(compoundProcessor, 1, 0, 1);
@ -81,12 +82,9 @@ public class CompoundProcessorTests extends ESTestCase {
assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor)); assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor));
assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true)); assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true));
try { Exception[] holder = new Exception[1];
compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument, (result, e) -> holder[0] = e);
fail("should throw exception"); assertThat(((ElasticsearchException) holder[0]).getRootCause().getMessage(), equalTo("error"));
} catch (ElasticsearchException e) {
assertThat(e.getRootCause().getMessage(), equalTo("error"));
}
assertThat(processor.getInvokedCounter(), equalTo(1)); assertThat(processor.getInvokedCounter(), equalTo(1));
assertStats(compoundProcessor, 1, 1, 0); assertStats(compoundProcessor, 1, 1, 0);
@ -99,7 +97,7 @@ public class CompoundProcessorTests extends ESTestCase {
when(relativeTimeProvider.getAsLong()).thenReturn(0L); when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor compoundProcessor = CompoundProcessor compoundProcessor =
new CompoundProcessor(true, Arrays.asList(processor1, processor2), Collections.emptyList(), relativeTimeProvider); new CompoundProcessor(true, Arrays.asList(processor1, processor2), Collections.emptyList(), relativeTimeProvider);
compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument, (result, e) -> {});
assertThat(processor1.getInvokedCounter(), equalTo(1)); assertThat(processor1.getInvokedCounter(), equalTo(1));
assertStats(0, compoundProcessor, 0, 1, 1, 0); assertStats(0, compoundProcessor, 0, 1, 1, 0);
assertThat(processor2.getInvokedCounter(), equalTo(1)); assertThat(processor2.getInvokedCounter(), equalTo(1));
@ -121,7 +119,7 @@ public class CompoundProcessorTests extends ESTestCase {
when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1)); when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1));
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1), CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1),
Collections.singletonList(processor2), relativeTimeProvider); Collections.singletonList(processor2), relativeTimeProvider);
compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument, (result, e) -> {});
verify(relativeTimeProvider, times(2)).getAsLong(); verify(relativeTimeProvider, times(2)).getAsLong();
assertThat(processor1.getInvokedCounter(), equalTo(1)); assertThat(processor1.getInvokedCounter(), equalTo(1));
@ -153,7 +151,9 @@ public class CompoundProcessorTests extends ESTestCase {
when(relativeTimeProvider.getAsLong()).thenReturn(0L); when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1), CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1),
Collections.singletonList(processor2), relativeTimeProvider); Collections.singletonList(processor2), relativeTimeProvider);
assertNull(compoundProcessor.execute(ingestDocument)); IngestDocument[] result = new IngestDocument[1];
compoundProcessor.execute(ingestDocument, (r, e) -> result[0] = r);
assertThat(result[0], nullValue());
assertThat(processor1.getInvokedCounter(), equalTo(1)); assertThat(processor1.getInvokedCounter(), equalTo(1));
assertStats(compoundProcessor, 1, 1, 0); assertStats(compoundProcessor, 1, 1, 0);
} }
@ -181,7 +181,7 @@ public class CompoundProcessorTests extends ESTestCase {
Collections.singletonList(lastProcessor), relativeTimeProvider); Collections.singletonList(lastProcessor), relativeTimeProvider);
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor), CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor),
Collections.singletonList(compoundOnFailProcessor), relativeTimeProvider); Collections.singletonList(compoundOnFailProcessor), relativeTimeProvider);
compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument, (result, e) -> {});
assertThat(processorToFail.getInvokedCounter(), equalTo(1)); assertThat(processorToFail.getInvokedCounter(), equalTo(1));
assertThat(lastProcessor.getInvokedCounter(), equalTo(1)); assertThat(lastProcessor.getInvokedCounter(), equalTo(1));
@ -204,7 +204,7 @@ public class CompoundProcessorTests extends ESTestCase {
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor),
Collections.singletonList(secondProcessor), relativeTimeProvider); Collections.singletonList(secondProcessor), relativeTimeProvider);
compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument, (result, e) -> {});
assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(1));
@ -230,7 +230,7 @@ public class CompoundProcessorTests extends ESTestCase {
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor),
Collections.singletonList(secondProcessor), relativeTimeProvider); Collections.singletonList(secondProcessor), relativeTimeProvider);
compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument, (result, e) -> {});
assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(1));
@ -256,7 +256,7 @@ public class CompoundProcessorTests extends ESTestCase {
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor),
Collections.singletonList(secondProcessor), relativeTimeProvider); Collections.singletonList(secondProcessor), relativeTimeProvider);
compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument, (result, e) -> {});
assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(1));
@ -271,7 +271,7 @@ public class CompoundProcessorTests extends ESTestCase {
when(relativeTimeProvider.getAsLong()).thenReturn(0L); when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor pipeline = new CompoundProcessor(false, Arrays.asList(firstProcessor, secondProcessor), CompoundProcessor pipeline = new CompoundProcessor(false, Arrays.asList(firstProcessor, secondProcessor),
Collections.singletonList(onFailureProcessor), relativeTimeProvider); Collections.singletonList(onFailureProcessor), relativeTimeProvider);
pipeline.execute(ingestDocument); pipeline.execute(ingestDocument, (result, e) -> {});
assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
assertThat(secondProcessor.getInvokedCounter(), equalTo(0)); assertThat(secondProcessor.getInvokedCounter(), equalTo(0));
assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1)); assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1));

View File

@ -98,7 +98,7 @@ public class ConditionalProcessorTests extends ESTestCase {
String falseValue = "falsy"; String falseValue = "falsy";
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
ingestDocument.setFieldValue(conditionalField, falseValue); ingestDocument.setFieldValue(conditionalField, falseValue);
processor.execute(ingestDocument); processor.execute(ingestDocument, (result, e) -> {});
assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(falseValue)); assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(falseValue));
assertThat(ingestDocument.getSourceAndMetadata(), not(hasKey("foo"))); assertThat(ingestDocument.getSourceAndMetadata(), not(hasKey("foo")));
assertStats(processor, 0, 0, 0); assertStats(processor, 0, 0, 0);
@ -106,13 +106,13 @@ public class ConditionalProcessorTests extends ESTestCase {
ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
ingestDocument.setFieldValue(conditionalField, falseValue); ingestDocument.setFieldValue(conditionalField, falseValue);
ingestDocument.setFieldValue("error", true); ingestDocument.setFieldValue("error", true);
processor.execute(ingestDocument); processor.execute(ingestDocument, (result, e) -> {});
assertStats(processor, 0, 0, 0); assertStats(processor, 0, 0, 0);
//true, always call processor and increments metrics //true, always call processor and increments metrics
ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
ingestDocument.setFieldValue(conditionalField, trueValue); ingestDocument.setFieldValue(conditionalField, trueValue);
processor.execute(ingestDocument); processor.execute(ingestDocument, (result, e) -> {});
assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(trueValue)); assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(trueValue));
assertThat(ingestDocument.getSourceAndMetadata().get("foo"), is("bar")); assertThat(ingestDocument.getSourceAndMetadata().get("foo"), is("bar"));
assertStats(processor, 1, 0, 1); assertStats(processor, 1, 0, 1);
@ -121,7 +121,9 @@ public class ConditionalProcessorTests extends ESTestCase {
ingestDocument.setFieldValue(conditionalField, trueValue); ingestDocument.setFieldValue(conditionalField, trueValue);
ingestDocument.setFieldValue("error", true); ingestDocument.setFieldValue("error", true);
IngestDocument finalIngestDocument = ingestDocument; IngestDocument finalIngestDocument = ingestDocument;
expectThrows(RuntimeException.class, () -> processor.execute(finalIngestDocument)); Exception holder[] = new Exception[1];
processor.execute(finalIngestDocument, (result, e) -> {holder[0] = e;});
assertThat(holder[0], instanceOf(RuntimeException.class));
assertStats(processor, 2, 1, 2); assertStats(processor, 2, 1, 2);
} }
@ -177,7 +179,7 @@ public class ConditionalProcessorTests extends ESTestCase {
}, relativeTimeProvider); }, relativeTimeProvider);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap()); IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
processor.execute(ingestDocument); processor.execute(ingestDocument, (result, e) -> {});
assertWarnings("[types removal] Looking up doc types [_type] in scripts is deprecated."); assertWarnings("[types removal] Looking up doc types [_type] in scripts is deprecated.");
} }
@ -213,7 +215,7 @@ public class ConditionalProcessorTests extends ESTestCase {
); );
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
ingestDocument.setFieldValue("listField", new ArrayList<>()); ingestDocument.setFieldValue("listField", new ArrayList<>());
processor.execute(ingestDocument); processor.execute(ingestDocument, (result, e) -> {});
Exception e = expectedException.get(); Exception e = expectedException.get();
assertThat(e, instanceOf(UnsupportedOperationException.class)); assertThat(e, instanceOf(UnsupportedOperationException.class));
assertEquals("Mutating ingest documents in conditionals is not supported", e.getMessage()); assertEquals("Mutating ingest documents in conditionals is not supported", e.getMessage());

View File

@ -63,7 +63,7 @@ import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Consumer; import java.util.function.IntConsumer;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet; import static java.util.Collections.emptySet;
@ -75,6 +75,7 @@ import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
@ -121,20 +122,20 @@ public class IngestServiceTests extends ESTestCase {
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
final SetOnce<Boolean> failure = new SetOnce<>(); final SetOnce<Boolean> failure = new SetOnce<>();
final BiConsumer<IndexRequest, Exception> failureHandler = (request, e) -> { final BiConsumer<Integer, Exception> failureHandler = (slot, e) -> {
failure.set(true); failure.set(true);
assertThat(request, sameInstance(indexRequest)); assertThat(slot, equalTo(0));
assertThat(e, instanceOf(IllegalArgumentException.class)); assertThat(e, instanceOf(IllegalArgumentException.class));
assertThat(e.getMessage(), equalTo("pipeline with id [_id] does not exist")); assertThat(e.getMessage(), equalTo("pipeline with id [_id] does not exist"));
}; };
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final Consumer<Exception> completionHandler = mock(Consumer.class); final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
assertTrue(failure.get()); assertTrue(failure.get());
verify(completionHandler, times(1)).accept(null); verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
} }
public void testUpdatePipelines() { public void testUpdatePipelines() {
@ -540,7 +541,7 @@ public class IngestServiceTests extends ESTestCase {
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
final SetOnce<Boolean> failure = new SetOnce<>(); final SetOnce<Boolean> failure = new SetOnce<>();
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline(id); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline(id);
final BiConsumer<IndexRequest, Exception> failureHandler = (request, e) -> { final BiConsumer<Integer, Exception> failureHandler = (slot, e) -> {
assertThat(e.getCause(), instanceOf(IllegalArgumentException.class)); assertThat(e.getCause(), instanceOf(IllegalArgumentException.class));
assertThat(e.getCause().getCause(), instanceOf(IllegalStateException.class)); assertThat(e.getCause().getCause(), instanceOf(IllegalStateException.class));
assertThat(e.getCause().getCause().getMessage(), equalTo("error")); assertThat(e.getCause().getCause().getMessage(), equalTo("error"));
@ -548,17 +549,17 @@ public class IngestServiceTests extends ESTestCase {
}; };
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final Consumer<Exception> completionHandler = mock(Consumer.class); final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
assertTrue(failure.get()); assertTrue(failure.get());
verify(completionHandler, times(1)).accept(null); verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
} }
public void testExecuteBulkPipelineDoesNotExist() { public void testExecuteBulkPipelineDoesNotExist() {
IngestService ingestService = createWithProcessors(Collections.singletonMap( IngestService ingestService = createWithProcessors(Collections.singletonMap(
"mock", (factories, tag, config) -> mock(CompoundProcessor.class))); "mock", (factories, tag, config) -> mockCompoundProcessor()));
PutPipelineRequest putRequest = new PutPipelineRequest("_id", PutPipelineRequest putRequest = new PutPipelineRequest("_id",
new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON);
@ -575,15 +576,16 @@ public class IngestServiceTests extends ESTestCase {
new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("does_not_exist"); new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("does_not_exist");
bulkRequest.add(indexRequest2); bulkRequest.add(indexRequest2);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class); BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Consumer<Exception> completionHandler = mock(Consumer.class); final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
ingestService.executeBulkRequest(bulkRequest.requests(), failureHandler, completionHandler, indexReq -> {}); ingestService.executeBulkRequest(bulkRequest.numberOfActions(), bulkRequest.requests(), failureHandler,
completionHandler, indexReq -> {});
verify(failureHandler, times(1)).accept( verify(failureHandler, times(1)).accept(
argThat(new CustomTypeSafeMatcher<IndexRequest>("failure handler was not called with the expected arguments") { argThat(new CustomTypeSafeMatcher<>("failure handler was not called with the expected arguments") {
@Override @Override
protected boolean matchesSafely(IndexRequest item) { protected boolean matchesSafely(Integer item) {
return item == indexRequest2; return item == 1;
} }
}), }),
@ -594,12 +596,12 @@ public class IngestServiceTests extends ESTestCase {
} }
}) })
); );
verify(completionHandler, times(1)).accept(null); verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
} }
public void testExecuteSuccess() { public void testExecuteSuccess() {
IngestService ingestService = createWithProcessors(Collections.singletonMap( IngestService ingestService = createWithProcessors(Collections.singletonMap(
"mock", (factories, tag, config) -> mock(CompoundProcessor.class))); "mock", (factories, tag, config) -> mockCompoundProcessor()));
PutPipelineRequest putRequest = new PutPipelineRequest("_id", PutPipelineRequest putRequest = new PutPipelineRequest("_id",
new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON);
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
@ -608,12 +610,12 @@ public class IngestServiceTests extends ESTestCase {
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class); final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final Consumer<Exception> completionHandler = mock(Consumer.class); final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
verify(failureHandler, never()).accept(any(), any()); verify(failureHandler, never()).accept(any(), any());
verify(completionHandler, times(1)).accept(null); verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
} }
public void testExecuteEmptyPipeline() throws Exception { public void testExecuteEmptyPipeline() throws Exception {
@ -626,16 +628,16 @@ public class IngestServiceTests extends ESTestCase {
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class); final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final Consumer<Exception> completionHandler = mock(Consumer.class); final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
verify(failureHandler, never()).accept(any(), any()); verify(failureHandler, never()).accept(any(), any());
verify(completionHandler, times(1)).accept(null); verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
} }
public void testExecutePropagateAllMetaDataUpdates() throws Exception { public void testExecutePropagateAllMetaDataUpdates() throws Exception {
final CompoundProcessor processor = mock(CompoundProcessor.class); final CompoundProcessor processor = mockCompoundProcessor();
IngestService ingestService = createWithProcessors(Collections.singletonMap( IngestService ingestService = createWithProcessors(Collections.singletonMap(
"mock", (factories, tag, config) -> processor)); "mock", (factories, tag, config) -> processor));
PutPipelineRequest putRequest = new PutPipelineRequest("_id", PutPipelineRequest putRequest = new PutPipelineRequest("_id",
@ -657,17 +659,21 @@ public class IngestServiceTests extends ESTestCase {
ingestDocument.setFieldValue(metaData.getFieldName(), "update" + metaData.getFieldName()); ingestDocument.setFieldValue(metaData.getFieldName(), "update" + metaData.getFieldName());
} }
} }
return ingestDocument;
}).when(processor).execute(any()); @SuppressWarnings("unchecked")
BiConsumer<IngestDocument, Exception> handler = (BiConsumer<IngestDocument, Exception>) invocationOnMock.getArguments()[1];
handler.accept(ingestDocument, null);
return null;
}).when(processor).execute(any(), any());
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class); final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final Consumer<Exception> completionHandler = mock(Consumer.class); final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
verify(processor).execute(any()); verify(processor).execute(any(), any());
verify(failureHandler, never()).accept(any(), any()); verify(failureHandler, never()).accept(any(), any());
verify(completionHandler, times(1)).accept(null); verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
assertThat(indexRequest.index(), equalTo("update_index")); assertThat(indexRequest.index(), equalTo("update_index"));
assertThat(indexRequest.type(), equalTo("update_type")); assertThat(indexRequest.type(), equalTo("update_type"));
assertThat(indexRequest.id(), equalTo("update_id")); assertThat(indexRequest.id(), equalTo("update_id"));
@ -677,7 +683,7 @@ public class IngestServiceTests extends ESTestCase {
} }
public void testExecuteFailure() throws Exception { public void testExecuteFailure() throws Exception {
final CompoundProcessor processor = mock(CompoundProcessor.class); final CompoundProcessor processor = mockCompoundProcessor();
IngestService ingestService = createWithProcessors(Collections.singletonMap( IngestService ingestService = createWithProcessors(Collections.singletonMap(
"mock", (factories, tag, config) -> processor)); "mock", (factories, tag, config) -> processor));
PutPipelineRequest putRequest = new PutPipelineRequest("_id", PutPipelineRequest putRequest = new PutPipelineRequest("_id",
@ -689,22 +695,37 @@ public class IngestServiceTests extends ESTestCase {
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
doThrow(new RuntimeException()) doThrow(new RuntimeException())
.when(processor) .when(processor)
.execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any());
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class); final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final Consumer<Exception> completionHandler = mock(Consumer.class); final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any());
verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class)); verify(failureHandler, times(1)).accept(eq(0), any(RuntimeException.class));
verify(completionHandler, times(1)).accept(null); verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
} }
public void testExecuteSuccessWithOnFailure() throws Exception { public void testExecuteSuccessWithOnFailure() throws Exception {
final Processor processor = mock(Processor.class); final Processor processor = mock(Processor.class);
when(processor.getType()).thenReturn("mock_processor_type"); when(processor.getType()).thenReturn("mock_processor_type");
when(processor.getTag()).thenReturn("mock_processor_tag"); when(processor.getTag()).thenReturn("mock_processor_tag");
doAnswer(args -> {
@SuppressWarnings("unchecked")
BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
handler.accept(null, new RuntimeException());
return null;
}).when(processor).execute(eqIndexTypeId(emptyMap()), any());
final Processor onFailureProcessor = mock(Processor.class); final Processor onFailureProcessor = mock(Processor.class);
doAnswer(args -> {
IngestDocument ingestDocument = (IngestDocument) args.getArguments()[0];
@SuppressWarnings("unchecked")
BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
handler.accept(ingestDocument, null);
return null;
}).when(onFailureProcessor).execute(eqIndexTypeId(emptyMap()), any());
final CompoundProcessor compoundProcessor = new CompoundProcessor( final CompoundProcessor compoundProcessor = new CompoundProcessor(
false, Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor))); false, Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor)));
IngestService ingestService = createWithProcessors(Collections.singletonMap( IngestService ingestService = createWithProcessors(Collections.singletonMap(
@ -716,14 +737,13 @@ public class IngestServiceTests extends ESTestCase {
clusterState = IngestService.innerPut(putRequest, clusterState); clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
doThrow(new RuntimeException()).when(processor).execute(eqIndexTypeId(emptyMap()));
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class); final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final Consumer<Exception> completionHandler = mock(Consumer.class); final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
verify(failureHandler, never()).accept(eq(indexRequest), any(ElasticsearchException.class)); verify(failureHandler, never()).accept(eq(0), any(ElasticsearchException.class));
verify(completionHandler, times(1)).accept(null); verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
} }
public void testExecuteFailureWithNestedOnFailure() throws Exception { public void testExecuteFailureWithNestedOnFailure() throws Exception {
@ -747,21 +767,21 @@ public class IngestServiceTests extends ESTestCase {
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
doThrow(new RuntimeException()) doThrow(new RuntimeException())
.when(onFailureOnFailureProcessor) .when(onFailureOnFailureProcessor)
.execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any());
doThrow(new RuntimeException()) doThrow(new RuntimeException())
.when(onFailureProcessor) .when(onFailureProcessor)
.execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any());
doThrow(new RuntimeException()) doThrow(new RuntimeException())
.when(processor) .when(processor)
.execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any());
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class); final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final Consumer<Exception> completionHandler = mock(Consumer.class); final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any());
verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class)); verify(failureHandler, times(1)).accept(eq(0), any(RuntimeException.class));
verify(completionHandler, times(1)).accept(null); verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
} }
public void testBulkRequestExecutionWithFailures() throws Exception { public void testBulkRequestExecutionWithFailures() throws Exception {
@ -790,7 +810,12 @@ public class IngestServiceTests extends ESTestCase {
CompoundProcessor processor = mock(CompoundProcessor.class); CompoundProcessor processor = mock(CompoundProcessor.class);
when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class))); when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class)));
Exception error = new RuntimeException(); Exception error = new RuntimeException();
doThrow(error).when(processor).execute(any()); doAnswer(args -> {
@SuppressWarnings("unchecked")
BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
handler.accept(null, error);
return null;
}).when(processor).execute(any(), any());
IngestService ingestService = createWithProcessors(Collections.singletonMap( IngestService ingestService = createWithProcessors(Collections.singletonMap(
"mock", (factories, tag, config) -> processor)); "mock", (factories, tag, config) -> processor));
PutPipelineRequest putRequest = new PutPipelineRequest("_id", PutPipelineRequest putRequest = new PutPipelineRequest("_id",
@ -801,18 +826,18 @@ public class IngestServiceTests extends ESTestCase {
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
BiConsumer<IndexRequest, Exception> requestItemErrorHandler = mock(BiConsumer.class); BiConsumer<Integer, Exception> requestItemErrorHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Consumer<Exception> completionHandler = mock(Consumer.class); final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
ingestService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {}); ingestService.executeBulkRequest(numRequest, bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {});
verify(requestItemErrorHandler, times(numIndexRequests)).accept(any(IndexRequest.class), argThat(new ArgumentMatcher<Exception>() { verify(requestItemErrorHandler, times(numIndexRequests)).accept(anyInt(), argThat(new ArgumentMatcher<Exception>() {
@Override @Override
public boolean matches(final Object o) { public boolean matches(final Object o) {
return ((Exception)o).getCause().getCause().equals(error); return ((Exception)o).getCause().getCause().equals(error);
} }
})); }));
verify(completionHandler, times(1)).accept(null); verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
} }
public void testBulkRequestExecution() { public void testBulkRequestExecution() {
@ -835,13 +860,13 @@ public class IngestServiceTests extends ESTestCase {
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
BiConsumer<IndexRequest, Exception> requestItemErrorHandler = mock(BiConsumer.class); BiConsumer<Integer, Exception> requestItemErrorHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Consumer<Exception> completionHandler = mock(Consumer.class); final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
ingestService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {}); ingestService.executeBulkRequest(numRequest, bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {});
verify(requestItemErrorHandler, never()).accept(any(), any()); verify(requestItemErrorHandler, never()).accept(any(), any());
verify(completionHandler, times(1)).accept(null); verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
} }
public void testStats() throws Exception { public void testStats() throws Exception {
@ -851,8 +876,18 @@ public class IngestServiceTests extends ESTestCase {
when(processor.getTag()).thenReturn("mockTag"); when(processor.getTag()).thenReturn("mockTag");
when(processorFailure.getType()).thenReturn("failure-mock"); when(processorFailure.getType()).thenReturn("failure-mock");
//avoid returning null and dropping the document //avoid returning null and dropping the document
when(processor.execute(any(IngestDocument.class))).thenReturn( RandomDocumentPicks.randomIngestDocument(random())); doAnswer(args -> {
when(processorFailure.execute(any(IngestDocument.class))).thenThrow(new RuntimeException("error")); @SuppressWarnings("unchecked")
BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
handler.accept(RandomDocumentPicks.randomIngestDocument(random()), null);
return null;
}).when(processor).execute(any(IngestDocument.class), any());
doAnswer(args -> {
@SuppressWarnings("unchecked")
BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
handler.accept(null, new RuntimeException("error"));
return null;
}).when(processorFailure).execute(any(IngestDocument.class), any());
Map<String, Processor.Factory> map = new HashMap<>(2); Map<String, Processor.Factory> map = new HashMap<>(2);
map.put("mock", (factories, tag, config) -> processor); map.put("mock", (factories, tag, config) -> processor);
map.put("failure-mock", (factories, tag, config) -> processorFailure); map.put("failure-mock", (factories, tag, config) -> processorFailure);
@ -874,13 +909,13 @@ public class IngestServiceTests extends ESTestCase {
clusterState = IngestService.innerPut(putRequest, clusterState); clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
@SuppressWarnings("unchecked") final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked") final Consumer<Exception> completionHandler = mock(Consumer.class); @SuppressWarnings("unchecked") final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
final IndexRequest indexRequest = new IndexRequest("_index"); final IndexRequest indexRequest = new IndexRequest("_index");
indexRequest.setPipeline("_id1"); indexRequest.setPipeline("_id1");
indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10)); indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10));
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
final IngestStats afterFirstRequestStats = ingestService.stats(); final IngestStats afterFirstRequestStats = ingestService.stats();
assertThat(afterFirstRequestStats.getPipelineStats().size(), equalTo(2)); assertThat(afterFirstRequestStats.getPipelineStats().size(), equalTo(2));
@ -898,7 +933,7 @@ public class IngestServiceTests extends ESTestCase {
indexRequest.setPipeline("_id2"); indexRequest.setPipeline("_id2");
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
final IngestStats afterSecondRequestStats = ingestService.stats(); final IngestStats afterSecondRequestStats = ingestService.stats();
assertThat(afterSecondRequestStats.getPipelineStats().size(), equalTo(2)); assertThat(afterSecondRequestStats.getPipelineStats().size(), equalTo(2));
//total //total
@ -917,7 +952,7 @@ public class IngestServiceTests extends ESTestCase {
clusterState = IngestService.innerPut(putRequest, clusterState); clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
indexRequest.setPipeline("_id1"); indexRequest.setPipeline("_id1");
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
final IngestStats afterThirdRequestStats = ingestService.stats(); final IngestStats afterThirdRequestStats = ingestService.stats();
assertThat(afterThirdRequestStats.getPipelineStats().size(), equalTo(2)); assertThat(afterThirdRequestStats.getPipelineStats().size(), equalTo(2));
//total //total
@ -941,7 +976,7 @@ public class IngestServiceTests extends ESTestCase {
clusterState = IngestService.innerPut(putRequest, clusterState); clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
indexRequest.setPipeline("_id1"); indexRequest.setPipeline("_id1");
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
final IngestStats afterForthRequestStats = ingestService.stats(); final IngestStats afterForthRequestStats = ingestService.stats();
assertThat(afterForthRequestStats.getPipelineStats().size(), equalTo(2)); assertThat(afterForthRequestStats.getPipelineStats().size(), equalTo(2));
//total //total
@ -1007,19 +1042,19 @@ public class IngestServiceTests extends ESTestCase {
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class); final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final Consumer<Exception> completionHandler = mock(Consumer.class); final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final Consumer<IndexRequest> dropHandler = mock(Consumer.class); final IntConsumer dropHandler = mock(IntConsumer.class);
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, dropHandler); ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, dropHandler);
verify(failureHandler, never()).accept(any(), any()); verify(failureHandler, never()).accept(any(), any());
verify(completionHandler, times(1)).accept(null); verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
verify(dropHandler, times(1)).accept(indexRequest); verify(dropHandler, times(1)).accept(0);
} }
private IngestDocument eqIndexTypeId(final Map<String, Object> source) { private IngestDocument eqIndexTypeId(final Map<String, Object> source) {
return argThat(new IngestDocumentMatcher("_index", "_type", "_id", source)); return argThat(new IngestDocumentMatcher("_index", "_type", "_id", -3L, VersionType.INTERNAL, source));
} }
private IngestDocument eqIndexTypeId(final Long version, final VersionType versionType, final Map<String, Object> source) { private IngestDocument eqIndexTypeId(final Long version, final VersionType versionType, final Map<String, Object> source) {
@ -1085,6 +1120,17 @@ public class IngestServiceTests extends ESTestCase {
}), null); }), null);
} }
private CompoundProcessor mockCompoundProcessor() {
CompoundProcessor processor = mock(CompoundProcessor.class);
doAnswer(args -> {
@SuppressWarnings("unchecked")
BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
handler.accept((IngestDocument) args.getArguments()[0], null);
return null;
}).when(processor).execute(any(), any());
return processor;
}
private class IngestDocumentMatcher extends ArgumentMatcher<IngestDocument> { private class IngestDocumentMatcher extends ArgumentMatcher<IngestDocument> {
private final IngestDocument ingestDocument; private final IngestDocument ingestDocument;

View File

@ -64,7 +64,7 @@ public class PipelineProcessorTests extends ESTestCase {
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
Map<String, Object> config = new HashMap<>(); Map<String, Object> config = new HashMap<>();
config.put("name", pipelineId); config.put("name", pipelineId);
factory.create(Collections.emptyMap(), null, config).execute(testIngestDocument); factory.create(Collections.emptyMap(), null, config).execute(testIngestDocument, (result, e) -> {});
assertEquals(testIngestDocument, invoked.get()); assertEquals(testIngestDocument, invoked.get());
} }
@ -74,12 +74,11 @@ public class PipelineProcessorTests extends ESTestCase {
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
Map<String, Object> config = new HashMap<>(); Map<String, Object> config = new HashMap<>();
config.put("name", "missingPipelineId"); config.put("name", "missingPipelineId");
IllegalStateException e = expectThrows( IllegalStateException[] e = new IllegalStateException[1];
IllegalStateException.class, factory.create(Collections.emptyMap(), null, config)
() -> factory.create(Collections.emptyMap(), null, config).execute(testIngestDocument) .execute(testIngestDocument, (result, e1) -> e[0] = (IllegalStateException) e1);
);
assertEquals( assertEquals(
"Pipeline processor configured for non-existent pipeline [missingPipelineId]", e.getMessage() "Pipeline processor configured for non-existent pipeline [missingPipelineId]", e[0].getMessage()
); );
} }
@ -104,12 +103,11 @@ public class PipelineProcessorTests extends ESTestCase {
when(ingestService.getPipeline(outerPipelineId)).thenReturn(outer); when(ingestService.getPipeline(outerPipelineId)).thenReturn(outer);
when(ingestService.getPipeline(innerPipelineId)).thenReturn(inner); when(ingestService.getPipeline(innerPipelineId)).thenReturn(inner);
outerConfig.put("name", innerPipelineId); outerConfig.put("name", innerPipelineId);
ElasticsearchException e = expectThrows( ElasticsearchException[] e = new ElasticsearchException[1];
ElasticsearchException.class, factory.create(Collections.emptyMap(), null, outerConfig)
() -> factory.create(Collections.emptyMap(), null, outerConfig).execute(testIngestDocument) .execute(testIngestDocument, (result, e1) -> e[0] = (ElasticsearchException) e1);
);
assertEquals( assertEquals(
"Cycle detected for pipeline: inner", e.getRootCause().getMessage() "Cycle detected for pipeline: inner", e[0].getRootCause().getMessage()
); );
} }
@ -125,8 +123,8 @@ public class PipelineProcessorTests extends ESTestCase {
); );
when(ingestService.getPipeline(innerPipelineId)).thenReturn(inner); when(ingestService.getPipeline(innerPipelineId)).thenReturn(inner);
Processor outerProc = factory.create(Collections.emptyMap(), null, outerConfig); Processor outerProc = factory.create(Collections.emptyMap(), null, outerConfig);
outerProc.execute(testIngestDocument); outerProc.execute(testIngestDocument, (result, e) -> {});
outerProc.execute(testIngestDocument); outerProc.execute(testIngestDocument, (result, e) -> {});
} }
public void testPipelineProcessorWithPipelineChain() throws Exception { public void testPipelineProcessorWithPipelineChain() throws Exception {
@ -177,7 +175,7 @@ public class PipelineProcessorTests extends ESTestCase {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
//start the chain //start the chain
ingestDocument.executePipeline(pipeline1); ingestDocument.executePipeline(pipeline1, (result, e) -> {});
assertNotNull(ingestDocument.getSourceAndMetadata().get(key1)); assertNotNull(ingestDocument.getSourceAndMetadata().get(key1));
//check the stats //check the stats

View File

@ -66,7 +66,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
public void testActualProcessor() throws Exception { public void testActualProcessor() throws Exception {
TestProcessor actualProcessor = new TestProcessor(ingestDocument -> {}); TestProcessor actualProcessor = new TestProcessor(ingestDocument -> {});
TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, resultList); TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, resultList);
trackingProcessor.execute(ingestDocument); trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
@ -84,12 +84,9 @@ public class TrackingResultProcessorTests extends ESTestCase {
CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor); CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor);
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
try { Exception[] holder = new Exception[1];
trackingProcessor.execute(ingestDocument); trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e);
fail("processor should throw exception"); assertThat(((ElasticsearchException) holder[0]).getRootCause().getMessage(), equalTo(exception.getMessage()));
} catch (ElasticsearchException e) {
assertThat(e.getRootCause().getMessage(), equalTo(exception.getMessage()));
}
SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument);
assertThat(testProcessor.getInvokedCounter(), equalTo(1)); assertThat(testProcessor.getInvokedCounter(), equalTo(1));
@ -109,7 +106,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
Arrays.asList(onFailureProcessor, failProcessor))), Arrays.asList(onFailureProcessor, failProcessor))),
Arrays.asList(onFailureProcessor)); Arrays.asList(onFailureProcessor));
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
trackingProcessor.execute(ingestDocument); trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument);
SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), ingestDocument);
@ -148,7 +145,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
Collections.emptyList()); Collections.emptyList());
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
trackingProcessor.execute(ingestDocument); trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument);
assertThat(testProcessor.getInvokedCounter(), equalTo(1)); assertThat(testProcessor.getInvokedCounter(), equalTo(1));
@ -178,7 +175,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key3, randomInt()); })); new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key3, randomInt()); }));
CompoundProcessor trackingProcessor = decorate(compoundProcessor, resultList); CompoundProcessor trackingProcessor = decorate(compoundProcessor, resultList);
trackingProcessor.execute(ingestDocument); trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(compoundProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedResult = new SimulateProcessorResult(compoundProcessor.getTag(), ingestDocument);
//the step for key 2 is never executed due to conditional and thus not part of the result set //the step for key 2 is never executed due to conditional and thus not part of the result set
@ -221,7 +218,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
trackingProcessor.execute(ingestDocument); trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
@ -287,7 +284,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
trackingProcessor.execute(ingestDocument); trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
@ -355,7 +352,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
trackingProcessor.execute(ingestDocument); trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
@ -407,7 +404,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
trackingProcessor.execute(ingestDocument); trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
@ -457,7 +454,9 @@ public class TrackingResultProcessorTests extends ESTestCase {
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
ElasticsearchException exception = expectThrows(ElasticsearchException.class, () -> trackingProcessor.execute(ingestDocument)); Exception[] holder = new Exception[1];
trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e);
ElasticsearchException exception = (ElasticsearchException) holder[0];
assertThat(exception.getCause(), instanceOf(IllegalArgumentException.class)); assertThat(exception.getCause(), instanceOf(IllegalArgumentException.class));
assertThat(exception.getCause().getCause(), instanceOf(IllegalStateException.class)); assertThat(exception.getCause().getCause(), instanceOf(IllegalStateException.class));
assertThat(exception.getMessage(), containsString("Cycle detected for pipeline: pipeline1")); assertThat(exception.getMessage(), containsString("Cycle detected for pipeline: pipeline1"));
@ -482,7 +481,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
trackingProcessor.execute(ingestDocument); trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);

View File

@ -5,11 +5,11 @@
*/ */
package org.elasticsearch.xpack.enrich; package org.elasticsearch.xpack.enrich;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.routing.Preference; import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.index.query.ConstantScoreQueryBuilder; import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.AbstractProcessor;
@ -21,13 +21,14 @@ import org.elasticsearch.xpack.enrich.EnrichProcessorFactory.EnrichSpecification
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.function.BiConsumer;
final class ExactMatchProcessor extends AbstractProcessor { final class ExactMatchProcessor extends AbstractProcessor {
static final String ENRICH_KEY_FIELD_NAME = "enrich_key_field"; static final String ENRICH_KEY_FIELD_NAME = "enrich_key_field";
private final CheckedFunction<SearchRequest, SearchResponse, Exception> searchRunner; private final BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner;
private final String policyName; private final String policyName;
private final String enrichKey; private final String enrichKey;
private final boolean ignoreMissing; private final boolean ignoreMissing;
@ -39,11 +40,18 @@ final class ExactMatchProcessor extends AbstractProcessor {
String enrichKey, String enrichKey,
boolean ignoreMissing, boolean ignoreMissing,
List<EnrichSpecification> specifications) { List<EnrichSpecification> specifications) {
this(tag, (req) -> client.search(req).actionGet(), policyName, enrichKey, ignoreMissing, specifications); this(
tag,
createSearchRunner(client),
policyName,
enrichKey,
ignoreMissing,
specifications
);
} }
ExactMatchProcessor(String tag, ExactMatchProcessor(String tag,
CheckedFunction<SearchRequest, SearchResponse, Exception> searchRunner, BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
String policyName, String policyName,
String enrichKey, String enrichKey,
boolean ignoreMissing, boolean ignoreMissing,
@ -57,11 +65,13 @@ final class ExactMatchProcessor extends AbstractProcessor {
} }
@Override @Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception { public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
try {
// If a document does not have the enrich key, return the unchanged document // If a document does not have the enrich key, return the unchanged document
final String value = ingestDocument.getFieldValue(enrichKey, String.class, ignoreMissing); final String value = ingestDocument.getFieldValue(enrichKey, String.class, ignoreMissing);
if (value == null) { if (value == null) {
return ingestDocument; handler.accept(ingestDocument, null);
return;
} }
TermQueryBuilder termQuery = new TermQueryBuilder(enrichKey, value); TermQueryBuilder termQuery = new TermQueryBuilder(enrichKey, value);
@ -78,17 +88,22 @@ final class ExactMatchProcessor extends AbstractProcessor {
req.preference(Preference.LOCAL.type()); req.preference(Preference.LOCAL.type());
req.source(searchBuilder); req.source(searchBuilder);
// TODO: Make this Async searchRunner.accept(req, (searchResponse, e) -> {
SearchResponse searchResponse = searchRunner.apply(req); if (e != null) {
handler.accept(null, e);
return;
}
// If the index is empty, return the unchanged document // If the index is empty, return the unchanged document
// If the enrich key does not exist in the index, throw an error // If the enrich key does not exist in the index, throw an error
// If no documents match the key, return the unchanged document // If no documents match the key, return the unchanged document
SearchHit[] searchHits = searchResponse.getHits().getHits(); SearchHit[] searchHits = searchResponse.getHits().getHits();
if (searchHits.length < 1) { if (searchHits.length < 1) {
return ingestDocument; handler.accept(ingestDocument, null);
return;
} else if (searchHits.length > 1) { } else if (searchHits.length > 1) {
throw new IllegalStateException("more than one doc id matching for [" + enrichKey + "]"); handler.accept(null, new IllegalStateException("more than one doc id matching for [" + enrichKey + "]"));
return;
} }
// If a document is returned, add its fields to the document // If a document is returned, add its fields to the document
@ -98,7 +113,16 @@ final class ExactMatchProcessor extends AbstractProcessor {
Object enrichFieldValue = enrichDocument.get(specification.sourceField); Object enrichFieldValue = enrichDocument.get(specification.sourceField);
ingestDocument.setFieldValue(specification.targetField, enrichFieldValue); ingestDocument.setFieldValue(specification.targetField, enrichFieldValue);
} }
return ingestDocument; handler.accept(ingestDocument, null);
});
} catch (Exception e) {
handler.accept(null, e);
}
}
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
throw new UnsupportedOperationException("this method should not get executed");
} }
@Override @Override
@ -121,4 +145,31 @@ final class ExactMatchProcessor extends AbstractProcessor {
List<EnrichSpecification> getSpecifications() { List<EnrichSpecification> getSpecifications() {
return specifications; return specifications;
} }
// TODO: This is temporary and will be removed once internal transport action that does an efficient lookup instead of a search.
// This semaphore purpose is to throttle the number of concurrent search requests, if this is not done then search thread pool
// on nodes may get full and search request fail because they get rejected.
// Because this code is going to change, a semaphore seemed like an easy quick fix to address this problem.
private static final Semaphore SEMAPHORE = new Semaphore(100);
private static BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> createSearchRunner(Client client) {
return (req, handler) -> {
try {
SEMAPHORE.acquire();
} catch (InterruptedException e) {
Thread.interrupted();
handler.accept(null, e);
return;
}
client.search(req, ActionListener.wrap(
resp -> {
SEMAPHORE.release();
handler.accept(resp, null);
},
e -> {
SEMAPHORE.release();
handler.accept(null, e);
}));
};
}
} }

View File

@ -12,7 +12,6 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchResponseSections; import org.elasticsearch.action.search.SearchResponseSections;
import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.cluster.routing.Preference; import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.text.Text; import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
@ -37,11 +36,13 @@ import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.function.BiConsumer;
import static org.hamcrest.Matchers.arrayContainingInAnyOrder; import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class ExactMatchProcessorTests extends ESTestCase { public class ExactMatchProcessorTests extends ESTestCase {
@ -60,7 +61,9 @@ public class ExactMatchProcessorTests extends ESTestCase {
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.singletonMap("domain", "elastic.co")); Collections.singletonMap("domain", "elastic.co"));
// Run // Run
assertThat(processor.execute(ingestDocument), notNullValue()); IngestDocument[] holder = new IngestDocument[1];
processor.execute(ingestDocument, (result, e) -> holder[0] = result);
assertThat(holder[0], notNullValue());
// Check request // Check request
SearchRequest request = mockSearch.getCapturedRequest(); SearchRequest request = mockSearch.getCapturedRequest();
assertThat(request.indices().length, equalTo(1)); assertThat(request.indices().length, equalTo(1));
@ -88,7 +91,9 @@ public class ExactMatchProcessorTests extends ESTestCase {
Collections.singletonMap("domain", "elastic.com")); Collections.singletonMap("domain", "elastic.com"));
int numProperties = ingestDocument.getSourceAndMetadata().size(); int numProperties = ingestDocument.getSourceAndMetadata().size();
// Run // Run
assertThat(processor.execute(ingestDocument), notNullValue()); IngestDocument[] holder = new IngestDocument[1];
processor.execute(ingestDocument, (result, e) -> holder[0] = result);
assertThat(holder[0], notNullValue());
// Check request // Check request
SearchRequest request = mockSearch.getCapturedRequest(); SearchRequest request = mockSearch.getCapturedRequest();
assertThat(request.indices().length, equalTo(1)); assertThat(request.indices().length, equalTo(1));
@ -115,7 +120,15 @@ public class ExactMatchProcessorTests extends ESTestCase {
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.singletonMap("domain", "elastic.com")); Collections.singletonMap("domain", "elastic.com"));
// Run // Run
IndexNotFoundException expectedException = expectThrows(IndexNotFoundException.class, () -> processor.execute(ingestDocument)); IngestDocument[] resultHolder = new IngestDocument[1];
Exception[] exceptionHolder = new Exception[1];
processor.execute(ingestDocument, (result, e) -> {
resultHolder[0] = result;
exceptionHolder[0] = e;
});
assertThat(resultHolder[0], nullValue());
assertThat(exceptionHolder[0], notNullValue());
assertThat(exceptionHolder[0], instanceOf(IndexNotFoundException.class));
// Check request // Check request
SearchRequest request = mockSearch.getCapturedRequest(); SearchRequest request = mockSearch.getCapturedRequest();
assertThat(request.indices().length, equalTo(1)); assertThat(request.indices().length, equalTo(1));
@ -131,7 +144,7 @@ public class ExactMatchProcessorTests extends ESTestCase {
assertThat(termQueryBuilder.fieldName(), equalTo("domain")); assertThat(termQueryBuilder.fieldName(), equalTo("domain"));
assertThat(termQueryBuilder.value(), equalTo("elastic.com")); assertThat(termQueryBuilder.value(), equalTo("elastic.com"));
// Check result // Check result
assertThat(expectedException.getMessage(), equalTo("no such index [" + indexName + "]")); assertThat(exceptionHolder[0].getMessage(), equalTo("no such index [" + indexName + "]"));
} }
public void testIgnoreKeyMissing() throws Exception { public void testIgnoreKeyMissing() throws Exception {
@ -142,7 +155,9 @@ public class ExactMatchProcessorTests extends ESTestCase {
Collections.emptyMap()); Collections.emptyMap());
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6)); assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6));
assertThat(processor.execute(ingestDocument), notNullValue()); IngestDocument[] holder = new IngestDocument[1];
processor.execute(ingestDocument, (result, e) -> holder[0] = result);
assertThat(holder[0], notNullValue());
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6)); assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6));
} }
{ {
@ -150,11 +165,19 @@ public class ExactMatchProcessorTests extends ESTestCase {
false, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); false, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.emptyMap()); Collections.emptyMap());
expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument)); IngestDocument[] resultHolder = new IngestDocument[1];
Exception[] exceptionHolder = new Exception[1];
processor.execute(ingestDocument, (result, e) -> {
resultHolder[0] = result;
exceptionHolder[0] = e;
});
assertThat(resultHolder[0], nullValue());
assertThat(exceptionHolder[0], notNullValue());
assertThat(exceptionHolder[0], instanceOf(IllegalArgumentException.class));
} }
} }
private static final class MockSearchFunction implements CheckedFunction<SearchRequest, SearchResponse, Exception> { private static final class MockSearchFunction implements BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> {
private final SearchResponse mockResponse; private final SearchResponse mockResponse;
private final SetOnce<SearchRequest> capturedRequest; private final SetOnce<SearchRequest> capturedRequest;
private final Exception exception; private final Exception exception;
@ -172,12 +195,12 @@ public class ExactMatchProcessorTests extends ESTestCase {
} }
@Override @Override
public SearchResponse apply(SearchRequest request) throws Exception { public void accept(SearchRequest request, BiConsumer<SearchResponse, Exception> handler) {
capturedRequest.set(request); capturedRequest.set(request);
if (exception != null) { if (exception != null) {
throw exception; handler.accept(null, exception);
} else { } else {
return mockResponse; handler.accept(mockResponse, null);
} }
} }