From f1bd866005e8e06e502ebf67fa6540bf379adfcf Mon Sep 17 00:00:00 2001 From: Giovanni Lanzani Date: Fri, 29 Sep 2017 23:21:03 +0200 Subject: [PATCH] NIFI-4443 Increase StoreInKiteDataset flexibility * The configuration property CONF_XML_FILE now support Expression Language and reuse a Hadoop validator; * The ADDITIONAL_CLASSPATH_RESOURCES property has been added, so that things such as writing to Azure Blob Storage should become possible. This closes #2186. Signed-off-by: Bryan Bende --- .../nifi-kite-processors/pom.xml | 4 +++ .../kite/AbstractKiteProcessor.java | 35 +++++------------- .../processors/kite/StoreInKiteDataset.java | 12 +++++++ .../kite/TestConfigurationProperty.java | 36 +++++++++++++++++++ 4 files changed, 61 insertions(+), 26 deletions(-) diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml index 6068379bf8..514a70285e 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml @@ -281,6 +281,10 @@ test ${kite.version} + + org.apache.nifi + nifi-hadoop-utils + diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java index f90c089244..32b83b845a 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java @@ -37,7 +37,7 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.hadoop.HadoopValidators; import org.kitesdk.data.DatasetNotFoundException; import org.kitesdk.data.Datasets; import org.kitesdk.data.SchemaNotFoundException; @@ -47,33 +47,16 @@ import org.kitesdk.data.spi.DefaultConfiguration; abstract class AbstractKiteProcessor extends AbstractProcessor { private static final Splitter COMMA = Splitter.on(',').trimResults(); - protected static final Validator FILES_EXIST = new Validator() { - @Override - public ValidationResult validate(String subject, String configFiles, - ValidationContext context) { - if (configFiles != null && !configFiles.isEmpty()) { - for (String file : COMMA.split(configFiles)) { - ValidationResult result = StandardValidators.FILE_EXISTS_VALIDATOR - .validate(subject, file, context); - if (!result.isValid()) { - return result; - } - } - } - return new ValidationResult.Builder() - .subject(subject) - .input(configFiles) - .explanation("Files exist") - .valid(true) - .build(); - } - }; protected static final PropertyDescriptor CONF_XML_FILES = new PropertyDescriptor.Builder() .name("Hadoop configuration files") - .description("A comma-separated list of Hadoop configuration files") - .addValidator(FILES_EXIST) + .displayName("Hadoop configuration Resources") + .description("A file or comma separated list of files which contains the Hadoop file system configuration. Without this, Hadoop " + + "will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' file or will revert to a default configuration.") + .required(false) + .addValidator(HadoopValidators.ONE_OR_MORE_FILE_EXISTS_VALIDATOR) + .expressionLanguageSupported(true) .build(); protected static final Validator RECOGNIZED_URI = new Validator() { @@ -163,7 +146,7 @@ abstract class AbstractKiteProcessor extends AbstractProcessor { protected static final Validator SCHEMA_VALIDATOR = new Validator() { @Override public ValidationResult validate(String subject, String uri, ValidationContext context) { - Configuration conf = getConfiguration(context.getProperty(CONF_XML_FILES).getValue()); + Configuration conf = getConfiguration(context.getProperty(CONF_XML_FILES).evaluateAttributeExpressions().getValue()); String error = null; final boolean elPresent = context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(uri); @@ -195,7 +178,7 @@ abstract class AbstractKiteProcessor extends AbstractProcessor { protected void setDefaultConfiguration(ProcessContext context) throws IOException { DefaultConfiguration.set(getConfiguration( - context.getProperty(CONF_XML_FILES).getValue())); + context.getProperty(CONF_XML_FILES).evaluateAttributeExpressions().getValue())); } protected static Configuration getConfiguration(String configFiles) { diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java index 1986f0bf0e..1a3966449c 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java @@ -38,6 +38,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StopWatch; import org.kitesdk.data.DatasetIOException; import org.kitesdk.data.DatasetWriter; @@ -79,10 +80,21 @@ public class StoreInKiteDataset extends AbstractKiteProcessor { .required(true) .build(); + public static final PropertyDescriptor ADDITIONAL_CLASSPATH_RESOURCES = new PropertyDescriptor.Builder() + .name("additional-classpath-resources") + .displayName("Additional Classpath Resources") + .description("A comma-separated list of paths to files and/or directories that will be added to the classpath. When specifying a " + + "directory, all files with in the directory will be added to the classpath, but further sub-directories will not be included.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dynamicallyModifiesClasspath(true) + .build(); + private static final List PROPERTIES = ImmutableList.builder() .addAll(AbstractKiteProcessor.getProperties()) .add(KITE_DATASET_URI) + .add(ADDITIONAL_CLASSPATH_RESOURCES) .build(); private static final Set RELATIONSHIPS diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConfigurationProperty.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConfigurationProperty.java index 724a4c64da..ef557105a8 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConfigurationProperty.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConfigurationProperty.java @@ -21,6 +21,7 @@ package org.apache.nifi.processors.kite; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import org.apache.avro.generic.GenericData.Record; import org.apache.hadoop.conf.Configuration; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.util.TestRunner; @@ -29,8 +30,12 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.After; import org.junit.rules.TemporaryFolder; +import org.kitesdk.data.DatasetDescriptor; +import org.kitesdk.data.Datasets; import org.kitesdk.data.spi.DefaultConfiguration; +import org.kitesdk.data.Dataset; public class TestConfigurationProperty { @@ -38,6 +43,9 @@ public class TestConfigurationProperty { public final TemporaryFolder temp = new TemporaryFolder(); public File confLocation; + private String datasetUri = null; + private Dataset dataset = null; + @Before public void saveConfiguration() throws IOException { Configuration conf = new Configuration(false); @@ -49,6 +57,20 @@ public class TestConfigurationProperty { out.close(); } + @Before + public void createDataset() throws Exception { + DatasetDescriptor descriptor = new DatasetDescriptor.Builder() + .schema(TestUtil.USER_SCHEMA) + .build(); + this.datasetUri = "dataset:file:" + temp.newFolder("ns", "temp").toString(); + this.dataset = Datasets.create(datasetUri, descriptor, Record.class); + } + + @After + public void deleteDataset() throws Exception { + Datasets.delete(datasetUri); + } + @Test public void testConfigurationCanary() throws IOException { TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class); @@ -73,4 +95,18 @@ public class TestConfigurationProperty { AbstractKiteProcessor.CONF_XML_FILES, temp.newFile().toString()); runner.assertNotValid(); } + + @Test + public void testConfigurationExpressionLanguage() throws IOException { + TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class); + runner.setProperty( + AbstractKiteProcessor.CONF_XML_FILES, "${filename:substring(0,0):append('pom.xml')}"); + runner.setProperty( + StoreInKiteDataset.KITE_DATASET_URI, datasetUri); + runner.assertValid(); + // botch the Expression Language evaluation + runner.setProperty( + AbstractKiteProcessor.CONF_XML_FILES, "${filename:substring(0,0):"); + runner.assertNotValid(); + } }