NIFI-1000 Fixed JmsFactory to properly obtain destiniation name

Re-enabled JMS Tests that were annotated with @Ignore
This commit is contained in:
Oleg Zhurakousky 2015-11-09 18:35:31 -05:00
parent 5f8fdae909
commit ef0be5a5d6
4 changed files with 71 additions and 16 deletions

View File

@ -152,7 +152,8 @@ language governing permissions and limitations under the License. -->
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<artifactId>activemq-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>

View File

@ -481,7 +481,13 @@ public class JmsFactory {
attributes.put(ATTRIBUTE_PREFIX + JMS_CORRELATION_ID, message.getJMSCorrelationID());
}
if (message.getJMSDestination() != null) {
attributes.put(ATTRIBUTE_PREFIX + JMS_DESTINATION, message.getJMSDestination().toString());
String destinationName;
if (message.getJMSDestination() instanceof Queue) {
destinationName = ((Queue) message.getJMSDestination()).getQueueName();
} else {
destinationName = ((Topic) message.getJMSDestination()).getTopicName();
}
attributes.put(ATTRIBUTE_PREFIX + JMS_DESTINATION, destinationName);
}
if (message.getJMSMessageID() != null) {
attributes.put(ATTRIBUTE_PREFIX + JMS_MESSAGE_ID, message.getJMSMessageID());

View File

@ -16,6 +16,11 @@
*/
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.List;
import javax.jms.BytesMessage;
import javax.jms.MapMessage;
import javax.jms.Message;
@ -24,73 +29,116 @@ import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.standard.util.JmsFactory;
import org.apache.nifi.processors.standard.util.JmsProperties;
import org.apache.nifi.processors.standard.util.WrappedMessageProducer;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessSession;
import org.apache.nifi.util.StandardProcessorTestRunner;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.Revision;
import org.junit.Test;
public class TestGetJMSQueue {
@org.junit.Ignore
@Test
public void testSendTextToQueue() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
GetJMSQueue getJmsQueue = new GetJMSQueue();
StandardProcessorTestRunner runner = (StandardProcessorTestRunner) TestRunners.newTestRunner(getJmsQueue);
runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
runner.setProperty(JmsProperties.URL, "tcp://localhost:61616");
runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false");
runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE);
runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
MockProcessSession pSession = (MockProcessSession) runner.getProcessSessionFactory().createSession();
WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runner.getProcessContext(), true);
final Session jmsSession = wrappedProducer.getSession();
final MessageProducer producer = wrappedProducer.getProducer();
final Message message = jmsSession.createTextMessage("Hello World");
producer.send(message);
jmsSession.commit();
getJmsQueue.onTrigger(runner.getProcessContext(), pSession);
List<MockFlowFile> flowFiles = pSession
.getFlowFilesForRelationship(new Relationship.Builder().name("success").build());
assertTrue(flowFiles.size() == 1);
MockFlowFile successFlowFile = flowFiles.get(0);
String receivedMessage = new String(runner.getContentAsByteArray(successFlowFile));
assertEquals("Hello World", receivedMessage);
assertEquals("queue.testing", successFlowFile.getAttribute("jms.JMSDestination"));
producer.close();
jmsSession.close();
}
@org.junit.Ignore
@Test
public void testSendBytesToQueue() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
GetJMSQueue getJmsQueue = new GetJMSQueue();
StandardProcessorTestRunner runner = (StandardProcessorTestRunner) TestRunners.newTestRunner(getJmsQueue);
runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
runner.setProperty(JmsProperties.URL, "tcp://localhost:61616");
runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false");
runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE);
runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runner.getProcessContext(), true);
final Session jmsSession = wrappedProducer.getSession();
final MessageProducer producer = wrappedProducer.getProducer();
MockProcessSession pSession = (MockProcessSession) runner.getProcessSessionFactory().createSession();
final BytesMessage message = jmsSession.createBytesMessage();
message.writeBytes("Hello Bytes".getBytes());
producer.send(message);
jmsSession.commit();
getJmsQueue.onTrigger(runner.getProcessContext(), pSession);
List<MockFlowFile> flowFiles = pSession
.getFlowFilesForRelationship(new Relationship.Builder().name("success").build());
assertTrue(flowFiles.size() == 1);
MockFlowFile successFlowFile = flowFiles.get(0);
String receivedMessage = new String(runner.getContentAsByteArray(successFlowFile));
assertEquals("Hello Bytes", receivedMessage);
assertEquals("queue.testing", successFlowFile.getAttribute("jms.JMSDestination"));
producer.close();
jmsSession.close();
}
@org.junit.Ignore
@Test
public void testSendStreamToQueue() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
GetJMSQueue getJmsQueue = new GetJMSQueue();
StandardProcessorTestRunner runner = (StandardProcessorTestRunner) TestRunners.newTestRunner(getJmsQueue);
runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
runner.setProperty(JmsProperties.URL, "tcp://localhost:61616");
runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false");
runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE);
runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runner.getProcessContext(), true);
final Session jmsSession = wrappedProducer.getSession();
final MessageProducer producer = wrappedProducer.getProducer();
MockProcessSession pSession = (MockProcessSession) runner.getProcessSessionFactory().createSession();
final StreamMessage message = jmsSession.createStreamMessage();
message.writeBytes("Hello Stream".getBytes());
producer.send(message);
jmsSession.commit();
getJmsQueue.onTrigger(runner.getProcessContext(), pSession);
List<MockFlowFile> flowFiles = pSession
.getFlowFilesForRelationship(new Relationship.Builder().name("success").build());
assertTrue(flowFiles.size() == 1);
MockFlowFile successFlowFile = flowFiles.get(0);
String receivedMessage = new String(runner.getContentAsByteArray(successFlowFile));
assertEquals("Hello Stream", receivedMessage);
assertEquals("queue.testing", successFlowFile.getAttribute("jms.JMSDestination"));
producer.close();
jmsSession.close();
}

View File

@ -511,8 +511,8 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.12.0</version>
<artifactId>activemq-all</artifactId>
<version>5.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>