NIFI-2407 Implements EL support on some properties of processors/services

- Added EL support to "Directory" property of ListHDFS and GetHDFS processors
 - Added EL support to "Destination" property for ConsumeJMS and PublishJMS processors
 - Added EL support to "MQ ConnectionFactory Implementation", "MQ client library path", "Broker URI" properties for the JMS Connection Factory Provider
 - Added EL support to "Database Connection URL", "Database Driver Class Name", "DB Driver jar url", "DB username", and "DB password" properties for the DBCP Connection Pool
 - Removed NotificationServiceManagerSpec.groovy, previous PR from Joe Witt covers the needed testing for Variable Registry/Value Lookup at this time
This closes #778.
This commit is contained in:
Jeff Storck 2016-07-27 11:51:15 -04:00 committed by Mark Payne
parent 0d730c5cc1
commit 1ea49ad8cf
15 changed files with 102 additions and 137 deletions

View File

@ -1,50 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.bootstrap
import org.apache.nifi.bootstrap.notification.NotificationType
import org.apache.nifi.registry.VariableRegistry
import spock.lang.Specification
import java.nio.file.Paths
class NotificationServiceManagerSpec extends Specification{
def setupSpec(){
}
def "should acess variable registry to replace EL values"(){
given:
def mockRegistry = Mock(VariableRegistry.class)
def notificationServiceManager = new NotificationServiceManager(mockRegistry);
def file = Paths.get("src/test/resources/notification-services.xml").toFile()
notificationServiceManager.loadNotificationServices(file)
//testing with stopped becasue it will block until method is completed
notificationServiceManager.registerNotificationService(NotificationType.NIFI_STOPPED,"custom-notification")
when:
notificationServiceManager.notify(NotificationType.NIFI_STOPPED,"NiFi Stopped","NiFi Stopped")
then:
6 * mockRegistry.getVariables() >> ["test.server":"smtp://fakeserver.com","test.username":"user","test.password":"pass"]
}
}

View File

@ -191,9 +191,18 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
HdfsResources resources = hdfsResources.get(); HdfsResources resources = hdfsResources.get();
if (resources.getConfiguration() == null) { if (resources.getConfiguration() == null) {
String configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue(); String configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue();
String dir = context.getProperty(DIRECTORY_PROP_NAME).getValue(); final String dir;
dir = dir == null ? "/" : dir; final PropertyDescriptor directoryPropDescriptor = getPropertyDescriptor(DIRECTORY_PROP_NAME);
resources = resetHDFSResources(configResources, dir, context); if (directoryPropDescriptor != null) {
if (directoryPropDescriptor.isExpressionLanguageSupported()) {
dir = context.getProperty(DIRECTORY_PROP_NAME).evaluateAttributeExpressions().getValue();
} else {
dir = context.getProperty(DIRECTORY_PROP_NAME).getValue();
}
} else {
dir = null;
}
resources = resetHDFSResources(configResources, dir == null ? "/" : dir, context);
hdfsResources.set(resources); hdfsResources.set(resources);
} }
} catch (IOException ex) { } catch (IOException ex) {

View File

@ -97,6 +97,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
.description("The HDFS directory from which files should be read") .description("The HDFS directory from which files should be read")
.required(true) .required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder() public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder()
@ -241,7 +242,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
// copy configuration values to pass them around cleanly // copy configuration values to pass them around cleanly
processorConfig = new ProcessorConfiguration(context); processorConfig = new ProcessorConfiguration(context);
final FileSystem fs = getFileSystem(); final FileSystem fs = getFileSystem();
final Path dir = new Path(context.getProperty(DIRECTORY).getValue()); final Path dir = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue());
if (!fs.exists(dir)) { if (!fs.exists(dir)) {
throw new IOException("PropertyDescriptor " + DIRECTORY + " has invalid value " + dir + ". The directory does not exist."); throw new IOException("PropertyDescriptor " + DIRECTORY + " has invalid value " + dir + ". The directory does not exist.");
} }
@ -341,7 +342,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B); final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : conf.getInt(BUFFER_SIZE_KEY, int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : conf.getInt(BUFFER_SIZE_KEY,
BUFFER_SIZE_DEFAULT); BUFFER_SIZE_DEFAULT);
final Path rootDir = new Path(context.getProperty(DIRECTORY).getValue()); final Path rootDir = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue());
final CompressionType compressionType = CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).toString()); final CompressionType compressionType = CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).toString());
final boolean inferCompressionCodec = compressionType == CompressionType.AUTOMATIC; final boolean inferCompressionCodec = compressionType == CompressionType.AUTOMATIC;
@ -501,7 +502,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
final private PathFilter pathFilter; final private PathFilter pathFilter;
ProcessorConfiguration(final ProcessContext context) { ProcessorConfiguration(final ProcessContext context) {
configuredRootDirPath = new Path(context.getProperty(DIRECTORY).getValue()); configuredRootDirPath = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue());
ignoreDottedFiles = context.getProperty(IGNORE_DOTTED_FILES).asBoolean(); ignoreDottedFiles = context.getProperty(IGNORE_DOTTED_FILES).asBoolean();
final String fileFilterRegex = context.getProperty(FILE_FILTER_REGEX).getValue(); final String fileFilterRegex = context.getProperty(FILE_FILTER_REGEX).getValue();
fileFilterPattern = (fileFilterRegex == null) ? null : Pattern.compile(fileFilterRegex); fileFilterPattern = (fileFilterRegex == null) ? null : Pattern.compile(fileFilterRegex);

View File

@ -102,6 +102,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
.description("The HDFS directory from which files should be read") .description("The HDFS directory from which files should be read")
.required(true) .required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder() public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder()
@ -248,7 +249,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
} }
lastRunTimestamp = now; lastRunTimestamp = now;
final String directory = context.getProperty(DIRECTORY).getValue(); final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
// Ensure that we are using the latest listing information before we try to perform a listing of HDFS files. // Ensure that we are using the latest listing information before we try to perform a listing of HDFS files.
try { try {

View File

@ -58,7 +58,7 @@ public class PutHDFSTest {
private KerberosProperties kerberosProperties; private KerberosProperties kerberosProperties;
@BeforeClass @BeforeClass
public static void setUpClass() throws Exception{ public static void setUpClass() throws Exception {
/* /*
* Running Hadoop on Windows requires a special build which will produce required binaries and native modules [1]. Since functionality * Running Hadoop on Windows requires a special build which will produce required binaries and native modules [1]. Since functionality
* provided by this module and validated by these test does not have any native implication we do not distribute required binaries and native modules * provided by this module and validated by these test does not have any native implication we do not distribute required binaries and native modules
@ -199,7 +199,6 @@ public class PutHDFSTest {
TestRunner runner = TestRunners.newTestRunner(proc); TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes"); runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes");
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
runner.setValidateExpressionUsage(false);
try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1");) { try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1");) {
Map<String, String> attributes = new HashMap<String, String>(); Map<String, String> attributes = new HashMap<String, String>();
attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1"); attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
@ -232,7 +231,6 @@ public class PutHDFSTest {
runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes"); runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes");
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
runner.setProperty(PutHDFS.COMPRESSION_CODEC, "GZIP"); runner.setProperty(PutHDFS.COMPRESSION_CODEC, "GZIP");
runner.setValidateExpressionUsage(false);
try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1");) { try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1");) {
Map<String, String> attributes = new HashMap<String, String>(); Map<String, String> attributes = new HashMap<String, String>();
attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1"); attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
@ -281,7 +279,6 @@ public class PutHDFSTest {
}); });
runner.setProperty(PutHDFS.DIRECTORY, dirName); runner.setProperty(PutHDFS.DIRECTORY, dirName);
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
runner.setValidateExpressionUsage(false);
try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1");) { try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1");) {
Map<String, String> attributes = new HashMap<String, String>(); Map<String, String> attributes = new HashMap<String, String>();

View File

@ -117,11 +117,11 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl
if (!this.configured) { if (!this.configured) {
if (logger.isInfoEnabled()) { if (logger.isInfoEnabled()) {
logger.info("Configuring " + this.getClass().getSimpleName() + " for '" logger.info("Configuring " + this.getClass().getSimpleName() + " for '"
+ context.getProperty(CONNECTION_FACTORY_IMPL).getValue() + "' to be conected to '" + context.getProperty(CONNECTION_FACTORY_IMPL).evaluateAttributeExpressions().getValue() + "' to be conected to '"
+ BROKER_URI + "'"); + BROKER_URI + "'");
} }
// will load user provided libraries/resources on the classpath // will load user provided libraries/resources on the classpath
Utils.addResourcesToClasspath(context.getProperty(CLIENT_LIB_DIR_PATH).getValue()); Utils.addResourcesToClasspath(context.getProperty(CLIENT_LIB_DIR_PATH).evaluateAttributeExpressions().getValue());
this.createConnectionFactoryInstance(context); this.createConnectionFactoryInstance(context);
@ -177,7 +177,7 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl
this.setProperty(propertyName, entry.getValue()); this.setProperty(propertyName, entry.getValue());
} else { } else {
if (propertyName.equals(BROKER)) { if (propertyName.equals(BROKER)) {
if (context.getProperty(CONNECTION_FACTORY_IMPL).getValue().startsWith("org.apache.activemq")) { if (context.getProperty(CONNECTION_FACTORY_IMPL).evaluateAttributeExpressions().getValue().startsWith("org.apache.activemq")) {
this.setProperty("brokerURL", entry.getValue()); this.setProperty("brokerURL", entry.getValue());
} else { } else {
String[] hostPort = entry.getValue().split(":"); String[] hostPort = entry.getValue().split(":");
@ -248,7 +248,7 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl
* 'CONNECTION_FACTORY_IMPL'. * 'CONNECTION_FACTORY_IMPL'.
*/ */
private void createConnectionFactoryInstance(ConfigurationContext context) { private void createConnectionFactoryInstance(ConfigurationContext context) {
String connectionFactoryImplName = context.getProperty(CONNECTION_FACTORY_IMPL).getValue(); String connectionFactoryImplName = context.getProperty(CONNECTION_FACTORY_IMPL).evaluateAttributeExpressions().getValue();
this.connectionFactory = Utils.newDefaultInstance(connectionFactoryImplName); this.connectionFactory = Utils.newDefaultInstance(connectionFactoryImplName);
} }

View File

@ -44,6 +44,7 @@ public interface JMSConnectionFactoryProviderDefinition extends ControllerServic
+ "class (i.e., org.apache.activemq.ActiveMQConnectionFactory)") + "class (i.e., org.apache.activemq.ActiveMQConnectionFactory)")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true) .required(true)
.expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor CLIENT_LIB_DIR_PATH = new PropertyDescriptor.Builder() public static final PropertyDescriptor CLIENT_LIB_DIR_PATH = new PropertyDescriptor.Builder()
.name(CF_LIB) .name(CF_LIB)
@ -53,6 +54,7 @@ public interface JMSConnectionFactoryProviderDefinition extends ControllerServic
+ "ConnectionFactory implementation.") + "ConnectionFactory implementation.")
.addValidator(new ClientLibValidator()) .addValidator(new ClientLibValidator())
.required(true) .required(true)
.expressionLanguageSupported(true)
.build(); .build();
// ConnectionFactory specific properties // ConnectionFactory specific properties
@ -63,6 +65,7 @@ public interface JMSConnectionFactoryProviderDefinition extends ControllerServic
+ "'tcp://myhost:61616' for ActiveMQ or 'myhost:1414' for IBM MQ") + "'tcp://myhost:61616' for ActiveMQ or 'myhost:1414' for IBM MQ")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true) .required(true)
.expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()

View File

@ -16,14 +16,13 @@
*/ */
package org.apache.nifi.jms.cf; package org.apache.nifi.jms.cf;
import static org.junit.Assert.assertEquals; import org.apache.nifi.controller.ControllerService;
import static org.junit.Assert.assertNotNull; import org.apache.nifi.processor.Processor;
import static org.junit.Assert.assertTrue; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import java.io.File; import org.junit.Test;
import java.lang.reflect.Method; import org.slf4j.Logger;
import java.util.Iterator; import org.slf4j.LoggerFactory;
import java.util.ServiceLoader;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionFactory; import javax.jms.ConnectionFactory;
@ -32,15 +31,15 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import java.io.File;
import java.lang.reflect.Method;
import java.util.Iterator;
import java.util.ServiceLoader;
import org.apache.nifi.controller.ControllerService; import static org.junit.Assert.assertEquals;
import org.apache.nifi.processor.Processor; import static org.junit.Assert.assertNotNull;
import org.apache.nifi.util.TestRunner; import static org.junit.Assert.assertTrue;
import org.apache.nifi.util.TestRunners; import static org.mockito.Mockito.mock;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* *
@ -51,7 +50,7 @@ public class JMSConnectionFactoryProviderTest {
@Test @Test
public void validateFullConfigWithUserLib() throws Exception { public void validateFullConfigWithUserLib() throws Exception {
TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider(); JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
runner.addControllerService("cfProvider", cfProvider); runner.addControllerService("cfProvider", cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "myhost:1234"); runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "myhost:1234");
@ -76,7 +75,7 @@ public class JMSConnectionFactoryProviderTest {
@Test(expected = AssertionError.class) @Test(expected = AssertionError.class)
public void validateOnConfigureFailsIfCNFonConnectionFactory() throws Exception { public void validateOnConfigureFailsIfCNFonConnectionFactory() throws Exception {
TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider(); JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
runner.addControllerService("cfProvider", cfProvider); runner.addControllerService("cfProvider", cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "myhost:1234"); runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "myhost:1234");
@ -89,7 +88,7 @@ public class JMSConnectionFactoryProviderTest {
@Test @Test
public void validateNotValidForNonExistingLibPath() throws Exception { public void validateNotValidForNonExistingLibPath() throws Exception {
TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider(); JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
runner.addControllerService("cfProvider", cfProvider); runner.addControllerService("cfProvider", cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "myhost:1234"); runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "myhost:1234");
@ -102,7 +101,7 @@ public class JMSConnectionFactoryProviderTest {
@Test(expected = AssertionError.class) @Test(expected = AssertionError.class)
public void validateFailsIfURINotHostPortAndNotActiveMQ() throws Exception { public void validateFailsIfURINotHostPortAndNotActiveMQ() throws Exception {
TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider(); JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
runner.addControllerService("cfProvider", cfProvider); runner.addControllerService("cfProvider", cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "myhost"); runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "myhost");
@ -116,7 +115,7 @@ public class JMSConnectionFactoryProviderTest {
@Test @Test
public void validateNotValidForNonDirectoryPath() throws Exception { public void validateNotValidForNonDirectoryPath() throws Exception {
TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider(); JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
runner.addControllerService("cfProvider", cfProvider); runner.addControllerService("cfProvider", cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "myhost:1234"); runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "myhost:1234");
@ -146,7 +145,7 @@ public class JMSConnectionFactoryProviderTest {
try { try {
String libPath = TestUtils.setupActiveMqLibForTesting(true); String libPath = TestUtils.setupActiveMqLibForTesting(true);
TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider(); JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
runner.addControllerService("cfProvider", cfProvider); runner.addControllerService("cfProvider", cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI,

View File

@ -69,6 +69,7 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
.description("The name of the JMS Destination. Usually provided by the administrator (e.g., 'topic://myTopic').") .description("The name of the JMS Destination. Usually provided by the administrator (e.g., 'topic://myTopic').")
.required(true) .required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build(); .build();
static final PropertyDescriptor DESTINATION_TYPE = new PropertyDescriptor.Builder() static final PropertyDescriptor DESTINATION_TYPE = new PropertyDescriptor.Builder()
.name("Destination Type") .name("Destination Type")
@ -198,7 +199,7 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
JmsTemplate jmsTemplate = new JmsTemplate(); JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(this.cachingConnectionFactory); jmsTemplate.setConnectionFactory(this.cachingConnectionFactory);
jmsTemplate.setDefaultDestinationName(context.getProperty(DESTINATION).getValue()); jmsTemplate.setDefaultDestinationName(context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue());
jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue())); jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue()));
// set of properties that may be good candidates for exposure via configuration // set of properties that may be good candidates for exposure via configuration

View File

@ -87,7 +87,7 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
}); });
Map<String, Object> jmsHeaders = response.getMessageHeaders(); Map<String, Object> jmsHeaders = response.getMessageHeaders();
flowFile = this.updateFlowFileAttributesWithJmsHeaders(jmsHeaders, flowFile, processSession); flowFile = this.updateFlowFileAttributesWithJmsHeaders(jmsHeaders, flowFile, processSession);
processSession.getProvenanceReporter().receive(flowFile, context.getProperty(DESTINATION).getValue()); processSession.getProvenanceReporter().receive(flowFile, context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue());
processSession.transfer(flowFile, REL_SUCCESS); processSession.transfer(flowFile, REL_SUCCESS);
} else { } else {
context.yield(); context.yield();

View File

@ -101,7 +101,7 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
this.targetResource.publish(this.extractMessageBody(flowFile, processSession), this.targetResource.publish(this.extractMessageBody(flowFile, processSession),
flowFile.getAttributes()); flowFile.getAttributes());
processSession.transfer(flowFile, REL_SUCCESS); processSession.transfer(flowFile, REL_SUCCESS);
processSession.getProvenanceReporter().send(flowFile, context.getProperty(DESTINATION).getValue()); processSession.getProvenanceReporter().send(flowFile, context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue());
} catch (Exception e) { } catch (Exception e) {
processSession.transfer(flowFile, REL_FAILURE); processSession.transfer(flowFile, REL_FAILURE);
this.getLogger().error("Failed while sending message to JMS via " + this.targetResource, e); this.getLogger().error("Failed while sending message to JMS via " + this.targetResource, e);

View File

@ -16,11 +16,6 @@
*/ */
package org.apache.nifi.jms.processors; package org.apache.nifi.jms.processors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition; import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
@ -31,6 +26,11 @@ import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.JmsHeaders; import org.springframework.jms.support.JmsHeaders;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ConsumeJMSTest { public class ConsumeJMSTest {
@Test @Test

View File

@ -16,19 +16,6 @@
*/ */
package org.apache.nifi.jms.processors; package org.apache.nifi.jms.processors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition; import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
@ -38,6 +25,18 @@ import org.junit.Test;
import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.JmsHeaders; import org.springframework.jms.support.JmsHeaders;
import javax.jms.BytesMessage;
import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class PublishJMSTest { public class PublishJMSTest {
@Test @Test

View File

@ -16,18 +16,6 @@
*/ */
package org.apache.nifi.dbcp; package org.apache.nifi.dbcp;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.dbcp.BasicDataSource; import org.apache.commons.dbcp.BasicDataSource;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
@ -40,6 +28,18 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
/** /**
* Implementation of for Database Connection Pooling Service. Apache DBCP is used for connection pooling functionality. * Implementation of for Database Connection Pooling Service. Apache DBCP is used for connection pooling functionality.
* *
@ -55,6 +55,7 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
.defaultValue(null) .defaultValue(null)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true) .required(true)
.expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor DB_DRIVERNAME = new PropertyDescriptor.Builder() public static final PropertyDescriptor DB_DRIVERNAME = new PropertyDescriptor.Builder()
@ -63,6 +64,7 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
.defaultValue(null) .defaultValue(null)
.required(true) .required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor DB_DRIVER_JAR_URL = new PropertyDescriptor.Builder() public static final PropertyDescriptor DB_DRIVER_JAR_URL = new PropertyDescriptor.Builder()
@ -71,6 +73,7 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
.defaultValue(null) .defaultValue(null)
.required(false) .required(false)
.addValidator(StandardValidators.URL_VALIDATOR) .addValidator(StandardValidators.URL_VALIDATOR)
.expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder() public static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder()
@ -78,6 +81,7 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
.description("Database user name") .description("Database user name")
.defaultValue(null) .defaultValue(null)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor DB_PASSWORD = new PropertyDescriptor.Builder() public static final PropertyDescriptor DB_PASSWORD = new PropertyDescriptor.Builder()
@ -87,6 +91,7 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
.required(false) .required(false)
.sensitive(true) .sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor MAX_WAIT_TIME = new PropertyDescriptor.Builder() public static final PropertyDescriptor MAX_WAIT_TIME = new PropertyDescriptor.Builder()
@ -148,9 +153,9 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
@OnEnabled @OnEnabled
public void onConfigured(final ConfigurationContext context) throws InitializationException { public void onConfigured(final ConfigurationContext context) throws InitializationException {
final String drv = context.getProperty(DB_DRIVERNAME).getValue(); final String drv = context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue();
final String user = context.getProperty(DB_USER).getValue(); final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
final String passw = context.getProperty(DB_PASSWORD).getValue(); final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
final Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS); final Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).asInteger(); final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).asInteger();
@ -158,10 +163,10 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
dataSource.setDriverClassName(drv); dataSource.setDriverClassName(drv);
// Optional driver URL, when exist, this URL will be used to locate driver jar file location // Optional driver URL, when exist, this URL will be used to locate driver jar file location
final String urlString = context.getProperty(DB_DRIVER_JAR_URL).getValue(); final String urlString = context.getProperty(DB_DRIVER_JAR_URL).evaluateAttributeExpressions().getValue();
dataSource.setDriverClassLoader(getDriverClassLoader(urlString, drv)); dataSource.setDriverClassLoader(getDriverClassLoader(urlString, drv));
final String dburl = context.getProperty(DATABASE_URL).getValue(); final String dburl = context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue();
dataSource.setMaxWait(maxWaitMillis); dataSource.setMaxWait(maxWaitMillis);
dataSource.setMaxActive(maxTotal); dataSource.setMaxActive(maxTotal);

View File

@ -16,8 +16,16 @@
*/ */
package org.apache.nifi.dbcp; package org.apache.nifi.dbcp;
import static org.junit.Assert.assertEquals; import org.apache.nifi.processor.exception.ProcessException;
import static org.junit.Assert.assertNotNull; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.File; import java.io.File;
import java.net.MalformedURLException; import java.net.MalformedURLException;
@ -32,16 +40,8 @@ import java.sql.Statement;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.apache.nifi.processor.exception.ProcessException; import static org.junit.Assert.assertEquals;
import org.apache.nifi.reporting.InitializationException; import static org.junit.Assert.assertNotNull;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
public class DBCPServiceTest { public class DBCPServiceTest {