Allow dropping documents with auto-generated ID (#46773)

When using auto-generated IDs + the ingest drop processor (which looks to be used by filebeat
as well) + coordinating nodes that do not have the ingest processor functionality, this can lead
to a NullPointerException.

The issue is that markCurrentItemAsDropped() is creating an UpdateResponse with no id when
the request contains auto-generated IDs. The response serialization is lenient for our
REST/XContent format (i.e. we will send "id" : null) but the internal transport format (used for
communication between nodes) assumes for this field to be non-null, which means that it can't
be serialized between nodes. Bulk requests with ingest functionality are processed on the
coordinating node if the node has the ingest capability, and only otherwise sent to a different
node. This means that, in order to reproduce this, one needs two nodes, with the coordinating
node not having the ingest functionality.

Closes #46678
This commit is contained in:
Yannick Welsch 2019-09-19 16:45:45 +02:00
parent 7a3054c5dc
commit 9638ca20b0
17 changed files with 92 additions and 37 deletions

View File

@ -878,7 +878,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
new IndexResponse(
shardId,
index.type(),
index.id(),
index.id() == null ? "dummy_id" : index.id(),
randomInt(20),
randomIntBetween(1, 16),
randomIntBetween(0, Integer.MAX_VALUE),

View File

@ -41,6 +41,7 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.Locale;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
@ -122,13 +123,13 @@ public abstract class DocWriteResponse extends ReplicationResponse implements Wr
protected final Result result;
public DocWriteResponse(ShardId shardId, String type, String id, long seqNo, long primaryTerm, long version, Result result) {
this.shardId = shardId;
this.type = type;
this.id = id;
this.shardId = Objects.requireNonNull(shardId);
this.type = Objects.requireNonNull(type);
this.id = Objects.requireNonNull(id);
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
this.version = version;
this.result = result;
this.result = Objects.requireNonNull(result);
}
// needed for deserialization

View File

@ -101,6 +101,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
private final IngestActionForwarder ingestForwarder;
private final NodeClient client;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private static final String DROPPED_ITEM_WITH_AUTO_GENERATED_ID = "auto-generated";
@Inject
public TransportBulkAction(ThreadPool threadPool, TransportService transportService,
@ -676,11 +677,12 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
void markCurrentItemAsDropped() {
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(currentSlot));
failedSlots.set(currentSlot);
final String id = indexRequest.id() == null ? DROPPED_ITEM_WITH_AUTO_GENERATED_ID : indexRequest.id();
itemResponses.add(
new BulkItemResponse(currentSlot, indexRequest.opType(),
new UpdateResponse(
new ShardId(indexRequest.index(), IndexMetaData.INDEX_UUID_NA_VALUE, 0),
indexRequest.type(), indexRequest.id(), SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
indexRequest.type(), id, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
indexRequest.version(), DocWriteResponse.Result.NOOP
)
)

View File

@ -26,6 +26,7 @@ import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.client.NoOpClient;
import org.junit.After;
@ -226,7 +227,8 @@ public class RetryTests extends ESTestCase {
}
private BulkItemResponse successfulResponse() {
return new BulkItemResponse(1, OpType.DELETE, new DeleteResponse(null, null, null, 0, 0, 0, false));
return new BulkItemResponse(1, OpType.DELETE, new DeleteResponse(
new ShardId("test", "test", 0), "_doc", "test", 0, 0, 0, false));
}
private BulkItemResponse failedResponse() {

View File

@ -240,6 +240,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
Engine.IndexResult success = new FakeIndexResult(1, 1, 13, true, resultLocation);
IndexShard shard = mock(IndexShard.class);
when(shard.shardId()).thenReturn(shardId);
when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()))
.thenReturn(mappingUpdate);
@ -583,6 +584,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()))
.thenReturn(indexResult);
when(shard.indexSettings()).thenReturn(indexSettings);
when(shard.shardId()).thenReturn(shardId);
UpdateHelper updateHelper = mock(UpdateHelper.class);
when(updateHelper.prepare(any(), eq(shard), any())).thenReturn(
@ -629,6 +631,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
IndexShard shard = mock(IndexShard.class);
when(shard.applyDeleteOperationOnPrimary(anyLong(), any(), any(), any(), anyLong(), anyLong())).thenReturn(deleteResult);
when(shard.indexSettings()).thenReturn(indexSettings);
when(shard.shardId()).thenReturn(shardId);
UpdateHelper updateHelper = mock(UpdateHelper.class);
when(updateHelper.prepare(any(), eq(shard), any())).thenReturn(
@ -783,6 +786,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
}
});
when(shard.indexSettings()).thenReturn(indexSettings);
when(shard.shardId()).thenReturn(shardId);
UpdateHelper updateHelper = mock(UpdateHelper.class);
when(updateHelper.prepare(any(), eq(shard), any())).thenReturn(
@ -814,7 +818,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
private void randomlySetIgnoredPrimaryResponse(BulkItemRequest primaryRequest) {
if (randomBoolean()) {
// add a response to the request and thereby check that it is ignored for the primary.
primaryRequest.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, new IndexResponse(null, "_doc",
primaryRequest.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, new IndexResponse(shardId, "_doc",
"ignore-primary-response-on-primary", 42, 42, 42, false)));
}
}

View File

@ -101,8 +101,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
public void testExecuteVerboseItemExceptionWithoutOnFailure() throws Exception {
TestProcessor processor1 = new TestProcessor("processor_0", "mock", ingestDocument -> {});
TestProcessor processor2 = new TestProcessor("processor_1", "mock",
ingestDocument -> { throw new RuntimeException("processor failed"); });
TestProcessor processor2 = new TestProcessor("processor_1", "mock", new RuntimeException("processor failed"));
TestProcessor processor3 = new TestProcessor("processor_2", "mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2, processor3));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
@ -126,8 +125,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
}
public void testExecuteVerboseItemWithOnFailure() throws Exception {
TestProcessor processor1 = new TestProcessor("processor_0", "mock",
ingestDocument -> { throw new RuntimeException("processor failed"); });
TestProcessor processor1 = new TestProcessor("processor_0", "mock", new RuntimeException("processor failed"));
TestProcessor processor2 = new TestProcessor("processor_1", "mock", ingestDocument -> {});
TestProcessor processor3 = new TestProcessor("processor_2", "mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", version,
@ -165,7 +163,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
public void testExecuteVerboseItemExceptionWithIgnoreFailure() throws Exception {
RuntimeException exception = new RuntimeException("processor failed");
TestProcessor testProcessor = new TestProcessor("processor_0", "mock", ingestDocument -> { throw exception; });
TestProcessor testProcessor = new TestProcessor("processor_0", "mock", exception);
CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList());
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);

View File

@ -28,6 +28,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import static org.hamcrest.CoreMatchers.equalTo;
@ -74,7 +75,7 @@ public class CompoundProcessorTests extends ESTestCase {
}
public void testSingleProcessorWithException() throws Exception {
TestProcessor processor = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");});
TestProcessor processor = new TestProcessor(new RuntimeException("error"));
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor compoundProcessor = new CompoundProcessor(relativeTimeProvider, processor);
@ -93,7 +94,7 @@ public class CompoundProcessorTests extends ESTestCase {
}
public void testIgnoreFailure() throws Exception {
TestProcessor processor1 = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");});
TestProcessor processor1 = new TestProcessor(new RuntimeException("error"));
TestProcessor processor2 = new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue("field", "value");});
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
@ -108,7 +109,7 @@ public class CompoundProcessorTests extends ESTestCase {
}
public void testSingleProcessorWithOnFailureProcessor() throws Exception {
TestProcessor processor1 = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor processor1 = new TestProcessor("id", "first", new RuntimeException("error"));
TestProcessor processor2 = new TestProcessor(ingestDocument -> {
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.size(), equalTo(3));
@ -130,7 +131,7 @@ public class CompoundProcessorTests extends ESTestCase {
}
public void testSingleProcessorWithOnFailureDropProcessor() throws Exception {
TestProcessor processor1 = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor processor1 = new TestProcessor("id", "first", new RuntimeException("error"));
Processor processor2 = new Processor() {
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
@ -159,8 +160,8 @@ public class CompoundProcessorTests extends ESTestCase {
}
public void testSingleProcessorWithNestedFailures() throws Exception {
TestProcessor processor = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor processorToFail = new TestProcessor("id2", "second", ingestDocument -> {
TestProcessor processor = new TestProcessor("id", "first", new RuntimeException("error"));
TestProcessor processorToFail = new TestProcessor("id2", "second", (Consumer<IngestDocument>) ingestDocument -> {
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.size(), equalTo(3));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
@ -189,7 +190,7 @@ public class CompoundProcessorTests extends ESTestCase {
}
public void testCompoundProcessorExceptionFailWithoutOnFailure() throws Exception {
TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor firstProcessor = new TestProcessor("id1", "first", new RuntimeException("error"));
TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> {
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.entrySet(), hasSize(3));
@ -212,9 +213,9 @@ public class CompoundProcessorTests extends ESTestCase {
}
public void testCompoundProcessorExceptionFail() throws Exception {
TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor firstProcessor = new TestProcessor("id1", "first", new RuntimeException("error"));
TestProcessor failProcessor =
new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");});
new TestProcessor("tag_fail", "fail", new RuntimeException("custom error message"));
TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> {
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.entrySet(), hasSize(3));
@ -238,9 +239,9 @@ public class CompoundProcessorTests extends ESTestCase {
}
public void testCompoundProcessorExceptionFailInOnFailure() throws Exception {
TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor firstProcessor = new TestProcessor("id1", "first", new RuntimeException("error"));
TestProcessor failProcessor =
new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");});
new TestProcessor("tag_fail", "fail", new RuntimeException("custom error message"));
TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> {
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.entrySet(), hasSize(3));
@ -264,8 +265,8 @@ public class CompoundProcessorTests extends ESTestCase {
}
public void testBreakOnFailure() throws Exception {
TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error1");});
TestProcessor secondProcessor = new TestProcessor("id2", "second", ingestDocument -> {throw new RuntimeException("error2");});
TestProcessor firstProcessor = new TestProcessor("id1", "first", new RuntimeException("error1"));
TestProcessor secondProcessor = new TestProcessor("id2", "second", new RuntimeException("error2"));
TestProcessor onFailureProcessor = new TestProcessor("id2", "on_failure", ingestDocument -> {});
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);

View File

@ -272,4 +272,25 @@ public class IngestClientIT extends ESIntegTestCase {
GetPipelineResponse response = client().admin().cluster().prepareGetPipeline("_id2").get();
assertFalse(response.isFound());
}
public void testWithDedicatedMaster() throws Exception {
String masterOnlyNode = internalCluster().startMasterOnlyNode();
BytesReference source = BytesReference.bytes(jsonBuilder().startObject()
.field("description", "my_pipeline")
.startArray("processors")
.startObject()
.startObject("test")
.endObject()
.endObject()
.endArray()
.endObject());
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, XContentType.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).get();
BulkItemResponse item = client(masterOnlyNode).prepareBulk().add(
client().prepareIndex("test", "type").setSource("field", "value2", "drop", true).setPipeline("_id")).get()
.getItems()[0];
assertFalse(item.isFailed());
assertEquals("auto-generated", item.getResponse().getId());
}
}

View File

@ -101,7 +101,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
public void testActualCompoundProcessorWithOnFailure() throws Exception {
RuntimeException exception = new RuntimeException("fail");
TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> { throw exception; });
TestProcessor failProcessor = new TestProcessor("fail", "test", exception);
TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {});
CompoundProcessor actualProcessor = new CompoundProcessor(false,
Arrays.asList(new CompoundProcessor(false,

View File

@ -37,6 +37,10 @@ public class IngestTestPlugin extends Plugin implements IngestPlugin {
if (doc.hasField("fail") && doc.getFieldValue("fail", Boolean.class)) {
throw new IllegalArgumentException("test processor failed");
}
if (doc.hasField("drop") && doc.getFieldValue("drop", Boolean.class)) {
return null;
}
return doc;
}));
}
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.ingest;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* Processor used for testing, keeps track of how many times it is invoked and
@ -31,15 +32,30 @@ public class TestProcessor implements Processor {
private final String type;
private final String tag;
private final Consumer<IngestDocument> ingestDocumentConsumer;
private final Function<IngestDocument, IngestDocument> ingestDocumentMapper;
private final AtomicInteger invokedCounter = new AtomicInteger();
public TestProcessor(Consumer<IngestDocument> ingestDocumentConsumer) {
this(null, "test-processor", ingestDocumentConsumer);
}
public TestProcessor(RuntimeException e) {
this(null, "test-processor", e);
}
public TestProcessor(String tag, String type, RuntimeException e) {
this(tag, type, (Consumer<IngestDocument>) i -> { throw e; });
}
public TestProcessor(String tag, String type, Consumer<IngestDocument> ingestDocumentConsumer) {
this.ingestDocumentConsumer = ingestDocumentConsumer;
this(tag, type, id -> {
ingestDocumentConsumer.accept(id);
return id;
});
}
public TestProcessor(String tag, String type, Function<IngestDocument, IngestDocument> ingestDocumentMapper) {
this.ingestDocumentMapper = ingestDocumentMapper;
this.type = type;
this.tag = tag;
}
@ -47,8 +63,7 @@ public class TestProcessor implements Processor {
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
invokedCounter.incrementAndGet();
ingestDocumentConsumer.accept(ingestDocument);
return ingestDocument;
return ingestDocumentMapper.apply(ingestDocument);
}
@Override

View File

@ -2203,6 +2203,7 @@ public final class InternalTestCluster extends TestCluster {
.put(settings)
.put(Node.NODE_MASTER_SETTING.getKey(), true)
.put(Node.NODE_DATA_SETTING.getKey(), false)
.put(Node.NODE_INGEST_SETTING.getKey(), false)
.build();
return startNode(settings1);
}

View File

@ -16,6 +16,7 @@ import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
@ -249,7 +250,7 @@ public class JobResultsPersisterTests extends ESTestCase {
// Take the listener passed to client::index as 2nd argument
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1];
// Handle the response on the listener
listener.onResponse(new IndexResponse(null, null, null, 0, 0, 0, false));
listener.onResponse(new IndexResponse(new ShardId("test", "test", 0), "_doc", "test", 0, 0, 0, false));
return null;
})
.when(client).index(any(), any(ActionListener.class));

View File

@ -31,6 +31,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ClusterServiceUtils;
@ -143,7 +144,7 @@ public class TransportOpenIdConnectLogoutActionTests extends OpenIdConnectTestCa
ActionListener<IndexResponse> listener = (ActionListener<IndexResponse>) invocationOnMock.getArguments()[2];
indexRequests.add(indexRequest);
final IndexResponse response = new IndexResponse(
indexRequest.shardId(), indexRequest.type(), indexRequest.id(), 1, 1, 1, true);
new ShardId("test", "test", 0), indexRequest.type(), indexRequest.id(), 1, 1, 1, true);
listener.onResponse(response);
return Void.TYPE;
}).when(client).execute(eq(IndexAction.INSTANCE), any(IndexRequest.class), any(ActionListener.class));

View File

@ -46,6 +46,7 @@ import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
@ -149,7 +150,7 @@ public class TransportSamlInvalidateSessionActionTests extends SamlTestCase {
IndexRequest indexRequest = (IndexRequest) request;
indexRequests.add(indexRequest);
final IndexResponse response = new IndexResponse(
indexRequest.shardId(), indexRequest.type(), indexRequest.id(), 1, 1, 1, true);
new ShardId("test", "test", 0), indexRequest.type(), indexRequest.id(), 1, 1, 1, true);
listener.onResponse((Response) response);
} else if (BulkAction.NAME.equals(action.name())) {
assertThat(request, instanceOf(BulkRequest.class));

View File

@ -38,6 +38,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ClusterServiceUtils;
@ -170,7 +171,7 @@ public class TransportSamlLogoutActionTests extends SamlTestCase {
ActionListener<IndexResponse> listener = (ActionListener<IndexResponse>) invocationOnMock.getArguments()[1];
indexRequests.add(indexRequest);
final IndexResponse response = new IndexResponse(
indexRequest.shardId(), indexRequest.type(), indexRequest.id(), 1, 1, 1, true);
new ShardId("test", "test", 0), indexRequest.type(), indexRequest.id(), 1, 1, 1, true);
listener.onResponse(response);
return Void.TYPE;
}).when(client).index(any(IndexRequest.class), any(ActionListener.class));
@ -179,7 +180,7 @@ public class TransportSamlLogoutActionTests extends SamlTestCase {
ActionListener<IndexResponse> listener = (ActionListener<IndexResponse>) invocationOnMock.getArguments()[2];
indexRequests.add(indexRequest);
final IndexResponse response = new IndexResponse(
indexRequest.shardId(), indexRequest.type(), indexRequest.id(), 1, 1, 1, true);
new ShardId("test", "test", 0), indexRequest.type(), indexRequest.id(), 1, 1, 1, true);
listener.onResponse(response);
return Void.TYPE;
}).when(client).execute(eq(IndexAction.INSTANCE), any(IndexRequest.class), any(ActionListener.class));

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.watcher.execution;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
@ -1098,7 +1099,8 @@ public class ExecutionServiceTests extends ESTestCase {
}
PlainActionFuture<UpdateResponse> future = PlainActionFuture.newFuture();
future.onResponse(new UpdateResponse(null, null, null, null, 0, 0, 0, null));
future.onResponse(new UpdateResponse(null, new ShardId("test", "test", 0), "_doc", "test", 0, 0, 0,
DocWriteResponse.Result.CREATED));
return future;
}).when(client).update(any());