diff --git a/docs/plugins/ingest.asciidoc b/docs/plugins/ingest.asciidoc index 1d75daf387d..74f1d66b847 100644 --- a/docs/plugins/ingest.asciidoc +++ b/docs/plugins/ingest.asciidoc @@ -1,6 +1,33 @@ [[ingest]] == Ingest Plugin +The ingest plugin can be used to pre-process documents before the actual indexing takes place. +This pre-processing happens by the ingest plugin that intercepts bulk and index requests, applies the +transformations and then passes the documents back to the index or bulk APIs. + +The ingest plugin is disabled by default. In order to enable the ingest plugin the following +setting should be configured in the elasticsearch.yml file: + +[source,yaml] +-------------------------------------------------- +node.ingest: true +-------------------------------------------------- + +The ingest plugin can be installed and enabled on any node. It is possible to run ingest +on an master and or data node or have dedicated client nodes that run with ingest. + +In order to pre-process document before indexing the `pipeline` parameter should be used +on an index or bulk request to tell the ingest plugin what pipeline is going to be used. + +[source,js] +-------------------------------------------------- +PUT /my-index/my-type/my-id?ingest=my_pipeline_id +{ + ... +} +-------------------------------------------------- +// AUTOSENSE + === Processors ==== Set processor diff --git a/plugins/ingest/build.gradle b/plugins/ingest/build.gradle index 2e5aede319f..dfdf421e608 100644 --- a/plugins/ingest/build.gradle +++ b/plugins/ingest/build.gradle @@ -66,3 +66,9 @@ bundlePlugin { //geoip WebServiceClient needs Google http client, but we're not using WebServiceClient and // joni has AsmCompilerSupport, but that isn't being used: thirdPartyAudit.missingClasses = true + +integTest { + cluster { + systemProperty 'es.node.ingest', 'true' + } +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java index c471163c0fa..bc083938ec7 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java @@ -42,31 +42,41 @@ import java.util.Map; public class IngestModule extends AbstractModule { + private final boolean ingestEnabled; private final Map processorFactoryProviders = new HashMap<>(); + public IngestModule(boolean ingestEnabled) { + this.ingestEnabled = ingestEnabled; + } + @Override protected void configure() { + // Even if ingest isn't enable we still need to make sure that rest requests with pipeline + // param copy the pipeline into the context, so that in IngestDisabledActionFilter + // index/bulk requests can be failed binder().bind(IngestRestFilter.class).asEagerSingleton(); - binder().bind(IngestBootstrapper.class).asEagerSingleton(); + if (ingestEnabled) { + binder().bind(IngestBootstrapper.class).asEagerSingleton(); - addProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(environment.configFile())); - addProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(environment.configFile())); - addProcessor(DateProcessor.TYPE, (environment, templateService) -> new DateProcessor.Factory()); - addProcessor(SetProcessor.TYPE, (environment, templateService) -> new SetProcessor.Factory(templateService)); - addProcessor(AppendProcessor.TYPE, (environment, templateService) -> new AppendProcessor.Factory(templateService)); - addProcessor(RenameProcessor.TYPE, (environment, templateService) -> new RenameProcessor.Factory()); - addProcessor(RemoveProcessor.TYPE, (environment, templateService) -> new RemoveProcessor.Factory(templateService)); - addProcessor(SplitProcessor.TYPE, (environment, templateService) -> new SplitProcessor.Factory()); - addProcessor(JoinProcessor.TYPE, (environment, templateService) -> new JoinProcessor.Factory()); - addProcessor(UppercaseProcessor.TYPE, (environment, templateService) -> new UppercaseProcessor.Factory()); - addProcessor(LowercaseProcessor.TYPE, (environment, mustacheFactory) -> new LowercaseProcessor.Factory()); - addProcessor(TrimProcessor.TYPE, (environment, templateService) -> new TrimProcessor.Factory()); - addProcessor(ConvertProcessor.TYPE, (environment, templateService) -> new ConvertProcessor.Factory()); - addProcessor(GsubProcessor.TYPE, (environment, templateService) -> new GsubProcessor.Factory()); + addProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(environment.configFile())); + addProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(environment.configFile())); + addProcessor(DateProcessor.TYPE, (environment, templateService) -> new DateProcessor.Factory()); + addProcessor(SetProcessor.TYPE, (environment, templateService) -> new SetProcessor.Factory(templateService)); + addProcessor(AppendProcessor.TYPE, (environment, templateService) -> new AppendProcessor.Factory(templateService)); + addProcessor(RenameProcessor.TYPE, (environment, templateService) -> new RenameProcessor.Factory()); + addProcessor(RemoveProcessor.TYPE, (environment, templateService) -> new RemoveProcessor.Factory(templateService)); + addProcessor(SplitProcessor.TYPE, (environment, templateService) -> new SplitProcessor.Factory()); + addProcessor(JoinProcessor.TYPE, (environment, templateService) -> new JoinProcessor.Factory()); + addProcessor(UppercaseProcessor.TYPE, (environment, templateService) -> new UppercaseProcessor.Factory()); + addProcessor(LowercaseProcessor.TYPE, (environment, mustacheFactory) -> new LowercaseProcessor.Factory()); + addProcessor(TrimProcessor.TYPE, (environment, templateService) -> new TrimProcessor.Factory()); + addProcessor(ConvertProcessor.TYPE, (environment, templateService) -> new ConvertProcessor.Factory()); + addProcessor(GsubProcessor.TYPE, (environment, templateService) -> new GsubProcessor.Factory()); - MapBinder mapBinder = MapBinder.newMapBinder(binder(), String.class, ProcessorFactoryProvider.class); - for (Map.Entry entry : processorFactoryProviders.entrySet()) { - mapBinder.addBinding(entry.getKey()).toInstance(entry.getValue()); + MapBinder mapBinder = MapBinder.newMapBinder(binder(), String.class, ProcessorFactoryProvider.class); + for (Map.Entry entry : processorFactoryProviders.entrySet()) { + mapBinder.addBinding(entry.getKey()).toInstance(entry.getValue()); + } } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java index af85765c416..f8eb044d881 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java @@ -30,7 +30,9 @@ import org.elasticsearch.plugin.ingest.rest.RestDeletePipelineAction; import org.elasticsearch.plugin.ingest.rest.RestGetPipelineAction; import org.elasticsearch.plugin.ingest.rest.RestPutPipelineAction; import org.elasticsearch.plugin.ingest.rest.RestSimulatePipelineAction; +import org.elasticsearch.plugin.ingest.rest.RestIngestDisabledAction; import org.elasticsearch.plugin.ingest.transport.IngestActionFilter; +import org.elasticsearch.plugin.ingest.transport.IngestDisabledActionFilter; import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineAction; import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineTransportAction; import org.elasticsearch.plugin.ingest.transport.get.GetPipelineAction; @@ -56,11 +58,13 @@ public class IngestPlugin extends Plugin { public static final String NODE_INGEST_SETTING = "node.ingest"; private final Settings nodeSettings; + private final boolean ingestEnabled; private final boolean transportClient; public IngestPlugin(Settings nodeSettings) { this.nodeSettings = nodeSettings; - transportClient = TransportClient.CLIENT_TYPE.equals(nodeSettings.get(Client.CLIENT_TYPE_SETTING)); + this.ingestEnabled = nodeSettings.getAsBoolean(NODE_INGEST_SETTING, false); + this.transportClient = TransportClient.CLIENT_TYPE.equals(nodeSettings.get(Client.CLIENT_TYPE_SETTING)); } @Override @@ -78,13 +82,13 @@ public class IngestPlugin extends Plugin { if (transportClient) { return Collections.emptyList(); } else { - return Collections.singletonList(new IngestModule()); + return Collections.singletonList(new IngestModule(ingestEnabled)); } } @Override public Collection> nodeServices() { - if (transportClient) { + if (transportClient|| ingestEnabled == false) { return Collections.emptyList(); } else { return Collections.singletonList(IngestBootstrapper.class); @@ -95,27 +99,37 @@ public class IngestPlugin extends Plugin { public Settings additionalSettings() { return settingsBuilder() .put(PipelineExecutionService.additionalSettings(nodeSettings)) - // TODO: in a followup issue this should be made configurable - .put(NODE_INGEST_SETTING, true) .build(); } public void onModule(ActionModule module) { if (transportClient == false) { - module.registerFilter(IngestActionFilter.class); + if (ingestEnabled) { + module.registerFilter(IngestActionFilter.class); + } else { + module.registerFilter(IngestDisabledActionFilter.class); + } + } + if (ingestEnabled) { + module.registerAction(PutPipelineAction.INSTANCE, PutPipelineTransportAction.class); + module.registerAction(GetPipelineAction.INSTANCE, GetPipelineTransportAction.class); + module.registerAction(DeletePipelineAction.INSTANCE, DeletePipelineTransportAction.class); + module.registerAction(SimulatePipelineAction.INSTANCE, SimulatePipelineTransportAction.class); } - module.registerAction(PutPipelineAction.INSTANCE, PutPipelineTransportAction.class); - module.registerAction(GetPipelineAction.INSTANCE, GetPipelineTransportAction.class); - module.registerAction(DeletePipelineAction.INSTANCE, DeletePipelineTransportAction.class); - module.registerAction(SimulatePipelineAction.INSTANCE, SimulatePipelineTransportAction.class); } public void onModule(NetworkModule networkModule) { - if (transportClient == false) { + if (transportClient) { + return; + } + + if (ingestEnabled) { networkModule.registerRestHandler(RestPutPipelineAction.class); networkModule.registerRestHandler(RestGetPipelineAction.class); networkModule.registerRestHandler(RestDeletePipelineAction.class); networkModule.registerRestHandler(RestSimulatePipelineAction.class); + } else { + networkModule.registerRestHandler(RestIngestDisabledAction.class); } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/rest/RestIngestDisabledAction.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/rest/RestIngestDisabledAction.java new file mode 100644 index 00000000000..80d0784956d --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/rest/RestIngestDisabledAction.java @@ -0,0 +1,48 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.plugin.ingest.rest; + +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.BytesRestResponse; + +public class RestIngestDisabledAction extends BaseRestHandler { + + @Inject + public RestIngestDisabledAction(Settings settings, RestController controller, Client client) { + super(settings, controller, client); + controller.registerHandler(RestRequest.Method.DELETE, "/_ingest/pipeline/{id}", this); + controller.registerHandler(RestRequest.Method.GET, "/_ingest/pipeline/{id}", this); + controller.registerHandler(RestRequest.Method.PUT, "/_ingest/pipeline/{id}", this); + controller.registerHandler(RestRequest.Method.POST, "/_ingest/pipeline/{id}/_simulate", this); + controller.registerHandler(RestRequest.Method.GET, "/_ingest/pipeline/{id}/_simulate", this); + controller.registerHandler(RestRequest.Method.POST, "/_ingest/pipeline/_simulate", this); + controller.registerHandler(RestRequest.Method.GET, "/_ingest/pipeline/_simulate", this); + } + + @Override + protected void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception { + channel.sendResponse(new BytesRestResponse(channel, new IllegalArgumentException("ingest plugin is disabled, pipeline CRUD or simulate APIs cannot be used"))); + } +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestDisabledActionFilter.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestDisabledActionFilter.java new file mode 100644 index 00000000000..06db1ab099c --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestDisabledActionFilter.java @@ -0,0 +1,58 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.plugin.ingest.transport; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ActionFilter; +import org.elasticsearch.action.support.ActionFilterChain; +import org.elasticsearch.plugin.ingest.IngestPlugin; + +public final class IngestDisabledActionFilter implements ActionFilter { + + @Override + public void apply(String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) { + String pipelineId = request.getFromContext(IngestPlugin.PIPELINE_ID_PARAM_CONTEXT_KEY); + if (pipelineId != null) { + failRequest(pipelineId); + } + pipelineId = request.getHeader(IngestPlugin.PIPELINE_ID_PARAM); + if (pipelineId != null) { + failRequest(pipelineId); + } + + chain.proceed(action, request, listener); + } + + @Override + public void apply(String action, ActionResponse response, ActionListener listener, ActionFilterChain chain) { + chain.proceed(action, response, listener); + } + + @Override + public int order() { + return Integer.MAX_VALUE; + } + + private static void failRequest(String pipelineId) { + throw new IllegalArgumentException("ingest plugin is disabled, cannot execute pipeline with id [" + pipelineId + "]"); + } + +} diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java index 545f8a0b6bf..13934f5a83d 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.plugin.ingest.IngestPlugin; @@ -68,6 +69,22 @@ public class IngestClientIT extends ESIntegTestCase { return nodePlugins(); } + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(IngestPlugin.NODE_INGEST_SETTING, true) + .build(); + } + + @Override + protected Settings externalClusterClientSettings() { + return Settings.builder() + .put(super.transportClientSettings()) + .put(IngestPlugin.NODE_INGEST_SETTING, true) + .build(); + } + public void testSimulate() throws Exception { new PutPipelineRequestBuilder(client(), PutPipelineAction.INSTANCE) .setId("_id") diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/reload/ReloadPipelinesActionTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/reload/ReloadPipelinesActionTests.java index d6f2d3f3d9f..87a2554ede1 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/reload/ReloadPipelinesActionTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/reload/ReloadPipelinesActionTests.java @@ -153,7 +153,7 @@ public class ReloadPipelinesActionTests extends ESTestCase { if (ingestNode) { attributes = Collections.singletonMap("ingest", "true"); } else { - attributes = Collections.emptyMap(); + attributes = randomBoolean() ? Collections.emptyMap() : Collections.singletonMap("ingest", "false"); } String id = String.valueOf(index); return new DiscoveryNode(id, id, new LocalTransportAddress(id), attributes, Version.CURRENT); diff --git a/qa/ingest-disabled/build.gradle b/qa/ingest-disabled/build.gradle new file mode 100644 index 00000000000..c74535e3789 --- /dev/null +++ b/qa/ingest-disabled/build.gradle @@ -0,0 +1,30 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +apply plugin: 'elasticsearch.rest-test' + +dependencies { + testCompile project(path: ':plugins:ingest', configuration: 'runtime') +} + +integTest { + cluster { + plugin 'ingest', project(':plugins:ingest') + } +} diff --git a/qa/ingest-disabled/src/test/java/org/elasticsearch/smoketest/IngestDisabledIT.java b/qa/ingest-disabled/src/test/java/org/elasticsearch/smoketest/IngestDisabledIT.java new file mode 100644 index 00000000000..e162807baca --- /dev/null +++ b/qa/ingest-disabled/src/test/java/org/elasticsearch/smoketest/IngestDisabledIT.java @@ -0,0 +1,41 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.smoketest; + +import com.carrotsearch.randomizedtesting.annotations.Name; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.test.rest.RestTestCandidate; +import org.elasticsearch.test.rest.parser.RestTestParseException; + +import java.io.IOException; + +public class IngestDisabledIT extends ESRestTestCase { + + public IngestDisabledIT(@Name("yaml") RestTestCandidate testCandidate) { + super(testCandidate); + } + + @ParametersFactory + public static Iterable parameters() throws IOException, RestTestParseException { + return ESRestTestCase.createParameters(0, 1); + } + +} diff --git a/qa/ingest-disabled/src/test/resources/rest-api-spec/test/ingest_mustache/10_ingest_disabled.yaml b/qa/ingest-disabled/src/test/resources/rest-api-spec/test/ingest_mustache/10_ingest_disabled.yaml new file mode 100644 index 00000000000..f470b3152bd --- /dev/null +++ b/qa/ingest-disabled/src/test/resources/rest-api-spec/test/ingest_mustache/10_ingest_disabled.yaml @@ -0,0 +1,58 @@ +--- +"Test ingest APIS fail when is disabled": + - do: + catch: /ingest plugin is disabled, pipeline CRUD or simulate APIs cannot be used/ + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "set" : { + "field" : "field", + "value": "valie" + } + } + ] + } + + - do: + catch: /ingest plugin is disabled, pipeline CRUD or simulate APIs cannot be used/ + ingest.delete_pipeline: + id: "my_pipeline" + + - do: + catch: /ingest plugin is disabled, pipeline CRUD or simulate APIs cannot be used/ + ingest.get_pipeline: + id: "my_pipeline" + + - do: + catch: /ingest plugin is disabled, pipeline CRUD or simulate APIs cannot be used/ + ingest.simulate: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "set" : { + "field" : "field", + "value": "valie" + } + } + ] + } + + - do: + catch: /ingest plugin is disabled, cannot execute pipeline with id \[my_pipeline_1\]/ + ingest.index: + index: test + type: test + id: 1 + pipeline: "my_pipeline_1" + body: { + field1: "1", + field2: "2", + field3: "3" + } diff --git a/qa/ingest-with-mustache/build.gradle b/qa/ingest-with-mustache/build.gradle index 32ed5f8956f..8c0adbefaef 100644 --- a/qa/ingest-with-mustache/build.gradle +++ b/qa/ingest-with-mustache/build.gradle @@ -27,5 +27,6 @@ dependencies { integTest { cluster { plugin 'ingest', project(':plugins:ingest') + systemProperty 'es.node.ingest', 'true' } } diff --git a/settings.gradle b/settings.gradle index d42952095d3..7070abcac89 100644 --- a/settings.gradle +++ b/settings.gradle @@ -41,6 +41,7 @@ List projects = [ 'qa:smoke-test-multinode', 'qa:smoke-test-plugins', 'qa:ingest-with-mustache', + 'qa:ingest-disabled', 'qa:vagrant', ]