fix PutPipelineRequest comments

This commit is contained in:
Martijn van Groningen 2016-01-20 14:00:55 +01:00
parent f7024bc4dd
commit 8b520111d0
6 changed files with 26 additions and 33 deletions

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException; import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError; import static org.elasticsearch.action.ValidateActions.addValidationError;
@ -36,30 +37,23 @@ public class PutPipelineRequest extends AcknowledgedRequest<PutPipelineRequest>
@Override @Override
public ActionRequestValidationException validate() { public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null; return null;
if (id == null) {
validationException = addValidationError("id is missing", validationException);
}
if (source == null) {
validationException = addValidationError("source is missing", validationException);
}
return validationException;
} }
public String id() { public String getId() {
return id; return id;
} }
public void id(String id) { public void setId(String id) {
this.id = id; this.id = Objects.requireNonNull(id);
} }
public BytesReference source() { public BytesReference getSource() {
return source; return source;
} }
public void source(BytesReference source) { public void setSource(BytesReference source) {
this.source = source; this.source = Objects.requireNonNull(source);
} }
@Override @Override

View File

@ -20,7 +20,6 @@
package org.elasticsearch.action.ingest; package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
@ -31,12 +30,12 @@ public class PutPipelineRequestBuilder extends ActionRequestBuilder<PutPipelineR
} }
public PutPipelineRequestBuilder setId(String id) { public PutPipelineRequestBuilder setId(String id) {
request.id(id); request.setId(id);
return this; return this;
} }
public PutPipelineRequestBuilder setSource(BytesReference source) { public PutPipelineRequestBuilder setSource(BytesReference source) {
request.source(source); request.setSource(source);
return this; return this;
} }

View File

@ -157,12 +157,12 @@ public class PipelineStore extends AbstractComponent implements Closeable, Clust
public void put(PutPipelineRequest request, ActionListener<WritePipelineResponse> listener) throws IllegalArgumentException { public void put(PutPipelineRequest request, ActionListener<WritePipelineResponse> listener) throws IllegalArgumentException {
try { try {
// validates the pipeline and processor configuration before submitting a cluster update task: // validates the pipeline and processor configuration before submitting a cluster update task:
Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.source(), false).v2(); Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false).v2();
constructPipeline(request.id(), pipelineConfig); constructPipeline(request.getId(), pipelineConfig);
} catch (Exception e) { } catch (Exception e) {
throw new IllegalArgumentException("Invalid pipeline configuration", e); throw new IllegalArgumentException("Invalid pipeline configuration", e);
} }
clusterService.submitStateUpdateTask("put-pipeline-" + request.id(), new AckedClusterStateUpdateTask<WritePipelineResponse>(request, listener) { clusterService.submitStateUpdateTask("put-pipeline-" + request.getId(), new AckedClusterStateUpdateTask<WritePipelineResponse>(request, listener) {
@Override @Override
protected WritePipelineResponse newResponse(boolean acknowledged) { protected WritePipelineResponse newResponse(boolean acknowledged) {
@ -185,7 +185,7 @@ public class PipelineStore extends AbstractComponent implements Closeable, Clust
pipelines = new HashMap<>(); pipelines = new HashMap<>();
} }
pipelines.put(request.id(), new PipelineConfiguration(request.id(), request.source())); pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), request.getSource()));
ClusterState.Builder newState = ClusterState.builder(currentState); ClusterState.Builder newState = ClusterState.builder(currentState);
newState.metaData(MetaData.builder(currentState.getMetaData()) newState.metaData(MetaData.builder(currentState.getMetaData())
.putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines)) .putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines))

View File

@ -40,9 +40,9 @@ public class RestPutPipelineAction extends BaseRestHandler {
@Override @Override
protected void handleRequest(RestRequest restRequest, RestChannel channel, Client client) throws Exception { protected void handleRequest(RestRequest restRequest, RestChannel channel, Client client) throws Exception {
PutPipelineRequest request = new PutPipelineRequest(); PutPipelineRequest request = new PutPipelineRequest();
request.id(restRequest.param("id")); request.setId(restRequest.param("id"));
if (restRequest.hasContent()) { if (restRequest.hasContent()) {
request.source(restRequest.content()); request.setSource(restRequest.content());
} }
request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout())); request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
request.timeout(restRequest.paramAsTime("timeout", request.timeout())); request.timeout(restRequest.paramAsTime("timeout", request.timeout()));

View File

@ -129,8 +129,8 @@ public class IngestClientIT extends ESIntegTestCase {
createIndex("index"); createIndex("index");
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(); PutPipelineRequest putPipelineRequest = new PutPipelineRequest();
putPipelineRequest.id("_id"); putPipelineRequest.setId("_id");
putPipelineRequest.source(jsonBuilder().startObject() putPipelineRequest.setSource(jsonBuilder().startObject()
.field("description", "my_pipeline") .field("description", "my_pipeline")
.startArray("processors") .startArray("processors")
.startObject() .startObject()
@ -167,8 +167,8 @@ public class IngestClientIT extends ESIntegTestCase {
public void test() throws Exception { public void test() throws Exception {
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(); PutPipelineRequest putPipelineRequest = new PutPipelineRequest();
putPipelineRequest.id("_id"); putPipelineRequest.setId("_id");
putPipelineRequest.source(jsonBuilder().startObject() putPipelineRequest.setSource(jsonBuilder().startObject()
.field("description", "my_pipeline") .field("description", "my_pipeline")
.startArray("processors") .startArray("processors")
.startObject() .startObject()

View File

@ -85,8 +85,8 @@ public class PipelineStoreTests extends ESTestCase {
// add a new pipeline: // add a new pipeline:
PutPipelineRequest putRequest = new PutPipelineRequest(); PutPipelineRequest putRequest = new PutPipelineRequest();
putRequest.id(id); putRequest.setId(id);
putRequest.source(new BytesArray("{\"processors\": []}")); putRequest.setSource(new BytesArray("{\"processors\": []}"));
clusterState = store.innerPut(putRequest, clusterState); clusterState = store.innerPut(putRequest, clusterState);
store.innerUpdatePipelines(clusterState); store.innerUpdatePipelines(clusterState);
pipeline = store.get(id); pipeline = store.get(id);
@ -97,8 +97,8 @@ public class PipelineStoreTests extends ESTestCase {
// overwrite existing pipeline: // overwrite existing pipeline:
putRequest = new PutPipelineRequest(); putRequest = new PutPipelineRequest();
putRequest.id(id); putRequest.setId(id);
putRequest.source(new BytesArray("{\"processors\": [], \"description\": \"_description\"}")); putRequest.setSource(new BytesArray("{\"processors\": [], \"description\": \"_description\"}"));
clusterState = store.innerPut(putRequest, clusterState); clusterState = store.innerPut(putRequest, clusterState);
store.innerUpdatePipelines(clusterState); store.innerUpdatePipelines(clusterState);
pipeline = store.get(id); pipeline = store.get(id);
@ -170,8 +170,8 @@ public class PipelineStoreTests extends ESTestCase {
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
PutPipelineRequest putRequest = new PutPipelineRequest(); PutPipelineRequest putRequest = new PutPipelineRequest();
putRequest.id(id); putRequest.setId(id);
putRequest.source(new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}")); putRequest.setSource(new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"));
clusterState = store.innerPut(putRequest, clusterState); clusterState = store.innerPut(putRequest, clusterState);
store.innerUpdatePipelines(clusterState); store.innerUpdatePipelines(clusterState);
pipeline = store.get(id); pipeline = store.get(id);