move processors that have no deps to core, also move to core rest spec and tests and set node.inget to true by default

This commit is contained in:
javanna 2016-01-07 18:57:02 +01:00 committed by Luca Cavanna
parent e35e9bd736
commit ae69d46f92
69 changed files with 77 additions and 267 deletions

View File

@ -24,6 +24,19 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.ingest.processor.AppendProcessor;
import org.elasticsearch.ingest.processor.ConvertProcessor;
import org.elasticsearch.ingest.processor.DateProcessor;
import org.elasticsearch.ingest.processor.FailProcessor;
import org.elasticsearch.ingest.processor.GsubProcessor;
import org.elasticsearch.ingest.processor.JoinProcessor;
import org.elasticsearch.ingest.processor.LowercaseProcessor;
import org.elasticsearch.ingest.processor.RemoveProcessor;
import org.elasticsearch.ingest.processor.RenameProcessor;
import org.elasticsearch.ingest.processor.SetProcessor;
import org.elasticsearch.ingest.processor.SplitProcessor;
import org.elasticsearch.ingest.processor.TrimProcessor;
import org.elasticsearch.ingest.processor.UppercaseProcessor;
import org.elasticsearch.rest.action.ingest.IngestRestFilter;
import java.util.function.BiFunction;
@ -38,6 +51,19 @@ public class IngestModule extends AbstractModule {
public IngestModule() {
this.processorsRegistry = new ProcessorsRegistry();
registerProcessor(DateProcessor.TYPE, (environment, templateService) -> new DateProcessor.Factory());
registerProcessor(SetProcessor.TYPE, (environment, templateService) -> new SetProcessor.Factory(templateService));
registerProcessor(AppendProcessor.TYPE, (environment, templateService) -> new AppendProcessor.Factory(templateService));
registerProcessor(RenameProcessor.TYPE, (environment, templateService) -> new RenameProcessor.Factory());
registerProcessor(RemoveProcessor.TYPE, (environment, templateService) -> new RemoveProcessor.Factory(templateService));
registerProcessor(SplitProcessor.TYPE, (environment, templateService) -> new SplitProcessor.Factory());
registerProcessor(JoinProcessor.TYPE, (environment, templateService) -> new JoinProcessor.Factory());
registerProcessor(UppercaseProcessor.TYPE, (environment, templateService) -> new UppercaseProcessor.Factory());
registerProcessor(LowercaseProcessor.TYPE, (environment, templateService) -> new LowercaseProcessor.Factory());
registerProcessor(TrimProcessor.TYPE, (environment, templateService) -> new TrimProcessor.Factory());
registerProcessor(ConvertProcessor.TYPE, (environment, templateService) -> new ConvertProcessor.Factory());
registerProcessor(GsubProcessor.TYPE, (environment, templateService) -> new GsubProcessor.Factory());
registerProcessor(FailProcessor.TYPE, (environment, templateService) -> new FailProcessor.Factory(templateService));
}
@Override
@ -55,6 +81,6 @@ public class IngestModule extends AbstractModule {
}
public static boolean isIngestEnabled(Settings settings) {
return settings.getAsBoolean("node.ingest", false);
return settings.getAsBoolean("node.ingest", true);
}
}

View File

@ -25,19 +25,18 @@ import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.ingest.IngestActionFilter;
import org.elasticsearch.action.ingest.DeletePipelineAction;
import org.elasticsearch.action.ingest.DeletePipelineRequestBuilder;
import org.elasticsearch.action.ingest.GetPipelineAction;
import org.elasticsearch.action.ingest.GetPipelineRequestBuilder;
import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.IngestActionFilter;
import org.elasticsearch.action.ingest.PutPipelineAction;
import org.elasticsearch.action.ingest.PutPipelineRequestBuilder;
import org.elasticsearch.action.ingest.SimulateDocumentSimpleResult;
import org.elasticsearch.action.ingest.SimulatePipelineAction;
import org.elasticsearch.action.ingest.SimulatePipelineRequestBuilder;
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
@ -60,28 +59,6 @@ public class IngestClientIT extends ESIntegTestCase {
return pluginList(IngestPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return nodePlugins();
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put("node.ingest", true)
.build();
}
@Override
protected Settings externalClusterClientSettings() {
return Settings.builder()
.put(super.transportClientSettings())
//TODO can we remove this?
.put("node.ingest", true)
.build();
}
public void testSimulate() throws Exception {
new PutPipelineRequestBuilder(client(), PutPipelineAction.INSTANCE)
.setId("_id")

View File

@ -56,7 +56,6 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase {
threadPool = new ThreadPool(settingsBuilder()
.put("name", "testCorrectThreadPoolTypePermittedInSettings")
.put("threadpool." + threadPoolName + ".type", correctThreadPoolType.getType())
.put("node.ingest", true)
.build());
ThreadPool.Info info = info(threadPool, threadPoolName);
if (ThreadPool.Names.SAME.equals(threadPoolName)) {
@ -80,7 +79,6 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase {
settingsBuilder()
.put("name", "testThreadPoolCanNotOverrideThreadPoolType")
.put("threadpool." + threadPoolName + ".type", incorrectThreadPoolType.getType())
.put("node.ingest", true)
.build());
terminate(threadPool);
fail("expected IllegalArgumentException");
@ -99,8 +97,7 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase {
ThreadPool.ThreadPoolType validThreadPoolType = ThreadPool.THREAD_POOL_TYPES.get(threadPoolName);
ThreadPool threadPool = null;
try {
threadPool = new ThreadPool(settingsBuilder().put("name", "testUpdateSettingsCanNotChangeThreadPoolType")
.put("node.ingest", true).build());
threadPool = new ThreadPool(settingsBuilder().put("name", "testUpdateSettingsCanNotChangeThreadPoolType").build());
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
threadPool.setClusterSettings(clusterSettings);
@ -125,7 +122,7 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase {
ThreadPool threadPool = null;
try {
Settings nodeSettings = Settings.settingsBuilder()
.put("name", "testCachedExecutorType").put("node.ingest", true).build();
.put("name", "testCachedExecutorType").build();
threadPool = new ThreadPool(nodeSettings);
ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
threadPool.setClusterSettings(clusterSettings);
@ -176,7 +173,7 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase {
try {
Settings nodeSettings = Settings.settingsBuilder()
.put("name", "testFixedExecutorType").put("node.ingest", true).build();
.put("name", "testFixedExecutorType").build();
threadPool = new ThreadPool(nodeSettings);
ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
threadPool.setClusterSettings(clusterSettings);
@ -231,7 +228,7 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase {
try {
Settings nodeSettings = settingsBuilder()
.put("threadpool." + threadPoolName + ".size", 10)
.put("name", "testScalingExecutorType").put("node.ingest", true).build();
.put("name", "testScalingExecutorType").build();
threadPool = new ThreadPool(nodeSettings);
ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
threadPool.setClusterSettings(clusterSettings);
@ -269,7 +266,7 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase {
try {
Settings nodeSettings = Settings.settingsBuilder()
.put("threadpool." + threadPoolName + ".queue_size", 1000)
.put("name", "testCachedExecutorType").put("node.ingest", true).build();
.put("name", "testCachedExecutorType").build();
threadPool = new ThreadPool(nodeSettings);
ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
threadPool.setClusterSettings(clusterSettings);
@ -306,7 +303,7 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase {
.put("threadpool.my_pool2.type", "fixed")
.put("threadpool.my_pool2.size", "1")
.put("threadpool.my_pool2.queue_size", "1")
.put("name", "testCustomThreadPool").put("node.ingest", true).build();
.put("name", "testCustomThreadPool").build();
threadPool = new ThreadPool(nodeSettings);
ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
threadPool.setClusterSettings(clusterSettings);

View File

@ -77,10 +77,4 @@ thirdPartyAudit.excludes = [
'org.objectweb.asm.ClassWriter',
'org.objectweb.asm.MethodVisitor',
'org.objectweb.asm.Opcodes',
]
integTest {
cluster {
systemProperty 'es.node.ingest', 'true'
}
}
]

View File

@ -20,21 +20,8 @@
package org.elasticsearch.plugin.ingest;
import org.elasticsearch.ingest.IngestModule;
import org.elasticsearch.ingest.processor.AppendProcessor;
import org.elasticsearch.ingest.processor.ConvertProcessor;
import org.elasticsearch.ingest.processor.DateProcessor;
import org.elasticsearch.ingest.processor.FailProcessor;
import org.elasticsearch.ingest.processor.GeoIpProcessor;
import org.elasticsearch.ingest.processor.GrokProcessor;
import org.elasticsearch.ingest.processor.GsubProcessor;
import org.elasticsearch.ingest.processor.JoinProcessor;
import org.elasticsearch.ingest.processor.LowercaseProcessor;
import org.elasticsearch.ingest.processor.RemoveProcessor;
import org.elasticsearch.ingest.processor.RenameProcessor;
import org.elasticsearch.ingest.processor.SetProcessor;
import org.elasticsearch.ingest.processor.SplitProcessor;
import org.elasticsearch.ingest.processor.TrimProcessor;
import org.elasticsearch.ingest.processor.UppercaseProcessor;
import org.elasticsearch.plugins.Plugin;
public class IngestPlugin extends Plugin {
@ -54,18 +41,5 @@ public class IngestPlugin extends Plugin {
public void onModule(IngestModule ingestModule) {
ingestModule.registerProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(environment.configFile()));
ingestModule.registerProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(environment.configFile()));
ingestModule.registerProcessor(DateProcessor.TYPE, (environment, templateService) -> new DateProcessor.Factory());
ingestModule.registerProcessor(SetProcessor.TYPE, (environment, templateService) -> new SetProcessor.Factory(templateService));
ingestModule.registerProcessor(AppendProcessor.TYPE, (environment, templateService) -> new AppendProcessor.Factory(templateService));
ingestModule.registerProcessor(RenameProcessor.TYPE, (environment, templateService) -> new RenameProcessor.Factory());
ingestModule.registerProcessor(RemoveProcessor.TYPE, (environment, templateService) -> new RemoveProcessor.Factory(templateService));
ingestModule.registerProcessor(SplitProcessor.TYPE, (environment, templateService) -> new SplitProcessor.Factory());
ingestModule.registerProcessor(JoinProcessor.TYPE, (environment, templateService) -> new JoinProcessor.Factory());
ingestModule.registerProcessor(UppercaseProcessor.TYPE, (environment, templateService) -> new UppercaseProcessor.Factory());
ingestModule.registerProcessor(LowercaseProcessor.TYPE, (environment, templateService) -> new LowercaseProcessor.Factory());
ingestModule.registerProcessor(TrimProcessor.TYPE, (environment, templateService) -> new TrimProcessor.Factory());
ingestModule.registerProcessor(ConvertProcessor.TYPE, (environment, templateService) -> new ConvertProcessor.Factory());
ingestModule.registerProcessor(GsubProcessor.TYPE, (environment, templateService) -> new GsubProcessor.Factory());
ingestModule.registerProcessor(FailProcessor.TYPE, (environment, templateService) -> new FailProcessor.Factory(templateService));
}
}

View File

@ -1,56 +0,0 @@
{
"ingest.bulk": {
"documentation": "Copied from bulk in core to add the pipeline parameter to rest spec",
"methods": ["POST", "PUT"],
"url": {
"path": "/_bulk",
"paths": ["/_bulk", "/{index}/_bulk", "/{index}/{type}/_bulk"],
"parts": {
"index": {
"type" : "string",
"description" : "Default index for items which don't provide one"
},
"type": {
"type" : "string",
"description" : "Default document type for items which don't provide one"
}
},
"params": {
"consistency": {
"type" : "enum",
"options" : ["one", "quorum", "all"],
"description" : "Explicit write consistency setting for the operation"
},
"refresh": {
"type" : "boolean",
"description" : "Refresh the index after performing the operation"
},
"routing": {
"type" : "string",
"description" : "Specific routing value"
},
"timeout": {
"type" : "time",
"description" : "Explicit operation timeout"
},
"type": {
"type" : "string",
"description" : "Default document type for items which don't provide one"
},
"fields": {
"type": "list",
"description" : "Default comma-separated list of fields to return in the response for updates"
},
"pipeline" : {
"type" : "string",
"description" : "The pipeline id to preprocess incoming documents with"
}
}
},
"body": {
"description" : "The operation definition and data (action-data pairs), separated by newlines",
"required" : true,
"serialize" : "bulk"
}
}
}

View File

@ -1,80 +0,0 @@
{
"ingest.index": {
"documentation": "Copied from index in core to add support for the pipeline parameter to rest spec",
"methods": ["POST", "PUT"],
"url": {
"path": "/{index}/{type}",
"paths": ["/{index}/{type}", "/{index}/{type}/{id}"],
"parts": {
"id": {
"type" : "string",
"description" : "Document ID"
},
"index": {
"type" : "string",
"required" : true,
"description" : "The name of the index"
},
"type": {
"type" : "string",
"required" : true,
"description" : "The type of the document"
}
},
"params": {
"consistency": {
"type" : "enum",
"options" : ["one", "quorum", "all"],
"description" : "Explicit write consistency setting for the operation"
},
"op_type": {
"type" : "enum",
"options" : ["index", "create"],
"default" : "index",
"description" : "Explicit operation type"
},
"parent": {
"type" : "string",
"description" : "ID of the parent document"
},
"refresh": {
"type" : "boolean",
"description" : "Refresh the index after performing the operation"
},
"routing": {
"type" : "string",
"description" : "Specific routing value"
},
"timeout": {
"type" : "time",
"description" : "Explicit operation timeout"
},
"timestamp": {
"type" : "time",
"description" : "Explicit timestamp for the document"
},
"ttl": {
"type" : "duration",
"description" : "Expiration time for the document"
},
"version" : {
"type" : "number",
"description" : "Explicit version number for concurrency control"
},
"version_type": {
"type" : "enum",
"options" : ["internal", "external", "external_gte", "force"],
"description" : "Specific version type"
},
"pipeline" : {
"type" : "string",
"description" : "The pipeline id to preprocess incoming documents with"
}
}
},
"body": {
"description" : "The document",
"required" : true
}
}
}

View File

@ -18,7 +18,7 @@
- match: { _id: "my_pipeline" }
- do:
ingest.index:
index:
index: test
type: test
id: 1
@ -57,7 +57,7 @@
- match: { _id: "my_pipeline" }
- do:
ingest.index:
index:
index: test
type: test
id: 1
@ -94,7 +94,7 @@
- match: { _id: "my_pipeline" }
- do:
ingest.index:
index:
index: test
type: test
id: 1

View File

@ -17,7 +17,7 @@
- match: { _id: "my_pipeline" }
- do:
ingest.index:
index:
index: test
type: test
id: 1
@ -61,7 +61,7 @@
- match: { _id: "my_pipeline" }
- do:
ingest.index:
index:
index: test
type: test
id: 1
@ -106,7 +106,7 @@
- match: { _id: "my_pipeline" }
- do:
ingest.index:
index:
index: test
type: test
id: 1

View File

@ -26,5 +26,6 @@ dependencies {
integTest {
cluster {
plugin 'ingest', project(':plugins:ingest')
systemProperty 'es.node.ingest', 'false'
}
}

View File

@ -80,7 +80,7 @@
"Test index api with pipeline id fails when node.ingest is set to false":
- do:
catch: /ingest plugin is disabled, cannot execute pipeline with id \[my_pipeline_1\]/
ingest.index:
index:
index: test
type: test
id: 1
@ -95,7 +95,7 @@
"Test bulk api with pipeline id fails when node.ingest is set to false":
- do:
catch: /ingest plugin is disabled, cannot execute pipeline with id \[my_pipeline_1\]/
ingest.bulk:
bulk:
pipeline: "my_pipeline_1"
body:
- index:

View File

@ -27,6 +27,5 @@ dependencies {
integTest {
cluster {
plugin 'ingest', project(':plugins:ingest')
systemProperty 'es.node.ingest', 'true'
}
}

View File

@ -28,7 +28,7 @@
- match: { _id: "my_pipeline_1" }
- do:
ingest.index:
index:
index: test
type: test
id: 1
@ -108,7 +108,7 @@
- match: { _id: "my_pipeline_3" }
- do:
ingest.index:
index:
index: test
type: test
id: 1
@ -133,7 +133,7 @@
- match: { _source.metadata: ["0","1","2","3"] }
- do:
ingest.index:
index:
index: test
type: test
id: 1
@ -152,7 +152,7 @@
- match: { _source.field2: "value" }
- do:
ingest.index:
index:
index: test
type: test
id: 1
@ -201,7 +201,7 @@
- match: { _id: "my_handled_pipeline" }
- do:
ingest.index:
index:
index: test
type: test
id: 1

View File

@ -40,6 +40,10 @@
"fields": {
"type": "list",
"description" : "Default comma-separated list of fields to return in the response for updates"
},
"pipeline" : {
"type" : "string",
"description" : "The pipeline id to preprocess incoming documents with"
}
}
},

View File

@ -65,6 +65,10 @@
"type" : "enum",
"options" : ["internal", "external", "external_gte", "force"],
"description" : "Specific version type"
},
"pipeline" : {
"type" : "string",
"description" : "The pipeline id to preprocess incoming documents with"
}
}
},

View File

@ -51,23 +51,7 @@
"description": "_description",
"processors": [
{
"geoip" : {
}
}
]
}
- do:
catch: param
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"geoip" : {
"ip_field" : 1234
"set" : {
}
}
]

View File

@ -20,7 +20,7 @@
- match: { _id: "my_pipeline" }
- do:
ingest.index:
index:
index: test
type: test
id: 1

View File

@ -75,7 +75,7 @@
- match: { _id: "my_pipeline" }
- do:
ingest.index:
index:
index: test
type: test
id: 1
@ -133,7 +133,7 @@
- match: { _id: "my_pipeline" }
- do:
ingest.index:
index:
index: test
type: test
id: 1

View File

@ -8,25 +8,20 @@
"description": "_description",
"processors": [
{
"rename" : {
"field" : "foofield",
"to" : "field1"
"set" : {
"field" : "_executed",
"value" : true
}
},
{
"grok" : {
"field" : "field1",
"pattern" : "%{NUMBER:val} %{NUMBER:status} <%{WORD:msg}>"
"date" : {
"match_field" : "date",
"target_field" : "date",
"match_formats" : ["yyyy"]
}
}
],
"on_failure" : [
{
"grok" : {
"field" : "field1",
"pattern" : "%{NUMBER:val:float} %{NUMBER:status:int} <%{WORD:msg}>"
}
},
{
"set" : {
"field" : "_failed",
@ -38,21 +33,20 @@
- match: { _id: "my_pipeline" }
- do:
ingest.index:
index:
index: test
type: test
id: 1
pipeline: "my_pipeline"
body: {field1: "123.42 400 <foo>"}
body: {field1: "value1"}
- do:
get:
index: test
type: test
id: 1
- match: { _source.val: 123.42 }
- match: { _source.status: 400 }
- match: { _source.msg: "foo" }
- match: { _source.field1: "value1" }
- match: { _source._executed: true }
- match: { _source._failed: true }
---
@ -64,12 +58,6 @@
{
"description": "_description",
"processors": [
{
"grok" : {
"field" : "field1",
"pattern" : "%{NUMBER:val:float} %{NUMBER:status:int} <%{WORD:msg}>"
}
},
{
"rename" : {
"field" : "foofield",
@ -103,20 +91,18 @@
- match: { _id: "my_pipeline" }
- do:
ingest.index:
index:
index: test
type: test
id: 1
pipeline: "my_pipeline"
body: {field1: "123.42 400 <foo>"}
body: {field1: "value1"}
- do:
get:
index: test
type: test
id: 1
- match: { _source.val: 123.42 }
- match: { _source.msg: "foo" }
- match: { _source.status: 400 }
- match: { _source.field1: "value1" }
- match: { _source.foofield: "exists" }
- match: { _source.foofield2: "ran" }

View File

@ -18,7 +18,7 @@
- do:
catch: request
ingest.index:
index:
index: test
type: test
id: 1
@ -52,7 +52,7 @@
- match: { _id: "my_pipeline" }
- do:
ingest.index:
index:
index: test
type: test
id: 1