NIFI-2774 added configurable QoS options to ConsumeJMS

Signed-off-by: Mike Moser <mosermw@apache.org>

This closes #1036.
This commit is contained in:
Oleg Zhurakousky 2016-09-20 10:49:22 -04:00 committed by Mike Moser
parent 4a4c87fa15
commit ee14ae8af0
7 changed files with 238 additions and 70 deletions

View File

@ -102,20 +102,18 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
* all other lifecycle methods are invoked multiple times.
*/
static {
propertyDescriptors.add(USER);
propertyDescriptors.add(PASSWORD);
propertyDescriptors.add(CF_SERVICE);
propertyDescriptors.add(DESTINATION);
propertyDescriptors.add(DESTINATION_TYPE);
propertyDescriptors.add(USER);
propertyDescriptors.add(PASSWORD);
propertyDescriptors.add(SESSION_CACHE_SIZE);
propertyDescriptors.add(CF_SERVICE);
}
protected volatile T targetResource;
private volatile CachingConnectionFactory cachingConnectionFactory;
protected volatile String destinationName;
/**
*
*/
@ -176,7 +174,7 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
* @see JMSPublisher
* @see JMSConsumer
*/
protected abstract T finishBuildingTargetResource(JmsTemplate jmsTemplate);
protected abstract T finishBuildingTargetResource(JmsTemplate jmsTemplate, ProcessContext processContext);
/**
* This method essentially performs initialization of this Processor by
@ -201,13 +199,12 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(this.cachingConnectionFactory);
this.destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue()));
// set of properties that may be good candidates for exposure via configuration
jmsTemplate.setReceiveTimeout(10000);
jmsTemplate.setReceiveTimeout(1000);
this.targetResource = this.finishBuildingTargetResource(jmsTemplate);
this.targetResource = this.finishBuildingTargetResource(jmsTemplate, context);
}
}
}

View File

@ -18,20 +18,27 @@ package org.apache.nifi.jms.processors;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import javax.jms.Session;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
import org.apache.nifi.jms.processors.JMSConsumer.ConsumerCallback;
import org.apache.nifi.jms.processors.JMSConsumer.JMSResponse;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@ -54,8 +61,31 @@ import org.springframework.jms.core.JmsTemplate;
@SeeAlso(value = { PublishJMS.class, JMSConnectionFactoryProvider.class })
public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
static final AllowableValue AUTO_ACK = new AllowableValue(String.valueOf(Session.AUTO_ACKNOWLEDGE),
"AUTO_ACKNOWLEDGE (" + String.valueOf(Session.AUTO_ACKNOWLEDGE) + ")",
"Automatically acknowledges a client's receipt of a message, regardless if NiFi session has been commited. "
+ "Can result in data loss in the event where NiFi abruptly stopped before session was commited.");
static final AllowableValue CLIENT_ACK = new AllowableValue(String.valueOf(Session.CLIENT_ACKNOWLEDGE),
"CLIENT_ACKNOWLEDGE (" + String.valueOf(Session.CLIENT_ACKNOWLEDGE) + ")",
"(DEFAULT) Manually acknowledges a client's receipt of a message after NiFi Session was commited, thus ensuring no data loss");
static final AllowableValue DUPS_OK = new AllowableValue(String.valueOf(Session.DUPS_OK_ACKNOWLEDGE),
"DUPS_OK_ACKNOWLEDGE (" + String.valueOf(Session.DUPS_OK_ACKNOWLEDGE) + ")",
"This acknowledgment mode instructs the session to lazily acknowledge the delivery of messages. May result in both data "
+ "duplication and data loss while achieving the best throughput.");
public static final String JMS_SOURCE_DESTINATION_NAME = "jms.source.destination";
static final PropertyDescriptor ACKNOWLEDGEMENT_MODE = new PropertyDescriptor.Builder()
.name("Acknowledgement Mode")
.description("The JMS Acknowledgement Mode. Using Auto Acknowledge can cause messages to be lost on restart of NiFi but may provide "
+ "better performance than Client Acknowledge.")
.required(true)
.allowableValues(AUTO_ACK, CLIENT_ACK, DUPS_OK)
.defaultValue(CLIENT_ACK.getValue())
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles that are received from the JMS Destination are routed to this relationship")
@ -63,7 +93,14 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
private final static Set<Relationship> relationships;
private final static List<PropertyDescriptor> thisPropertyDescriptors;
static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.addAll(propertyDescriptors);
_propertyDescriptors.add(ACKNOWLEDGEMENT_MODE);
thisPropertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS);
relationships = Collections.unmodifiableSet(_relationships);
@ -77,33 +114,41 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
* 'success' {@link Relationship}.
*/
@Override
protected void rendezvousWithJms(ProcessContext context, ProcessSession processSession) throws ProcessException {
protected void rendezvousWithJms(final ProcessContext context, final ProcessSession processSession) throws ProcessException {
final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
final JMSResponse response = this.targetResource.consume(destinationName);
if (response != null){
FlowFile flowFile = processSession.create();
flowFile = processSession.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(response.getMessageBody());
this.targetResource.consume(destinationName, new ConsumerCallback(){
@Override
public void accept(final JMSResponse response) {
if (response != null){
FlowFile flowFile = processSession.create();
flowFile = processSession.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(response.getMessageBody());
}
});
Map<String, Object> jmsHeaders = response.getMessageHeaders();
Map<String, Object> jmsProperties = Collections.<String, Object>unmodifiableMap(response.getMessageProperties());
flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, processSession);
flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, processSession);
flowFile = processSession.putAttribute(flowFile, JMS_SOURCE_DESTINATION_NAME, destinationName);
processSession.getProvenanceReporter().receive(flowFile, destinationName);
processSession.transfer(flowFile, REL_SUCCESS);
processSession.commit();
} else {
context.yield();
}
});
Map<String, Object> jmsHeaders = response.getMessageHeaders();
Map<String, Object> jmsProperties = Collections.<String, Object>unmodifiableMap(response.getMessageProperties());
flowFile = this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, processSession);
flowFile = this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, processSession);
processSession.getProvenanceReporter().receive(flowFile, context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue());
processSession.transfer(flowFile, REL_SUCCESS);
} else {
context.yield();
}
}
});
}
/**
* Will create an instance of {@link JMSConsumer}
*/
@Override
protected JMSConsumer finishBuildingTargetResource(JmsTemplate jmsTemplate) {
protected JMSConsumer finishBuildingTargetResource(JmsTemplate jmsTemplate, ProcessContext processContext) {
int ackMode = processContext.getProperty(ACKNOWLEDGEMENT_MODE).asInteger();
jmsTemplate.setSessionAcknowledgeMode(ackMode);
return new JMSConsumer(jmsTemplate, this.getLogger());
}
@ -115,19 +160,25 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
return relationships;
}
/**
*
*/
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return thisPropertyDescriptors;
}
/**
* Copies JMS attributes (i.e., headers and properties) as FF attributes.
* Given that FF attributes mandate that values are of type String, the
* copied values of JMS attributes will be "stringified" via
* String.valueOf(attribute).
*/
private FlowFile updateFlowFileAttributesWithJMSAttributes(Map<String, Object> jmsAttributes, FlowFile flowFile,
ProcessSession processSession) {
private FlowFile updateFlowFileAttributesWithJMSAttributes(Map<String, Object> jmsAttributes, FlowFile flowFile, ProcessSession processSession) {
Map<String, String> attributes = new HashMap<String, String>();
for (Entry<String, Object> entry : jmsAttributes.entrySet()) {
attributes.put(entry.getKey(), String.valueOf(entry.getValue()));
}
attributes.put(JMS_SOURCE_DESTINATION_NAME, this.destinationName);
flowFile = processSession.putAllAttributes(flowFile, attributes);
return flowFile;
}

View File

@ -25,23 +25,23 @@ import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.nifi.logging.ComponentLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.SessionCallback;
import org.springframework.jms.support.JmsHeaders;
import org.springframework.jms.support.JmsUtils;
/**
* Generic consumer of messages from JMS compliant messaging system.
*/
final class JMSConsumer extends JMSWorker {
private final static Logger logger = LoggerFactory.getLogger(JMSConsumer.class);
/**
* Creates an instance of this consumer
*
@ -52,8 +52,8 @@ final class JMSConsumer extends JMSWorker {
*/
JMSConsumer(JmsTemplate jmsTemplate, ComponentLog processLog) {
super(jmsTemplate, processLog);
if (logger.isInfoEnabled()) {
logger.info("Created Message Consumer for '" + jmsTemplate.toString() + "'.");
if (this.processLog.isInfoEnabled()) {
this.processLog.info("Created Message Consumer for '" + jmsTemplate.toString() + "'.");
}
}
@ -61,27 +61,51 @@ final class JMSConsumer extends JMSWorker {
/**
*
*/
public JMSResponse consume(String destinationName) {
Message message = this.jmsTemplate.receive(destinationName);
if (message != null) {
byte[] messageBody = null;
try {
if (message instanceof TextMessage) {
messageBody = MessageBodyToBytesConverter.toBytes((TextMessage) message);
} else if (message instanceof BytesMessage) {
messageBody = MessageBodyToBytesConverter.toBytes((BytesMessage) message);
} else {
throw new UnsupportedOperationException("Message type other then TextMessage and BytesMessage are "
+ "not supported at the moment");
public void consume(final String destinationName, final ConsumerCallback consumerCallback) {
this.jmsTemplate.execute(new SessionCallback<Void>() {
@Override
public Void doInJms(Session session) throws JMSException {
/*
* We need to call recover to ensure that in in the event of
* abrupt end or exception the current session will stop message
* delivery and restarts with the oldest unacknowledged message
*/
session.recover();
Destination destination = JMSConsumer.this.jmsTemplate.getDestinationResolver().resolveDestinationName(
session, destinationName, JMSConsumer.this.jmsTemplate.isPubSubDomain());
MessageConsumer msgConsumer = session.createConsumer(destination, null,
JMSConsumer.this.jmsTemplate.isPubSubDomain());
Message message = msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout());
JMSResponse response = null;
try {
if (message != null) {
byte[] messageBody = null;
if (message instanceof TextMessage) {
messageBody = MessageBodyToBytesConverter.toBytes((TextMessage) message);
} else if (message instanceof BytesMessage) {
messageBody = MessageBodyToBytesConverter.toBytes((BytesMessage) message);
} else {
throw new IllegalStateException("Message type other then TextMessage and BytesMessage are "
+ "not supported at the moment");
}
Map<String, Object> messageHeaders = extractMessageHeaders(message);
Map<String, String> messageProperties = extractMessageProperties(message);
response = new JMSResponse(messageBody, messageHeaders, messageProperties);
}
// invoke the processor callback (regardless if it's null,
// so the processor can yield) as part of this inJMS call
// and ACK message *only* after its successful invocation
// and if CLIENT_ACKNOWLEDGE is set.
consumerCallback.accept(response);
if (message != null && session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) {
message.acknowledge();
}
} finally {
JmsUtils.closeMessageConsumer(msgConsumer);
}
Map<String, Object> messageHeaders = this.extractMessageHeaders(message);
Map<String, String> messageProperties = this.extractMessageProperties(message);
return new JMSResponse(messageBody, messageHeaders, messageProperties);
} catch (Exception e) {
throw new IllegalStateException(e);
return null;
}
}
return null;
}, true);
}
/**
@ -97,7 +121,6 @@ final class JMSConsumer extends JMSWorker {
properties.put(propertyName, String.valueOf(message.getObjectProperty(propertyName)));
}
} catch (JMSException e) {
logger.warn("Failed to extract message properties", e);
this.processLog.warn("Failed to extract message properties", e);
}
return properties;
@ -145,7 +168,6 @@ final class JMSConsumer extends JMSWorker {
destinationName = (destination instanceof Queue) ? ((Queue) destination).getQueueName()
: ((Topic) destination).getTopicName();
} catch (JMSException e) {
logger.warn("Failed to retrieve Destination name for '" + headerName + "' header", e);
this.processLog.warn("Failed to retrieve Destination name for '" + headerName + "' header", e);
}
}
@ -180,4 +202,12 @@ final class JMSConsumer extends JMSWorker {
return messageProperties;
}
}
/**
* Callback to be invoked while executing inJMS call (the call within the
* live JMS session)
*/
static interface ConsumerCallback {
void accept(JMSResponse response);
}
}

View File

@ -74,7 +74,7 @@ final class JMSPublisher extends JMSWorker {
* @param flowFileAttributes
* Map representing {@link FlowFile} attributes.
*/
void publish(final String destinationName, final byte[] messageBytes, Map<String, String> flowFileAttributes) {
void publish(final String destinationName, final byte[] messageBytes, final Map<String, String> flowFileAttributes) {
this.jmsTemplate.send(destinationName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {

View File

@ -122,7 +122,7 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
* Will create an instance of {@link JMSPublisher}
*/
@Override
protected JMSPublisher finishBuildingTargetResource(JmsTemplate jmsTemplate) {
protected JMSPublisher finishBuildingTargetResource(JmsTemplate jmsTemplate, ProcessContext processContext) {
return new JMSPublisher(jmsTemplate, this.getLogger());
}

View File

@ -21,6 +21,9 @@ import static org.junit.Assert.assertTrue;
import java.util.Iterator;
import java.util.ServiceLoader;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.nifi.processor.Processor;
import org.junit.Test;
@ -49,12 +52,13 @@ public class CommonTest {
}
static JmsTemplate buildJmsTemplateForDestination(boolean pubSub) {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"vm://localhost?broker.persistent=false");
CachingConnectionFactory cf = new CachingConnectionFactory(connectionFactory);
connectionFactory = new CachingConnectionFactory(connectionFactory);
JmsTemplate jmsTemplate = new JmsTemplate(cf);
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
jmsTemplate.setPubSubDomain(pubSub);
jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return jmsTemplate;
}
}

View File

@ -20,8 +20,10 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
@ -30,6 +32,7 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.nifi.jms.processors.JMSConsumer.ConsumerCallback;
import org.apache.nifi.jms.processors.JMSConsumer.JMSResponse;
import org.apache.nifi.logging.ComponentLog;
import org.junit.Test;
@ -41,7 +44,7 @@ import org.springframework.jms.support.JmsHeaders;
public class JMSPublisherConsumerTest {
@Test
public void validateByesConvertedToBytesMessageOnSend() throws Exception {
public void validateBytesConvertedToBytesMessageOnSend() throws Exception {
final String destinationName = "testQueue";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
@ -97,7 +100,12 @@ public class JMSPublisherConsumerTest {
JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class));
try {
consumer.consume(destinationName);
consumer.consume(destinationName, new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
// noop
}
});
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
@ -120,12 +128,90 @@ public class JMSPublisherConsumerTest {
});
JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class));
JMSResponse response = consumer.consume(destinationName);
assertEquals("hello from the other side", new String(response.getMessageBody()));
assertEquals("fooQueue", response.getMessageHeaders().get(JmsHeaders.REPLY_TO));
assertEquals("foo", response.getMessageProperties().get("foo"));
assertEquals("false", response.getMessageProperties().get("bar"));
final AtomicBoolean callbackInvoked = new AtomicBoolean();
consumer.consume(destinationName, new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
callbackInvoked.set(true);
assertEquals("hello from the other side", new String(response.getMessageBody()));
assertEquals("fooQueue", response.getMessageHeaders().get(JmsHeaders.REPLY_TO));
assertEquals("foo", response.getMessageProperties().get("foo"));
assertEquals("false", response.getMessageProperties().get("bar"));
}
});
assertTrue(callbackInvoked.get());
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
@Test
public void validateMessageRedeliveryWhenNotAcked() throws Exception {
String destinationName = "testQueue";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
JMSPublisher publisher = new JMSPublisher(jmsTemplate, mock(ComponentLog.class));
publisher.publish(destinationName, "1".getBytes(StandardCharsets.UTF_8));
publisher.publish(destinationName, "2".getBytes(StandardCharsets.UTF_8));
JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class));
final AtomicBoolean callbackInvoked = new AtomicBoolean();
try {
consumer.consume(destinationName, new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
callbackInvoked.set(true);
assertEquals("1", new String(response.getMessageBody()));
throw new RuntimeException("intentional to avoid explicit ack");
}
});
} catch (Exception e) {
// ignore
}
assertTrue(callbackInvoked.get());
callbackInvoked.set(false);
// should receive the same message, but will process it successfully
try {
consumer.consume(destinationName, new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
callbackInvoked.set(true);
assertEquals("1", new String(response.getMessageBody()));
}
});
} catch (Exception e) {
// ignore
}
assertTrue(callbackInvoked.get());
callbackInvoked.set(false);
// receiving next message and fail again
try {
consumer.consume(destinationName, new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
callbackInvoked.set(true);
assertEquals("2", new String(response.getMessageBody()));
throw new RuntimeException("intentional to avoid explicit ack");
}
});
} catch (Exception e) {
// ignore
}
assertTrue(callbackInvoked.get());
callbackInvoked.set(false);
// should receive the same message, but will process it successfully
try {
consumer.consume(destinationName, new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
callbackInvoked.set(true);
assertEquals("2", new String(response.getMessageBody()));
}
});
} catch (Exception e) {
// ignore
}
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}