REST high-level client: add get ingest pipeline API (#30847)

Relates to #27205
This commit is contained in:
Sohaib Iftikhar 2018-06-01 08:55:43 +02:00 committed by Luca Cavanna
parent 70749e01c4
commit 80d20a9010
11 changed files with 483 additions and 29 deletions

View File

@ -23,6 +23,8 @@ import org.apache.http.Header;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineResponse; import org.elasticsearch.action.ingest.PutPipelineResponse;
@ -87,4 +89,26 @@ public final class ClusterClient {
restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::putPipeline, restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::putPipeline,
PutPipelineResponse::fromXContent, listener, emptySet(), headers); PutPipelineResponse::fromXContent, listener, emptySet(), headers);
} }
/**
* Get an existing pipeline
* <p>
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/get-pipeline-api.html"> Get Pipeline API on elastic.co</a>
*/
public GetPipelineResponse getPipeline(GetPipelineRequest request, Header... headers) throws IOException {
return restHighLevelClient.performRequestAndParseEntity( request, RequestConverters::getPipeline,
GetPipelineResponse::fromXContent, emptySet(), headers);
}
/**
* Asynchronously get an existing pipeline
* <p>
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/get-pipeline-api.html"> Get Pipeline API on elastic.co</a>
*/
public void getPipelineAsync(GetPipelineRequest request, ActionListener<GetPipelineResponse> listener, Header... headers) {
restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::getPipeline,
GetPipelineResponse::fromXContent, listener, emptySet(), headers);
}
} }

View File

@ -61,6 +61,7 @@ import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.MultiGetRequest; import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
@ -620,6 +621,18 @@ final class RequestConverters {
return request; return request;
} }
static Request getPipeline(GetPipelineRequest getPipelineRequest) {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_ingest/pipeline")
.addCommaSeparatedPathParts(getPipelineRequest.getIds())
.build();
Request request = new Request(HttpGet.METHOD_NAME, endpoint);
Params parameters = new Params(request);
parameters.withMasterTimeout(getPipelineRequest.masterNodeTimeout());
return request;
}
static Request putPipeline(PutPipelineRequest putPipelineRequest) throws IOException { static Request putPipeline(PutPipelineRequest putPipelineRequest) throws IOException {
String endpoint = new EndpointBuilder() String endpoint = new EndpointBuilder()
.addPathPartAsIs("_ingest/pipeline") .addPathPartAsIs("_ingest/pipeline")

View File

@ -22,6 +22,8 @@ package org.elasticsearch.client;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineResponse; import org.elasticsearch.action.ingest.PutPipelineResponse;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
@ -32,7 +34,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import java.io.IOException; import java.io.IOException;
@ -113,31 +115,7 @@ public class ClusterClientIT extends ESRestHighLevelClientTestCase {
public void testPutPipeline() throws IOException { public void testPutPipeline() throws IOException {
String id = "some_pipeline_id"; String id = "some_pipeline_id";
XContentType xContentType = randomFrom(XContentType.values()); XContentBuilder pipelineBuilder = buildRandomXContentPipeline();
XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent());
pipelineBuilder.startObject();
{
pipelineBuilder.field(Pipeline.DESCRIPTION_KEY, "some random set of processors");
pipelineBuilder.startArray(Pipeline.PROCESSORS_KEY);
{
pipelineBuilder.startObject().startObject("set");
{
pipelineBuilder
.field("field", "foo")
.field("value", "bar");
}
pipelineBuilder.endObject().endObject();
pipelineBuilder.startObject().startObject("convert");
{
pipelineBuilder
.field("field", "rank")
.field("type", "integer");
}
pipelineBuilder.endObject().endObject();
}
pipelineBuilder.endArray();
}
pipelineBuilder.endObject();
PutPipelineRequest request = new PutPipelineRequest( PutPipelineRequest request = new PutPipelineRequest(
id, id,
BytesReference.bytes(pipelineBuilder), BytesReference.bytes(pipelineBuilder),
@ -147,4 +125,27 @@ public class ClusterClientIT extends ESRestHighLevelClientTestCase {
execute(request, highLevelClient().cluster()::putPipeline, highLevelClient().cluster()::putPipelineAsync); execute(request, highLevelClient().cluster()::putPipeline, highLevelClient().cluster()::putPipelineAsync);
assertTrue(putPipelineResponse.isAcknowledged()); assertTrue(putPipelineResponse.isAcknowledged());
} }
public void testGetPipeline() throws IOException {
String id = "some_pipeline_id";
XContentBuilder pipelineBuilder = buildRandomXContentPipeline();
{
PutPipelineRequest request = new PutPipelineRequest(
id,
BytesReference.bytes(pipelineBuilder),
pipelineBuilder.contentType()
);
createPipeline(request);
}
GetPipelineRequest request = new GetPipelineRequest(id);
GetPipelineResponse response =
execute(request, highLevelClient().cluster()::getPipeline, highLevelClient().cluster()::getPipelineAsync);
assertTrue(response.isFound());
assertEquals(response.pipelines().get(0).getId(), id);
PipelineConfiguration expectedConfig =
new PipelineConfiguration(id, BytesReference.bytes(pipelineBuilder), pipelineBuilder.contentType());
assertEquals(expectedConfig.getConfigAsMap(), response.pipelines().get(0).getConfigAsMap());
}
} }

View File

@ -21,7 +21,12 @@ package org.elasticsearch.client;
import org.apache.http.Header; import org.apache.http.Header;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
@ -80,4 +85,42 @@ public abstract class ESRestHighLevelClientTestCase extends ESRestTestCase {
super(restClient, (client) -> {}, Collections.emptyList()); super(restClient, (client) -> {}, Collections.emptyList());
} }
} }
protected static XContentBuilder buildRandomXContentPipeline() throws IOException {
XContentType xContentType = randomFrom(XContentType.values());
XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent());
pipelineBuilder.startObject();
{
pipelineBuilder.field(Pipeline.DESCRIPTION_KEY, "some random set of processors");
pipelineBuilder.startArray(Pipeline.PROCESSORS_KEY);
{
pipelineBuilder.startObject().startObject("set");
{
pipelineBuilder
.field("field", "foo")
.field("value", "bar");
}
pipelineBuilder.endObject().endObject();
pipelineBuilder.startObject().startObject("convert");
{
pipelineBuilder
.field("field", "rank")
.field("type", "integer");
}
pipelineBuilder.endObject().endObject();
}
pipelineBuilder.endArray();
}
pipelineBuilder.endObject();
return pipelineBuilder;
}
protected static void createPipeline(String pipelineId) throws IOException {
XContentBuilder builder = buildRandomXContentPipeline();
createPipeline(new PutPipelineRequest(pipelineId, BytesReference.bytes(builder), builder.contentType()));
}
protected static void createPipeline(PutPipelineRequest putPipelineRequest) throws IOException {
assertOK(client().performRequest(RequestConverters.putPipeline(putPipelineRequest)));
}
} }

View File

@ -63,6 +63,7 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.MultiGetRequest; import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.MultiSearchRequest;
@ -1450,6 +1451,20 @@ public class RequestConvertersTests extends ESTestCase {
assertEquals(expectedParams, expectedRequest.getParameters()); assertEquals(expectedParams, expectedRequest.getParameters());
} }
public void testGetPipeline() {
String pipelineId = "some_pipeline_id";
Map<String, String> expectedParams = new HashMap<>();
GetPipelineRequest request = new GetPipelineRequest("some_pipeline_id");
setRandomMasterTimeout(request, expectedParams);
Request expectedRequest = RequestConverters.getPipeline(request);
StringJoiner endpoint = new StringJoiner("/", "/", "");
endpoint.add("_ingest/pipeline");
endpoint.add(pipelineId);
assertEquals(endpoint.toString(), expectedRequest.getEndpoint());
assertEquals(HttpGet.METHOD_NAME, expectedRequest.getMethod());
assertEquals(expectedParams, expectedRequest.getParameters());
}
public void testRollover() throws IOException { public void testRollover() throws IOException {
RolloverRequest rolloverRequest = new RolloverRequest(randomAlphaOfLengthBetween(3, 10), RolloverRequest rolloverRequest = new RolloverRequest(randomAlphaOfLengthBetween(3, 10),
randomBoolean() ? null : randomAlphaOfLengthBetween(3, 10)); randomBoolean() ? null : randomAlphaOfLengthBetween(3, 10));

View File

@ -23,6 +23,8 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineResponse; import org.elasticsearch.action.ingest.PutPipelineResponse;
import org.elasticsearch.client.ESRestHighLevelClientTestCase; import org.elasticsearch.client.ESRestHighLevelClientTestCase;
@ -34,11 +36,13 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.ingest.PipelineConfiguration;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -257,4 +261,74 @@ public class ClusterClientDocumentationIT extends ESRestHighLevelClientTestCase
assertTrue(latch.await(30L, TimeUnit.SECONDS)); assertTrue(latch.await(30L, TimeUnit.SECONDS));
} }
} }
public void testGetPipeline() throws IOException {
RestHighLevelClient client = highLevelClient();
{
createPipeline("my-pipeline-id");
}
{
// tag::get-pipeline-request
GetPipelineRequest request = new GetPipelineRequest("my-pipeline-id"); // <1>
// end::get-pipeline-request
// tag::get-pipeline-request-masterTimeout
request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); // <1>
request.masterNodeTimeout("1m"); // <2>
// end::get-pipeline-request-masterTimeout
// tag::get-pipeline-execute
GetPipelineResponse response = client.cluster().getPipeline(request); // <1>
// end::get-pipeline-execute
// tag::get-pipeline-response
boolean successful = response.isFound(); // <1>
List<PipelineConfiguration> pipelines = response.pipelines(); // <2>
for(PipelineConfiguration pipeline: pipelines) {
Map<String, Object> config = pipeline.getConfigAsMap(); // <3>
}
// end::get-pipeline-response
assertTrue(successful);
}
}
public void testGetPipelineAsync() throws Exception {
RestHighLevelClient client = highLevelClient();
{
createPipeline("my-pipeline-id");
}
{
GetPipelineRequest request = new GetPipelineRequest("my-pipeline-id");
// tag::get-pipeline-execute-listener
ActionListener<GetPipelineResponse> listener =
new ActionListener<GetPipelineResponse>() {
@Override
public void onResponse(GetPipelineResponse response) {
// <1>
}
@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::get-pipeline-execute-listener
// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener<>(listener, latch);
// tag::get-pipeline-execute-async
client.cluster().getPipelineAsync(request, listener); // <1>
// end::get-pipeline-execute-async
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}
} }

View File

@ -0,0 +1,75 @@
[[java-rest-high-cluster-get-pipeline]]
=== Get Pipeline API
[[java-rest-high-cluster-get-pipeline-request]]
==== Get Pipeline Request
A `GetPipelineRequest` requires one or more `pipelineIds` to fetch.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[get-pipeline-request]
--------------------------------------------------
<1> The pipeline id to fetch
==== Optional arguments
The following arguments can optionally be provided:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[get-pipeline-request-masterTimeout]
--------------------------------------------------
<1> Timeout to connect to the master node as a `TimeValue`
<2> Timeout to connect to the master node as a `String`
[[java-rest-high-cluster-get-pipeline-sync]]
==== Synchronous Execution
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[get-pipeline-execute]
--------------------------------------------------
<1> Execute the request and get back the response in a GetPipelineResponse object.
[[java-rest-high-cluster-get-pipeline-async]]
==== Asynchronous Execution
The asynchronous execution of a get pipeline request requires both the `GetPipelineRequest`
instance and an `ActionListener` instance to be passed to the asynchronous
method:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[get-pipeline-execute-async]
--------------------------------------------------
<1> The `GetPipelineRequest` to execute and the `ActionListener` to use when
the execution completes
The asynchronous method does not block and returns immediately. Once it is
completed the `ActionListener` is called back using the `onResponse` method
if the execution successfully completed or using the `onFailure` method if
it failed.
A typical listener for `GetPipelineResponse` looks like:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[get-pipeline-execute-listener]
--------------------------------------------------
<1> Called when the execution is successfully completed. The response is
provided as an argument
<2> Called in case of failure. The raised exception is provided as an argument
[[java-rest-high-cluster-get-pipeline-response]]
==== Get Pipeline Response
The returned `GetPipelineResponse` allows to retrieve information about the executed
operation as follows:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[get-pipeline-response]
--------------------------------------------------
<1> Check if a matching pipeline id was found or not.
<2> Get the list of pipelines found as a list of `PipelineConfig` objects.
<3> Get the individual configuration of each pipeline as a `Map<String, Object>`.

View File

@ -107,9 +107,11 @@ The Java High Level REST Client supports the following Cluster APIs:
* <<java-rest-high-cluster-put-settings>> * <<java-rest-high-cluster-put-settings>>
* <<java-rest-high-cluster-put-pipeline>> * <<java-rest-high-cluster-put-pipeline>>
* <<java-rest-high-cluster-get-pipeline>>
include::cluster/put_settings.asciidoc[] include::cluster/put_settings.asciidoc[]
include::cluster/put_pipeline.asciidoc[] include::cluster/put_pipeline.asciidoc[]
include::cluster/get_pipeline.asciidoc[]
== Snapshot APIs == Snapshot APIs

View File

@ -20,16 +20,24 @@
package org.elasticsearch.action.ingest; package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.StatusToXContentObject; import org.elasticsearch.common.xcontent.StatusToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParser.Token;
import org.elasticsearch.ingest.PipelineConfiguration; import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.HashMap;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
public class GetPipelineResponse extends ActionResponse implements StatusToXContentObject { public class GetPipelineResponse extends ActionResponse implements StatusToXContentObject {
@ -42,8 +50,13 @@ public class GetPipelineResponse extends ActionResponse implements StatusToXCont
this.pipelines = pipelines; this.pipelines = pipelines;
} }
/**
* Get the list of pipelines that were a part of this response.
* The pipeline id can be obtained using getId on the PipelineConfiguration object.
* @return A list of {@link PipelineConfiguration} objects.
*/
public List<PipelineConfiguration> pipelines() { public List<PipelineConfiguration> pipelines() {
return pipelines; return Collections.unmodifiableList(pipelines);
} }
@Override @Override
@ -83,4 +96,66 @@ public class GetPipelineResponse extends ActionResponse implements StatusToXCont
builder.endObject(); builder.endObject();
return builder; return builder;
} }
/**
*
* @param parser the parser for the XContent that contains the serialized GetPipelineResponse.
* @return an instance of GetPipelineResponse read from the parser
* @throws IOException If the parsing fails
*/
public static GetPipelineResponse fromXContent(XContentParser parser) throws IOException {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);
List<PipelineConfiguration> pipelines = new ArrayList<>();
while(parser.nextToken().equals(Token.FIELD_NAME)) {
String pipelineId = parser.currentName();
parser.nextToken();
XContentBuilder contentBuilder = XContentBuilder.builder(parser.contentType().xContent());
contentBuilder.generator().copyCurrentStructure(parser);
PipelineConfiguration pipeline =
new PipelineConfiguration(
pipelineId, BytesReference.bytes(contentBuilder), contentBuilder.contentType()
);
pipelines.add(pipeline);
}
ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.currentToken(), parser::getTokenLocation);
return new GetPipelineResponse(pipelines);
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
} else if (other instanceof GetPipelineResponse){
GetPipelineResponse otherResponse = (GetPipelineResponse)other;
if (pipelines == null) {
return otherResponse.pipelines == null;
} else {
// We need a map here because order does not matter for equality
Map<String, PipelineConfiguration> otherPipelineMap = new HashMap<>();
for (PipelineConfiguration pipeline: otherResponse.pipelines) {
otherPipelineMap.put(pipeline.getId(), pipeline);
}
for (PipelineConfiguration pipeline: pipelines) {
PipelineConfiguration otherPipeline = otherPipelineMap.get(pipeline.getId());
if (!pipeline.equals(otherPipeline)) {
return false;
}
}
return true;
}
} else {
return false;
}
}
@Override
public int hashCode() {
int result = 1;
for (PipelineConfiguration pipeline: pipelines) {
// We only take the sum here to ensure that the order does not matter.
result += (pipeline == null ? 0 : pipeline.hashCode());
}
return result;
}
} }

View File

@ -35,6 +35,7 @@ import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@ -148,14 +149,14 @@ public final class PipelineConfiguration extends AbstractDiffable<PipelineConfig
PipelineConfiguration that = (PipelineConfiguration) o; PipelineConfiguration that = (PipelineConfiguration) o;
if (!id.equals(that.id)) return false; if (!id.equals(that.id)) return false;
return config.equals(that.config); return getConfigAsMap().equals(that.getConfigAsMap());
} }
@Override @Override
public int hashCode() { public int hashCode() {
int result = id.hashCode(); int result = id.hashCode();
result = 31 * result + config.hashCode(); result = 31 * result + getConfigAsMap().hashCode();
return result; return result;
} }
} }

View File

@ -0,0 +1,131 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.ingest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.test.AbstractStreamableXContentTestCase;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class GetPipelineResponseTests extends AbstractStreamableXContentTestCase<GetPipelineResponse> {
private XContentBuilder getRandomXContentBuilder() throws IOException {
XContentType xContentType = randomFrom(XContentType.values());
return XContentBuilder.builder(xContentType.xContent());
}
private PipelineConfiguration createRandomPipeline(String pipelineId) throws IOException {
String field = "field_" + randomInt();
String value = "value_" + randomInt();
XContentBuilder builder = getRandomXContentBuilder();
builder.startObject();
// We only use a single SetProcessor here in each pipeline to test.
// Since the contents are returned as a configMap anyway this does not matter for fromXContent
builder.startObject("set");
builder.field("field", field);
builder.field("value", value);
builder.endObject();
builder.endObject();
return
new PipelineConfiguration(
pipelineId, BytesReference.bytes(builder), builder.contentType()
);
}
private Map<String, PipelineConfiguration> createPipelineConfigMap() throws IOException {
int numPipelines = randomInt(5);
Map<String, PipelineConfiguration> pipelinesMap = new HashMap<>();
for (int i=0; i<numPipelines; i++) {
String pipelineId = "pipeline_" + i;
pipelinesMap.put(pipelineId, createRandomPipeline(pipelineId));
}
return pipelinesMap;
}
public void testXContentDeserialization() throws IOException {
Map<String, PipelineConfiguration> pipelinesMap = createPipelineConfigMap();
GetPipelineResponse response = new GetPipelineResponse(new ArrayList<>(pipelinesMap.values()));
XContentBuilder builder = response.toXContent(getRandomXContentBuilder(), ToXContent.EMPTY_PARAMS);
XContentParser parser =
builder
.generator()
.contentType()
.xContent()
.createParser(
xContentRegistry(),
LoggingDeprecationHandler.INSTANCE,
BytesReference.bytes(builder).streamInput()
);
GetPipelineResponse parsedResponse = GetPipelineResponse.fromXContent(parser);
List<PipelineConfiguration> actualPipelines = response.pipelines();
List<PipelineConfiguration> parsedPipelines = parsedResponse.pipelines();
assertEquals(actualPipelines.size(), parsedPipelines.size());
for (PipelineConfiguration pipeline: parsedPipelines) {
assertTrue(pipelinesMap.containsKey(pipeline.getId()));
assertEquals(pipelinesMap.get(pipeline.getId()).getConfigAsMap(), pipeline.getConfigAsMap());
}
}
@Override
protected GetPipelineResponse doParseInstance(XContentParser parser) throws IOException {
return GetPipelineResponse.fromXContent(parser);
}
@Override
protected GetPipelineResponse createBlankInstance() {
return new GetPipelineResponse();
}
@Override
protected GetPipelineResponse createTestInstance() {
try {
return new GetPipelineResponse(new ArrayList<>(createPipelineConfigMap().values()));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
protected boolean supportsUnknownFields() {
return false;
}
@Override
protected GetPipelineResponse mutateInstance(GetPipelineResponse response) {
try {
List<PipelineConfiguration> clonePipelines = new ArrayList<>(response.pipelines());
clonePipelines.add(createRandomPipeline("pipeline_" + clonePipelines.size() + 1));
return new GetPipelineResponse(clonePipelines);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}