diff --git a/docs/plugins/ingest.asciidoc b/docs/plugins/ingest.asciidoc new file mode 100644 index 00000000000..3fb61bdc3b7 --- /dev/null +++ b/docs/plugins/ingest.asciidoc @@ -0,0 +1,74 @@ +[[ingest]] +== Ingest Plugin + +TODO + +=== Put pipeline API + +The put pipeline api adds pipelines and updates existing pipelines in the cluster. + +[source,js] +-------------------------------------------------- +PUT _ingest/pipeline/my-pipeline-id +{ + "description" : "describe pipeline", + "processors" : [ + { + "simple" : { + // settings + } + }, + // other processors + ] +} +-------------------------------------------------- +// AUTOSENSE + +NOTE: Each ingest node updates its processors asynchronously in the background, so it may take a few seconds for all + nodes to have the latest version of the pipeline. + +=== Get pipeline API + +The get pipeline api returns pipelines based on id. This api always returns a local reference of the pipeline. + +[source,js] +-------------------------------------------------- +GET _ingest/pipeline/my-pipeline-id +-------------------------------------------------- +// AUTOSENSE + +Example response: + +[source,js] +-------------------------------------------------- +{ + "my-pipeline-id": { + "_source" : { + "description": "describe pipeline", + "processors": [ + { + "simple" : { + // settings + } + }, + // other processors + ] + }, + "_version" : 0 + } +} +-------------------------------------------------- + +For each returned pipeline the source and the version is returned. +The version is useful for knowing what version of the pipeline the node has. +Multiple ids can be provided at the same time. Also wildcards are supported. + +=== Delete pipeline API + +The delete pipeline api deletes pipelines by id. + +[source,js] +-------------------------------------------------- +DELETE _ingest/pipeline/my-pipeline-id +-------------------------------------------------- +// AUTOSENSE \ No newline at end of file diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/Pipeline.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/Pipeline.java index baa8427d069..1ea8f58a12e 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -72,16 +72,15 @@ public final class Pipeline { public final static class Builder { - private final String name; + private final String id; private String description; private List processors = new ArrayList<>(); - public Builder(String name) { - this.name = name; + public Builder(String id) { + this.id = id; } - public Builder(Map config, Map processorRegistry) { - name = (String) config.get("name"); + public void fromMap(Map config, Map processorRegistry) { description = (String) config.get("description"); @SuppressWarnings("unchecked") List>> processors = (List>>) config.get("processors"); @@ -111,7 +110,7 @@ public final class Pipeline { } public Pipeline build() { - return new Pipeline(name, description, Collections.unmodifiableList(processors)); + return new Pipeline(id, description, Collections.unmodifiableList(processors)); } } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java index 6a2138c9089..2786923b7ef 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java @@ -21,11 +21,9 @@ package org.elasticsearch.plugin.ingest; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.multibindings.MapBinder; -import org.elasticsearch.common.inject.multibindings.Multibinder; import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.SimpleProcessor; import org.elasticsearch.plugin.ingest.rest.IngestRestFilter; -import org.elasticsearch.plugin.ingest.transport.IngestActionFilter; import java.util.HashMap; import java.util.Map; @@ -39,7 +37,7 @@ public class IngestModule extends AbstractModule { binder().bind(IngestRestFilter.class).asEagerSingleton(); binder().bind(PipelineExecutionService.class).asEagerSingleton(); binder().bind(PipelineStore.class).asEagerSingleton(); - binder().bind(PipelineConfigDocReader.class).asEagerSingleton(); + binder().bind(PipelineStoreClient.class).asEagerSingleton(); registerProcessor(SimpleProcessor.TYPE, SimpleProcessor.Builder.Factory.class); diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java index 7cb62e86a72..9dfbf314499 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java @@ -21,12 +21,22 @@ package org.elasticsearch.plugin.ingest; import org.elasticsearch.action.ActionModule; +import org.elasticsearch.client.Client; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugin.ingest.rest.RestDeletePipelineAction; +import org.elasticsearch.plugin.ingest.rest.RestGetPipelineAction; +import org.elasticsearch.plugin.ingest.rest.RestPutPipelineAction; import org.elasticsearch.plugin.ingest.transport.IngestActionFilter; +import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineAction; +import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineTransportAction; +import org.elasticsearch.plugin.ingest.transport.get.GetPipelineAction; +import org.elasticsearch.plugin.ingest.transport.get.GetPipelineTransportAction; +import org.elasticsearch.plugin.ingest.transport.put.PutPipelineAction; +import org.elasticsearch.plugin.ingest.transport.put.PutPipelineTransportAction; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.rest.action.RestActionModule; +import org.elasticsearch.rest.RestModule; import java.util.Arrays; import java.util.Collection; @@ -42,9 +52,11 @@ public class IngestPlugin extends Plugin { public static final String NAME = "ingest"; private final Settings nodeSettings; + private final boolean transportClient; public IngestPlugin(Settings nodeSettings) { this.nodeSettings = nodeSettings; + transportClient = "transport".equals(nodeSettings.get(Client.CLIENT_TYPE_SETTING)); } @Override @@ -59,12 +71,20 @@ public class IngestPlugin extends Plugin { @Override public Collection nodeModules() { - return Collections.singletonList(new IngestModule()); + if (transportClient) { + return Collections.emptyList(); + } else { + return Collections.singletonList(new IngestModule()); + } } @Override public Collection> nodeServices() { - return Arrays.asList(PipelineStore.class, PipelineConfigDocReader.class); + if (transportClient) { + return Collections.emptyList(); + } else { + return Arrays.asList(PipelineStore.class, PipelineStoreClient.class); + } } @Override @@ -75,7 +95,18 @@ public class IngestPlugin extends Plugin { } public void onModule(ActionModule module) { - module.registerFilter(IngestActionFilter.class); + if (!transportClient) { + module.registerFilter(IngestActionFilter.class); + } + module.registerAction(PutPipelineAction.INSTANCE, PutPipelineTransportAction.class); + module.registerAction(GetPipelineAction.INSTANCE, GetPipelineTransportAction.class); + module.registerAction(DeletePipelineAction.INSTANCE, DeletePipelineTransportAction.class); + } + + public void onModule(RestModule restModule) { + restModule.addRestAction(RestPutPipelineAction.class); + restModule.addRestAction(RestGetPipelineAction.class); + restModule.addRestAction(RestDeletePipelineAction.class); } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java index b33bac75995..b5c9b0ffe18 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.gateway.GatewayService; @@ -34,10 +35,7 @@ import org.elasticsearch.ingest.Processor; import org.elasticsearch.search.SearchHit; import org.elasticsearch.threadpool.ThreadPool; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; +import java.util.*; public class PipelineStore extends AbstractLifecycleComponent { @@ -47,13 +45,13 @@ public class PipelineStore extends AbstractLifecycleComponent { private final ThreadPool threadPool; private final ClusterService clusterService; private final TimeValue pipelineUpdateInterval; - private final PipelineConfigDocReader configDocReader; + private final PipelineStoreClient configDocReader; private final Map processorFactoryRegistry; private volatile Map pipelines = new HashMap<>(); @Inject - public PipelineStore(Settings settings, ThreadPool threadPool, ClusterService clusterService, PipelineConfigDocReader configDocReader, Map processors) { + public PipelineStore(Settings settings, ThreadPool threadPool, ClusterService clusterService, PipelineStoreClient configDocReader, Map processors) { super(settings); this.threadPool = threadPool; this.clusterService = clusterService; @@ -84,10 +82,32 @@ public class PipelineStore extends AbstractLifecycleComponent { } } + public List getReference(String... ids) { + List result = new ArrayList<>(ids.length); + for (String id : ids) { + if (Regex.isSimpleMatchPattern(id)) { + for (Map.Entry entry : pipelines.entrySet()) { + if (Regex.simpleMatch(id, entry.getKey())) { + result.add(entry.getValue()); + } + } + } else { + PipelineReference reference = pipelines.get(id); + if (reference != null) { + result.add(reference); + } + } + } + return result; + } + void updatePipelines() { + // note: this process isn't fast or smart, but the idea is that there will not be many pipelines, + // so for that reason the goal is to keep the update logic simple. + int changed = 0; Map newPipelines = new HashMap<>(pipelines); - for (SearchHit hit : configDocReader.readAll()) { + for (SearchHit hit : configDocReader.readAllPipelines()) { String pipelineId = hit.getId(); BytesReference pipelineSource = hit.getSourceRef(); PipelineReference previous = newPipelines.get(pipelineId); @@ -98,15 +118,24 @@ public class PipelineStore extends AbstractLifecycleComponent { } changed++; - Pipeline.Builder builder = new Pipeline.Builder(hit.sourceAsMap(), processorFactoryRegistry); + Pipeline.Builder builder = new Pipeline.Builder(hit.getId()); + builder.fromMap(hit.sourceAsMap(), processorFactoryRegistry); newPipelines.put(pipelineId, new PipelineReference(builder.build(), hit.getVersion(), pipelineSource)); } - if (changed != 0) { - logger.debug("adding or updating [{}] pipelines", changed); + int removed = 0; + for (String existingPipelineId : pipelines.keySet()) { + if (!configDocReader.existPipeline(existingPipelineId)) { + newPipelines.remove(existingPipelineId); + removed++; + } + } + + if (changed != 0 || removed != 0) { + logger.debug("adding or updating [{}] pipelines and [{}] pipelines removed", changed, removed); pipelines = newPipelines; } else { - logger.debug("adding no new pipelines"); + logger.debug("no pipelines changes detected"); } } @@ -142,7 +171,7 @@ public class PipelineStore extends AbstractLifecycleComponent { } } - static class PipelineReference { + public static class PipelineReference { private final Pipeline pipeline; private final long version; diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineConfigDocReader.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStoreClient.java similarity index 90% rename from plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineConfigDocReader.java rename to plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStoreClient.java index 9b2dcf1e09e..c11969f840c 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineConfigDocReader.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStoreClient.java @@ -19,6 +19,7 @@ package org.elasticsearch.plugin.ingest; +import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; @@ -33,14 +34,14 @@ import org.elasticsearch.search.sort.SortOrder; import java.util.Collections; import java.util.Iterator; -public class PipelineConfigDocReader extends AbstractLifecycleComponent { +public class PipelineStoreClient extends AbstractLifecycleComponent { private volatile Client client; private final Injector injector; private final TimeValue scrollTimeout; @Inject - public PipelineConfigDocReader(Settings settings, Injector injector) { + public PipelineStoreClient(Settings settings, Injector injector) { super(settings); this.injector = injector; this.scrollTimeout = settings.getAsTime("ingest.pipeline.store.scroll.timeout", TimeValue.timeValueSeconds(30)); @@ -60,7 +61,7 @@ public class PipelineConfigDocReader extends AbstractLifecycleComponent { protected void doClose() { } - public Iterable readAll() { + public Iterable readAllPipelines() { // TODO: the search should be replaced with an ingest API when it is available SearchResponse searchResponse = client.prepareSearch(PipelineStore.INDEX) .setVersion(true) @@ -81,6 +82,11 @@ public class PipelineConfigDocReader extends AbstractLifecycleComponent { }; } + public boolean existPipeline(String pipelineId) { + GetResponse response = client.prepareGet(PipelineStore.INDEX, PipelineStore.TYPE, pipelineId).get(); + return response.isExists(); + } + class SearchScrollIterator implements Iterator { private SearchResponse searchResponse; diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/rest/RestDeletePipelineAction.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/rest/RestDeletePipelineAction.java new file mode 100644 index 00000000000..f09c229a710 --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/rest/RestDeletePipelineAction.java @@ -0,0 +1,50 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest.rest; + +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineAction; +import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineRequest; +import org.elasticsearch.plugin.ingest.transport.get.GetPipelineAction; +import org.elasticsearch.plugin.ingest.transport.get.GetPipelineRequest; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.support.RestToXContentListener; + +public class RestDeletePipelineAction extends BaseRestHandler { + + @Inject + public RestDeletePipelineAction(Settings settings, RestController controller, Client client) { + super(settings, controller, client); + controller.registerHandler(RestRequest.Method.DELETE, "/_ingest/pipeline/{id}", this); + } + + @Override + protected void handleRequest(RestRequest restRequest, RestChannel channel, Client client) throws Exception { + DeletePipelineRequest request = new DeletePipelineRequest(); + request.id(restRequest.param("id")); + client.execute(DeletePipelineAction.INSTANCE, request, new RestToXContentListener<>(channel)); + } +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/rest/RestGetPipelineAction.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/rest/RestGetPipelineAction.java new file mode 100644 index 00000000000..a58366eedb0 --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/rest/RestGetPipelineAction.java @@ -0,0 +1,51 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest.rest; + +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugin.ingest.transport.get.GetPipelineAction; +import org.elasticsearch.plugin.ingest.transport.get.GetPipelineRequest; +import org.elasticsearch.plugin.ingest.transport.put.PutPipelineAction; +import org.elasticsearch.plugin.ingest.transport.put.PutPipelineRequest; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.support.RestStatusToXContentListener; +import org.elasticsearch.rest.action.support.RestToXContentListener; + +public class RestGetPipelineAction extends BaseRestHandler { + + @Inject + public RestGetPipelineAction(Settings settings, RestController controller, Client client) { + super(settings, controller, client); + controller.registerHandler(RestRequest.Method.GET, "/_ingest/pipeline/{ids}", this); + } + + @Override + protected void handleRequest(RestRequest restRequest, RestChannel channel, Client client) throws Exception { + GetPipelineRequest request = new GetPipelineRequest(); + request.ids(Strings.splitStringByCommaToArray(restRequest.param("ids"))); + client.execute(GetPipelineAction.INSTANCE, request, new RestStatusToXContentListener<>(channel)); + } +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/rest/RestPutPipelineAction.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/rest/RestPutPipelineAction.java new file mode 100644 index 00000000000..5b5bd0a0d2e --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/rest/RestPutPipelineAction.java @@ -0,0 +1,50 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest.rest; + +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugin.ingest.transport.put.PutPipelineAction; +import org.elasticsearch.plugin.ingest.transport.put.PutPipelineRequest; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.support.RestToXContentListener; + +public class RestPutPipelineAction extends BaseRestHandler { + + @Inject + public RestPutPipelineAction(Settings settings, RestController controller, Client client) { + super(settings, controller, client); + controller.registerHandler(RestRequest.Method.PUT, "/_ingest/pipeline/{id}", this); + } + + @Override + protected void handleRequest(RestRequest restRequest, RestChannel channel, Client client) throws Exception { + PutPipelineRequest request = new PutPipelineRequest(); + request.id(restRequest.param("id")); + if (restRequest.hasContent()) { + request.source(restRequest.content()); + } + client.execute(PutPipelineAction.INSTANCE, request, new RestToXContentListener<>(channel)); + } +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/delete/DeletePipelineAction.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/delete/DeletePipelineAction.java new file mode 100644 index 00000000000..b6405362ce5 --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/delete/DeletePipelineAction.java @@ -0,0 +1,43 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest.transport.delete; + +import org.elasticsearch.action.Action; +import org.elasticsearch.client.ElasticsearchClient; + +public class DeletePipelineAction extends Action { + + public static final DeletePipelineAction INSTANCE = new DeletePipelineAction(); + public static final String NAME = "cluster:admin/ingest/pipeline/delete"; + + public DeletePipelineAction() { + super(NAME); + } + + @Override + public DeletePipelineRequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new DeletePipelineRequestBuilder(client, this); + } + + @Override + public DeletePipelineResponse newResponse() { + return new DeletePipelineResponse(); + } +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/delete/DeletePipelineRequest.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/delete/DeletePipelineRequest.java new file mode 100644 index 00000000000..1b31d5f44b2 --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/delete/DeletePipelineRequest.java @@ -0,0 +1,63 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest.transport.delete; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +public class DeletePipelineRequest extends ActionRequest { + + private String id; + + public void id(String id) { + this.id = id; + } + + public String id() { + return id; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (id == null) { + validationException = addValidationError("id is missing", validationException); + } + return validationException; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + id = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(id); + } +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/delete/DeletePipelineRequestBuilder.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/delete/DeletePipelineRequestBuilder.java new file mode 100644 index 00000000000..ab7eea1c972 --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/delete/DeletePipelineRequestBuilder.java @@ -0,0 +1,36 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest.transport.delete; + +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.client.ElasticsearchClient; + +public class DeletePipelineRequestBuilder extends ActionRequestBuilder { + + public DeletePipelineRequestBuilder(ElasticsearchClient client, DeletePipelineAction action) { + super(client, action, new DeletePipelineRequest()); + } + + public DeletePipelineRequestBuilder setId(String id) { + request.id(id); + return this; + } + +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/delete/DeletePipelineResponse.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/delete/DeletePipelineResponse.java new file mode 100644 index 00000000000..a35752636b6 --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/delete/DeletePipelineResponse.java @@ -0,0 +1,80 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest.transport.delete; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.common.xcontent.XContentHelper; + +import java.io.IOException; +import java.util.Map; + +public class DeletePipelineResponse extends ActionResponse implements ToXContent { + + private String id; + private boolean found; + + DeletePipelineResponse() { + } + + public DeletePipelineResponse(String id, boolean found) { + this.id = id; + this.found = found; + } + + public String id() { + return id; + } + + public boolean found() { + return found; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + this.id = in.readString(); + this.found = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(id); + out.writeBoolean(found); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(Fields.ID, id); + builder.field(Fields.FOUND, found); + return builder; + } + + static final class Fields { + static final XContentBuilderString ID = new XContentBuilderString("_id"); + static final XContentBuilderString FOUND = new XContentBuilderString("_found"); + } +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/delete/DeletePipelineTransportAction.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/delete/DeletePipelineTransportAction.java new file mode 100644 index 00000000000..b57a3db00c9 --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/delete/DeletePipelineTransportAction.java @@ -0,0 +1,69 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest.transport.delete; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.delete.TransportDeleteAction; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugin.ingest.PipelineStore; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class DeletePipelineTransportAction extends HandledTransportAction { + + private final TransportDeleteAction deleteAction; + + @Inject + public DeletePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, TransportDeleteAction deleteAction) { + super(settings, DeletePipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, DeletePipelineRequest::new); + this.deleteAction = deleteAction; + } + + @Override + protected void doExecute(DeletePipelineRequest request, ActionListener listener) { + DeleteRequest deleteRequest = new DeleteRequest(); + deleteRequest.index(PipelineStore.INDEX); + deleteRequest.type(PipelineStore.TYPE); + deleteRequest.id(request.id()); + deleteRequest.refresh(true); + deleteAction.execute(deleteRequest, new ActionListener() { + @Override + public void onResponse(DeleteResponse deleteResponse) { + listener.onResponse(new DeletePipelineResponse(deleteResponse.getId(), deleteResponse.isFound())); + } + + @Override + public void onFailure(Throwable e) { + listener.onFailure(e); + } + }); + } +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/get/GetPipelineAction.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/get/GetPipelineAction.java new file mode 100644 index 00000000000..0904a8a3f9f --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/get/GetPipelineAction.java @@ -0,0 +1,43 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest.transport.get; + +import org.elasticsearch.action.Action; +import org.elasticsearch.client.ElasticsearchClient; + +public class GetPipelineAction extends Action { + + public static final GetPipelineAction INSTANCE = new GetPipelineAction(); + public static final String NAME = "cluster:admin/ingest/pipeline/get"; + + public GetPipelineAction() { + super(NAME); + } + + @Override + public GetPipelineRequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new GetPipelineRequestBuilder(client, this); + } + + @Override + public GetPipelineResponse newResponse() { + return new GetPipelineResponse(); + } +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/get/GetPipelineRequest.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/get/GetPipelineRequest.java new file mode 100644 index 00000000000..0ff673a7bdb --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/get/GetPipelineRequest.java @@ -0,0 +1,63 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest.transport.get; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +public class GetPipelineRequest extends ActionRequest { + + private String[] ids; + + public void ids(String... ids) { + this.ids = ids; + } + + public String[] ids() { + return ids; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (ids == null || ids.length == 0) { + validationException = addValidationError("ids is missing", validationException); + } + return validationException; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + ids = in.readStringArray(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(ids); + } +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/get/GetPipelineRequestBuilder.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/get/GetPipelineRequestBuilder.java new file mode 100644 index 00000000000..4269b6ceccd --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/get/GetPipelineRequestBuilder.java @@ -0,0 +1,36 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest.transport.get; + +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.client.ElasticsearchClient; + +public class GetPipelineRequestBuilder extends ActionRequestBuilder { + + public GetPipelineRequestBuilder(ElasticsearchClient client, GetPipelineAction action) { + super(client, action, new GetPipelineRequest()); + } + + public GetPipelineRequestBuilder setIds(String... ids) { + request.ids(ids); + return this; + } + +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/get/GetPipelineResponse.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/get/GetPipelineResponse.java new file mode 100644 index 00000000000..020c8004631 --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/get/GetPipelineResponse.java @@ -0,0 +1,102 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest.transport.get; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.StatusToXContent; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.rest.RestStatus; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class GetPipelineResponse extends ActionResponse implements StatusToXContent { + + private Map pipelines; + private Map versions; + + public GetPipelineResponse() { + } + + public GetPipelineResponse(Map pipelines, Map versions) { + this.pipelines = pipelines; + this.versions = versions; + } + + public Map pipelines() { + return pipelines; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + int size = in.readVInt(); + pipelines = new HashMap<>(size); + for (int i = 0; i < size; i++) { + pipelines.put(in.readString(), in.readBytesReference()); + } + size = in.readVInt(); + versions = new HashMap<>(size); + for (int i = 0; i < size; i++) { + versions.put(in.readString(), in.readVLong()); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(pipelines.size()); + for (Map.Entry entry : pipelines.entrySet()) { + out.writeString(entry.getKey()); + out.writeBytesReference(entry.getValue()); + } + out.writeVInt(versions.size()); + for (Map.Entry entry : versions.entrySet()) { + out.writeString(entry.getKey()); + out.writeVLong(entry.getValue()); + } + } + + public boolean isFound() { + return !pipelines.isEmpty(); + } + + @Override + public RestStatus status() { + return isFound() ? RestStatus.OK : RestStatus.NOT_FOUND; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + for (Map.Entry entry : pipelines.entrySet()) { + builder.startObject(entry.getKey()); + XContentHelper.writeRawField("_source", entry.getValue(), builder, params); + builder.field("_version", versions.get(entry.getKey())); + builder.endObject(); + } + return builder; + } +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/get/GetPipelineTransportAction.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/get/GetPipelineTransportAction.java new file mode 100644 index 00000000000..89ba97d81b3 --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/get/GetPipelineTransportAction.java @@ -0,0 +1,58 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest.transport.get; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugin.ingest.PipelineStore; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class GetPipelineTransportAction extends HandledTransportAction { + + private final PipelineStore pipelineStore; + + @Inject + public GetPipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, PipelineStore pipelineStore) { + super(settings, GetPipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, GetPipelineRequest::new); + this.pipelineStore = pipelineStore; + } + + @Override + protected void doExecute(GetPipelineRequest request, ActionListener listener) { + List references = pipelineStore.getReference(request.ids()); + Map result = new HashMap<>(); + Map versions = new HashMap<>(); + for (PipelineStore.PipelineReference reference : references) { + result.put(reference.getPipeline().getId(), reference.getSource()); + versions.put(reference.getPipeline().getId(), reference.getVersion()); + } + listener.onResponse(new GetPipelineResponse(result, versions)); + } +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/put/PutPipelineAction.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/put/PutPipelineAction.java new file mode 100644 index 00000000000..a638d0c8010 --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/put/PutPipelineAction.java @@ -0,0 +1,43 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest.transport.put; + +import org.elasticsearch.action.Action; +import org.elasticsearch.client.ElasticsearchClient; + +public class PutPipelineAction extends Action { + + public static final PutPipelineAction INSTANCE = new PutPipelineAction(); + public static final String NAME = "cluster:admin/ingest/pipeline/put"; + + public PutPipelineAction() { + super(NAME); + } + + @Override + public PutPipelineRequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new PutPipelineRequestBuilder(client, this); + } + + @Override + public PutPipelineResponse newResponse() { + return new PutPipelineResponse(); + } +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/put/PutPipelineRequest.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/put/PutPipelineRequest.java new file mode 100644 index 00000000000..b9ef9c17e45 --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/put/PutPipelineRequest.java @@ -0,0 +1,78 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest.transport.put; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +public class PutPipelineRequest extends ActionRequest { + + private String id; + private BytesReference source; + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (id == null) { + validationException = addValidationError("id is missing", validationException); + } + if (source == null) { + validationException = addValidationError("source is missing", validationException); + } + return validationException; + } + + public String id() { + return id; + } + + public void id(String id) { + this.id = id; + } + + public BytesReference source() { + return source; + } + + public void source(BytesReference source) { + this.source = source; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + id = in.readString(); + source = in.readBytesReference(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(id); + out.writeBytesReference(source); + } +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/put/PutPipelineRequestBuilder.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/put/PutPipelineRequestBuilder.java new file mode 100644 index 00000000000..732756adee4 --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/put/PutPipelineRequestBuilder.java @@ -0,0 +1,42 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest.transport.put; + +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.common.bytes.BytesReference; + +public class PutPipelineRequestBuilder extends ActionRequestBuilder { + + public PutPipelineRequestBuilder(ElasticsearchClient client, PutPipelineAction action) { + super(client, action, new PutPipelineRequest()); + } + + public PutPipelineRequestBuilder setId(String id) { + request.id(id); + return this; + } + + public PutPipelineRequestBuilder setSource(BytesReference source) { + request.source(source); + return this; + } + +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/put/PutPipelineResponse.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/put/PutPipelineResponse.java new file mode 100644 index 00000000000..eb733bcbffc --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/put/PutPipelineResponse.java @@ -0,0 +1,79 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest.transport.put; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; + +import java.io.IOException; + +public class PutPipelineResponse extends ActionResponse implements ToXContent { + + private String id; + private long version; + + public String id() { + return id; + } + + public PutPipelineResponse id(String id) { + this.id = id; + return this; + } + + public long version() { + return version; + } + + public PutPipelineResponse version(long version) { + this.version = version; + return this; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(id); + out.writeLong(version); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + id = in.readString(); + version = in.readLong(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(Fields.ID, id); + builder.field(Fields.VERSION, version); + return builder; + } + + static final class Fields { + static final XContentBuilderString ID = new XContentBuilderString("_id"); + static final XContentBuilderString VERSION = new XContentBuilderString("_version"); + } +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/put/PutPipelineTransportAction.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/put/PutPipelineTransportAction.java new file mode 100644 index 00000000000..fc5b7e7f124 --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/put/PutPipelineTransportAction.java @@ -0,0 +1,70 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest.transport.put; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.index.TransportIndexAction; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugin.ingest.PipelineStore; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.function.Supplier; + +public class PutPipelineTransportAction extends HandledTransportAction { + + private final TransportIndexAction indexAction; + + @Inject + public PutPipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, TransportIndexAction indexAction) { + super(settings, PutPipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, PutPipelineRequest::new); + this.indexAction = indexAction; + } + + @Override + protected void doExecute(PutPipelineRequest request, ActionListener listener) { + IndexRequest indexRequest = new IndexRequest(); + indexRequest.index(PipelineStore.INDEX); + indexRequest.type(PipelineStore.TYPE); + indexRequest.id(request.id()); + indexRequest.source(request.source()); + indexRequest.refresh(true); + indexAction.execute(indexRequest, new ActionListener() { + @Override + public void onResponse(IndexResponse indexResponse) { + PutPipelineResponse response = new PutPipelineResponse(); + response.id(indexResponse.getId()); + response.version(indexResponse.getVersion()); + listener.onResponse(response); + } + + @Override + public void onFailure(Throwable e) { + listener.onFailure(e); + } + }); + } +} diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/BasicTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java similarity index 56% rename from plugins/ingest/src/test/java/org/elasticsearch/ingest/BasicTests.java rename to plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java index 0c884713374..dd83939530d 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/BasicTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java @@ -21,6 +21,14 @@ package org.elasticsearch.ingest; import org.elasticsearch.plugin.ingest.IngestPlugin; import org.elasticsearch.plugin.ingest.PipelineStore; +import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineAction; +import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineRequestBuilder; +import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineResponse; +import org.elasticsearch.plugin.ingest.transport.get.GetPipelineAction; +import org.elasticsearch.plugin.ingest.transport.get.GetPipelineRequestBuilder; +import org.elasticsearch.plugin.ingest.transport.get.GetPipelineResponse; +import org.elasticsearch.plugin.ingest.transport.put.PutPipelineAction; +import org.elasticsearch.plugin.ingest.transport.put.PutPipelineRequestBuilder; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; @@ -29,20 +37,25 @@ import java.util.Collections; import java.util.Map; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.hamcrest.Matchers.equalTo; +import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE; +import static org.hamcrest.Matchers.*; -@ESIntegTestCase.ClusterScope(numDataNodes = 1, numClientNodes = 0) -public class BasicTests extends ESIntegTestCase { +public class IngestClientIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - return Collections.singletonList(IngestPlugin.class); + return pluginList(IngestPlugin.class); } - public void test() throws Exception { - client().prepareIndex(PipelineStore.INDEX, PipelineStore.TYPE, "_id") + @Override + protected Collection> transportClientPlugins() { + return nodePlugins(); + } + + public void testBasics() throws Exception { + new PutPipelineRequestBuilder(client(), PutPipelineAction.INSTANCE) + .setId("_id") .setSource(jsonBuilder().startObject() - .field("name", "my_pipeline") .field("description", "my_pipeline") .startArray("processors") .startObject() @@ -54,10 +67,18 @@ public class BasicTests extends ESIntegTestCase { .endObject() .endObject() .endArray() - .endObject()) - .setRefresh(true) + .endObject().bytes()) .get(); - Thread.sleep(5000); + assertBusy(new Runnable() { + @Override + public void run() { + GetPipelineResponse response = new GetPipelineRequestBuilder(client(), GetPipelineAction.INSTANCE) + .setIds("_id") + .get(); + assertThat(response.isFound(), is(true)); + assertThat(response.pipelines().get("_id"), notNullValue()); + } + }); createIndex("test"); client().prepareIndex("test", "type", "1").setSource("field2", "abc") @@ -83,6 +104,23 @@ public class BasicTests extends ESIntegTestCase { assertThat(doc.get("field3"), equalTo("xyz")); } }); + + DeletePipelineResponse response = new DeletePipelineRequestBuilder(client(), DeletePipelineAction.INSTANCE) + .setId("_id") + .get(); + assertThat(response.found(), is(true)); + assertThat(response.id(), equalTo("_id")); + + assertBusy(new Runnable() { + @Override + public void run() { + GetPipelineResponse response = new GetPipelineRequestBuilder(client(), GetPipelineAction.INSTANCE) + .setIds("_id") + .get(); + assertThat(response.isFound(), is(false)); + assertThat(response.pipelines().get("_id"), nullValue()); + } + }); } @Override diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineConfigDocReaderTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineStoreClientTests.java similarity index 88% rename from plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineConfigDocReaderTests.java rename to plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineStoreClientTests.java index 782d5a674b4..f670bdab7f9 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineConfigDocReaderTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineStoreClientTests.java @@ -25,10 +25,10 @@ import org.elasticsearch.test.ESSingleNodeTestCase; import static org.hamcrest.Matchers.equalTo; -public class PipelineConfigDocReaderTests extends ESSingleNodeTestCase { +public class PipelineStoreClientTests extends ESSingleNodeTestCase { public void testReadAll() { - PipelineConfigDocReader reader = new PipelineConfigDocReader(Settings.EMPTY, node().injector()); + PipelineStoreClient reader = new PipelineStoreClient(Settings.EMPTY, node().injector()); reader.start(); createIndex(PipelineStore.INDEX); @@ -41,7 +41,7 @@ public class PipelineConfigDocReaderTests extends ESSingleNodeTestCase { client().admin().indices().prepareRefresh().get(); int i = 0; - for (SearchHit hit : reader.readAll()) { + for (SearchHit hit : reader.readAllPipelines()) { assertThat(hit.getId(), equalTo(Integer.toString(i))); assertThat(hit.getVersion(), equalTo(1l)); assertThat(hit.getSource().get("field"), equalTo("value" + i)); diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineStoreTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineStoreTests.java index b10291fefbd..306df53600c 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineStoreTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineStoreTests.java @@ -33,11 +33,13 @@ import org.junit.Before; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.List; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -45,14 +47,14 @@ public class PipelineStoreTests extends ESTestCase { private PipelineStore store; private ThreadPool threadPool; - private PipelineConfigDocReader docReader; + private PipelineStoreClient client; @Before public void init() { threadPool = new ThreadPool("test"); ClusterService clusterService = mock(ClusterService.class); - docReader = mock(PipelineConfigDocReader.class); - store = new PipelineStore(Settings.EMPTY, threadPool, clusterService, docReader, Collections.singletonMap(SimpleProcessor.TYPE, new SimpleProcessor.Builder.Factory())); + client = mock(PipelineStoreClient.class); + store = new PipelineStore(Settings.EMPTY, threadPool, clusterService, client, Collections.singletonMap(SimpleProcessor.TYPE, new SimpleProcessor.Builder.Factory())); store.start(); } @@ -66,52 +68,110 @@ public class PipelineStoreTests extends ESTestCase { public void testUpdatePipeline() { List hits = new ArrayList<>(); hits.add(new InternalSearchHit(0, "1", new StringText("type"), Collections.emptyMap()) - .sourceRef(new BytesArray("{\"name\": \"_name1\", \"description\": \"_description1\"}")) + .sourceRef(new BytesArray("{\"description\": \"_description1\"}")) ); - when(docReader.readAll()).thenReturn(hits); + when(client.readAllPipelines()).thenReturn(hits); + when(client.existPipeline("1")).thenReturn(true); assertThat(store.get("1"), nullValue()); store.updatePipelines(); - assertThat(store.get("1").getId(), equalTo("_name1")); + assertThat(store.get("1").getId(), equalTo("1")); assertThat(store.get("1").getDescription(), equalTo("_description1")); + when(client.existPipeline("2")).thenReturn(true); hits.add(new InternalSearchHit(0, "2", new StringText("type"), Collections.emptyMap()) - .sourceRef(new BytesArray("{\"name\": \"_name2\", \"description\": \"_description2\"}")) + .sourceRef(new BytesArray("{\"description\": \"_description2\"}")) ); store.updatePipelines(); - assertThat(store.get("1").getId(), equalTo("_name1")); + assertThat(store.get("1").getId(), equalTo("1")); assertThat(store.get("1").getDescription(), equalTo("_description1")); - assertThat(store.get("2").getId(), equalTo("_name2")); + assertThat(store.get("2").getId(), equalTo("2")); assertThat(store.get("2").getDescription(), equalTo("_description2")); + + hits.remove(1); + when(client.existPipeline("2")).thenReturn(false); + store.updatePipelines(); + assertThat(store.get("1").getId(), equalTo("1")); + assertThat(store.get("1").getDescription(), equalTo("_description1")); + assertThat(store.get("2"), nullValue()); } public void testPipelineUpdater() throws Exception { List hits = new ArrayList<>(); hits.add(new InternalSearchHit(0, "1", new StringText("type"), Collections.emptyMap()) - .sourceRef(new BytesArray("{\"name\": \"_name1\", \"description\": \"_description1\"}")) + .sourceRef(new BytesArray("{\"description\": \"_description1\"}")) ); - when(docReader.readAll()).thenReturn(hits); + when(client.readAllPipelines()).thenReturn(hits); + when(client.existPipeline(anyString())).thenReturn(true); assertThat(store.get("1"), nullValue()); store.startUpdateWorker(); assertBusy(() -> { assertThat(store.get("1"), notNullValue()); - assertThat(store.get("1").getId(), equalTo("_name1")); + assertThat(store.get("1").getId(), equalTo("1")); assertThat(store.get("1").getDescription(), equalTo("_description1")); }); hits.add(new InternalSearchHit(0, "2", new StringText("type"), Collections.emptyMap()) - .sourceRef(new BytesArray("{\"name\": \"_name2\", \"description\": \"_description2\"}")) + .sourceRef(new BytesArray("{\"description\": \"_description2\"}")) ); assertBusy(() -> { assertThat(store.get("1"), notNullValue()); - assertThat(store.get("1").getId(), equalTo("_name1")); + assertThat(store.get("1").getId(), equalTo("1")); assertThat(store.get("1").getDescription(), equalTo("_description1")); assertThat(store.get("2"), notNullValue()); - assertThat(store.get("2").getId(), equalTo("_name2")); + assertThat(store.get("2").getId(), equalTo("2")); assertThat(store.get("2").getDescription(), equalTo("_description2")); }); } + public void testGetReference() { + // fill the store up for the test: + List hits = new ArrayList<>(); + hits.add(new InternalSearchHit(0, "foo", new StringText("type"), Collections.emptyMap()).sourceRef(new BytesArray("{\"description\": \"_description\"}"))); + hits.add(new InternalSearchHit(0, "bar", new StringText("type"), Collections.emptyMap()).sourceRef(new BytesArray("{\"description\": \"_description\"}"))); + hits.add(new InternalSearchHit(0, "foobar", new StringText("type"), Collections.emptyMap()).sourceRef(new BytesArray("{\"description\": \"_description\"}"))); + when(client.readAllPipelines()).thenReturn(hits); + store.updatePipelines(); + + List result = store.getReference("foo"); + assertThat(result.size(), equalTo(1)); + assertThat(result.get(0).getPipeline().getId(), equalTo("foo")); + + result = store.getReference("foo*"); + // to make sure the order is consistent in the test: + Collections.sort(result, new Comparator() { + @Override + public int compare(PipelineStore.PipelineReference first, PipelineStore.PipelineReference second) { + return first.getPipeline().getId().compareTo(second.getPipeline().getId()); + } + }); + assertThat(result.size(), equalTo(2)); + assertThat(result.get(0).getPipeline().getId(), equalTo("foo")); + assertThat(result.get(1).getPipeline().getId(), equalTo("foobar")); + + result = store.getReference("bar*"); + assertThat(result.size(), equalTo(1)); + assertThat(result.get(0).getPipeline().getId(), equalTo("bar")); + + result = store.getReference("*"); + // to make sure the order is consistent in the test: + Collections.sort(result, new Comparator() { + @Override + public int compare(PipelineStore.PipelineReference first, PipelineStore.PipelineReference second) { + return first.getPipeline().getId().compareTo(second.getPipeline().getId()); + } + }); + assertThat(result.size(), equalTo(3)); + assertThat(result.get(0).getPipeline().getId(), equalTo("bar")); + assertThat(result.get(1).getPipeline().getId(), equalTo("foo")); + assertThat(result.get(2).getPipeline().getId(), equalTo("foobar")); + + result = store.getReference("foo", "bar"); + assertThat(result.size(), equalTo(2)); + assertThat(result.get(0).getPipeline().getId(), equalTo("foo")); + assertThat(result.get(1).getPipeline().getId(), equalTo("bar")); + } + } diff --git a/plugins/ingest/src/test/resources/rest-api-spec/api/ingest.delete_pipeline.json b/plugins/ingest/src/test/resources/rest-api-spec/api/ingest.delete_pipeline.json new file mode 100644 index 00000000000..69b8f53d63a --- /dev/null +++ b/plugins/ingest/src/test/resources/rest-api-spec/api/ingest.delete_pipeline.json @@ -0,0 +1,20 @@ +{ + "ingest.delete_pipeline": { + "documentation": "https://www.elastic.co/guide/en/elasticsearch/plugins/master/ingest.html", + "methods": [ "DELETE" ], + "url": { + "path": "/_ingest/pipeline/{id}", + "paths": [ "/_ingest/pipeline/{id}" ], + "parts": { + "id": { + "type" : "string", + "description" : "Pipeline ID", + "required" : true + } + }, + "params": { + } + }, + "body": null + } +} diff --git a/plugins/ingest/src/test/resources/rest-api-spec/api/ingest.get_pipeline.json b/plugins/ingest/src/test/resources/rest-api-spec/api/ingest.get_pipeline.json new file mode 100644 index 00000000000..246c6535e92 --- /dev/null +++ b/plugins/ingest/src/test/resources/rest-api-spec/api/ingest.get_pipeline.json @@ -0,0 +1,20 @@ +{ + "ingest.get_pipeline": { + "documentation": "https://www.elastic.co/guide/en/elasticsearch/plugins/master/ingest.html", + "methods": [ "GET" ], + "url": { + "path": "/_ingest/pipeline/{ids}", + "paths": [ "/_ingest/pipeline/{ids}" ], + "parts": { + "ids": { + "type" : "string", + "description" : "Comma separated list of pipeline ids. Wildcards supported", + "required" : true + } + }, + "params": { + } + }, + "body": null + } +} diff --git a/plugins/ingest/src/test/resources/rest-api-spec/api/ingest.put_pipeline.json b/plugins/ingest/src/test/resources/rest-api-spec/api/ingest.put_pipeline.json new file mode 100644 index 00000000000..fd88d352731 --- /dev/null +++ b/plugins/ingest/src/test/resources/rest-api-spec/api/ingest.put_pipeline.json @@ -0,0 +1,23 @@ +{ + "ingest.put_pipeline": { + "documentation": "https://www.elastic.co/guide/en/elasticsearch/plugins/master/ingest.html", + "methods": [ "PUT" ], + "url": { + "path": "/_ingest/pipeline/{id}", + "paths": [ "/_ingest/pipeline/{id}" ], + "parts": { + "id": { + "type" : "string", + "description" : "Pipeline ID", + "required" : true + } + }, + "params": { + } + }, + "body": { + "description" : "The ingest definition", + "required" : true + } + } +} diff --git a/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/20_crud.yaml b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/20_crud.yaml new file mode 100644 index 00000000000..ae076f3e34f --- /dev/null +++ b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/20_crud.yaml @@ -0,0 +1,57 @@ +--- +"Test basic pipeline crud": + - do: + cluster.health: + wait_for_status: green + + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "simple" : { + "path" : "field1", + "value" : "_value", + "add_field" : "field2", + "add_field_value" : "_value" + } + } + ] + } + - match: { _id: "my_pipeline" } + + # Simulate a Thread.sleep(), because pipeline are updated in the background + - do: + catch: request_timeout + cluster.health: + wait_for_nodes: 99 + timeout: 2s + - match: { "timed_out": true } + + - do: + ingest.get_pipeline: + ids: "my_pipeline" + - match: { my_pipeline._source.description: "_description" } + - match: { my_pipeline._version: 1 } + + - do: + ingest.delete_pipeline: + id: "my_pipeline" + - match: { _id: "my_pipeline" } + - match: { _found: true } + + # Simulate a Thread.sleep(), because pipeline are updated in the background + - do: + catch: request_timeout + cluster.health: + wait_for_nodes: 99 + timeout: 2s + - match: { "timed_out": true } + + - do: + catch: missing + ingest.get_pipeline: + ids: "my_pipeline"