Merge pull request #15937 from javanna/enhancement/client_ingest_methods

add proper ingest methods to Client
This commit is contained in:
Luca Cavanna 2016-01-13 09:46:20 +01:00
commit f23f0bf0bf
7 changed files with 209 additions and 64 deletions

View File

@ -51,6 +51,16 @@ import org.elasticsearch.action.indexedscripts.get.GetIndexedScriptResponse;
import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptRequest;
import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptRequestBuilder;
import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptResponse;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.DeletePipelineRequestBuilder;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineRequestBuilder;
import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequestBuilder;
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
import org.elasticsearch.action.ingest.SimulatePipelineRequestBuilder;
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
import org.elasticsearch.action.percolate.MultiPercolateRequest;
import org.elasticsearch.action.percolate.MultiPercolateRequestBuilder;
import org.elasticsearch.action.percolate.MultiPercolateResponse;
@ -592,6 +602,66 @@ public interface Client extends ElasticsearchClient, Releasable {
void fieldStats(FieldStatsRequest request, ActionListener<FieldStatsResponse> listener);
/**
* Stores an ingest pipeline
*/
void putPipeline(PutPipelineRequest request, ActionListener<IndexResponse> listener);
/**
* Stores an ingest pipeline
*/
ActionFuture<IndexResponse> putPipeline(PutPipelineRequest request);
/**
* Stores an ingest pipeline
*/
PutPipelineRequestBuilder preparePutPipeline();
/**
* Deletes a stored ingest pipeline
*/
void deletePipeline(DeletePipelineRequest request, ActionListener<DeleteResponse> listener);
/**
* Deletes a stored ingest pipeline
*/
ActionFuture<DeleteResponse> deletePipeline(DeletePipelineRequest request);
/**
* Deletes a stored ingest pipeline
*/
DeletePipelineRequestBuilder prepareDeletePipeline();
/**
* Returns a stored ingest pipeline
*/
void getPipeline(GetPipelineRequest request, ActionListener<GetPipelineResponse> listener);
/**
* Returns a stored ingest pipeline
*/
ActionFuture<GetPipelineResponse> getPipeline(GetPipelineRequest request);
/**
* Returns a stored ingest pipeline
*/
GetPipelineRequestBuilder prepareGetPipeline();
/**
* Simulates an ingest pipeline
*/
void simulatePipeline(SimulatePipelineRequest request, ActionListener<SimulatePipelineResponse> listener);
/**
* Simulates an ingest pipeline
*/
ActionFuture<SimulatePipelineResponse> simulatePipeline(SimulatePipelineRequest request);
/**
* Simulates an ingest pipeline
*/
SimulatePipelineRequestBuilder prepareSimulatePipeline();
/**
* Returns this clients settings
*/

View File

@ -272,6 +272,20 @@ import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptAction;
import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptRequest;
import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptRequestBuilder;
import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptResponse;
import org.elasticsearch.action.ingest.DeletePipelineAction;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.DeletePipelineRequestBuilder;
import org.elasticsearch.action.ingest.GetPipelineAction;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineRequestBuilder;
import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineAction;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequestBuilder;
import org.elasticsearch.action.ingest.SimulatePipelineAction;
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
import org.elasticsearch.action.ingest.SimulatePipelineRequestBuilder;
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
import org.elasticsearch.action.percolate.MultiPercolateAction;
import org.elasticsearch.action.percolate.MultiPercolateRequest;
import org.elasticsearch.action.percolate.MultiPercolateRequestBuilder;
@ -791,6 +805,66 @@ public abstract class AbstractClient extends AbstractComponent implements Client
return new FieldStatsRequestBuilder(this, FieldStatsAction.INSTANCE);
}
@Override
public void putPipeline(PutPipelineRequest request, ActionListener<IndexResponse> listener) {
execute(PutPipelineAction.INSTANCE, request, listener);
}
@Override
public ActionFuture<IndexResponse> putPipeline(PutPipelineRequest request) {
return execute(PutPipelineAction.INSTANCE, request);
}
@Override
public PutPipelineRequestBuilder preparePutPipeline() {
return new PutPipelineRequestBuilder(this, PutPipelineAction.INSTANCE);
}
@Override
public void deletePipeline(DeletePipelineRequest request, ActionListener<DeleteResponse> listener) {
execute(DeletePipelineAction.INSTANCE, request, listener);
}
@Override
public ActionFuture<DeleteResponse> deletePipeline(DeletePipelineRequest request) {
return execute(DeletePipelineAction.INSTANCE, request);
}
@Override
public DeletePipelineRequestBuilder prepareDeletePipeline() {
return new DeletePipelineRequestBuilder(this, DeletePipelineAction.INSTANCE);
}
@Override
public void getPipeline(GetPipelineRequest request, ActionListener<GetPipelineResponse> listener) {
execute(GetPipelineAction.INSTANCE, request, listener);
}
@Override
public ActionFuture<GetPipelineResponse> getPipeline(GetPipelineRequest request) {
return execute(GetPipelineAction.INSTANCE, request);
}
@Override
public GetPipelineRequestBuilder prepareGetPipeline() {
return new GetPipelineRequestBuilder(this, GetPipelineAction.INSTANCE);
}
@Override
public void simulatePipeline(SimulatePipelineRequest request, ActionListener<SimulatePipelineResponse> listener) {
execute(SimulatePipelineAction.INSTANCE, request, listener);
}
@Override
public ActionFuture<SimulatePipelineResponse> simulatePipeline(SimulatePipelineRequest request) {
return execute(SimulatePipelineAction.INSTANCE, request);
}
@Override
public SimulatePipelineRequestBuilder prepareSimulatePipeline() {
return new SimulatePipelineRequestBuilder(this, SimulatePipelineAction.INSTANCE);
}
static class Admin implements AdminClient {
private final ClusterAdmin clusterAdmin;

View File

@ -19,11 +19,10 @@
package org.elasticsearch.rest.action.ingest;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.action.ingest.DeletePipelineAction;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
@ -42,6 +41,6 @@ public class RestDeletePipelineAction extends BaseRestHandler {
protected void handleRequest(RestRequest restRequest, RestChannel channel, Client client) throws Exception {
DeletePipelineRequest request = new DeletePipelineRequest();
request.id(restRequest.param("id"));
client.execute(DeletePipelineAction.INSTANCE, request, new RestStatusToXContentListener<>(channel));
client.deletePipeline(request, new RestStatusToXContentListener<>(channel));
}
}

View File

@ -19,12 +19,11 @@
package org.elasticsearch.rest.action.ingest;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.action.ingest.GetPipelineAction;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
@ -43,6 +42,6 @@ public class RestGetPipelineAction extends BaseRestHandler {
protected void handleRequest(RestRequest restRequest, RestChannel channel, Client client) throws Exception {
GetPipelineRequest request = new GetPipelineRequest();
request.ids(Strings.splitStringByCommaToArray(restRequest.param("id")));
client.execute(GetPipelineAction.INSTANCE, request, new RestStatusToXContentListener<>(channel));
client.getPipeline(request, new RestStatusToXContentListener<>(channel));
}
}

View File

@ -19,11 +19,10 @@
package org.elasticsearch.rest.action.ingest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.action.ingest.PutPipelineAction;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
@ -45,6 +44,6 @@ public class RestPutPipelineAction extends BaseRestHandler {
if (restRequest.hasContent()) {
request.source(restRequest.content());
}
client.execute(PutPipelineAction.INSTANCE, request, new RestStatusToXContentListener<>(channel));
client.putPipeline(request, new RestStatusToXContentListener<>(channel));
}
}

View File

@ -19,11 +19,10 @@
package org.elasticsearch.rest.action.ingest;
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.action.ingest.SimulatePipelineAction;
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
@ -52,6 +51,6 @@ public class RestSimulatePipelineAction extends BaseRestHandler {
request.setSource(RestActions.getRestContent(restRequest));
}
client.execute(SimulatePipelineAction.INSTANCE, request, new RestToXContentListener<>(channel));
client.simulatePipeline(request, new RestToXContentListener<>(channel));
}
}

View File

@ -25,17 +25,14 @@ import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.ingest.DeletePipelineAction;
import org.elasticsearch.action.ingest.DeletePipelineRequestBuilder;
import org.elasticsearch.action.ingest.GetPipelineAction;
import org.elasticsearch.action.ingest.GetPipelineRequestBuilder;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineAction;
import org.elasticsearch.action.ingest.PutPipelineRequestBuilder;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.SimulateDocumentSimpleResult;
import org.elasticsearch.action.ingest.SimulatePipelineAction;
import org.elasticsearch.action.ingest.SimulatePipelineRequestBuilder;
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.plugins.Plugin;
@ -69,7 +66,7 @@ public class IngestClientIT extends ESIntegTestCase {
}
public void testSimulate() throws Exception {
new PutPipelineRequestBuilder(client(), PutPipelineAction.INSTANCE)
client().preparePutPipeline()
.setId("_id")
.setSource(jsonBuilder().startObject()
.field("description", "my_pipeline")
@ -81,16 +78,14 @@ public class IngestClientIT extends ESIntegTestCase {
.endArray()
.endObject().bytes())
.get();
GetPipelineResponse getResponse = new GetPipelineRequestBuilder(client(), GetPipelineAction.INSTANCE)
GetPipelineResponse getResponse = client().prepareGetPipeline()
.setIds("_id")
.get();
assertThat(getResponse.isFound(), is(true));
assertThat(getResponse.pipelines().size(), equalTo(1));
assertThat(getResponse.pipelines().get(0).getId(), equalTo("_id"));
SimulatePipelineResponse response = new SimulatePipelineRequestBuilder(client(), SimulatePipelineAction.INSTANCE)
.setId("_id")
.setSource(jsonBuilder().startObject()
BytesReference bytes = jsonBuilder().startObject()
.startArray("docs")
.startObject()
.field("_index", "index")
@ -102,9 +97,18 @@ public class IngestClientIT extends ESIntegTestCase {
.endObject()
.endObject()
.endArray()
.endObject().bytes())
.get();
.endObject().bytes();
SimulatePipelineResponse response;
if (randomBoolean()) {
response = client().prepareSimulatePipeline()
.setId("_id")
.setSource(bytes).get();
} else {
SimulatePipelineRequest request = new SimulatePipelineRequest();
request.setId("_id");
request.setSource(bytes);
response = client().simulatePipeline(request).get();
}
assertThat(response.isVerbose(), equalTo(false));
assertThat(response.getPipelineId(), equalTo("_id"));
assertThat(response.getResults().size(), equalTo(1));
@ -122,9 +126,9 @@ public class IngestClientIT extends ESIntegTestCase {
public void testBulkWithIngestFailures() throws Exception {
createIndex("index");
new PutPipelineRequestBuilder(client(), PutPipelineAction.INSTANCE)
.setId("_id")
.setSource(jsonBuilder().startObject()
PutPipelineRequest putPipelineRequest = new PutPipelineRequest();
putPipelineRequest.id("_id");
putPipelineRequest.source(jsonBuilder().startObject()
.field("description", "my_pipeline")
.startArray("processors")
.startObject()
@ -132,8 +136,9 @@ public class IngestClientIT extends ESIntegTestCase {
.endObject()
.endObject()
.endArray()
.endObject().bytes())
.get();
.endObject().bytes());
client().putPipeline(putPipelineRequest).get();
int numRequests = scaledRandomIntBetween(32, 128);
BulkRequest bulkRequest = new BulkRequest();
@ -159,9 +164,10 @@ public class IngestClientIT extends ESIntegTestCase {
}
public void test() throws Exception {
new PutPipelineRequestBuilder(client(), PutPipelineAction.INSTANCE)
.setId("_id")
.setSource(jsonBuilder().startObject()
PutPipelineRequest putPipelineRequest = new PutPipelineRequest();
putPipelineRequest.id("_id");
putPipelineRequest.source(jsonBuilder().startObject()
.field("description", "my_pipeline")
.startArray("processors")
.startObject()
@ -169,11 +175,12 @@ public class IngestClientIT extends ESIntegTestCase {
.endObject()
.endObject()
.endArray()
.endObject().bytes())
.get();
GetPipelineResponse getResponse = new GetPipelineRequestBuilder(client(), GetPipelineAction.INSTANCE)
.setIds("_id")
.get();
.endObject().bytes());
client().putPipeline(putPipelineRequest).get();
GetPipelineRequest getPipelineRequest = new GetPipelineRequest();
getPipelineRequest.ids("_id");
GetPipelineResponse getResponse = client().getPipeline(getPipelineRequest).get();
assertThat(getResponse.isFound(), is(true));
assertThat(getResponse.pipelines().size(), equalTo(1));
assertThat(getResponse.pipelines().get(0).getId(), equalTo("_id"));
@ -191,15 +198,13 @@ public class IngestClientIT extends ESIntegTestCase {
assertThat(doc.get("field"), equalTo("value2"));
assertThat(doc.get("processed"), equalTo(true));
DeleteResponse response = new DeletePipelineRequestBuilder(client(), DeletePipelineAction.INSTANCE)
.setId("_id")
.get();
DeletePipelineRequest deletePipelineRequest = new DeletePipelineRequest();
deletePipelineRequest.id("_id");
DeleteResponse response = client().deletePipeline(deletePipelineRequest).get();
assertThat(response.isFound(), is(true));
assertThat(response.getId(), equalTo("_id"));
getResponse = new GetPipelineRequestBuilder(client(), GetPipelineAction.INSTANCE)
.setIds("_id")
.get();
getResponse = client().prepareGetPipeline().setIds("_id").get();
assertThat(getResponse.isFound(), is(false));
assertThat(getResponse.pipelines().size(), equalTo(0));
}