NIFI-2789, NIFI-2790 - Read JMS properties and add to FlowFile attributes in ConsumeJMS

Remove unused assertEquals import

Move destination from default to send/receive to support EL better
This commit is contained in:
Joey Frazee 2016-09-19 08:18:30 -05:00 committed by Oleg Zhurakousky
parent feaa4c9db8
commit c238676058
9 changed files with 89 additions and 38 deletions

View File

@ -202,7 +202,6 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(this.cachingConnectionFactory);
this.destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
jmsTemplate.setDefaultDestinationName(this.destinationName);
jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue()));
// set of properties that may be good candidates for exposure via configuration

View File

@ -78,7 +78,8 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
*/
@Override
protected void rendezvousWithJms(ProcessContext context, ProcessSession processSession) throws ProcessException {
final JMSResponse response = this.targetResource.consume();
final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
final JMSResponse response = this.targetResource.consume(destinationName);
if (response != null){
FlowFile flowFile = processSession.create();
flowFile = processSession.write(flowFile, new OutputStreamCallback() {
@ -88,7 +89,9 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
}
});
Map<String, Object> jmsHeaders = response.getMessageHeaders();
flowFile = this.updateFlowFileAttributesWithJmsHeaders(jmsHeaders, flowFile, processSession);
Map<String, Object> jmsProperties = Collections.<String, Object>unmodifiableMap(response.getMessageProperties());
flowFile = this.updateFlowFileAttributesWithMap(jmsHeaders, flowFile, processSession);
flowFile = this.updateFlowFileAttributesWithMap(jmsProperties, flowFile, processSession);
processSession.getProvenanceReporter().receive(flowFile, context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue());
processSession.transfer(flowFile, REL_SUCCESS);
} else {
@ -115,10 +118,10 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
/**
*
*/
private FlowFile updateFlowFileAttributesWithJmsHeaders(Map<String, Object> jmsHeaders, FlowFile flowFile, ProcessSession processSession) {
private FlowFile updateFlowFileAttributesWithMap(Map<String, Object> map, FlowFile flowFile, ProcessSession processSession) {
Map<String, String> attributes = new HashMap<String, String>();
for (Entry<String, Object> headersEntry : jmsHeaders.entrySet()) {
attributes.put(headersEntry.getKey(), String.valueOf(headersEntry.getValue()));
for (Entry<String, Object> entry : map.entrySet()) {
attributes.put(entry.getKey(), String.valueOf(entry.getValue()));
}
attributes.put(JMS_SOURCE_DESTINATION_NAME, this.destinationName);
flowFile = processSession.putAllAttributes(flowFile, attributes);

View File

@ -61,8 +61,8 @@ final class JMSConsumer extends JMSWorker {
/**
*
*/
public JMSResponse consume() {
Message message = this.jmsTemplate.receive();
public JMSResponse consume(final String destinationName) {
Message message = this.jmsTemplate.receive(destinationName);
if (message != null) {
byte[] messageBody = null;
try {

View File

@ -63,8 +63,8 @@ final class JMSPublisher extends JMSWorker {
*
* @param messageBytes byte array representing contents of the message
*/
void publish(byte[] messageBytes) {
this.publish(messageBytes, null);
void publish(final String destinationName, byte[] messageBytes) {
this.publish(destinationName, messageBytes, null);
}
/**
@ -74,8 +74,8 @@ final class JMSPublisher extends JMSWorker {
* @param flowFileAttributes
* Map representing {@link FlowFile} attributes.
*/
void publish(final byte[] messageBytes, final Map<String, String> flowFileAttributes) {
this.jmsTemplate.send(new MessageCreator() {
void publish(final String destinationName, final byte[] messageBytes, final Map<String, String> flowFileAttributes) {
this.jmsTemplate.send(destinationName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
BytesMessage message = session.createBytesMessage();
@ -83,7 +83,7 @@ final class JMSPublisher extends JMSWorker {
if (flowFileAttributes != null && !flowFileAttributes.isEmpty()) {
// set message headers and properties
for (Entry<String, String> entry : flowFileAttributes.entrySet()) {
if (!entry.getKey().startsWith(JmsHeaders.PREFIX) && !entry.getKey().contains("-")) {// '-' is illegal char in JMS prop names
if (!entry.getKey().startsWith(JmsHeaders.PREFIX) && !entry.getKey().contains("-") && !entry.getKey().contains(".")) {// '-' and '.' are illegal char in JMS prop names
message.setStringProperty(entry.getKey(), entry.getValue());
} else if (entry.getKey().equals(JmsHeaders.DELIVERY_MODE)) {
message.setJMSDeliveryMode(Integer.parseInt(entry.getValue()));

View File

@ -98,7 +98,8 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
FlowFile flowFile = processSession.get();
if (flowFile != null) {
try {
this.targetResource.publish(this.extractMessageBody(flowFile, processSession),
final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions(flowFile).getValue();
this.targetResource.publish(destinationName, this.extractMessageBody(flowFile, processSession),
flowFile.getAttributes());
processSession.transfer(flowFile, REL_SUCCESS);
processSession.getProvenanceReporter().send(flowFile, context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue());

View File

@ -48,13 +48,12 @@ public class CommonTest {
assertTrue(consumeJmsPresent);
}
static JmsTemplate buildJmsTemplateForDestination(String destinationName, boolean pubSub) {
static JmsTemplate buildJmsTemplateForDestination(boolean pubSub) {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"vm://localhost?broker.persistent=false");
CachingConnectionFactory cf = new CachingConnectionFactory(connectionFactory);
JmsTemplate jmsTemplate = new JmsTemplate(cf);
jmsTemplate.setDefaultDestinationName(destinationName);
jmsTemplate.setPubSubDomain(pubSub);
return jmsTemplate;
}

View File

@ -16,6 +16,9 @@
*/
package org.apache.nifi.jms.processors;
import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.util.MockFlowFile;
@ -26,7 +29,6 @@ import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.JmsHeaders;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -35,9 +37,13 @@ public class ConsumeJMSTest {
@Test
public void validateSuccessfulConsumeAndTransferToSuccess() throws Exception {
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination("cooQueue", false);
final String destinationName = "cooQueue";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
JMSPublisher sender = new JMSPublisher(jmsTemplate, mock(ComponentLog.class));
sender.publish("Hey dude!".getBytes());
final Map<String, String> senderAttributes = new HashMap<>();
senderAttributes.put("filename", "message.txt");
senderAttributes.put("attribute_from_sender", "some value");
sender.publish(destinationName, "Hey dude!".getBytes(), senderAttributes);
TestRunner runner = TestRunners.newTestRunner(new ConsumeJMS());
JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
when(cs.getIdentifier()).thenReturn("cfProvider");
@ -46,13 +52,18 @@ public class ConsumeJMSTest {
runner.enableControllerService(cs);
runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
runner.setProperty(ConsumeJMS.DESTINATION, "cooQueue");
runner.setProperty(ConsumeJMS.DESTINATION, destinationName);
runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.QUEUE);
runner.run(1, false);
//
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
assertNotNull(successFF);
assertEquals("cooQueue", successFF.getAttributes().get(JmsHeaders.DESTINATION));
successFF.assertAttributeExists(JmsHeaders.DESTINATION);
successFF.assertAttributeEquals(JmsHeaders.DESTINATION, destinationName);
successFF.assertAttributeExists("filename");
successFF.assertAttributeEquals("filename", "message.txt");
successFF.assertAttributeExists("attribute_from_sender");
successFF.assertAttributeEquals("attribute_from_sender", "some value");
successFF.assertContentEquals("Hey dude!".getBytes());
String sourceDestination = successFF.getAttribute(ConsumeJMS.JMS_SOURCE_DESTINATION_NAME);
assertNotNull(sourceDestination);

View File

@ -42,12 +42,13 @@ public class JMSPublisherConsumerTest {
@Test
public void validateByesConvertedToBytesMessageOnSend() throws Exception {
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination("testQueue", false);
final String destinationName = "testQueue";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
JMSPublisher publisher = new JMSPublisher(jmsTemplate, mock(ComponentLog.class));
publisher.publish("hellomq".getBytes());
publisher.publish(destinationName, "hellomq".getBytes());
Message receivedMessage = jmsTemplate.receive();
Message receivedMessage = jmsTemplate.receive(destinationName);
assertTrue(receivedMessage instanceof BytesMessage);
byte[] bytes = new byte[7];
((BytesMessage) receivedMessage).readBytes(bytes);
@ -58,15 +59,16 @@ public class JMSPublisherConsumerTest {
@Test
public void validateJmsHeadersAndPropertiesAreTransferredFromFFAttributes() throws Exception {
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination("testQueue", false);
final String destinationName = "testQueue";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
JMSPublisher publisher = new JMSPublisher(jmsTemplate, mock(ComponentLog.class));
Map<String, String> flowFileAttributes = new HashMap<>();
flowFileAttributes.put("foo", "foo");
flowFileAttributes.put(JmsHeaders.REPLY_TO, "myTopic");
publisher.publish("hellomq".getBytes(), flowFileAttributes);
publisher.publish(destinationName, "hellomq".getBytes(), flowFileAttributes);
Message receivedMessage = jmsTemplate.receive();
Message receivedMessage = jmsTemplate.receive(destinationName);
assertTrue(receivedMessage instanceof BytesMessage);
assertEquals("foo", receivedMessage.getStringProperty("foo"));
assertTrue(receivedMessage.getJMSReplyTo() instanceof Topic);
@ -83,9 +85,10 @@ public class JMSPublisherConsumerTest {
*/
@Test(expected = IllegalStateException.class)
public void validateFailOnUnsupportedMessageType() throws Exception {
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination("testQueue", false);
final String destinationName = "testQueue";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
jmsTemplate.send(new MessageCreator() {
jmsTemplate.send(destinationName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage();
@ -94,7 +97,7 @@ public class JMSPublisherConsumerTest {
JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class));
try {
consumer.consume();
consumer.consume(destinationName);
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
@ -102,9 +105,10 @@ public class JMSPublisherConsumerTest {
@Test
public void validateConsumeWithCustomHeadersAndProperties() throws Exception {
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination("testQueue", false);
final String destinationName = "testQueue";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
jmsTemplate.send(new MessageCreator() {
jmsTemplate.send(destinationName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage message = session.createTextMessage("hello from the other side");
@ -116,9 +120,7 @@ public class JMSPublisherConsumerTest {
});
JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class));
assertEquals("JMSConsumer[destination:testQueue; pub-sub:false;]", consumer.toString());
JMSResponse response = consumer.consume();
JMSResponse response = consumer.consume(destinationName);
assertEquals("hello from the other side", new String(response.getMessageBody()));
assertEquals("fooQueue", response.getMessageHeaders().get(JmsHeaders.REPLY_TO));
assertEquals("foo", response.getMessageProperties().get("foo"));

View File

@ -43,6 +43,7 @@ public class PublishJMSTest {
public void validateSuccessfulPublishAndTransferToSuccess() throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
final String destinationName = "fooQueue";
PublishJMS pubProc = new PublishJMS();
TestRunner runner = TestRunners.newTestRunner(pubProc);
JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
@ -53,7 +54,7 @@ public class PublishJMSTest {
runner.enableControllerService(cs);
runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
runner.setProperty(PublishJMS.DESTINATION, "fooQueue");
runner.setProperty(PublishJMS.DESTINATION, destinationName);
Map<String, String> attributes = new HashMap<>();
attributes.put("foo", "foo");
@ -65,8 +66,43 @@ public class PublishJMSTest {
assertNotNull(successFF);
JmsTemplate jmst = new JmsTemplate(cf);
jmst.setDefaultDestinationName("fooQueue");
BytesMessage message = (BytesMessage) jmst.receive();
BytesMessage message = (BytesMessage) jmst.receive(destinationName);
byte[] messageBytes = MessageBodyToBytesConverter.toBytes(message);
assertEquals("Hey dude!", new String(messageBytes));
assertEquals("cooQueue", ((Queue) message.getJMSReplyTo()).getQueueName());
assertEquals("foo", message.getStringProperty("foo"));
}
@Test
public void validateSuccessfulPublishAndTransferToSuccessWithEL() throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
final String destinationNameExpression = "${foo}Queue";
final String destinationName = "fooQueue";
PublishJMS pubProc = new PublishJMS();
TestRunner runner = TestRunners.newTestRunner(pubProc);
JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
when(cs.getIdentifier()).thenReturn("cfProvider");
when(cs.getConnectionFactory()).thenReturn(cf);
runner.addControllerService("cfProvider", cs);
runner.enableControllerService(cs);
runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
runner.setProperty(PublishJMS.DESTINATION, destinationNameExpression);
Map<String, String> attributes = new HashMap<>();
attributes.put("foo", "foo");
attributes.put(JmsHeaders.REPLY_TO, "cooQueue");
runner.enqueue("Hey dude!".getBytes(), attributes);
runner.run(1, false);
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
assertNotNull(successFF);
JmsTemplate jmst = new JmsTemplate(cf);
BytesMessage message = (BytesMessage) jmst.receive(destinationName);
byte[] messageBytes = MessageBodyToBytesConverter.toBytes(message);
assertEquals("Hey dude!", new String(messageBytes));