NIFI-1254 remove Destination Type property from GetJMSQueue and GetJMSTopic

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Mike Moser 2015-12-04 14:26:14 -05:00 committed by Mark Payne
parent dce039b54f
commit 14b3349e8d
2 changed files with 45 additions and 26 deletions

View File

@ -21,7 +21,6 @@ import static org.apache.nifi.processors.standard.util.JmsProperties.ACK_MODE_CL
import static org.apache.nifi.processors.standard.util.JmsProperties.BATCH_SIZE;
import static org.apache.nifi.processors.standard.util.JmsProperties.CLIENT_ID_PREFIX;
import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_NAME;
import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_TYPE;
import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROPS_TO_ATTRIBUTES;
import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROVIDER;
import static org.apache.nifi.processors.standard.util.JmsProperties.MESSAGE_SELECTOR;
@ -90,7 +89,6 @@ public abstract class JmsConsumer extends AbstractProcessor {
descriptors.add(USERNAME);
descriptors.add(PASSWORD);
descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(DESTINATION_TYPE);
descriptors.add(ACKNOWLEDGEMENT_MODE);
descriptors.add(MESSAGE_SELECTOR);
descriptors.add(JMS_PROPS_TO_ATTRIBUTES);

View File

@ -42,15 +42,15 @@ public class TestGetJMSQueue {
@Test
public void testSendTextToQueue() throws Exception {
GetJMSQueue getJmsQueue = new GetJMSQueue();
TestRunner runner = TestRunners.newTestRunner(getJmsQueue);
runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false");
runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE);
runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
PutJMS putJms = new PutJMS();
TestRunner putRunner = TestRunners.newTestRunner(putJms);
putRunner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
putRunner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false");
putRunner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE);
putRunner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
putRunner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runner.getProcessContext(), true);
WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(putRunner.getProcessContext(), true);
final Session jmsSession = wrappedProducer.getSession();
final MessageProducer producer = wrappedProducer.getProducer();
final Message message = jmsSession.createTextMessage("Hello World");
@ -58,6 +58,13 @@ public class TestGetJMSQueue {
producer.send(message);
jmsSession.commit();
GetJMSQueue getJmsQueue = new GetJMSQueue();
TestRunner runner = TestRunners.newTestRunner(getJmsQueue);
runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false");
runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
runner.run();
List<MockFlowFile> flowFiles = runner
@ -73,14 +80,14 @@ public class TestGetJMSQueue {
@Test
public void testSendBytesToQueue() throws Exception {
GetJMSQueue getJmsQueue = new GetJMSQueue();
TestRunner runner = TestRunners.newTestRunner(getJmsQueue);
runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false");
runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE);
runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runner.getProcessContext(), true);
PutJMS putJms = new PutJMS();
TestRunner putRunner = TestRunners.newTestRunner(putJms);
putRunner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
putRunner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false");
putRunner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE);
putRunner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
putRunner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(putRunner.getProcessContext(), true);
final Session jmsSession = wrappedProducer.getSession();
final MessageProducer producer = wrappedProducer.getProducer();
final BytesMessage message = jmsSession.createBytesMessage();
@ -89,6 +96,13 @@ public class TestGetJMSQueue {
producer.send(message);
jmsSession.commit();
GetJMSQueue getJmsQueue = new GetJMSQueue();
TestRunner runner = TestRunners.newTestRunner(getJmsQueue);
runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false");
runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
runner.run();
List<MockFlowFile> flowFiles = runner
@ -104,14 +118,14 @@ public class TestGetJMSQueue {
@Test
public void testSendStreamToQueue() throws Exception {
GetJMSQueue getJmsQueue = new GetJMSQueue();
TestRunner runner = TestRunners.newTestRunner(getJmsQueue);
runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false");
runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE);
runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runner.getProcessContext(), true);
PutJMS putJms = new PutJMS();
TestRunner putRunner = TestRunners.newTestRunner(putJms);
putRunner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
putRunner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false");
putRunner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE);
putRunner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
putRunner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(putRunner.getProcessContext(), true);
final Session jmsSession = wrappedProducer.getSession();
final MessageProducer producer = wrappedProducer.getProducer();
@ -121,6 +135,13 @@ public class TestGetJMSQueue {
producer.send(message);
jmsSession.commit();
GetJMSQueue getJmsQueue = new GetJMSQueue();
TestRunner runner = TestRunners.newTestRunner(getJmsQueue);
runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false");
runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
runner.run();
List<MockFlowFile> flowFiles = runner