NIFI-4443 Increase StoreInKiteDataset flexibility

* The configuration property CONF_XML_FILE now support Expression
  Language and reuse a Hadoop validator;
* The ADDITIONAL_CLASSPATH_RESOURCES property has been added, so that
  things such as writing to Azure Blob Storage should become possible.

This closes #2186.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Giovanni Lanzani 2017-09-29 23:21:03 +02:00 committed by Bryan Bende
parent 0a47a3bde5
commit f1bd866005
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
4 changed files with 61 additions and 26 deletions

View File

@ -281,6 +281,10 @@
<scope>test</scope> <scope>test</scope>
<version>${kite.version}</version> <version>${kite.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hadoop-utils</artifactId>
</dependency>
</dependencies> </dependencies>

View File

@ -37,7 +37,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator; import org.apache.nifi.components.Validator;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.hadoop.HadoopValidators;
import org.kitesdk.data.DatasetNotFoundException; import org.kitesdk.data.DatasetNotFoundException;
import org.kitesdk.data.Datasets; import org.kitesdk.data.Datasets;
import org.kitesdk.data.SchemaNotFoundException; import org.kitesdk.data.SchemaNotFoundException;
@ -47,33 +47,16 @@ import org.kitesdk.data.spi.DefaultConfiguration;
abstract class AbstractKiteProcessor extends AbstractProcessor { abstract class AbstractKiteProcessor extends AbstractProcessor {
private static final Splitter COMMA = Splitter.on(',').trimResults(); private static final Splitter COMMA = Splitter.on(',').trimResults();
protected static final Validator FILES_EXIST = new Validator() {
@Override
public ValidationResult validate(String subject, String configFiles,
ValidationContext context) {
if (configFiles != null && !configFiles.isEmpty()) {
for (String file : COMMA.split(configFiles)) {
ValidationResult result = StandardValidators.FILE_EXISTS_VALIDATOR
.validate(subject, file, context);
if (!result.isValid()) {
return result;
}
}
}
return new ValidationResult.Builder()
.subject(subject)
.input(configFiles)
.explanation("Files exist")
.valid(true)
.build();
}
};
protected static final PropertyDescriptor CONF_XML_FILES protected static final PropertyDescriptor CONF_XML_FILES
= new PropertyDescriptor.Builder() = new PropertyDescriptor.Builder()
.name("Hadoop configuration files") .name("Hadoop configuration files")
.description("A comma-separated list of Hadoop configuration files") .displayName("Hadoop configuration Resources")
.addValidator(FILES_EXIST) .description("A file or comma separated list of files which contains the Hadoop file system configuration. Without this, Hadoop "
+ "will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' file or will revert to a default configuration.")
.required(false)
.addValidator(HadoopValidators.ONE_OR_MORE_FILE_EXISTS_VALIDATOR)
.expressionLanguageSupported(true)
.build(); .build();
protected static final Validator RECOGNIZED_URI = new Validator() { protected static final Validator RECOGNIZED_URI = new Validator() {
@ -163,7 +146,7 @@ abstract class AbstractKiteProcessor extends AbstractProcessor {
protected static final Validator SCHEMA_VALIDATOR = new Validator() { protected static final Validator SCHEMA_VALIDATOR = new Validator() {
@Override @Override
public ValidationResult validate(String subject, String uri, ValidationContext context) { public ValidationResult validate(String subject, String uri, ValidationContext context) {
Configuration conf = getConfiguration(context.getProperty(CONF_XML_FILES).getValue()); Configuration conf = getConfiguration(context.getProperty(CONF_XML_FILES).evaluateAttributeExpressions().getValue());
String error = null; String error = null;
final boolean elPresent = context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(uri); final boolean elPresent = context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(uri);
@ -195,7 +178,7 @@ abstract class AbstractKiteProcessor extends AbstractProcessor {
protected void setDefaultConfiguration(ProcessContext context) protected void setDefaultConfiguration(ProcessContext context)
throws IOException { throws IOException {
DefaultConfiguration.set(getConfiguration( DefaultConfiguration.set(getConfiguration(
context.getProperty(CONF_XML_FILES).getValue())); context.getProperty(CONF_XML_FILES).evaluateAttributeExpressions().getValue()));
} }
protected static Configuration getConfiguration(String configFiles) { protected static Configuration getConfiguration(String configFiles) {

View File

@ -38,6 +38,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StopWatch;
import org.kitesdk.data.DatasetIOException; import org.kitesdk.data.DatasetIOException;
import org.kitesdk.data.DatasetWriter; import org.kitesdk.data.DatasetWriter;
@ -79,10 +80,21 @@ public class StoreInKiteDataset extends AbstractKiteProcessor {
.required(true) .required(true)
.build(); .build();
public static final PropertyDescriptor ADDITIONAL_CLASSPATH_RESOURCES = new PropertyDescriptor.Builder()
.name("additional-classpath-resources")
.displayName("Additional Classpath Resources")
.description("A comma-separated list of paths to files and/or directories that will be added to the classpath. When specifying a " +
"directory, all files with in the directory will be added to the classpath, but further sub-directories will not be included.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.dynamicallyModifiesClasspath(true)
.build();
private static final List<PropertyDescriptor> PROPERTIES private static final List<PropertyDescriptor> PROPERTIES
= ImmutableList.<PropertyDescriptor>builder() = ImmutableList.<PropertyDescriptor>builder()
.addAll(AbstractKiteProcessor.getProperties()) .addAll(AbstractKiteProcessor.getProperties())
.add(KITE_DATASET_URI) .add(KITE_DATASET_URI)
.add(ADDITIONAL_CLASSPATH_RESOURCES)
.build(); .build();
private static final Set<Relationship> RELATIONSHIPS private static final Set<Relationship> RELATIONSHIPS

View File

@ -21,6 +21,7 @@ package org.apache.nifi.processors.kite;
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import org.apache.avro.generic.GenericData.Record;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
@ -29,8 +30,12 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.After;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.spi.DefaultConfiguration; import org.kitesdk.data.spi.DefaultConfiguration;
import org.kitesdk.data.Dataset;
public class TestConfigurationProperty { public class TestConfigurationProperty {
@ -38,6 +43,9 @@ public class TestConfigurationProperty {
public final TemporaryFolder temp = new TemporaryFolder(); public final TemporaryFolder temp = new TemporaryFolder();
public File confLocation; public File confLocation;
private String datasetUri = null;
private Dataset<Record> dataset = null;
@Before @Before
public void saveConfiguration() throws IOException { public void saveConfiguration() throws IOException {
Configuration conf = new Configuration(false); Configuration conf = new Configuration(false);
@ -49,6 +57,20 @@ public class TestConfigurationProperty {
out.close(); out.close();
} }
@Before
public void createDataset() throws Exception {
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.schema(TestUtil.USER_SCHEMA)
.build();
this.datasetUri = "dataset:file:" + temp.newFolder("ns", "temp").toString();
this.dataset = Datasets.create(datasetUri, descriptor, Record.class);
}
@After
public void deleteDataset() throws Exception {
Datasets.delete(datasetUri);
}
@Test @Test
public void testConfigurationCanary() throws IOException { public void testConfigurationCanary() throws IOException {
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class); TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
@ -73,4 +95,18 @@ public class TestConfigurationProperty {
AbstractKiteProcessor.CONF_XML_FILES, temp.newFile().toString()); AbstractKiteProcessor.CONF_XML_FILES, temp.newFile().toString());
runner.assertNotValid(); runner.assertNotValid();
} }
@Test
public void testConfigurationExpressionLanguage() throws IOException {
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
runner.setProperty(
AbstractKiteProcessor.CONF_XML_FILES, "${filename:substring(0,0):append('pom.xml')}");
runner.setProperty(
StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
runner.assertValid();
// botch the Expression Language evaluation
runner.setProperty(
AbstractKiteProcessor.CONF_XML_FILES, "${filename:substring(0,0):");
runner.assertNotValid();
}
} }