NIFI-5073: JMSConnectionFactoryProvider now resolves EL Expression from VariableRegistry

- CLIENT_LIB_PATH is updated to include 'dynamicallyModifiesClasspath(true)'
This commit is contained in:
zenfenan 2018-04-24 21:05:05 +05:30 committed by Mark Payne
parent 716587d09f
commit db259628c7
7 changed files with 70 additions and 88 deletions

View File

@ -80,4 +80,19 @@
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes combine.children="append">
<exclude>src/test/resources/dummy.conf</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -23,7 +23,6 @@ import java.util.List;
import java.util.Map.Entry;
import javax.net.ssl.SSLContext;
import java.io.File;
import javax.jms.ConnectionFactory;
@ -44,6 +43,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.SSLContextService.ClientAuth;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -78,9 +78,9 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl
private volatile ConnectionFactory connectionFactory;
static final String BROKER = "broker";
static final String CF_IMPL = "cf";
static final String CF_LIB = "cflib";
private static final String BROKER = "broker";
private static final String CF_IMPL = "cf";
private static final String CF_LIB = "cflib";
public static final PropertyDescriptor CONNECTION_FACTORY_IMPL = new PropertyDescriptor.Builder()
.name(CF_IMPL)
@ -97,8 +97,9 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl
.description("Path to the directory with additional resources (i.e., JARs, configuration files etc.) to be added "
+ "to the classpath. Such resources typically represent target MQ client libraries for the "
+ "ConnectionFactory implementation.")
.addValidator(new ClientLibValidator())
.addValidator(StandardValidators.createListValidator(true, true, StandardValidators.createURLorFileValidator()))
.required(true)
.dynamicallyModifiesClasspath(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
@ -159,13 +160,15 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl
if (logger.isInfoEnabled()) {
logger.info("Configuring " + this.getClass().getSimpleName() + " for '"
+ context.getProperty(CONNECTION_FACTORY_IMPL).evaluateAttributeExpressions().getValue() + "' to be connected to '"
+ BROKER_URI + "'");
+ context.getProperty(BROKER_URI).evaluateAttributeExpressions().getValue() + "'");
}
// will load user provided libraries/resources on the classpath
Utils.addResourcesToClasspath(context.getProperty(CLIENT_LIB_DIR_PATH).evaluateAttributeExpressions().getValue());
final String clientLibPath = context.getProperty(CLIENT_LIB_DIR_PATH).evaluateAttributeExpressions().getValue();
ClassLoader customClassLoader = ClassLoaderUtils.getCustomClassLoader(clientLibPath, this.getClass().getClassLoader(), null);
Thread.currentThread().setContextClassLoader(customClassLoader);
this.createConnectionFactoryInstance(context);
this.setConnectionFactoryProperties(context);
}
this.configured = true;
@ -312,48 +315,13 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl
* evaluation
*/
static class NonEmptyBrokerURIValidator implements Validator {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
String value = input;
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
value = context.getProperty(BROKER_URI).evaluateAttributeExpressions().getValue();
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
}
return StandardValidators.NON_EMPTY_VALIDATOR.validate(subject, value, context);
return StandardValidators.NON_EMPTY_VALIDATOR.validate(subject, input, context);
}
}
/**
*
*/
static class ClientLibValidator implements Validator {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
String libDirPath = context.getProperty(CLIENT_LIB_DIR_PATH).evaluateAttributeExpressions().getValue();
StringBuilder invalidationMessageBuilder = new StringBuilder();
if (libDirPath != null) {
File file = new File(libDirPath);
if (!file.isDirectory()) {
invalidationMessageBuilder
.append("MQ Client library directory path must point to a directory. Was '")
.append(file.getAbsolutePath())
.append("'.");
}
} else {
invalidationMessageBuilder.append("'MQ Client Libraries path' must be provided. \n");
}
String invalidationMessage = invalidationMessageBuilder.toString();
ValidationResult vResult;
if (invalidationMessage.length() == 0) {
vResult = new ValidationResult.Builder().subject(subject).input(input)
.explanation("Client lib path is valid and points to a directory").valid(true).build();
} else {
vResult = new ValidationResult.Builder().subject(subject).input(input)
.explanation("Client lib path is invalid. " + invalidationMessage)
.valid(false).build();
}
return vResult;
}
}
}

View File

@ -16,10 +16,7 @@
*/
package org.apache.nifi.jms.cf;
import java.io.File;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@ -101,40 +98,4 @@ public final class Utils {
}
}
/**
* Adds content of the directory specified with 'path' to the classpath. It
* does so by creating a new instance of the {@link URLClassLoader} using
* {@link URL}s created from listing the contents of the directory denoted
* by 'path' and setting it as thread context class loader.
*/
static void addResourcesToClasspath(String path) {
if (logger.isDebugEnabled()) {
logger.debug("Adding additional resources from '" + path + "' to the classpath.");
}
if (path == null) {
throw new IllegalArgumentException("'path' must not be null");
}
File libraryDir = new File(path);
if (libraryDir.exists() && libraryDir.isDirectory()) {
String[] cpResourceNames = libraryDir.list();
URL[] urls = new URL[cpResourceNames.length];
try {
for (int i = 0; i < urls.length; i++) {
urls[i] = new File(libraryDir, cpResourceNames[i]).toURI().toURL();
if (logger.isDebugEnabled()) {
logger.debug("Identifying additional resource to the classpath: " + urls[i]);
}
}
} catch (Exception e) {
throw new IllegalStateException(
"Failed to parse user libraries from '" + libraryDir.getAbsolutePath() + "'", e);
}
URLClassLoader cl = new URLClassLoader(urls, Utils.class.getClassLoader());
Thread.currentThread().setContextClassLoader(cl);
} else {
throw new IllegalArgumentException("Path '" + libraryDir.getAbsolutePath()
+ "' is not valid because it doesn't exist or does not point to a directory.");
}
}
}

View File

@ -17,12 +17,16 @@
package org.apache.nifi.jms.cf;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URISyntaxException;
import static org.mockito.Mockito.mock;
/**
@ -46,16 +50,50 @@ public class JMSConnectionFactoryProviderTest {
}
@Test
public void validateNotValidForNonDirectoryPath() throws Exception {
public void validateELExpression() throws InitializationException, URISyntaxException {
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
runner.setValidateExpressionUsage(true);
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
String clientLib = this.getClass().getResource("/dummy-lib.jar").toURI().toString();
runner.addControllerService("cfProvider", cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "myhost:1234");
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, "pom.xml");
runner.setVariable("broker.uri", "tcp://0.0.0.0:616161");
runner.setVariable("client.lib", clientLib);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "${broker.uri}");
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, "${client.lib}");
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL,
"org.apache.nifi.jms.testcflib.TestConnectionFactory");
runner.assertNotValid(cfProvider);
runner.assertValid(cfProvider);
}
@Test
public void testClientLibResourcesLoaded() throws InitializationException, URISyntaxException {
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
runner.setValidateExpressionUsage(true);
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
String clientLib = this.getClass().getResource("/dummy-lib.jar").toURI().toString() + "," +
this.getClass().getResource("/dummy-lib-2.jar").toURI().toString() + "," +
this.getClass().getResource("/dummy.conf").toURI().toString() + ",";
runner.addControllerService("cfProvider", cfProvider);
runner.setVariable("broker.uri", "tcp://0.0.0.0:616161");
runner.setVariable("client.lib", clientLib);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "${broker.uri}");
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, "${client.lib}");
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL,
"org.apache.nifi.jms.testcflib.TestConnectionFactory");
runner.assertValid(cfProvider);
ClassLoader loader = runner.getClass().getClassLoader();
Assert.assertTrue(loader.getResource("dummy.conf") != null);
Assert.assertTrue(loader.getResource("dummy-lib.jar") != null);
Assert.assertTrue(loader.getResource("dummy-lib-2.jar") != null);
}
@Test(expected = IllegalStateException.class)