added a `node.ingest` setting that controls whether ingest is active or not. Defaults to `false`.

If `node.ingest` isn't active then ingest related API calls fail and if the `pipeline_id` parameter is set then index and bulk requests fail.
This commit is contained in:
Martijn van Groningen 2015-12-22 16:51:25 +01:00
parent fe94593615
commit dbbb296322
13 changed files with 341 additions and 30 deletions

View File

@ -1,6 +1,33 @@
[[ingest]]
== Ingest Plugin
The ingest plugin can be used to pre-process documents before the actual indexing takes place.
This pre-processing happens by the ingest plugin that intercepts bulk and index requests, applies the
transformations and then passes the documents back to the index or bulk APIs.
The ingest plugin is disabled by default. In order to enable the ingest plugin the following
setting should be configured in the elasticsearch.yml file:
[source,yaml]
--------------------------------------------------
node.ingest: true
--------------------------------------------------
The ingest plugin can be installed and enabled on any node. It is possible to run ingest
on an master and or data node or have dedicated client nodes that run with ingest.
In order to pre-process document before indexing the `pipeline` parameter should be used
on an index or bulk request to tell the ingest plugin what pipeline is going to be used.
[source,js]
--------------------------------------------------
PUT /my-index/my-type/my-id?ingest=my_pipeline_id
{
...
}
--------------------------------------------------
// AUTOSENSE
=== Processors
==== Set processor

View File

@ -66,3 +66,9 @@ bundlePlugin {
//geoip WebServiceClient needs Google http client, but we're not using WebServiceClient and
// joni has AsmCompilerSupport, but that isn't being used:
thirdPartyAudit.missingClasses = true
integTest {
cluster {
systemProperty 'es.node.ingest', 'true'
}
}

View File

@ -42,31 +42,41 @@ import java.util.Map;
public class IngestModule extends AbstractModule {
private final boolean ingestEnabled;
private final Map<String, ProcessorFactoryProvider> processorFactoryProviders = new HashMap<>();
public IngestModule(boolean ingestEnabled) {
this.ingestEnabled = ingestEnabled;
}
@Override
protected void configure() {
// Even if ingest isn't enable we still need to make sure that rest requests with pipeline
// param copy the pipeline into the context, so that in IngestDisabledActionFilter
// index/bulk requests can be failed
binder().bind(IngestRestFilter.class).asEagerSingleton();
binder().bind(IngestBootstrapper.class).asEagerSingleton();
if (ingestEnabled) {
binder().bind(IngestBootstrapper.class).asEagerSingleton();
addProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(environment.configFile()));
addProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(environment.configFile()));
addProcessor(DateProcessor.TYPE, (environment, templateService) -> new DateProcessor.Factory());
addProcessor(SetProcessor.TYPE, (environment, templateService) -> new SetProcessor.Factory(templateService));
addProcessor(AppendProcessor.TYPE, (environment, templateService) -> new AppendProcessor.Factory(templateService));
addProcessor(RenameProcessor.TYPE, (environment, templateService) -> new RenameProcessor.Factory());
addProcessor(RemoveProcessor.TYPE, (environment, templateService) -> new RemoveProcessor.Factory(templateService));
addProcessor(SplitProcessor.TYPE, (environment, templateService) -> new SplitProcessor.Factory());
addProcessor(JoinProcessor.TYPE, (environment, templateService) -> new JoinProcessor.Factory());
addProcessor(UppercaseProcessor.TYPE, (environment, templateService) -> new UppercaseProcessor.Factory());
addProcessor(LowercaseProcessor.TYPE, (environment, mustacheFactory) -> new LowercaseProcessor.Factory());
addProcessor(TrimProcessor.TYPE, (environment, templateService) -> new TrimProcessor.Factory());
addProcessor(ConvertProcessor.TYPE, (environment, templateService) -> new ConvertProcessor.Factory());
addProcessor(GsubProcessor.TYPE, (environment, templateService) -> new GsubProcessor.Factory());
addProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(environment.configFile()));
addProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(environment.configFile()));
addProcessor(DateProcessor.TYPE, (environment, templateService) -> new DateProcessor.Factory());
addProcessor(SetProcessor.TYPE, (environment, templateService) -> new SetProcessor.Factory(templateService));
addProcessor(AppendProcessor.TYPE, (environment, templateService) -> new AppendProcessor.Factory(templateService));
addProcessor(RenameProcessor.TYPE, (environment, templateService) -> new RenameProcessor.Factory());
addProcessor(RemoveProcessor.TYPE, (environment, templateService) -> new RemoveProcessor.Factory(templateService));
addProcessor(SplitProcessor.TYPE, (environment, templateService) -> new SplitProcessor.Factory());
addProcessor(JoinProcessor.TYPE, (environment, templateService) -> new JoinProcessor.Factory());
addProcessor(UppercaseProcessor.TYPE, (environment, templateService) -> new UppercaseProcessor.Factory());
addProcessor(LowercaseProcessor.TYPE, (environment, mustacheFactory) -> new LowercaseProcessor.Factory());
addProcessor(TrimProcessor.TYPE, (environment, templateService) -> new TrimProcessor.Factory());
addProcessor(ConvertProcessor.TYPE, (environment, templateService) -> new ConvertProcessor.Factory());
addProcessor(GsubProcessor.TYPE, (environment, templateService) -> new GsubProcessor.Factory());
MapBinder<String, ProcessorFactoryProvider> mapBinder = MapBinder.newMapBinder(binder(), String.class, ProcessorFactoryProvider.class);
for (Map.Entry<String, ProcessorFactoryProvider> entry : processorFactoryProviders.entrySet()) {
mapBinder.addBinding(entry.getKey()).toInstance(entry.getValue());
MapBinder<String, ProcessorFactoryProvider> mapBinder = MapBinder.newMapBinder(binder(), String.class, ProcessorFactoryProvider.class);
for (Map.Entry<String, ProcessorFactoryProvider> entry : processorFactoryProviders.entrySet()) {
mapBinder.addBinding(entry.getKey()).toInstance(entry.getValue());
}
}
}

View File

@ -30,7 +30,9 @@ import org.elasticsearch.plugin.ingest.rest.RestDeletePipelineAction;
import org.elasticsearch.plugin.ingest.rest.RestGetPipelineAction;
import org.elasticsearch.plugin.ingest.rest.RestPutPipelineAction;
import org.elasticsearch.plugin.ingest.rest.RestSimulatePipelineAction;
import org.elasticsearch.plugin.ingest.rest.RestIngestDisabledAction;
import org.elasticsearch.plugin.ingest.transport.IngestActionFilter;
import org.elasticsearch.plugin.ingest.transport.IngestDisabledActionFilter;
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineAction;
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineTransportAction;
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineAction;
@ -56,11 +58,13 @@ public class IngestPlugin extends Plugin {
public static final String NODE_INGEST_SETTING = "node.ingest";
private final Settings nodeSettings;
private final boolean ingestEnabled;
private final boolean transportClient;
public IngestPlugin(Settings nodeSettings) {
this.nodeSettings = nodeSettings;
transportClient = TransportClient.CLIENT_TYPE.equals(nodeSettings.get(Client.CLIENT_TYPE_SETTING));
this.ingestEnabled = nodeSettings.getAsBoolean(NODE_INGEST_SETTING, false);
this.transportClient = TransportClient.CLIENT_TYPE.equals(nodeSettings.get(Client.CLIENT_TYPE_SETTING));
}
@Override
@ -78,13 +82,13 @@ public class IngestPlugin extends Plugin {
if (transportClient) {
return Collections.emptyList();
} else {
return Collections.singletonList(new IngestModule());
return Collections.singletonList(new IngestModule(ingestEnabled));
}
}
@Override
public Collection<Class<? extends LifecycleComponent>> nodeServices() {
if (transportClient) {
if (transportClient|| ingestEnabled == false) {
return Collections.emptyList();
} else {
return Collections.singletonList(IngestBootstrapper.class);
@ -95,27 +99,37 @@ public class IngestPlugin extends Plugin {
public Settings additionalSettings() {
return settingsBuilder()
.put(PipelineExecutionService.additionalSettings(nodeSettings))
// TODO: in a followup issue this should be made configurable
.put(NODE_INGEST_SETTING, true)
.build();
}
public void onModule(ActionModule module) {
if (transportClient == false) {
module.registerFilter(IngestActionFilter.class);
if (ingestEnabled) {
module.registerFilter(IngestActionFilter.class);
} else {
module.registerFilter(IngestDisabledActionFilter.class);
}
}
if (ingestEnabled) {
module.registerAction(PutPipelineAction.INSTANCE, PutPipelineTransportAction.class);
module.registerAction(GetPipelineAction.INSTANCE, GetPipelineTransportAction.class);
module.registerAction(DeletePipelineAction.INSTANCE, DeletePipelineTransportAction.class);
module.registerAction(SimulatePipelineAction.INSTANCE, SimulatePipelineTransportAction.class);
}
module.registerAction(PutPipelineAction.INSTANCE, PutPipelineTransportAction.class);
module.registerAction(GetPipelineAction.INSTANCE, GetPipelineTransportAction.class);
module.registerAction(DeletePipelineAction.INSTANCE, DeletePipelineTransportAction.class);
module.registerAction(SimulatePipelineAction.INSTANCE, SimulatePipelineTransportAction.class);
}
public void onModule(NetworkModule networkModule) {
if (transportClient == false) {
if (transportClient) {
return;
}
if (ingestEnabled) {
networkModule.registerRestHandler(RestPutPipelineAction.class);
networkModule.registerRestHandler(RestGetPipelineAction.class);
networkModule.registerRestHandler(RestDeletePipelineAction.class);
networkModule.registerRestHandler(RestSimulatePipelineAction.class);
} else {
networkModule.registerRestHandler(RestIngestDisabledAction.class);
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.rest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.BytesRestResponse;
public class RestIngestDisabledAction extends BaseRestHandler {
@Inject
public RestIngestDisabledAction(Settings settings, RestController controller, Client client) {
super(settings, controller, client);
controller.registerHandler(RestRequest.Method.DELETE, "/_ingest/pipeline/{id}", this);
controller.registerHandler(RestRequest.Method.GET, "/_ingest/pipeline/{id}", this);
controller.registerHandler(RestRequest.Method.PUT, "/_ingest/pipeline/{id}", this);
controller.registerHandler(RestRequest.Method.POST, "/_ingest/pipeline/{id}/_simulate", this);
controller.registerHandler(RestRequest.Method.GET, "/_ingest/pipeline/{id}/_simulate", this);
controller.registerHandler(RestRequest.Method.POST, "/_ingest/pipeline/_simulate", this);
controller.registerHandler(RestRequest.Method.GET, "/_ingest/pipeline/_simulate", this);
}
@Override
protected void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception {
channel.sendResponse(new BytesRestResponse(channel, new IllegalArgumentException("ingest plugin is disabled, pipeline CRUD or simulate APIs cannot be used")));
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilterChain;
import org.elasticsearch.plugin.ingest.IngestPlugin;
public final class IngestDisabledActionFilter implements ActionFilter {
@Override
public void apply(String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) {
String pipelineId = request.getFromContext(IngestPlugin.PIPELINE_ID_PARAM_CONTEXT_KEY);
if (pipelineId != null) {
failRequest(pipelineId);
}
pipelineId = request.getHeader(IngestPlugin.PIPELINE_ID_PARAM);
if (pipelineId != null) {
failRequest(pipelineId);
}
chain.proceed(action, request, listener);
}
@Override
public void apply(String action, ActionResponse response, ActionListener listener, ActionFilterChain chain) {
chain.proceed(action, response, listener);
}
@Override
public int order() {
return Integer.MAX_VALUE;
}
private static void failRequest(String pipelineId) {
throw new IllegalArgumentException("ingest plugin is disabled, cannot execute pipeline with id [" + pipelineId + "]");
}
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.plugin.ingest.IngestPlugin;
@ -68,6 +69,22 @@ public class IngestClientIT extends ESIntegTestCase {
return nodePlugins();
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(IngestPlugin.NODE_INGEST_SETTING, true)
.build();
}
@Override
protected Settings externalClusterClientSettings() {
return Settings.builder()
.put(super.transportClientSettings())
.put(IngestPlugin.NODE_INGEST_SETTING, true)
.build();
}
public void testSimulate() throws Exception {
new PutPipelineRequestBuilder(client(), PutPipelineAction.INSTANCE)
.setId("_id")

View File

@ -153,7 +153,7 @@ public class ReloadPipelinesActionTests extends ESTestCase {
if (ingestNode) {
attributes = Collections.singletonMap("ingest", "true");
} else {
attributes = Collections.emptyMap();
attributes = randomBoolean() ? Collections.emptyMap() : Collections.singletonMap("ingest", "false");
}
String id = String.valueOf(index);
return new DiscoveryNode(id, id, new LocalTransportAddress(id), attributes, Version.CURRENT);

View File

@ -0,0 +1,30 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
apply plugin: 'elasticsearch.rest-test'
dependencies {
testCompile project(path: ':plugins:ingest', configuration: 'runtime')
}
integTest {
cluster {
plugin 'ingest', project(':plugins:ingest')
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.smoketest;
import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.RestTestCandidate;
import org.elasticsearch.test.rest.parser.RestTestParseException;
import java.io.IOException;
public class IngestDisabledIT extends ESRestTestCase {
public IngestDisabledIT(@Name("yaml") RestTestCandidate testCandidate) {
super(testCandidate);
}
@ParametersFactory
public static Iterable<Object[]> parameters() throws IOException, RestTestParseException {
return ESRestTestCase.createParameters(0, 1);
}
}

View File

@ -0,0 +1,58 @@
---
"Test ingest APIS fail when is disabled":
- do:
catch: /ingest plugin is disabled, pipeline CRUD or simulate APIs cannot be used/
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "field",
"value": "valie"
}
}
]
}
- do:
catch: /ingest plugin is disabled, pipeline CRUD or simulate APIs cannot be used/
ingest.delete_pipeline:
id: "my_pipeline"
- do:
catch: /ingest plugin is disabled, pipeline CRUD or simulate APIs cannot be used/
ingest.get_pipeline:
id: "my_pipeline"
- do:
catch: /ingest plugin is disabled, pipeline CRUD or simulate APIs cannot be used/
ingest.simulate:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "field",
"value": "valie"
}
}
]
}
- do:
catch: /ingest plugin is disabled, cannot execute pipeline with id \[my_pipeline_1\]/
ingest.index:
index: test
type: test
id: 1
pipeline: "my_pipeline_1"
body: {
field1: "1",
field2: "2",
field3: "3"
}

View File

@ -27,5 +27,6 @@ dependencies {
integTest {
cluster {
plugin 'ingest', project(':plugins:ingest')
systemProperty 'es.node.ingest', 'true'
}
}

View File

@ -41,6 +41,7 @@ List projects = [
'qa:smoke-test-multinode',
'qa:smoke-test-plugins',
'qa:ingest-with-mustache',
'qa:ingest-disabled',
'qa:vagrant',
]