Backport: Fix ingest simulate response document order if processor executes async (#50269)

Backport #50244 to 7.x branch.

If a processor executes asynchronously and the ingest simulate api simulates with
multiple documents then the order of the documents in the response may not match
the order of the documents in the request.

Alexander Reelsen discovered this issue with the enrich processor with the following reproduction:

```
PUT cities/_doc/munich
{"zip":"80331","city":"Munich"}

PUT cities/_doc/berlin
{"zip":"10965","city":"Berlin"}

PUT /_enrich/policy/zip-policy
{
  "match": {
    "indices": "cities",
    "match_field": "zip",
    "enrich_fields": [ "city" ]
  }
}

POST /_enrich/policy/zip-policy/_execute

GET _cat/indices/.enrich-*

POST /_ingest/pipeline/_simulate
{
  "pipeline": {
  "processors" : [
    {
      "enrich" : {
        "policy_name": "zip-policy",
        "field" : "zip",
        "target_field": "city",
        "max_matches": "1"
      }
    }
  ]
  },
  "docs": [
    { "_id": "first", "_source" : { "zip" : "80331" } } ,
    { "_id": "second", "_source" : { "zip" : "50667" } }
  ]
}
```

* fixed test compile error
This commit is contained in:
Martijn van Groningen 2019-12-17 12:27:07 +01:00 committed by GitHub
parent 4f24739fbe
commit 2079f1cbeb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 72 additions and 4 deletions

View File

@ -67,17 +67,21 @@ class SimulateExecutionService {
public void execute(SimulatePipelineRequest.Parsed request, ActionListener<SimulatePipelineResponse> listener) { public void execute(SimulatePipelineRequest.Parsed request, ActionListener<SimulatePipelineResponse> listener) {
threadPool.executor(THREAD_POOL_NAME).execute(ActionRunnable.wrap(listener, l -> { threadPool.executor(THREAD_POOL_NAME).execute(ActionRunnable.wrap(listener, l -> {
final AtomicInteger counter = new AtomicInteger(); final AtomicInteger counter = new AtomicInteger();
final List<SimulateDocumentResult> responses = new CopyOnWriteArrayList<>(); final List<SimulateDocumentResult> responses =
new CopyOnWriteArrayList<>(new SimulateDocumentBaseResult[request.getDocuments().size()]);
int iter = 0;
for (IngestDocument ingestDocument : request.getDocuments()) { for (IngestDocument ingestDocument : request.getDocuments()) {
final int index = iter;
executeDocument(request.getPipeline(), ingestDocument, request.isVerbose(), (response, e) -> { executeDocument(request.getPipeline(), ingestDocument, request.isVerbose(), (response, e) -> {
if (response != null) { if (response != null) {
responses.add(response); responses.set(index, response);
} }
if (counter.incrementAndGet() == request.getDocuments().size()) { if (counter.incrementAndGet() == request.getDocuments().size()) {
listener.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(), listener.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(),
request.isVerbose(), responses)); request.isVerbose(), responses));
} }
}); });
iter++;
} }
})); }));
} }

View File

@ -19,6 +19,9 @@
package org.elasticsearch.action.ingest; package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.CompoundProcessor; import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.DropProcessor; import org.elasticsearch.ingest.DropProcessor;
import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.IngestDocument;
@ -29,18 +32,24 @@ import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.TestProcessor; import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
@ -331,4 +340,56 @@ public class SimulateExecutionServiceTests extends ESTestCase {
assertThat(verboseResult.getProcessorResults().get(1).getFailure(), nullValue()); assertThat(verboseResult.getProcessorResults().get(1).getFailure(), nullValue());
} }
public void testAsyncSimulation() throws Exception {
int numDocs = randomIntBetween(1, 64);
List<IngestDocument> documents = new ArrayList<>(numDocs);
for (int id = 0; id < numDocs; id++) {
documents.add(new IngestDocument("_index", "_type", Integer.toString(id), null, 0L, VersionType.INTERNAL, new HashMap<>()));
}
Processor processor1 = new AbstractProcessor(null) {
@Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
ingestDocument.setFieldValue("processed", true);
handler.accept(ingestDocument, null);
});
}
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public String getType() {
return "none-of-your-business";
}
};
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1));
SimulatePipelineRequest.Parsed request = new SimulatePipelineRequest.Parsed(pipeline, documents, false);
AtomicReference<SimulatePipelineResponse> responseHolder = new AtomicReference<>();
AtomicReference<Exception> errorHolder = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
executionService.execute(request, ActionListener.wrap(response -> {
responseHolder.set(response);
latch.countDown();
}, e -> {
errorHolder.set(e);
latch.countDown();
}));
latch.await(1, TimeUnit.MINUTES);
assertThat(errorHolder.get(), nullValue());
SimulatePipelineResponse response = responseHolder.get();
assertThat(response, notNullValue());
assertThat(response.getResults().size(), equalTo(numDocs));
for (int id = 0; id < numDocs; id++) {
SimulateDocumentBaseResult result = (SimulateDocumentBaseResult) response.getResults().get(id);
assertThat(result.getIngestDocument().getMetadata().get(IngestDocument.MetaData.ID), equalTo(Integer.toString(id)));
assertThat(result.getIngestDocument().getSourceAndMetadata().get("processed"), is(true));
}
}
} }

View File

@ -63,7 +63,10 @@ public class EnrichCoordinatorProxyAction extends ActionType<SearchResponse> {
@Override @Override
protected void doExecute(Task task, SearchRequest request, ActionListener<SearchResponse> listener) { protected void doExecute(Task task, SearchRequest request, ActionListener<SearchResponse> listener) {
assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE); // Write tp is expected when executing enrich processor from index / bulk api
// Management tp is expected when executing enrich processor from ingest simulate api
assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE)
|| Thread.currentThread().getName().contains(ThreadPool.Names.MANAGEMENT);
coordinator.schedule(request, listener); coordinator.schedule(request, listener);
} }
} }