mirror of https://github.com/apache/nifi.git
NIFI-1378: fixed JMS URI validation simplified JmsFactory check for SSL and scheme-less URIs ensured URI validation is handled by ActiveMqConnectionFactory ensured informative error messages are shown to the userm removed unneeded test, cleaned up unused imports
Reviewed by Tony Kurc (tkurc@apache.org). This closes #167
This commit is contained in:
parent
53322c99ac
commit
3db6fffa68
|
@ -358,12 +358,12 @@ public class JmsFactory {
|
||||||
try {
|
try {
|
||||||
uri = new URI(context.getProperty(URL).getValue());
|
uri = new URI(context.getProperty(URL).getValue());
|
||||||
} catch (URISyntaxException e) {
|
} catch (URISyntaxException e) {
|
||||||
// Should not happen - URL was validated
|
// Should not happen - URI was validated
|
||||||
throw new IllegalArgumentException("Validated URI [" + context.getProperty(URL) + "] was invalid", e);
|
throw new IllegalArgumentException("Validated URI [" + context.getProperty(URL) + "] was invalid", e);
|
||||||
}
|
}
|
||||||
final int timeoutMillis = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
|
final int timeoutMillis = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
|
||||||
final String provider = context.getProperty(JMS_PROVIDER).getValue();
|
final String provider = context.getProperty(JMS_PROVIDER).getValue();
|
||||||
if (uri.getScheme().equals("ssl") || (URISupport.isCompositeURI(uri) && compositeURIHasSSL(uri))) {
|
if (isSSL(uri)) {
|
||||||
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
|
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
|
||||||
if (sslContextService == null) {
|
if (sslContextService == null) {
|
||||||
throw new IllegalArgumentException("Attempting to initiate SSL JMS connection and SSL Context is not set.");
|
throw new IllegalArgumentException("Attempting to initiate SSL JMS connection and SSL Context is not set.");
|
||||||
|
@ -375,11 +375,11 @@ public class JmsFactory {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean compositeURIHasSSL(URI uri) {
|
private static boolean isSSL(URI uri) {
|
||||||
try {
|
try {
|
||||||
CompositeData compositeData = URISupport.parseComposite(uri);
|
CompositeData compositeData = URISupport.parseComposite(uri);
|
||||||
for(URI component : compositeData.getComponents()){
|
for(URI component : compositeData.getComponents()){
|
||||||
if(component.getScheme().equals("ssl")){
|
if ("ssl".equals(component.getScheme())) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,11 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.standard.util;
|
package org.apache.nifi.processors.standard.util;
|
||||||
|
|
||||||
import java.net.URI;
|
|
||||||
import java.net.URISyntaxException;
|
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.components.ValidationContext;
|
|
||||||
import org.apache.nifi.components.ValidationResult;
|
|
||||||
import org.apache.nifi.components.Validator;
|
import org.apache.nifi.components.Validator;
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
import org.apache.nifi.ssl.SSLContextService;
|
import org.apache.nifi.ssl.SSLContextService;
|
||||||
|
@ -53,27 +49,6 @@ public class JmsProperties {
|
||||||
.name("URL")
|
.name("URL")
|
||||||
.description("The URL of the JMS Server")
|
.description("The URL of the JMS Server")
|
||||||
.addValidator(StandardValidators.URI_VALIDATOR)
|
.addValidator(StandardValidators.URI_VALIDATOR)
|
||||||
.addValidator(new Validator() {
|
|
||||||
@Override
|
|
||||||
public ValidationResult validate(String subject, String input, ValidationContext context) {
|
|
||||||
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
|
|
||||||
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
|
|
||||||
}
|
|
||||||
final ValidationResult.Builder builder = new ValidationResult.Builder();
|
|
||||||
builder.subject(subject).input(input).explanation("Valid URL").valid(true);
|
|
||||||
try {
|
|
||||||
final String evaluatedInput = context.newPropertyValue(input).evaluateAttributeExpressions().getValue();
|
|
||||||
final URI uri = new URI(evaluatedInput);
|
|
||||||
if (uri.getScheme() == null) {
|
|
||||||
builder.explanation("JMS URI must have a scheme set such as 'jms','ssl','tcp','vm',etc..").valid(false);
|
|
||||||
}
|
|
||||||
} catch (final URISyntaxException urie) {
|
|
||||||
builder.explanation("JMS URI not valid").valid(false);
|
|
||||||
}
|
|
||||||
return builder.build();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
)
|
|
||||||
.required(true)
|
.required(true)
|
||||||
.build();
|
.build();
|
||||||
public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
|
||||||
|
|
|
@ -16,8 +16,12 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.standard;
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.PrintStream;
|
||||||
|
import java.lang.reflect.Field;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import javax.jms.BytesMessage;
|
import javax.jms.BytesMessage;
|
||||||
|
@ -37,18 +41,76 @@ import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
import org.apache.nifi.web.Revision;
|
import org.apache.nifi.web.Revision;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.slf4j.impl.SimpleLogger;
|
||||||
|
|
||||||
public class TestGetJMSQueue {
|
public class TestGetJMSQueue {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testInvalidURL() throws Exception {
|
public void testSchemelessURI() throws Exception {
|
||||||
|
String expectedErrMsg = "Failed to connect to JMS Server due to javax.jms.JMSException: "
|
||||||
|
+ "Could not create Transport. Reason: java.io.IOException: Transport not scheme specified: [localhost]";
|
||||||
|
|
||||||
|
ByteArrayOutputStream bos = this.prepLogOutputStream();
|
||||||
GetJMSQueue getJmsQueue = new GetJMSQueue();
|
GetJMSQueue getJmsQueue = new GetJMSQueue();
|
||||||
|
|
||||||
TestRunner runner = TestRunners.newTestRunner(getJmsQueue);
|
TestRunner runner = TestRunners.newTestRunner(getJmsQueue);
|
||||||
runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
|
runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
|
||||||
runner.setProperty(JmsProperties.URL, "localhost");
|
runner.setProperty(JmsProperties.URL, "localhost");
|
||||||
runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
|
runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
|
||||||
runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
|
runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
|
||||||
runner.assertNotValid();
|
|
||||||
|
runner.run();
|
||||||
|
assertEquals(0, runner.getFlowFilesForRelationship("success").size());
|
||||||
|
assertTrue(bos.toString("ASCII").contains(expectedErrMsg));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPortlessURI() throws Exception {
|
||||||
|
String expectedErrMsg = "Failed to connect to JMS Server due to javax.jms.JMSException: "
|
||||||
|
+ "Could not connect to broker URL: tcp://localhost. Reason: java.lang.IllegalArgumentException: port out of range:-1";
|
||||||
|
|
||||||
|
ByteArrayOutputStream bos = this.prepLogOutputStream();
|
||||||
|
GetJMSQueue getJmsQueue = new GetJMSQueue();
|
||||||
|
|
||||||
|
TestRunner runner = TestRunners.newTestRunner(getJmsQueue);
|
||||||
|
runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
|
||||||
|
runner.setProperty(JmsProperties.URL, "tcp://localhost");
|
||||||
|
runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
|
||||||
|
runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
assertEquals(0, runner.getFlowFilesForRelationship("success").size());
|
||||||
|
assertTrue(bos.toString("ASCII").contains(expectedErrMsg));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCompositeSchemelessPortlessURI() throws Exception {
|
||||||
|
String expectedErrMsg1 = "Failed to connect to [tcp://localhost] after: 2 attempt(s)";
|
||||||
|
String expectedErrMsg2 = "Failed to connect to JMS Server due to javax.jms.JMSException: port out of range:-1";
|
||||||
|
|
||||||
|
ByteArrayOutputStream bos = this.prepLogOutputStream();
|
||||||
|
GetJMSQueue getJmsQueue = new GetJMSQueue();
|
||||||
|
TestRunner runner = TestRunners.newTestRunner(getJmsQueue);
|
||||||
|
runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
|
||||||
|
runner.setProperty(JmsProperties.URL,
|
||||||
|
"failover:(tcp://localhost,remotehost)?initialReconnectDelay=1&startupMaxReconnectAttempts=2");
|
||||||
|
runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
|
||||||
|
runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
assertEquals(0, runner.getFlowFilesForRelationship("success").size());
|
||||||
|
assertTrue(bos.toString("ASCII").contains(expectedErrMsg1));
|
||||||
|
assertTrue(bos.toString("ASCII").contains(expectedErrMsg2));
|
||||||
|
}
|
||||||
|
|
||||||
|
private ByteArrayOutputStream prepLogOutputStream() throws Exception {
|
||||||
|
LoggerFactory.getLogger(GetJMSQueue.class);
|
||||||
|
Field field = SimpleLogger.class.getDeclaredField("TARGET_STREAM");
|
||||||
|
field.setAccessible(true);
|
||||||
|
ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
||||||
|
field.set(null, new PrintStream(bos));
|
||||||
|
return bos;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -78,17 +78,6 @@ public class TestPutJMS {
|
||||||
assertTrue(relationships.contains(PutJMS.REL_SUCCESS));
|
assertTrue(relationships.contains(PutJMS.REL_SUCCESS));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testInvalidURL() throws Exception {
|
|
||||||
PutJMS putJms = new PutJMS();
|
|
||||||
TestRunner runner = TestRunners.newTestRunner(putJms);
|
|
||||||
runner.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER);
|
|
||||||
runner.setProperty(JmsProperties.URL, "localhost");
|
|
||||||
runner.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_TYPE);
|
|
||||||
runner.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + testQueueSuffix());
|
|
||||||
runner.assertNotValid();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCleanupResources() throws JMSException, NoSuchFieldException, IllegalAccessException {
|
public void testCleanupResources() throws JMSException, NoSuchFieldException, IllegalAccessException {
|
||||||
final PutJMS putJMS = new PutJMS();
|
final PutJMS putJMS = new PutJMS();
|
||||||
|
|
Loading…
Reference in New Issue