From 11a6622e46c65e0e94244f791146f957f8b1b63d Mon Sep 17 00:00:00 2001 From: javanna Date: Tue, 12 Jan 2016 19:10:04 +0100 Subject: [PATCH] add proper ingest methods to Client Now that ingest is part of core we can add specific put/get/delete/simualtePipeline methods to the Client interface which is nice for java api users --- .../java/org/elasticsearch/client/Client.java | 70 +++++++++++ .../client/support/AbstractClient.java | 74 ++++++++++++ .../ingest/RestDeletePipelineAction.java | 5 +- .../action/ingest/RestGetPipelineAction.java | 5 +- .../action/ingest/RestPutPipelineAction.java | 5 +- .../ingest/RestSimulatePipelineAction.java | 5 +- .../elasticsearch/ingest/IngestClientIT.java | 109 +++++++++--------- 7 files changed, 209 insertions(+), 64 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/client/Client.java b/core/src/main/java/org/elasticsearch/client/Client.java index e7461dabfe1..2d7e8bde0a6 100644 --- a/core/src/main/java/org/elasticsearch/client/Client.java +++ b/core/src/main/java/org/elasticsearch/client/Client.java @@ -51,6 +51,16 @@ import org.elasticsearch.action.indexedscripts.get.GetIndexedScriptResponse; import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptRequest; import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptRequestBuilder; import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptResponse; +import org.elasticsearch.action.ingest.DeletePipelineRequest; +import org.elasticsearch.action.ingest.DeletePipelineRequestBuilder; +import org.elasticsearch.action.ingest.GetPipelineRequest; +import org.elasticsearch.action.ingest.GetPipelineRequestBuilder; +import org.elasticsearch.action.ingest.GetPipelineResponse; +import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.ingest.PutPipelineRequestBuilder; +import org.elasticsearch.action.ingest.SimulatePipelineRequest; +import org.elasticsearch.action.ingest.SimulatePipelineRequestBuilder; +import org.elasticsearch.action.ingest.SimulatePipelineResponse; import org.elasticsearch.action.percolate.MultiPercolateRequest; import org.elasticsearch.action.percolate.MultiPercolateRequestBuilder; import org.elasticsearch.action.percolate.MultiPercolateResponse; @@ -592,6 +602,66 @@ public interface Client extends ElasticsearchClient, Releasable { void fieldStats(FieldStatsRequest request, ActionListener listener); + /** + * Stores an ingest pipeline + */ + void putPipeline(PutPipelineRequest request, ActionListener listener); + + /** + * Stores an ingest pipeline + */ + ActionFuture putPipeline(PutPipelineRequest request); + + /** + * Stores an ingest pipeline + */ + PutPipelineRequestBuilder preparePutPipeline(); + + /** + * Deletes a stored ingest pipeline + */ + void deletePipeline(DeletePipelineRequest request, ActionListener listener); + + /** + * Deletes a stored ingest pipeline + */ + ActionFuture deletePipeline(DeletePipelineRequest request); + + /** + * Deletes a stored ingest pipeline + */ + DeletePipelineRequestBuilder prepareDeletePipeline(); + + /** + * Returns a stored ingest pipeline + */ + void getPipeline(GetPipelineRequest request, ActionListener listener); + + /** + * Returns a stored ingest pipeline + */ + ActionFuture getPipeline(GetPipelineRequest request); + + /** + * Returns a stored ingest pipeline + */ + GetPipelineRequestBuilder prepareGetPipeline(); + + /** + * Simulates an ingest pipeline + */ + void simulatePipeline(SimulatePipelineRequest request, ActionListener listener); + + /** + * Simulates an ingest pipeline + */ + ActionFuture simulatePipeline(SimulatePipelineRequest request); + + /** + * Simulates an ingest pipeline + */ + SimulatePipelineRequestBuilder prepareSimulatePipeline(); + /** * Returns this clients settings */ diff --git a/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java index e5a465442bb..bc5ed9410d1 100644 --- a/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java +++ b/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java @@ -272,6 +272,20 @@ import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptAction; import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptRequest; import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptRequestBuilder; import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptResponse; +import org.elasticsearch.action.ingest.DeletePipelineAction; +import org.elasticsearch.action.ingest.DeletePipelineRequest; +import org.elasticsearch.action.ingest.DeletePipelineRequestBuilder; +import org.elasticsearch.action.ingest.GetPipelineAction; +import org.elasticsearch.action.ingest.GetPipelineRequest; +import org.elasticsearch.action.ingest.GetPipelineRequestBuilder; +import org.elasticsearch.action.ingest.GetPipelineResponse; +import org.elasticsearch.action.ingest.PutPipelineAction; +import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.ingest.PutPipelineRequestBuilder; +import org.elasticsearch.action.ingest.SimulatePipelineAction; +import org.elasticsearch.action.ingest.SimulatePipelineRequest; +import org.elasticsearch.action.ingest.SimulatePipelineRequestBuilder; +import org.elasticsearch.action.ingest.SimulatePipelineResponse; import org.elasticsearch.action.percolate.MultiPercolateAction; import org.elasticsearch.action.percolate.MultiPercolateRequest; import org.elasticsearch.action.percolate.MultiPercolateRequestBuilder; @@ -791,6 +805,66 @@ public abstract class AbstractClient extends AbstractComponent implements Client return new FieldStatsRequestBuilder(this, FieldStatsAction.INSTANCE); } + @Override + public void putPipeline(PutPipelineRequest request, ActionListener listener) { + execute(PutPipelineAction.INSTANCE, request, listener); + } + + @Override + public ActionFuture putPipeline(PutPipelineRequest request) { + return execute(PutPipelineAction.INSTANCE, request); + } + + @Override + public PutPipelineRequestBuilder preparePutPipeline() { + return new PutPipelineRequestBuilder(this, PutPipelineAction.INSTANCE); + } + + @Override + public void deletePipeline(DeletePipelineRequest request, ActionListener listener) { + execute(DeletePipelineAction.INSTANCE, request, listener); + } + + @Override + public ActionFuture deletePipeline(DeletePipelineRequest request) { + return execute(DeletePipelineAction.INSTANCE, request); + } + + @Override + public DeletePipelineRequestBuilder prepareDeletePipeline() { + return new DeletePipelineRequestBuilder(this, DeletePipelineAction.INSTANCE); + } + + @Override + public void getPipeline(GetPipelineRequest request, ActionListener listener) { + execute(GetPipelineAction.INSTANCE, request, listener); + } + + @Override + public ActionFuture getPipeline(GetPipelineRequest request) { + return execute(GetPipelineAction.INSTANCE, request); + } + + @Override + public GetPipelineRequestBuilder prepareGetPipeline() { + return new GetPipelineRequestBuilder(this, GetPipelineAction.INSTANCE); + } + + @Override + public void simulatePipeline(SimulatePipelineRequest request, ActionListener listener) { + execute(SimulatePipelineAction.INSTANCE, request, listener); + } + + @Override + public ActionFuture simulatePipeline(SimulatePipelineRequest request) { + return execute(SimulatePipelineAction.INSTANCE, request); + } + + @Override + public SimulatePipelineRequestBuilder prepareSimulatePipeline() { + return new SimulatePipelineRequestBuilder(this, SimulatePipelineAction.INSTANCE); + } + static class Admin implements AdminClient { private final ClusterAdmin clusterAdmin; diff --git a/core/src/main/java/org/elasticsearch/rest/action/ingest/RestDeletePipelineAction.java b/core/src/main/java/org/elasticsearch/rest/action/ingest/RestDeletePipelineAction.java index 994e0300407..cb70b5a79c8 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/ingest/RestDeletePipelineAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/ingest/RestDeletePipelineAction.java @@ -19,11 +19,10 @@ package org.elasticsearch.rest.action.ingest; +import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.action.ingest.DeletePipelineAction; -import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestController; @@ -42,6 +41,6 @@ public class RestDeletePipelineAction extends BaseRestHandler { protected void handleRequest(RestRequest restRequest, RestChannel channel, Client client) throws Exception { DeletePipelineRequest request = new DeletePipelineRequest(); request.id(restRequest.param("id")); - client.execute(DeletePipelineAction.INSTANCE, request, new RestStatusToXContentListener<>(channel)); + client.deletePipeline(request, new RestStatusToXContentListener<>(channel)); } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/ingest/RestGetPipelineAction.java b/core/src/main/java/org/elasticsearch/rest/action/ingest/RestGetPipelineAction.java index 47f41fc437b..7fae61eaed5 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/ingest/RestGetPipelineAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/ingest/RestGetPipelineAction.java @@ -19,12 +19,11 @@ package org.elasticsearch.rest.action.ingest; +import org.elasticsearch.action.ingest.GetPipelineRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.action.ingest.GetPipelineAction; -import org.elasticsearch.action.ingest.GetPipelineRequest; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestController; @@ -43,6 +42,6 @@ public class RestGetPipelineAction extends BaseRestHandler { protected void handleRequest(RestRequest restRequest, RestChannel channel, Client client) throws Exception { GetPipelineRequest request = new GetPipelineRequest(); request.ids(Strings.splitStringByCommaToArray(restRequest.param("id"))); - client.execute(GetPipelineAction.INSTANCE, request, new RestStatusToXContentListener<>(channel)); + client.getPipeline(request, new RestStatusToXContentListener<>(channel)); } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java b/core/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java index b63b2eb44a7..98ec67782d5 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java @@ -19,11 +19,10 @@ package org.elasticsearch.rest.action.ingest; +import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.action.ingest.PutPipelineAction; -import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestController; @@ -45,6 +44,6 @@ public class RestPutPipelineAction extends BaseRestHandler { if (restRequest.hasContent()) { request.source(restRequest.content()); } - client.execute(PutPipelineAction.INSTANCE, request, new RestStatusToXContentListener<>(channel)); + client.putPipeline(request, new RestStatusToXContentListener<>(channel)); } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulatePipelineAction.java b/core/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulatePipelineAction.java index ed859e2a442..da902bdaa42 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulatePipelineAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulatePipelineAction.java @@ -19,11 +19,10 @@ package org.elasticsearch.rest.action.ingest; +import org.elasticsearch.action.ingest.SimulatePipelineRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.action.ingest.SimulatePipelineAction; -import org.elasticsearch.action.ingest.SimulatePipelineRequest; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestController; @@ -52,6 +51,6 @@ public class RestSimulatePipelineAction extends BaseRestHandler { request.setSource(RestActions.getRestContent(restRequest)); } - client.execute(SimulatePipelineAction.INSTANCE, request, new RestToXContentListener<>(channel)); + client.simulatePipeline(request, new RestToXContentListener<>(channel)); } } diff --git a/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java b/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java index 43c12255b17..1c50bc38b6a 100644 --- a/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java @@ -25,17 +25,14 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.ingest.DeletePipelineAction; -import org.elasticsearch.action.ingest.DeletePipelineRequestBuilder; -import org.elasticsearch.action.ingest.GetPipelineAction; -import org.elasticsearch.action.ingest.GetPipelineRequestBuilder; +import org.elasticsearch.action.ingest.DeletePipelineRequest; +import org.elasticsearch.action.ingest.GetPipelineRequest; import org.elasticsearch.action.ingest.GetPipelineResponse; -import org.elasticsearch.action.ingest.PutPipelineAction; -import org.elasticsearch.action.ingest.PutPipelineRequestBuilder; +import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.ingest.SimulateDocumentSimpleResult; -import org.elasticsearch.action.ingest.SimulatePipelineAction; -import org.elasticsearch.action.ingest.SimulatePipelineRequestBuilder; +import org.elasticsearch.action.ingest.SimulatePipelineRequest; import org.elasticsearch.action.ingest.SimulatePipelineResponse; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.ingest.core.IngestDocument; import org.elasticsearch.plugins.Plugin; @@ -69,7 +66,7 @@ public class IngestClientIT extends ESIntegTestCase { } public void testSimulate() throws Exception { - new PutPipelineRequestBuilder(client(), PutPipelineAction.INSTANCE) + client().preparePutPipeline() .setId("_id") .setSource(jsonBuilder().startObject() .field("description", "my_pipeline") @@ -81,30 +78,37 @@ public class IngestClientIT extends ESIntegTestCase { .endArray() .endObject().bytes()) .get(); - GetPipelineResponse getResponse = new GetPipelineRequestBuilder(client(), GetPipelineAction.INSTANCE) + GetPipelineResponse getResponse = client().prepareGetPipeline() .setIds("_id") .get(); assertThat(getResponse.isFound(), is(true)); assertThat(getResponse.pipelines().size(), equalTo(1)); assertThat(getResponse.pipelines().get(0).getId(), equalTo("_id")); - SimulatePipelineResponse response = new SimulatePipelineRequestBuilder(client(), SimulatePipelineAction.INSTANCE) + BytesReference bytes = jsonBuilder().startObject() + .startArray("docs") + .startObject() + .field("_index", "index") + .field("_type", "type") + .field("_id", "id") + .startObject("_source") + .field("foo", "bar") + .field("fail", false) + .endObject() + .endObject() + .endArray() + .endObject().bytes(); + SimulatePipelineResponse response; + if (randomBoolean()) { + response = client().prepareSimulatePipeline() .setId("_id") - .setSource(jsonBuilder().startObject() - .startArray("docs") - .startObject() - .field("_index", "index") - .field("_type", "type") - .field("_id", "id") - .startObject("_source") - .field("foo", "bar") - .field("fail", false) - .endObject() - .endObject() - .endArray() - .endObject().bytes()) - .get(); - + .setSource(bytes).get(); + } else { + SimulatePipelineRequest request = new SimulatePipelineRequest(); + request.setId("_id"); + request.setSource(bytes); + response = client().simulatePipeline(request).get(); + } assertThat(response.isVerbose(), equalTo(false)); assertThat(response.getPipelineId(), equalTo("_id")); assertThat(response.getResults().size(), equalTo(1)); @@ -122,18 +126,19 @@ public class IngestClientIT extends ESIntegTestCase { public void testBulkWithIngestFailures() throws Exception { createIndex("index"); - new PutPipelineRequestBuilder(client(), PutPipelineAction.INSTANCE) - .setId("_id") - .setSource(jsonBuilder().startObject() - .field("description", "my_pipeline") - .startArray("processors") - .startObject() - .startObject("test") - .endObject() - .endObject() - .endArray() - .endObject().bytes()) - .get(); + PutPipelineRequest putPipelineRequest = new PutPipelineRequest(); + putPipelineRequest.id("_id"); + putPipelineRequest.source(jsonBuilder().startObject() + .field("description", "my_pipeline") + .startArray("processors") + .startObject() + .startObject("test") + .endObject() + .endObject() + .endArray() + .endObject().bytes()); + + client().putPipeline(putPipelineRequest).get(); int numRequests = scaledRandomIntBetween(32, 128); BulkRequest bulkRequest = new BulkRequest(); @@ -159,9 +164,10 @@ public class IngestClientIT extends ESIntegTestCase { } public void test() throws Exception { - new PutPipelineRequestBuilder(client(), PutPipelineAction.INSTANCE) - .setId("_id") - .setSource(jsonBuilder().startObject() + + PutPipelineRequest putPipelineRequest = new PutPipelineRequest(); + putPipelineRequest.id("_id"); + putPipelineRequest.source(jsonBuilder().startObject() .field("description", "my_pipeline") .startArray("processors") .startObject() @@ -169,11 +175,12 @@ public class IngestClientIT extends ESIntegTestCase { .endObject() .endObject() .endArray() - .endObject().bytes()) - .get(); - GetPipelineResponse getResponse = new GetPipelineRequestBuilder(client(), GetPipelineAction.INSTANCE) - .setIds("_id") - .get(); + .endObject().bytes()); + client().putPipeline(putPipelineRequest).get(); + + GetPipelineRequest getPipelineRequest = new GetPipelineRequest(); + getPipelineRequest.ids("_id"); + GetPipelineResponse getResponse = client().getPipeline(getPipelineRequest).get(); assertThat(getResponse.isFound(), is(true)); assertThat(getResponse.pipelines().size(), equalTo(1)); assertThat(getResponse.pipelines().get(0).getId(), equalTo("_id")); @@ -191,15 +198,13 @@ public class IngestClientIT extends ESIntegTestCase { assertThat(doc.get("field"), equalTo("value2")); assertThat(doc.get("processed"), equalTo(true)); - DeleteResponse response = new DeletePipelineRequestBuilder(client(), DeletePipelineAction.INSTANCE) - .setId("_id") - .get(); + DeletePipelineRequest deletePipelineRequest = new DeletePipelineRequest(); + deletePipelineRequest.id("_id"); + DeleteResponse response = client().deletePipeline(deletePipelineRequest).get(); assertThat(response.isFound(), is(true)); assertThat(response.getId(), equalTo("_id")); - getResponse = new GetPipelineRequestBuilder(client(), GetPipelineAction.INSTANCE) - .setIds("_id") - .get(); + getResponse = client().prepareGetPipeline().setIds("_id").get(); assertThat(getResponse.isFound(), is(false)); assertThat(getResponse.pipelines().size(), equalTo(0)); }