diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java index 3f49682183e..aa2e0a893a6 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java @@ -67,17 +67,21 @@ class SimulateExecutionService { public void execute(SimulatePipelineRequest.Parsed request, ActionListener listener) { threadPool.executor(THREAD_POOL_NAME).execute(ActionRunnable.wrap(listener, l -> { final AtomicInteger counter = new AtomicInteger(); - final List responses = new CopyOnWriteArrayList<>(); + final List responses = + new CopyOnWriteArrayList<>(new SimulateDocumentBaseResult[request.getDocuments().size()]); + int iter = 0; for (IngestDocument ingestDocument : request.getDocuments()) { + final int index = iter; executeDocument(request.getPipeline(), ingestDocument, request.isVerbose(), (response, e) -> { if (response != null) { - responses.add(response); + responses.set(index, response); } if (counter.incrementAndGet() == request.getDocuments().size()) { listener.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(), request.isVerbose(), responses)); } }); + iter++; } })); } diff --git a/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java b/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java index 1d1268039d2..43eaad41728 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java @@ -19,6 +19,9 @@ package org.elasticsearch.action.ingest; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.CompoundProcessor; import org.elasticsearch.ingest.DropProcessor; import org.elasticsearch.ingest.IngestDocument; @@ -29,18 +32,24 @@ import org.elasticsearch.ingest.RandomDocumentPicks; import org.elasticsearch.ingest.TestProcessor; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; -import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; import org.junit.Before; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -331,4 +340,56 @@ public class SimulateExecutionServiceTests extends ESTestCase { assertThat(verboseResult.getProcessorResults().get(1).getFailure(), nullValue()); } + public void testAsyncSimulation() throws Exception { + int numDocs = randomIntBetween(1, 64); + List documents = new ArrayList<>(numDocs); + for (int id = 0; id < numDocs; id++) { + documents.add(new IngestDocument("_index", "_type", Integer.toString(id), null, 0L, VersionType.INTERNAL, new HashMap<>())); + } + Processor processor1 = new AbstractProcessor(null) { + + @Override + public void execute(IngestDocument ingestDocument, BiConsumer handler) { + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + ingestDocument.setFieldValue("processed", true); + handler.accept(ingestDocument, null); + }); + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public String getType() { + return "none-of-your-business"; + } + }; + Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1)); + SimulatePipelineRequest.Parsed request = new SimulatePipelineRequest.Parsed(pipeline, documents, false); + + AtomicReference responseHolder = new AtomicReference<>(); + AtomicReference errorHolder = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + executionService.execute(request, ActionListener.wrap(response -> { + responseHolder.set(response); + latch.countDown(); + }, e -> { + errorHolder.set(e); + latch.countDown(); + })); + latch.await(1, TimeUnit.MINUTES); + assertThat(errorHolder.get(), nullValue()); + SimulatePipelineResponse response = responseHolder.get(); + assertThat(response, notNullValue()); + assertThat(response.getResults().size(), equalTo(numDocs)); + + for (int id = 0; id < numDocs; id++) { + SimulateDocumentBaseResult result = (SimulateDocumentBaseResult) response.getResults().get(id); + assertThat(result.getIngestDocument().getMetadata().get(IngestDocument.MetaData.ID), equalTo(Integer.toString(id))); + assertThat(result.getIngestDocument().getSourceAndMetadata().get("processed"), is(true)); + } + } + } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java index 7391719c901..06b3b30390c 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java @@ -63,7 +63,10 @@ public class EnrichCoordinatorProxyAction extends ActionType { @Override protected void doExecute(Task task, SearchRequest request, ActionListener listener) { - assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE); + // Write tp is expected when executing enrich processor from index / bulk api + // Management tp is expected when executing enrich processor from ingest simulate api + assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE) + || Thread.currentThread().getName().contains(ThreadPool.Names.MANAGEMENT); coordinator.schedule(request, listener); } }