From 1ea49ad8cff63dd9e31fc4d1f6907222563306ea Mon Sep 17 00:00:00 2001 From: Jeff Storck Date: Wed, 27 Jul 2016 11:51:15 -0400 Subject: [PATCH] NIFI-2407 Implements EL support on some properties of processors/services - Added EL support to "Directory" property of ListHDFS and GetHDFS processors - Added EL support to "Destination" property for ConsumeJMS and PublishJMS processors - Added EL support to "MQ ConnectionFactory Implementation", "MQ client library path", "Broker URI" properties for the JMS Connection Factory Provider - Added EL support to "Database Connection URL", "Database Driver Class Name", "DB Driver jar url", "DB username", and "DB password" properties for the DBCP Connection Pool - Removed NotificationServiceManagerSpec.groovy, previous PR from Joe Witt covers the needed testing for Variable Registry/Value Lookup at this time This closes #778. --- .../NotificationServiceManagerSpec.groovy | 50 ------------------- .../hadoop/AbstractHadoopProcessor.java | 15 ++++-- .../nifi/processors/hadoop/GetHDFS.java | 7 +-- .../nifi/processors/hadoop/ListHDFS.java | 3 +- .../nifi/processors/hadoop/PutHDFSTest.java | 5 +- .../jms/cf/JMSConnectionFactoryProvider.java | 8 +-- ...MSConnectionFactoryProviderDefinition.java | 3 ++ .../cf/JMSConnectionFactoryProviderTest.java | 43 ++++++++-------- .../jms/processors/AbstractJMSProcessor.java | 3 +- .../nifi/jms/processors/ConsumeJMS.java | 2 +- .../nifi/jms/processors/PublishJMS.java | 2 +- .../nifi/jms/processors/ConsumeJMSTest.java | 10 ++-- .../nifi/jms/processors/PublishJMSTest.java | 25 +++++----- .../apache/nifi/dbcp/DBCPConnectionPool.java | 39 ++++++++------- .../org/apache/nifi/dbcp/DBCPServiceTest.java | 24 ++++----- 15 files changed, 102 insertions(+), 137 deletions(-) delete mode 100644 nifi-bootstrap/src/test/groovy/org/apache/nifi/bootstrap/NotificationServiceManagerSpec.groovy diff --git a/nifi-bootstrap/src/test/groovy/org/apache/nifi/bootstrap/NotificationServiceManagerSpec.groovy b/nifi-bootstrap/src/test/groovy/org/apache/nifi/bootstrap/NotificationServiceManagerSpec.groovy deleted file mode 100644 index 7bd4c52db6..0000000000 --- a/nifi-bootstrap/src/test/groovy/org/apache/nifi/bootstrap/NotificationServiceManagerSpec.groovy +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.bootstrap - -import org.apache.nifi.bootstrap.notification.NotificationType -import org.apache.nifi.registry.VariableRegistry -import spock.lang.Specification -import java.nio.file.Paths - -class NotificationServiceManagerSpec extends Specification{ - - def setupSpec(){ - } - - def "should acess variable registry to replace EL values"(){ - - given: - def mockRegistry = Mock(VariableRegistry.class) - def notificationServiceManager = new NotificationServiceManager(mockRegistry); - def file = Paths.get("src/test/resources/notification-services.xml").toFile() - notificationServiceManager.loadNotificationServices(file) - //testing with stopped becasue it will block until method is completed - notificationServiceManager.registerNotificationService(NotificationType.NIFI_STOPPED,"custom-notification") - - when: - notificationServiceManager.notify(NotificationType.NIFI_STOPPED,"NiFi Stopped","NiFi Stopped") - - then: - 6 * mockRegistry.getVariables() >> ["test.server":"smtp://fakeserver.com","test.username":"user","test.password":"pass"] - - - } - - -} diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java index f96aa78f48..6288de3294 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java @@ -191,9 +191,18 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { HdfsResources resources = hdfsResources.get(); if (resources.getConfiguration() == null) { String configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue(); - String dir = context.getProperty(DIRECTORY_PROP_NAME).getValue(); - dir = dir == null ? "/" : dir; - resources = resetHDFSResources(configResources, dir, context); + final String dir; + final PropertyDescriptor directoryPropDescriptor = getPropertyDescriptor(DIRECTORY_PROP_NAME); + if (directoryPropDescriptor != null) { + if (directoryPropDescriptor.isExpressionLanguageSupported()) { + dir = context.getProperty(DIRECTORY_PROP_NAME).evaluateAttributeExpressions().getValue(); + } else { + dir = context.getProperty(DIRECTORY_PROP_NAME).getValue(); + } + } else { + dir = null; + } + resources = resetHDFSResources(configResources, dir == null ? "/" : dir, context); hdfsResources.set(resources); } } catch (IOException ex) { diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java index d18c13fd54..24c033076e 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java @@ -97,6 +97,7 @@ public class GetHDFS extends AbstractHadoopProcessor { .description("The HDFS directory from which files should be read") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder() @@ -241,7 +242,7 @@ public class GetHDFS extends AbstractHadoopProcessor { // copy configuration values to pass them around cleanly processorConfig = new ProcessorConfiguration(context); final FileSystem fs = getFileSystem(); - final Path dir = new Path(context.getProperty(DIRECTORY).getValue()); + final Path dir = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue()); if (!fs.exists(dir)) { throw new IOException("PropertyDescriptor " + DIRECTORY + " has invalid value " + dir + ". The directory does not exist."); } @@ -341,7 +342,7 @@ public class GetHDFS extends AbstractHadoopProcessor { final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B); int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : conf.getInt(BUFFER_SIZE_KEY, BUFFER_SIZE_DEFAULT); - final Path rootDir = new Path(context.getProperty(DIRECTORY).getValue()); + final Path rootDir = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue()); final CompressionType compressionType = CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).toString()); final boolean inferCompressionCodec = compressionType == CompressionType.AUTOMATIC; @@ -501,7 +502,7 @@ public class GetHDFS extends AbstractHadoopProcessor { final private PathFilter pathFilter; ProcessorConfiguration(final ProcessContext context) { - configuredRootDirPath = new Path(context.getProperty(DIRECTORY).getValue()); + configuredRootDirPath = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue()); ignoreDottedFiles = context.getProperty(IGNORE_DOTTED_FILES).asBoolean(); final String fileFilterRegex = context.getProperty(FILE_FILTER_REGEX).getValue(); fileFilterPattern = (fileFilterRegex == null) ? null : Pattern.compile(fileFilterRegex); diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java index 4cb8d25ef1..524bf64fdf 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java @@ -102,6 +102,7 @@ public class ListHDFS extends AbstractHadoopProcessor { .description("The HDFS directory from which files should be read") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder() @@ -248,7 +249,7 @@ public class ListHDFS extends AbstractHadoopProcessor { } lastRunTimestamp = now; - final String directory = context.getProperty(DIRECTORY).getValue(); + final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue(); // Ensure that we are using the latest listing information before we try to perform a listing of HDFS files. try { diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java index 48e0f89972..34efcb2571 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java @@ -58,7 +58,7 @@ public class PutHDFSTest { private KerberosProperties kerberosProperties; @BeforeClass - public static void setUpClass() throws Exception{ + public static void setUpClass() throws Exception { /* * Running Hadoop on Windows requires a special build which will produce required binaries and native modules [1]. Since functionality * provided by this module and validated by these test does not have any native implication we do not distribute required binaries and native modules @@ -199,7 +199,6 @@ public class PutHDFSTest { TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes"); runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); - runner.setValidateExpressionUsage(false); try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1");) { Map attributes = new HashMap(); attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1"); @@ -232,7 +231,6 @@ public class PutHDFSTest { runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes"); runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); runner.setProperty(PutHDFS.COMPRESSION_CODEC, "GZIP"); - runner.setValidateExpressionUsage(false); try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1");) { Map attributes = new HashMap(); attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1"); @@ -281,7 +279,6 @@ public class PutHDFSTest { }); runner.setProperty(PutHDFS.DIRECTORY, dirName); runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); - runner.setValidateExpressionUsage(false); try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1");) { Map attributes = new HashMap(); diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java index e279baebb6..c4608be116 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java @@ -117,11 +117,11 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl if (!this.configured) { if (logger.isInfoEnabled()) { logger.info("Configuring " + this.getClass().getSimpleName() + " for '" - + context.getProperty(CONNECTION_FACTORY_IMPL).getValue() + "' to be conected to '" + + context.getProperty(CONNECTION_FACTORY_IMPL).evaluateAttributeExpressions().getValue() + "' to be conected to '" + BROKER_URI + "'"); } // will load user provided libraries/resources on the classpath - Utils.addResourcesToClasspath(context.getProperty(CLIENT_LIB_DIR_PATH).getValue()); + Utils.addResourcesToClasspath(context.getProperty(CLIENT_LIB_DIR_PATH).evaluateAttributeExpressions().getValue()); this.createConnectionFactoryInstance(context); @@ -177,7 +177,7 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl this.setProperty(propertyName, entry.getValue()); } else { if (propertyName.equals(BROKER)) { - if (context.getProperty(CONNECTION_FACTORY_IMPL).getValue().startsWith("org.apache.activemq")) { + if (context.getProperty(CONNECTION_FACTORY_IMPL).evaluateAttributeExpressions().getValue().startsWith("org.apache.activemq")) { this.setProperty("brokerURL", entry.getValue()); } else { String[] hostPort = entry.getValue().split(":"); @@ -248,7 +248,7 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl * 'CONNECTION_FACTORY_IMPL'. */ private void createConnectionFactoryInstance(ConfigurationContext context) { - String connectionFactoryImplName = context.getProperty(CONNECTION_FACTORY_IMPL).getValue(); + String connectionFactoryImplName = context.getProperty(CONNECTION_FACTORY_IMPL).evaluateAttributeExpressions().getValue(); this.connectionFactory = Utils.newDefaultInstance(connectionFactoryImplName); } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderDefinition.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderDefinition.java index 4375e221b3..3057ec6bab 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderDefinition.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderDefinition.java @@ -44,6 +44,7 @@ public interface JMSConnectionFactoryProviderDefinition extends ControllerServic + "class (i.e., org.apache.activemq.ActiveMQConnectionFactory)") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .required(true) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor CLIENT_LIB_DIR_PATH = new PropertyDescriptor.Builder() .name(CF_LIB) @@ -53,6 +54,7 @@ public interface JMSConnectionFactoryProviderDefinition extends ControllerServic + "ConnectionFactory implementation.") .addValidator(new ClientLibValidator()) .required(true) + .expressionLanguageSupported(true) .build(); // ConnectionFactory specific properties @@ -63,6 +65,7 @@ public interface JMSConnectionFactoryProviderDefinition extends ControllerServic + "'tcp://myhost:61616' for ActiveMQ or 'myhost:1414' for IBM MQ") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .required(true) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderTest.java index 95bdbe53e2..ab74dddd3e 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderTest.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderTest.java @@ -16,14 +16,13 @@ */ package org.apache.nifi.jms.cf; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.lang.reflect.Method; -import java.util.Iterator; -import java.util.ServiceLoader; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -32,15 +31,15 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; +import java.io.File; +import java.lang.reflect.Method; +import java.util.Iterator; +import java.util.ServiceLoader; -import org.apache.nifi.controller.ControllerService; -import org.apache.nifi.processor.Processor; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.Test; -import org.mockito.Mockito; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; /** * @@ -51,7 +50,7 @@ public class JMSConnectionFactoryProviderTest { @Test public void validateFullConfigWithUserLib() throws Exception { - TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); + TestRunner runner = TestRunners.newTestRunner(mock(Processor.class)); JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider(); runner.addControllerService("cfProvider", cfProvider); runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "myhost:1234"); @@ -76,7 +75,7 @@ public class JMSConnectionFactoryProviderTest { @Test(expected = AssertionError.class) public void validateOnConfigureFailsIfCNFonConnectionFactory() throws Exception { - TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); + TestRunner runner = TestRunners.newTestRunner(mock(Processor.class)); JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider(); runner.addControllerService("cfProvider", cfProvider); runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "myhost:1234"); @@ -89,7 +88,7 @@ public class JMSConnectionFactoryProviderTest { @Test public void validateNotValidForNonExistingLibPath() throws Exception { - TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); + TestRunner runner = TestRunners.newTestRunner(mock(Processor.class)); JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider(); runner.addControllerService("cfProvider", cfProvider); runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "myhost:1234"); @@ -102,7 +101,7 @@ public class JMSConnectionFactoryProviderTest { @Test(expected = AssertionError.class) public void validateFailsIfURINotHostPortAndNotActiveMQ() throws Exception { - TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); + TestRunner runner = TestRunners.newTestRunner(mock(Processor.class)); JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider(); runner.addControllerService("cfProvider", cfProvider); runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "myhost"); @@ -116,7 +115,7 @@ public class JMSConnectionFactoryProviderTest { @Test public void validateNotValidForNonDirectoryPath() throws Exception { - TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); + TestRunner runner = TestRunners.newTestRunner(mock(Processor.class)); JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider(); runner.addControllerService("cfProvider", cfProvider); runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "myhost:1234"); @@ -146,7 +145,7 @@ public class JMSConnectionFactoryProviderTest { try { String libPath = TestUtils.setupActiveMqLibForTesting(true); - TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); + TestRunner runner = TestRunners.newTestRunner(mock(Processor.class)); JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider(); runner.addControllerService("cfProvider", cfProvider); runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java index f8030dbeba..398c5c1042 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java @@ -69,6 +69,7 @@ abstract class AbstractJMSProcessor extends AbstractProcess .description("The name of the JMS Destination. Usually provided by the administrator (e.g., 'topic://myTopic').") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .build(); static final PropertyDescriptor DESTINATION_TYPE = new PropertyDescriptor.Builder() .name("Destination Type") @@ -198,7 +199,7 @@ abstract class AbstractJMSProcessor extends AbstractProcess JmsTemplate jmsTemplate = new JmsTemplate(); jmsTemplate.setConnectionFactory(this.cachingConnectionFactory); - jmsTemplate.setDefaultDestinationName(context.getProperty(DESTINATION).getValue()); + jmsTemplate.setDefaultDestinationName(context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue()); jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue())); // set of properties that may be good candidates for exposure via configuration diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java index a4cad0dbf0..ac05f2cb8f 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java @@ -87,7 +87,7 @@ public class ConsumeJMS extends AbstractJMSProcessor { }); Map jmsHeaders = response.getMessageHeaders(); flowFile = this.updateFlowFileAttributesWithJmsHeaders(jmsHeaders, flowFile, processSession); - processSession.getProvenanceReporter().receive(flowFile, context.getProperty(DESTINATION).getValue()); + processSession.getProvenanceReporter().receive(flowFile, context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue()); processSession.transfer(flowFile, REL_SUCCESS); } else { context.yield(); diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java index 8f6c18b94c..0802b439d4 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java @@ -101,7 +101,7 @@ public class PublishJMS extends AbstractJMSProcessor { this.targetResource.publish(this.extractMessageBody(flowFile, processSession), flowFile.getAttributes()); processSession.transfer(flowFile, REL_SUCCESS); - processSession.getProvenanceReporter().send(flowFile, context.getProperty(DESTINATION).getValue()); + processSession.getProvenanceReporter().send(flowFile, context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue()); } catch (Exception e) { processSession.transfer(flowFile, REL_FAILURE); this.getLogger().error("Failed while sending message to JMS via " + this.targetResource, e); diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java index 68b95fdd9f..9366e8d2e9 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java @@ -16,11 +16,6 @@ */ package org.apache.nifi.jms.processors; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.util.MockFlowFile; @@ -31,6 +26,11 @@ import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.support.JmsHeaders; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class ConsumeJMSTest { @Test diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java index 28d4ac62f7..36edf79a64 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java @@ -16,19 +16,6 @@ */ package org.apache.nifi.jms.processors; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.HashMap; -import java.util.Map; - -import javax.jms.BytesMessage; -import javax.jms.ConnectionFactory; -import javax.jms.Queue; - import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition; import org.apache.nifi.util.MockFlowFile; @@ -38,6 +25,18 @@ import org.junit.Test; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.support.JmsHeaders; +import javax.jms.BytesMessage; +import javax.jms.ConnectionFactory; +import javax.jms.Queue; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class PublishJMSTest { @Test diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java index 08aa966850..8fc7530daf 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java @@ -16,18 +16,6 @@ */ package org.apache.nifi.dbcp; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLClassLoader; -import java.sql.Connection; -import java.sql.Driver; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.TimeUnit; - import org.apache.commons.dbcp.BasicDataSource; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; @@ -40,6 +28,18 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.reporting.InitializationException; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + /** * Implementation of for Database Connection Pooling Service. Apache DBCP is used for connection pooling functionality. * @@ -55,6 +55,7 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC .defaultValue(null) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .required(true) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor DB_DRIVERNAME = new PropertyDescriptor.Builder() @@ -63,6 +64,7 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC .defaultValue(null) .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor DB_DRIVER_JAR_URL = new PropertyDescriptor.Builder() @@ -71,6 +73,7 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC .defaultValue(null) .required(false) .addValidator(StandardValidators.URL_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder() @@ -78,6 +81,7 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC .description("Database user name") .defaultValue(null) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor DB_PASSWORD = new PropertyDescriptor.Builder() @@ -87,6 +91,7 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC .required(false) .sensitive(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor MAX_WAIT_TIME = new PropertyDescriptor.Builder() @@ -148,9 +153,9 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC @OnEnabled public void onConfigured(final ConfigurationContext context) throws InitializationException { - final String drv = context.getProperty(DB_DRIVERNAME).getValue(); - final String user = context.getProperty(DB_USER).getValue(); - final String passw = context.getProperty(DB_PASSWORD).getValue(); + final String drv = context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue(); + final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue(); + final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue(); final Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS); final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).asInteger(); @@ -158,10 +163,10 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC dataSource.setDriverClassName(drv); // Optional driver URL, when exist, this URL will be used to locate driver jar file location - final String urlString = context.getProperty(DB_DRIVER_JAR_URL).getValue(); + final String urlString = context.getProperty(DB_DRIVER_JAR_URL).evaluateAttributeExpressions().getValue(); dataSource.setDriverClassLoader(getDriverClassLoader(urlString, drv)); - final String dburl = context.getProperty(DATABASE_URL).getValue(); + final String dburl = context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue(); dataSource.setMaxWait(maxWaitMillis); dataSource.setMaxActive(maxTotal); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java index 6683d2a1f1..fcf58ea86c 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java @@ -16,8 +16,16 @@ */ package org.apache.nifi.dbcp; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; import java.io.File; import java.net.MalformedURLException; @@ -32,16 +40,8 @@ import java.sql.Statement; import java.util.HashMap; import java.util.Map; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; public class DBCPServiceTest {