#25601 Add pipeline support for REST API bulk upsert (#27075)

This commit is contained in:
Armin Braun 2017-10-25 19:03:25 +02:00 committed by GitHub
parent 6722b9c4a2
commit 6533b165d6
4 changed files with 135 additions and 11 deletions

View File

@ -429,6 +429,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
if (upsertRequest != null) { if (upsertRequest != null) {
upsertRequest.version(version); upsertRequest.version(version);
upsertRequest.versionType(versionType); upsertRequest.versionType(versionType);
upsertRequest.setPipeline(defaultPipeline);
} }
IndexRequest doc = updateRequest.doc(); IndexRequest doc = updateRequest.doc();
if (doc != null) { if (doc != null) {

View File

@ -21,6 +21,7 @@ package org.elasticsearch.ingest;
import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateApplier; import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
@ -81,17 +82,21 @@ public class PipelineExecutionService implements ClusterStateApplier {
@Override @Override
protected void doRun() throws Exception { protected void doRun() throws Exception {
for (DocWriteRequest actionRequest : actionRequests) { for (DocWriteRequest actionRequest : actionRequests) {
if ((actionRequest instanceof IndexRequest)) { IndexRequest indexRequest = null;
IndexRequest indexRequest = (IndexRequest) actionRequest; if (actionRequest instanceof IndexRequest) {
if (Strings.hasText(indexRequest.getPipeline())) { indexRequest = (IndexRequest) actionRequest;
try { } else if (actionRequest instanceof UpdateRequest) {
innerExecute(indexRequest, getPipeline(indexRequest.getPipeline())); UpdateRequest updateRequest = (UpdateRequest) actionRequest;
//this shouldn't be needed here but we do it for consistency with index api indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest();
// which requires it to prevent double execution }
indexRequest.setPipeline(null); if (indexRequest != null && Strings.hasText(indexRequest.getPipeline())) {
} catch (Exception e) { try {
itemFailureHandler.accept(indexRequest, e); innerExecute(indexRequest, getPipeline(indexRequest.getPipeline()));
} //this shouldn't be needed here but we do it for consistency with index api
// which requires it to prevent double execution
indexRequest.setPipeline(null);
} catch (Exception e) {
itemFailureHandler.accept(indexRequest, e);
} }
} }
} }

View File

@ -36,11 +36,16 @@ import org.elasticsearch.action.ingest.SimulateDocumentBaseResult;
import org.elasticsearch.action.ingest.SimulatePipelineRequest; import org.elasticsearch.action.ingest.SimulatePipelineRequest;
import org.elasticsearch.action.ingest.SimulatePipelineResponse; import org.elasticsearch.action.ingest.SimulatePipelineResponse;
import org.elasticsearch.action.ingest.WritePipelineResponse; import org.elasticsearch.action.ingest.WritePipelineResponse;
import org.elasticsearch.action.support.replication.TransportReplicationActionTests;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Requests; import org.elasticsearch.client.Requests;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import java.util.Arrays; import java.util.Arrays;
@ -169,6 +174,43 @@ public class IngestClientIT extends ESIntegTestCase {
} }
} }
public void testBulkWithUpsert() throws Exception {
createIndex("index");
BytesReference source = jsonBuilder().startObject()
.field("description", "my_pipeline")
.startArray("processors")
.startObject()
.startObject("test")
.endObject()
.endObject()
.endArray()
.endObject().bytes();
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, XContentType.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).get();
BulkRequest bulkRequest = new BulkRequest();
IndexRequest indexRequest = new IndexRequest("index", "type", "1").setPipeline("_id");
indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "val1");
bulkRequest.add(indexRequest);
UpdateRequest updateRequest = new UpdateRequest("index", "type", "2");
updateRequest.doc("{}", Requests.INDEX_CONTENT_TYPE);
updateRequest.upsert("{\"field1\":\"upserted_val\"}", XContentType.JSON).upsertRequest().setPipeline("_id");
bulkRequest.add(updateRequest);
BulkResponse response = client().bulk(bulkRequest).actionGet();
assertThat(response.getItems().length, equalTo(bulkRequest.requests().size()));
Map<String, Object> inserted = client().prepareGet("index", "type", "1")
.get().getSourceAsMap();
assertThat(inserted.get("field1"), equalTo("val1"));
assertThat(inserted.get("processed"), equalTo(true));
Map<String, Object> upserted = client().prepareGet("index", "type", "2")
.get().getSourceAsMap();
assertThat(upserted.get("field1"), equalTo("upserted_val"));
assertThat(upserted.get("processed"), equalTo(true));
}
public void test() throws Exception { public void test() throws Exception {
BytesReference source = jsonBuilder().startObject() BytesReference source = jsonBuilder().startObject()
.field("description", "my_pipeline") .field("description", "my_pipeline")

View File

@ -0,0 +1,76 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.document;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.Version;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.rest.FakeRestRequest;
import org.hamcrest.CustomMatcher;
import org.mockito.Mockito;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.mock;
/**
* Tests for {@link RestBulkAction}.
*/
public class RestBulkActionTests extends ESTestCase {
public void testBulkPipelineUpsert() throws Exception {
final NodeClient mockClient = mock(NodeClient.class);
final Map<String, String> params = new HashMap<>();
params.put("pipeline", "timestamps");
new RestBulkAction(settings(Version.CURRENT).build(), mock(RestController.class))
.handleRequest(
new FakeRestRequest.Builder(
xContentRegistry()).withPath("my_index/my_type/_bulk").withParams(params)
.withContent(
new BytesArray(
"{\"index\":{\"_id\":\"1\"}}\n" +
"{\"field1\":\"val1\"}\n" +
"{\"update\":{\"_id\":\"2\"}}\n" +
"{\"script\":{\"source\":\"ctx._source.counter++;\"},\"upsert\":{\"field1\":\"upserted_val\"}}\n"
),
XContentType.JSON
).withMethod(RestRequest.Method.POST).build(),
mock(RestChannel.class), mockClient
);
Mockito.verify(mockClient)
.bulk(argThat(new CustomMatcher<BulkRequest>("Pipeline in upsert request") {
@Override
public boolean matches(final Object item) {
BulkRequest request = (BulkRequest) item;
UpdateRequest update = (UpdateRequest) request.requests().get(1);
return "timestamps".equals(update.upsertRequest().getPipeline());
}
}), any());
}
}