diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index 1b0d89222dd..fbbe6c1bf8a 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -429,6 +429,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques if (upsertRequest != null) { upsertRequest.version(version); upsertRequest.versionType(versionType); + upsertRequest.setPipeline(defaultPipeline); } IndexRequest doc = updateRequest.doc(); if (doc != null) { diff --git a/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java b/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java index c1b46e49567..cec622f4a25 100644 --- a/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java +++ b/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java @@ -21,6 +21,7 @@ package org.elasticsearch.ingest; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterStateApplier; import org.elasticsearch.common.Strings; @@ -81,17 +82,21 @@ public class PipelineExecutionService implements ClusterStateApplier { @Override protected void doRun() throws Exception { for (DocWriteRequest actionRequest : actionRequests) { - if ((actionRequest instanceof IndexRequest)) { - IndexRequest indexRequest = (IndexRequest) actionRequest; - if (Strings.hasText(indexRequest.getPipeline())) { - try { - innerExecute(indexRequest, getPipeline(indexRequest.getPipeline())); - //this shouldn't be needed here but we do it for consistency with index api - // which requires it to prevent double execution - indexRequest.setPipeline(null); - } catch (Exception e) { - itemFailureHandler.accept(indexRequest, e); - } + IndexRequest indexRequest = null; + if (actionRequest instanceof IndexRequest) { + indexRequest = (IndexRequest) actionRequest; + } else if (actionRequest instanceof UpdateRequest) { + UpdateRequest updateRequest = (UpdateRequest) actionRequest; + indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest(); + } + if (indexRequest != null && Strings.hasText(indexRequest.getPipeline())) { + try { + innerExecute(indexRequest, getPipeline(indexRequest.getPipeline())); + //this shouldn't be needed here but we do it for consistency with index api + // which requires it to prevent double execution + indexRequest.setPipeline(null); + } catch (Exception e) { + itemFailureHandler.accept(indexRequest, e); } } } diff --git a/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java b/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java index 2b59f0d421c..654927b19f2 100644 --- a/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java @@ -36,11 +36,16 @@ import org.elasticsearch.action.ingest.SimulateDocumentBaseResult; import org.elasticsearch.action.ingest.SimulatePipelineRequest; import org.elasticsearch.action.ingest.SimulatePipelineResponse; import org.elasticsearch.action.ingest.WritePipelineResponse; +import org.elasticsearch.action.support.replication.TransportReplicationActionTests; +import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.Requests; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptService; import org.elasticsearch.test.ESIntegTestCase; import java.util.Arrays; @@ -169,6 +174,43 @@ public class IngestClientIT extends ESIntegTestCase { } } + public void testBulkWithUpsert() throws Exception { + createIndex("index"); + + BytesReference source = jsonBuilder().startObject() + .field("description", "my_pipeline") + .startArray("processors") + .startObject() + .startObject("test") + .endObject() + .endObject() + .endArray() + .endObject().bytes(); + PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, XContentType.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).get(); + + BulkRequest bulkRequest = new BulkRequest(); + IndexRequest indexRequest = new IndexRequest("index", "type", "1").setPipeline("_id"); + indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "val1"); + bulkRequest.add(indexRequest); + UpdateRequest updateRequest = new UpdateRequest("index", "type", "2"); + updateRequest.doc("{}", Requests.INDEX_CONTENT_TYPE); + updateRequest.upsert("{\"field1\":\"upserted_val\"}", XContentType.JSON).upsertRequest().setPipeline("_id"); + bulkRequest.add(updateRequest); + + BulkResponse response = client().bulk(bulkRequest).actionGet(); + + assertThat(response.getItems().length, equalTo(bulkRequest.requests().size())); + Map inserted = client().prepareGet("index", "type", "1") + .get().getSourceAsMap(); + assertThat(inserted.get("field1"), equalTo("val1")); + assertThat(inserted.get("processed"), equalTo(true)); + Map upserted = client().prepareGet("index", "type", "2") + .get().getSourceAsMap(); + assertThat(upserted.get("field1"), equalTo("upserted_val")); + assertThat(upserted.get("processed"), equalTo(true)); + } + public void test() throws Exception { BytesReference source = jsonBuilder().startObject() .field("description", "my_pipeline") diff --git a/core/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java b/core/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java new file mode 100644 index 00000000000..5011946914a --- /dev/null +++ b/core/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java @@ -0,0 +1,76 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.rest.action.document; + +import java.util.HashMap; +import java.util.Map; +import org.elasticsearch.Version; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.rest.FakeRestRequest; +import org.hamcrest.CustomMatcher; +import org.mockito.Mockito; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.mock; + +/** + * Tests for {@link RestBulkAction}. + */ +public class RestBulkActionTests extends ESTestCase { + + public void testBulkPipelineUpsert() throws Exception { + final NodeClient mockClient = mock(NodeClient.class); + final Map params = new HashMap<>(); + params.put("pipeline", "timestamps"); + new RestBulkAction(settings(Version.CURRENT).build(), mock(RestController.class)) + .handleRequest( + new FakeRestRequest.Builder( + xContentRegistry()).withPath("my_index/my_type/_bulk").withParams(params) + .withContent( + new BytesArray( + "{\"index\":{\"_id\":\"1\"}}\n" + + "{\"field1\":\"val1\"}\n" + + "{\"update\":{\"_id\":\"2\"}}\n" + + "{\"script\":{\"source\":\"ctx._source.counter++;\"},\"upsert\":{\"field1\":\"upserted_val\"}}\n" + ), + XContentType.JSON + ).withMethod(RestRequest.Method.POST).build(), + mock(RestChannel.class), mockClient + ); + Mockito.verify(mockClient) + .bulk(argThat(new CustomMatcher("Pipeline in upsert request") { + @Override + public boolean matches(final Object item) { + BulkRequest request = (BulkRequest) item; + UpdateRequest update = (UpdateRequest) request.requests().get(1); + return "timestamps".equals(update.upsertRequest().getPipeline()); + } + }), any()); + } +}