diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java index 5f6bea514c..45758a4a35 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java @@ -358,12 +358,12 @@ public class JmsFactory { try { uri = new URI(context.getProperty(URL).getValue()); } catch (URISyntaxException e) { - // Should not happen - URL was validated + // Should not happen - URI was validated throw new IllegalArgumentException("Validated URI [" + context.getProperty(URL) + "] was invalid", e); } final int timeoutMillis = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); final String provider = context.getProperty(JMS_PROVIDER).getValue(); - if (uri.getScheme().equals("ssl") || (URISupport.isCompositeURI(uri) && compositeURIHasSSL(uri))) { + if (isSSL(uri)) { final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); if (sslContextService == null) { throw new IllegalArgumentException("Attempting to initiate SSL JMS connection and SSL Context is not set."); @@ -375,11 +375,11 @@ public class JmsFactory { } } - private static boolean compositeURIHasSSL(URI uri) { + private static boolean isSSL(URI uri) { try { CompositeData compositeData = URISupport.parseComposite(uri); for(URI component : compositeData.getComponents()){ - if(component.getScheme().equals("ssl")){ + if ("ssl".equals(component.getScheme())) { return true; } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java index 522493b2bc..f538624243 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java @@ -16,11 +16,7 @@ */ package org.apache.nifi.processors.standard.util; -import java.net.URI; -import java.net.URISyntaxException; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.ssl.SSLContextService; @@ -53,27 +49,6 @@ public class JmsProperties { .name("URL") .description("The URL of the JMS Server") .addValidator(StandardValidators.URI_VALIDATOR) - .addValidator(new Validator() { - @Override - public ValidationResult validate(String subject, String input, ValidationContext context) { - if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { - return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build(); - } - final ValidationResult.Builder builder = new ValidationResult.Builder(); - builder.subject(subject).input(input).explanation("Valid URL").valid(true); - try { - final String evaluatedInput = context.newPropertyValue(input).evaluateAttributeExpressions().getValue(); - final URI uri = new URI(evaluatedInput); - if (uri.getScheme() == null) { - builder.explanation("JMS URI must have a scheme set such as 'jms','ssl','tcp','vm',etc..").valid(false); - } - } catch (final URISyntaxException urie) { - builder.explanation("JMS URI not valid").valid(false); - } - return builder.build(); - } - } - ) .required(true) .build(); public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java index 4a729dcb5c..2e891bc16e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java @@ -16,8 +16,12 @@ */ package org.apache.nifi.processors.standard; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.lang.reflect.Field; import java.util.List; import javax.jms.BytesMessage; @@ -37,18 +41,76 @@ import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.apache.nifi.web.Revision; import org.junit.Test; +import org.slf4j.LoggerFactory; +import org.slf4j.impl.SimpleLogger; public class TestGetJMSQueue { @Test - public void testInvalidURL() throws Exception { + public void testSchemelessURI() throws Exception { + String expectedErrMsg = "Failed to connect to JMS Server due to javax.jms.JMSException: " + + "Could not create Transport. Reason: java.io.IOException: Transport not scheme specified: [localhost]"; + + ByteArrayOutputStream bos = this.prepLogOutputStream(); GetJMSQueue getJmsQueue = new GetJMSQueue(); + TestRunner runner = TestRunners.newTestRunner(getJmsQueue); runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER); runner.setProperty(JmsProperties.URL, "localhost"); runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing"); runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO); - runner.assertNotValid(); + + runner.run(); + assertEquals(0, runner.getFlowFilesForRelationship("success").size()); + assertTrue(bos.toString("ASCII").contains(expectedErrMsg)); + } + + @Test + public void testPortlessURI() throws Exception { + String expectedErrMsg = "Failed to connect to JMS Server due to javax.jms.JMSException: " + + "Could not connect to broker URL: tcp://localhost. Reason: java.lang.IllegalArgumentException: port out of range:-1"; + + ByteArrayOutputStream bos = this.prepLogOutputStream(); + GetJMSQueue getJmsQueue = new GetJMSQueue(); + + TestRunner runner = TestRunners.newTestRunner(getJmsQueue); + runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER); + runner.setProperty(JmsProperties.URL, "tcp://localhost"); + runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing"); + runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO); + + runner.run(); + assertEquals(0, runner.getFlowFilesForRelationship("success").size()); + assertTrue(bos.toString("ASCII").contains(expectedErrMsg)); + } + + @Test + public void testCompositeSchemelessPortlessURI() throws Exception { + String expectedErrMsg1 = "Failed to connect to [tcp://localhost] after: 2 attempt(s)"; + String expectedErrMsg2 = "Failed to connect to JMS Server due to javax.jms.JMSException: port out of range:-1"; + + ByteArrayOutputStream bos = this.prepLogOutputStream(); + GetJMSQueue getJmsQueue = new GetJMSQueue(); + TestRunner runner = TestRunners.newTestRunner(getJmsQueue); + runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER); + runner.setProperty(JmsProperties.URL, + "failover:(tcp://localhost,remotehost)?initialReconnectDelay=1&startupMaxReconnectAttempts=2"); + runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing"); + runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO); + + runner.run(); + assertEquals(0, runner.getFlowFilesForRelationship("success").size()); + assertTrue(bos.toString("ASCII").contains(expectedErrMsg1)); + assertTrue(bos.toString("ASCII").contains(expectedErrMsg2)); + } + + private ByteArrayOutputStream prepLogOutputStream() throws Exception { + LoggerFactory.getLogger(GetJMSQueue.class); + Field field = SimpleLogger.class.getDeclaredField("TARGET_STREAM"); + field.setAccessible(true); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + field.set(null, new PrintStream(bos)); + return bos; } @Test diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutJMS.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutJMS.java index b6c4bed661..df1e4a4042 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutJMS.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutJMS.java @@ -78,17 +78,6 @@ public class TestPutJMS { assertTrue(relationships.contains(PutJMS.REL_SUCCESS)); } - @Test - public void testInvalidURL() throws Exception { - PutJMS putJms = new PutJMS(); - TestRunner runner = TestRunners.newTestRunner(putJms); - runner.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER); - runner.setProperty(JmsProperties.URL, "localhost"); - runner.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_TYPE); - runner.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + testQueueSuffix()); - runner.assertNotValid(); - } - @Test public void testCleanupResources() throws JMSException, NoSuchFieldException, IllegalAccessException { final PutJMS putJMS = new PutJMS();