NIFI-4010 Enables EL on Fetch/List/PutSFTP and List/Fetch/Put/DeleteHDFS processor properties

FetchSFTP/ListSFTP/PutSFTP: Private Key Path
ListHDFS/FetchHDFS/PutHDFS/DeleteHDFS: Hadoop Configuration Resources, Kerberos Principal, Kerberos Keytab, Kerberos Relogin Period

This closes #1148
This closes #1930.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Jeff Storck 2017-06-05 13:35:34 -04:00 committed by Bryan Bende
parent 59a32948ea
commit c99100c934
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
6 changed files with 40 additions and 5 deletions

View File

@ -84,6 +84,7 @@ public class KerberosProperties {
.required(false)
.description("Kerberos principal to authenticate as. Requires nifi.kerberos.krb5.file to be set in your nifi.properties")
.addValidator(kerberosConfigValidator)
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
.expressionLanguageSupported(true)
.build();
@ -92,6 +93,7 @@ public class KerberosProperties {
.description("Kerberos keytab associated with the principal. Requires nifi.kerberos.krb5.file to be set in your nifi.properties")
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.addValidator(kerberosConfigValidator)
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
.expressionLanguageSupported(true)
.build();
}

View File

@ -28,6 +28,7 @@ import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.hadoop.KerberosProperties;
@ -69,6 +70,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
+ "will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' file or will revert to a default configuration.")
.required(false)
.addValidator(HadoopValidators.ONE_OR_MORE_FILE_EXISTS_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
@ -92,6 +94,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
.defaultValue("4 hours")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor ADDITIONAL_CLASSPATH_RESOURCES = new PropertyDescriptor.Builder()
@ -147,7 +150,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final String configResources = validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue();
final String configResources = validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
final String principal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
final String keytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
@ -191,12 +194,13 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
try {
// This value will be null when called from ListHDFS, because it overrides all of the default
// properties this processor sets. TODO: re-work ListHDFS to utilize Kerberos
if (context.getProperty(KERBEROS_RELOGIN_PERIOD).getValue() != null) {
kerberosReloginThreshold = context.getProperty(KERBEROS_RELOGIN_PERIOD).asTimePeriod(TimeUnit.SECONDS);
PropertyValue reloginPeriod = context.getProperty(KERBEROS_RELOGIN_PERIOD).evaluateAttributeExpressions();
if (reloginPeriod.getValue() != null) {
kerberosReloginThreshold = reloginPeriod.asTimePeriod(TimeUnit.SECONDS);
}
HdfsResources resources = hdfsResources.get();
if (resources.getConfiguration() == null) {
final String configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue();
final String configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
resources = resetHDFSResources(configResources, context);
hdfsResources.set(resources);
}

View File

@ -33,6 +33,10 @@ public interface HadoopValidators {
Validator ONE_OR_MORE_FILE_EXISTS_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
}
final String[] files = input.split(",");
for (String filename : files) {
try {

View File

@ -136,6 +136,30 @@ public class AbstractHadoopTest {
runner.assertValid();
}
@Test
public void testKerberosOptionsWithEL() throws Exception {
SimpleHadoopProcessor processor = new SimpleHadoopProcessor(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(processor);
// initialize the runner with EL for the kerberos properties
runner.setProperty(AbstractHadoopProcessor.HADOOP_CONFIGURATION_RESOURCES, "${variableHadoopConfigResources}");
runner.setProperty(kerberosProperties.getKerberosPrincipal(), "${variablePrincipal}");
runner.setProperty(AbstractHadoopProcessor.KERBEROS_RELOGIN_PERIOD, "${variableReloginPeriod}");
runner.setProperty(kerberosProperties.getKerberosKeytab(), "${variableKeytab}");
// add variables for all the kerberos properties except for the keytab
runner.setVariable("variableHadoopConfigResources", "src/test/resources/core-site-security.xml");
runner.setVariable("variablePrincipal", "principal");
runner.setVariable("variableReloginPeriod", "4m");
// test that the config is not valid, since the EL for keytab will return nothing, no keytab
runner.assertNotValid();
// add variable for the keytab
runner.setVariable("variableKeytab", temporaryFile.getAbsolutePath());
// test that the config is valid
runner.assertValid();
}
@Test
public void testKerberosOptionsWithBadKerberosConfigFile() throws Exception {
// invalid since the kerberos configuration was changed to a non-existent file

View File

@ -93,7 +93,7 @@ public class GetSFTP extends GetFileTransfer {
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
final List<ValidationResult> results = new ArrayList<>(super.customValidate(context));
final boolean passwordSpecified = context.getProperty(SFTPTransfer.PASSWORD).getValue() != null;
final boolean privateKeySpecified = context.getProperty(SFTPTransfer.PRIVATE_KEY_PATH).getValue() != null;
final boolean privateKeySpecified = context.getProperty(SFTPTransfer.PRIVATE_KEY_PATH).evaluateAttributeExpressions().getValue() != null;
if (!passwordSpecified && !privateKeySpecified) {
results.add(new ValidationResult.Builder().subject("Password")

View File

@ -56,6 +56,7 @@ public class SFTPTransfer implements FileTransfer {
.description("The fully qualified path to the Private Key file")
.required(false)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor PRIVATE_KEY_PASSPHRASE = new PropertyDescriptor.Builder()
.name("Private Key Passphrase")