diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml
index 6068379bf8..514a70285e 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml
@@ -281,6 +281,10 @@
test
${kite.version}
+
+ org.apache.nifi
+ nifi-hadoop-utils
+
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java
index f90c089244..32b83b845a 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java
@@ -37,7 +37,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.hadoop.HadoopValidators;
import org.kitesdk.data.DatasetNotFoundException;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.SchemaNotFoundException;
@@ -47,33 +47,16 @@ import org.kitesdk.data.spi.DefaultConfiguration;
abstract class AbstractKiteProcessor extends AbstractProcessor {
private static final Splitter COMMA = Splitter.on(',').trimResults();
- protected static final Validator FILES_EXIST = new Validator() {
- @Override
- public ValidationResult validate(String subject, String configFiles,
- ValidationContext context) {
- if (configFiles != null && !configFiles.isEmpty()) {
- for (String file : COMMA.split(configFiles)) {
- ValidationResult result = StandardValidators.FILE_EXISTS_VALIDATOR
- .validate(subject, file, context);
- if (!result.isValid()) {
- return result;
- }
- }
- }
- return new ValidationResult.Builder()
- .subject(subject)
- .input(configFiles)
- .explanation("Files exist")
- .valid(true)
- .build();
- }
- };
protected static final PropertyDescriptor CONF_XML_FILES
= new PropertyDescriptor.Builder()
.name("Hadoop configuration files")
- .description("A comma-separated list of Hadoop configuration files")
- .addValidator(FILES_EXIST)
+ .displayName("Hadoop configuration Resources")
+ .description("A file or comma separated list of files which contains the Hadoop file system configuration. Without this, Hadoop "
+ + "will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' file or will revert to a default configuration.")
+ .required(false)
+ .addValidator(HadoopValidators.ONE_OR_MORE_FILE_EXISTS_VALIDATOR)
+ .expressionLanguageSupported(true)
.build();
protected static final Validator RECOGNIZED_URI = new Validator() {
@@ -163,7 +146,7 @@ abstract class AbstractKiteProcessor extends AbstractProcessor {
protected static final Validator SCHEMA_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(String subject, String uri, ValidationContext context) {
- Configuration conf = getConfiguration(context.getProperty(CONF_XML_FILES).getValue());
+ Configuration conf = getConfiguration(context.getProperty(CONF_XML_FILES).evaluateAttributeExpressions().getValue());
String error = null;
final boolean elPresent = context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(uri);
@@ -195,7 +178,7 @@ abstract class AbstractKiteProcessor extends AbstractProcessor {
protected void setDefaultConfiguration(ProcessContext context)
throws IOException {
DefaultConfiguration.set(getConfiguration(
- context.getProperty(CONF_XML_FILES).getValue()));
+ context.getProperty(CONF_XML_FILES).evaluateAttributeExpressions().getValue()));
}
protected static Configuration getConfiguration(String configFiles) {
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java
index 1986f0bf0e..1a3966449c 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java
@@ -38,6 +38,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
import org.kitesdk.data.DatasetIOException;
import org.kitesdk.data.DatasetWriter;
@@ -79,10 +80,21 @@ public class StoreInKiteDataset extends AbstractKiteProcessor {
.required(true)
.build();
+ public static final PropertyDescriptor ADDITIONAL_CLASSPATH_RESOURCES = new PropertyDescriptor.Builder()
+ .name("additional-classpath-resources")
+ .displayName("Additional Classpath Resources")
+ .description("A comma-separated list of paths to files and/or directories that will be added to the classpath. When specifying a " +
+ "directory, all files with in the directory will be added to the classpath, but further sub-directories will not be included.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .dynamicallyModifiesClasspath(true)
+ .build();
+
private static final List PROPERTIES
= ImmutableList.builder()
.addAll(AbstractKiteProcessor.getProperties())
.add(KITE_DATASET_URI)
+ .add(ADDITIONAL_CLASSPATH_RESOURCES)
.build();
private static final Set RELATIONSHIPS
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConfigurationProperty.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConfigurationProperty.java
index 724a4c64da..ef557105a8 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConfigurationProperty.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConfigurationProperty.java
@@ -21,6 +21,7 @@ package org.apache.nifi.processors.kite;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import org.apache.avro.generic.GenericData.Record;
import org.apache.hadoop.conf.Configuration;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.TestRunner;
@@ -29,8 +30,12 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.After;
import org.junit.rules.TemporaryFolder;
+import org.kitesdk.data.DatasetDescriptor;
+import org.kitesdk.data.Datasets;
import org.kitesdk.data.spi.DefaultConfiguration;
+import org.kitesdk.data.Dataset;
public class TestConfigurationProperty {
@@ -38,6 +43,9 @@ public class TestConfigurationProperty {
public final TemporaryFolder temp = new TemporaryFolder();
public File confLocation;
+ private String datasetUri = null;
+ private Dataset dataset = null;
+
@Before
public void saveConfiguration() throws IOException {
Configuration conf = new Configuration(false);
@@ -49,6 +57,20 @@ public class TestConfigurationProperty {
out.close();
}
+ @Before
+ public void createDataset() throws Exception {
+ DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
+ .schema(TestUtil.USER_SCHEMA)
+ .build();
+ this.datasetUri = "dataset:file:" + temp.newFolder("ns", "temp").toString();
+ this.dataset = Datasets.create(datasetUri, descriptor, Record.class);
+ }
+
+ @After
+ public void deleteDataset() throws Exception {
+ Datasets.delete(datasetUri);
+ }
+
@Test
public void testConfigurationCanary() throws IOException {
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
@@ -73,4 +95,18 @@ public class TestConfigurationProperty {
AbstractKiteProcessor.CONF_XML_FILES, temp.newFile().toString());
runner.assertNotValid();
}
+
+ @Test
+ public void testConfigurationExpressionLanguage() throws IOException {
+ TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
+ runner.setProperty(
+ AbstractKiteProcessor.CONF_XML_FILES, "${filename:substring(0,0):append('pom.xml')}");
+ runner.setProperty(
+ StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
+ runner.assertValid();
+ // botch the Expression Language evaluation
+ runner.setProperty(
+ AbstractKiteProcessor.CONF_XML_FILES, "${filename:substring(0,0):");
+ runner.assertNotValid();
+ }
}