From 5a97423b7ae69afede4060045a62d375aa8f0c7c Mon Sep 17 00:00:00 2001 From: Sohaib Iftikhar Date: Fri, 25 May 2018 01:02:26 +0200 Subject: [PATCH] REST high-level client: add put ingest pipeline API (#30793) REST high-level client: add put ingest pipeline API Adds the put ingest pipeline API to the high level rest client. --- .../elasticsearch/client/ClusterClient.java | 24 +++++ .../client/RequestConverters.java | 16 +++ .../elasticsearch/client/ClusterClientIT.java | 42 ++++++++ .../client/RequestConvertersTests.java | 23 +++++ .../ClusterClientDocumentationIT.java | 97 +++++++++++++++++-- .../high-level/cluster/put_pipeline.asciidoc | 83 ++++++++++++++++ .../high-level/supported-apis.asciidoc | 2 + .../action/ingest/PutPipelineRequest.java | 14 ++- .../action/ingest/PutPipelineResponse.java | 62 ++++++++++++ .../org/elasticsearch/ingest/Pipeline.java | 8 +- .../ingest/PutPipelineRequestTests.java | 25 +++++ .../ingest/PutPipelineResponseTests.java | 53 ++++++++++ 12 files changed, 435 insertions(+), 14 deletions(-) create mode 100644 docs/java-rest/high-level/cluster/put_pipeline.asciidoc create mode 100644 server/src/main/java/org/elasticsearch/action/ingest/PutPipelineResponse.java create mode 100644 server/src/test/java/org/elasticsearch/action/ingest/PutPipelineResponseTests.java diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ClusterClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ClusterClient.java index e78e4686d69..0f9e9e58226 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ClusterClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ClusterClient.java @@ -25,6 +25,8 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; +import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.ingest.PutPipelineResponse; import java.io.IOException; @@ -87,4 +89,26 @@ public final class ClusterClient { restHighLevelClient.performRequestAsyncAndParseEntity(request, RequestConverters::listTasks, ListTasksResponse::fromXContent, listener, emptySet(), headers); } + + /** + * Add a pipeline or update an existing pipeline in the cluster + *

+ * See + * Put Pipeline API on elastic.co + */ + public PutPipelineResponse putPipeline(PutPipelineRequest request, Header... headers) throws IOException { + return restHighLevelClient.performRequestAndParseEntity( request, RequestConverters::putPipeline, + PutPipelineResponse::fromXContent, emptySet(), headers); + } + + /** + * Asynchronously add a pipeline or update an existing pipeline in the cluster + *

+ * See + * Put Pipeline API on elastic.co + */ + public void putPipelineAsync(PutPipelineRequest request, ActionListener listener, Header... headers) { + restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::putPipeline, + PutPipelineResponse::fromXContent, listener, emptySet(), headers); + } } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java index c9526346e5b..6126d59b16a 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java @@ -58,6 +58,7 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.MultiGetRequest; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.SearchRequest; @@ -609,6 +610,21 @@ final class RequestConverters { return request; } + static Request putPipeline(PutPipelineRequest putPipelineRequest) throws IOException { + String endpoint = new EndpointBuilder() + .addPathPartAsIs("_ingest/pipeline") + .addPathPart(putPipelineRequest.getId()) + .build(); + Request request = new Request(HttpPut.METHOD_NAME, endpoint); + + Params parameters = new Params(request); + parameters.withTimeout(putPipelineRequest.timeout()); + parameters.withMasterTimeout(putPipelineRequest.masterNodeTimeout()); + + request.setEntity(createEntity(putPipelineRequest, REQUEST_BODY_CONTENT_TYPE)); + return request; + } + static Request listTasks(ListTasksRequest listTaskRequest) { if (listTaskRequest.getTaskId() != null && listTaskRequest.getTaskId().isSet()) { throw new IllegalArgumentException("TaskId cannot be used for list tasks request"); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ClusterClientIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ClusterClientIT.java index fa3086442f5..d41117ceb6d 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ClusterClientIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ClusterClientIT.java @@ -25,12 +25,17 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; +import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.ingest.PutPipelineResponse; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.indices.recovery.RecoverySettings; +import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.TaskInfo; @@ -136,4 +141,41 @@ public class ClusterClientIT extends ESRestHighLevelClientTestCase { } assertTrue("List tasks were not found", listTasksFound); } + + public void testPutPipeline() throws IOException { + String id = "some_pipeline_id"; + XContentType xContentType = randomFrom(XContentType.values()); + XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent()); + pipelineBuilder.startObject(); + { + pipelineBuilder.field(Pipeline.DESCRIPTION_KEY, "some random set of processors"); + pipelineBuilder.startArray(Pipeline.PROCESSORS_KEY); + { + pipelineBuilder.startObject().startObject("set"); + { + pipelineBuilder + .field("field", "foo") + .field("value", "bar"); + } + pipelineBuilder.endObject().endObject(); + pipelineBuilder.startObject().startObject("convert"); + { + pipelineBuilder + .field("field", "rank") + .field("type", "integer"); + } + pipelineBuilder.endObject().endObject(); + } + pipelineBuilder.endArray(); + } + pipelineBuilder.endObject(); + PutPipelineRequest request = new PutPipelineRequest( + id, + BytesReference.bytes(pipelineBuilder), + pipelineBuilder.contentType()); + + PutPipelineResponse putPipelineResponse = + execute(request, highLevelClient().cluster()::putPipeline, highLevelClient().cluster()::putPipelineAsync); + assertTrue(putPipelineResponse.isAcknowledged()); + } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index c5ee387d315..1573071da33 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -61,6 +61,7 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.MultiGetRequest; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.SearchRequest; @@ -91,6 +92,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.RandomCreateIndexGenerator; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.query.TermQueryBuilder; @@ -119,6 +121,7 @@ import org.elasticsearch.test.RandomObjects; import java.io.IOException; import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; @@ -1402,6 +1405,26 @@ public class RequestConvertersTests extends ESTestCase { assertEquals(expectedParams, expectedRequest.getParameters()); } + public void testPutPipeline() throws IOException { + String pipelineId = "some_pipeline_id"; + PutPipelineRequest request = new PutPipelineRequest( + "some_pipeline_id", + new BytesArray("{}".getBytes(StandardCharsets.UTF_8)), + XContentType.JSON + ); + Map expectedParams = new HashMap<>(); + setRandomMasterTimeout(request, expectedParams); + setRandomTimeout(request::timeout, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, expectedParams); + + Request expectedRequest = RequestConverters.putPipeline(request); + StringJoiner endpoint = new StringJoiner("/", "/", ""); + endpoint.add("_ingest/pipeline"); + endpoint.add(pipelineId); + assertEquals(endpoint.toString(), expectedRequest.getEndpoint()); + assertEquals(HttpPut.METHOD_NAME, expectedRequest.getMethod()); + assertEquals(expectedParams, expectedRequest.getParameters()); + } + public void testRollover() throws IOException { RolloverRequest rolloverRequest = new RolloverRequest(randomAlphaOfLengthBetween(3, 10), randomBoolean() ? null : randomAlphaOfLengthBetween(3, 10)); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/ClusterClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/ClusterClientDocumentationIT.java index d41b11c68fe..b9329f99a3c 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/ClusterClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/ClusterClientDocumentationIT.java @@ -21,7 +21,6 @@ package org.elasticsearch.client.documentation; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; @@ -29,9 +28,12 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; +import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.ingest.PutPipelineResponse; import org.elasticsearch.client.ESRestHighLevelClientTestCase; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.TimeValue; @@ -41,6 +43,7 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskInfo; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -80,19 +83,19 @@ public class ClusterClientDocumentationIT extends ESRestHighLevelClientTestCase // end::put-settings-request // tag::put-settings-create-settings - String transientSettingKey = + String transientSettingKey = RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(); int transientSettingValue = 10; - Settings transientSettings = + Settings transientSettings = Settings.builder() .put(transientSettingKey, transientSettingValue, ByteSizeUnit.BYTES) .build(); // <1> - String persistentSettingKey = + String persistentSettingKey = EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(); - String persistentSettingValue = + String persistentSettingValue = EnableAllocationDecider.Allocation.NONE.name(); - Settings persistentSettings = + Settings persistentSettings = Settings.builder() .put(persistentSettingKey, persistentSettingValue) .build(); // <2> @@ -105,9 +108,9 @@ public class ClusterClientDocumentationIT extends ESRestHighLevelClientTestCase { // tag::put-settings-settings-builder - Settings.Builder transientSettingsBuilder = + Settings.Builder transientSettingsBuilder = Settings.builder() - .put(transientSettingKey, transientSettingValue, ByteSizeUnit.BYTES); + .put(transientSettingKey, transientSettingValue, ByteSizeUnit.BYTES); request.transientSettings(transientSettingsBuilder); // <1> // end::put-settings-settings-builder } @@ -164,7 +167,7 @@ public class ClusterClientDocumentationIT extends ESRestHighLevelClientTestCase ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest(); // tag::put-settings-execute-listener - ActionListener listener = + ActionListener listener = new ActionListener() { @Override public void onResponse(ClusterUpdateSettingsResponse response) { @@ -272,4 +275,80 @@ public class ClusterClientDocumentationIT extends ESRestHighLevelClientTestCase assertTrue(latch.await(30L, TimeUnit.SECONDS)); } } + + public void testPutPipeline() throws IOException { + RestHighLevelClient client = highLevelClient(); + + { + // tag::put-pipeline-request + String source = + "{\"description\":\"my set of processors\"," + + "\"processors\":[{\"set\":{\"field\":\"foo\",\"value\":\"bar\"}}]}"; + PutPipelineRequest request = new PutPipelineRequest( + "my-pipeline-id", // <1> + new BytesArray(source.getBytes(StandardCharsets.UTF_8)), // <2> + XContentType.JSON // <3> + ); + // end::put-pipeline-request + + // tag::put-pipeline-request-timeout + request.timeout(TimeValue.timeValueMinutes(2)); // <1> + request.timeout("2m"); // <2> + // end::put-pipeline-request-timeout + + // tag::put-pipeline-request-masterTimeout + request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); // <1> + request.masterNodeTimeout("1m"); // <2> + // end::put-pipeline-request-masterTimeout + + // tag::put-pipeline-execute + PutPipelineResponse response = client.cluster().putPipeline(request); // <1> + // end::put-pipeline-execute + + // tag::put-pipeline-response + boolean acknowledged = response.isAcknowledged(); // <1> + // end::put-pipeline-response + assertTrue(acknowledged); + } + } + + public void testPutPipelineAsync() throws Exception { + RestHighLevelClient client = highLevelClient(); + + { + String source = + "{\"description\":\"my set of processors\"," + + "\"processors\":[{\"set\":{\"field\":\"foo\",\"value\":\"bar\"}}]}"; + PutPipelineRequest request = new PutPipelineRequest( + "my-pipeline-id", + new BytesArray(source.getBytes(StandardCharsets.UTF_8)), + XContentType.JSON + ); + + // tag::put-pipeline-execute-listener + ActionListener listener = + new ActionListener() { + @Override + public void onResponse(PutPipelineResponse response) { + // <1> + } + + @Override + public void onFailure(Exception e) { + // <2> + } + }; + // end::put-pipeline-execute-listener + + // Replace the empty listener by a blocking listener in test + final CountDownLatch latch = new CountDownLatch(1); + listener = new LatchedActionListener<>(listener, latch); + + // tag::put-pipeline-execute-async + client.cluster().putPipelineAsync(request, listener); // <1> + // end::put-pipeline-execute-async + + assertTrue(latch.await(30L, TimeUnit.SECONDS)); + } + } } diff --git a/docs/java-rest/high-level/cluster/put_pipeline.asciidoc b/docs/java-rest/high-level/cluster/put_pipeline.asciidoc new file mode 100644 index 00000000000..d50a6741cc0 --- /dev/null +++ b/docs/java-rest/high-level/cluster/put_pipeline.asciidoc @@ -0,0 +1,83 @@ +[[java-rest-high-cluster-put-pipeline]] +=== Put Pipeline API + +[[java-rest-high-cluster-put-pipeline-request]] +==== Put Pipeline Request + +A `PutPipelineRequest` requires an `id` argument, a source and a `XContentType`. The source consists +of a description and a list of `Processor` objects. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[put-pipeline-request] +-------------------------------------------------- +<1> The pipeline id +<2> The source for the pipeline as a `ByteArray`. +<3> The XContentType for the pipeline source supplied above. + +==== Optional arguments +The following arguments can optionally be provided: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[put-pipeline-request-timeout] +-------------------------------------------------- +<1> Timeout to wait for the all the nodes to acknowledge the index creation as a `TimeValue` +<2> Timeout to wait for the all the nodes to acknowledge the index creation as a `String` + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[put-pipeline-request-masterTimeout] +-------------------------------------------------- +<1> Timeout to connect to the master node as a `TimeValue` +<2> Timeout to connect to the master node as a `String` + +[[java-rest-high-cluster-put-pipeline-sync]] +==== Synchronous Execution + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[put-pipeline-execute] +-------------------------------------------------- +<1> Execute the request and get back the response in a PutPipelineResponse object. + +[[java-rest-high-cluster-put-pipeline-async]] +==== Asynchronous Execution + +The asynchronous execution of a put pipeline request requires both the `PutPipelineRequest` +instance and an `ActionListener` instance to be passed to the asynchronous +method: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[put-pipeline-execute-async] +-------------------------------------------------- +<1> The `PutPipelineRequest` to execute and the `ActionListener` to use when +the execution completes + +The asynchronous method does not block and returns immediately. Once it is +completed the `ActionListener` is called back using the `onResponse` method +if the execution successfully completed or using the `onFailure` method if +it failed. + +A typical listener for `PutPipelineResponse` looks like: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[put-pipeline-execute-listener] +-------------------------------------------------- +<1> Called when the execution is successfully completed. The response is +provided as an argument +<2> Called in case of failure. The raised exception is provided as an argument + +[[java-rest-high-cluster-put-pipeline-response]] +==== Put Pipeline Response + +The returned `PutPipelineResponse` allows to retrieve information about the executed + operation as follows: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[put-pipeline-response] +-------------------------------------------------- +<1> Indicates whether all of the nodes have acknowledged the request diff --git a/docs/java-rest/high-level/supported-apis.asciidoc b/docs/java-rest/high-level/supported-apis.asciidoc index d8ec67dade1..b04cbb8df79 100644 --- a/docs/java-rest/high-level/supported-apis.asciidoc +++ b/docs/java-rest/high-level/supported-apis.asciidoc @@ -105,9 +105,11 @@ The Java High Level REST Client supports the following Cluster APIs: * <> * <> +* <> include::cluster/put_settings.asciidoc[] include::cluster/list_tasks.asciidoc[] +include::cluster/put_pipeline.asciidoc[] == Snapshot APIs diff --git a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineRequest.java b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineRequest.java index 722473d64e4..6447b0557db 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineRequest.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineRequest.java @@ -25,13 +25,15 @@ import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import java.io.IOException; import java.util.Objects; -public class PutPipelineRequest extends AcknowledgedRequest { +public class PutPipelineRequest extends AcknowledgedRequest implements ToXContentObject { private String id; private BytesReference source; @@ -96,4 +98,14 @@ public class PutPipelineRequest extends AcknowledgedRequest out.writeEnum(xContentType); } } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + if (source != null) { + builder.rawValue(source.streamInput(), xContentType); + } else { + builder.startObject().endObject(); + } + return builder; + } } diff --git a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineResponse.java b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineResponse.java new file mode 100644 index 00000000000..13960ca99ef --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineResponse.java @@ -0,0 +1,62 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.ingest; + +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; + +public class PutPipelineResponse extends AcknowledgedResponse implements ToXContentObject { + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "cluster_put_pipeline", true, args -> new PutPipelineResponse((boolean) args[0])); + + static { + declareAcknowledgedField(PARSER); + } + + public PutPipelineResponse() { + } + + public PutPipelineResponse(boolean acknowledged) { + super(acknowledged); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + readAcknowledged(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + writeAcknowledged(out); + } + + public static PutPipelineResponse fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } +} diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java index 473b555c05d..1b0553a5490 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -32,10 +32,10 @@ import java.util.Map; */ public final class Pipeline { - static final String DESCRIPTION_KEY = "description"; - static final String PROCESSORS_KEY = "processors"; - static final String VERSION_KEY = "version"; - static final String ON_FAILURE_KEY = "on_failure"; + public static final String DESCRIPTION_KEY = "description"; + public static final String PROCESSORS_KEY = "processors"; + public static final String VERSION_KEY = "version"; + public static final String ON_FAILURE_KEY = "on_failure"; private final String id; @Nullable diff --git a/server/src/test/java/org/elasticsearch/action/ingest/PutPipelineRequestTests.java b/server/src/test/java/org/elasticsearch/action/ingest/PutPipelineRequestTests.java index 304904f2612..7f64b3fe585 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/PutPipelineRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/PutPipelineRequestTests.java @@ -20,9 +20,13 @@ package org.elasticsearch.action.ingest; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.test.ESTestCase; import java.io.IOException; @@ -43,4 +47,25 @@ public class PutPipelineRequestTests extends ESTestCase { assertEquals(XContentType.JSON, serialized.getXContentType()); assertEquals("{}", serialized.getSource().utf8ToString()); } + + public void testToXContent() throws IOException { + XContentType xContentType = randomFrom(XContentType.values()); + XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent()); + pipelineBuilder.startObject().field(Pipeline.DESCRIPTION_KEY, "some random set of processors"); + pipelineBuilder.startArray(Pipeline.PROCESSORS_KEY); + //Start first processor + pipelineBuilder.startObject(); + pipelineBuilder.startObject("set"); + pipelineBuilder.field("field", "foo"); + pipelineBuilder.field("value", "bar"); + pipelineBuilder.endObject(); + pipelineBuilder.endObject(); + //End first processor + pipelineBuilder.endArray(); + pipelineBuilder.endObject(); + PutPipelineRequest request = new PutPipelineRequest("1", BytesReference.bytes(pipelineBuilder), xContentType); + XContentBuilder requestBuilder = XContentBuilder.builder(xContentType.xContent()); + BytesReference actualRequestBody = BytesReference.bytes(request.toXContent(requestBuilder, ToXContent.EMPTY_PARAMS)); + assertEquals(BytesReference.bytes(pipelineBuilder), actualRequestBody); + } } diff --git a/server/src/test/java/org/elasticsearch/action/ingest/PutPipelineResponseTests.java b/server/src/test/java/org/elasticsearch/action/ingest/PutPipelineResponseTests.java new file mode 100644 index 00000000000..438d3e55044 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/ingest/PutPipelineResponseTests.java @@ -0,0 +1,53 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.ingest; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractStreamableXContentTestCase; + +public class PutPipelineResponseTests extends AbstractStreamableXContentTestCase { + + public void testToXContent() { + PutPipelineResponse response = new PutPipelineResponse(true); + String output = Strings.toString(response); + assertEquals("{\"acknowledged\":true}", output); + } + + @Override + protected PutPipelineResponse doParseInstance(XContentParser parser) { + return PutPipelineResponse.fromXContent(parser); + } + + @Override + protected PutPipelineResponse createTestInstance() { + return new PutPipelineResponse(randomBoolean()); + } + + @Override + protected PutPipelineResponse createBlankInstance() { + return new PutPipelineResponse(); + } + + @Override + protected PutPipelineResponse mutateInstance(PutPipelineResponse response) { + return new PutPipelineResponse(response.isAcknowledged() == false); + } +}