NIFI-3983 - Support ability to make JMS 2.0 durable subscriptions on Topic

Add logic in Consumer adding support for all topic consumer combinations, non-durable, durable, shared, durable-shared.
Add new optional config option to supply subscription name.
Add new optional config option to supply clientId.

This closes #1863.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Michael Andre Pearce 2017-06-28 13:08:58 +01:00 committed by Koji Kawamura
parent 9bfa7469cb
commit bdfd710692
4 changed files with 85 additions and 12 deletions

View File

@ -79,6 +79,15 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
.defaultValue(QUEUE) .defaultValue(QUEUE)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();
static final PropertyDescriptor CLIENT_ID = new PropertyDescriptor.Builder()
.name("Connection Client ID")
.description("The client id to be set on the connection, if set. For durable non shared consumer this is mandatory, " +
"for all others it is optional, typically with shared consumers it is undesirable to be set. " +
"Please see JMS spec for further details")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
static final PropertyDescriptor SESSION_CACHE_SIZE = new PropertyDescriptor.Builder() static final PropertyDescriptor SESSION_CACHE_SIZE = new PropertyDescriptor.Builder()
.name("Session Cache size") .name("Session Cache size")
.description("The maximum limit for the number of cached Sessions.") .description("The maximum limit for the number of cached Sessions.")
@ -87,6 +96,7 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build(); .build();
// ConnectionFactoryProvider ControllerService // ConnectionFactoryProvider ControllerService
static final PropertyDescriptor CF_SERVICE = new PropertyDescriptor.Builder() static final PropertyDescriptor CF_SERVICE = new PropertyDescriptor.Builder()
.name("Connection Factory Service") .name("Connection Factory Service")
@ -107,6 +117,7 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
propertyDescriptors.add(DESTINATION_TYPE); propertyDescriptors.add(DESTINATION_TYPE);
propertyDescriptors.add(USER); propertyDescriptors.add(USER);
propertyDescriptors.add(PASSWORD); propertyDescriptors.add(PASSWORD);
propertyDescriptors.add(CLIENT_ID);
propertyDescriptors.add(SESSION_CACHE_SIZE); propertyDescriptors.add(SESSION_CACHE_SIZE);
} }
@ -196,7 +207,10 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
this.cachingConnectionFactory = new CachingConnectionFactory(cfCredentialsAdapter); this.cachingConnectionFactory = new CachingConnectionFactory(cfCredentialsAdapter);
this.cachingConnectionFactory.setSessionCacheSize(Integer.parseInt(context.getProperty(SESSION_CACHE_SIZE).getValue())); this.cachingConnectionFactory.setSessionCacheSize(Integer.parseInt(context.getProperty(SESSION_CACHE_SIZE).getValue()));
String clientId = context.getProperty(CLIENT_ID).evaluateAttributeExpressions().getValue();
if (clientId != null) {
this.cachingConnectionFactory.setClientId(clientId);
}
JmsTemplate jmsTemplate = new JmsTemplate(); JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(this.cachingConnectionFactory); jmsTemplate.setConnectionFactory(this.cachingConnectionFactory);
jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue())); jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue()));

View File

@ -45,6 +45,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.JmsTemplate;
/** /**
@ -86,6 +87,34 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
.defaultValue(CLIENT_ACK.getValue()) .defaultValue(CLIENT_ACK.getValue())
.build(); .build();
static final PropertyDescriptor DURABLE_SUBSCRIBER = new PropertyDescriptor.Builder()
.name("Durable subscription")
.description("If destination is Topic if present then make it the consumer durable. " +
"@see https://docs.oracle.com/javaee/7/api/javax/jms/Session.html#createDurableConsumer-javax.jms.Topic-java.lang.String-")
.required(false)
.expressionLanguageSupported(true)
.defaultValue("false")
.allowableValues("true", "false")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor SHARED_SUBSCRIBER = new PropertyDescriptor.Builder()
.name("Shared subscription")
.description("If destination is Topic if present then make it the consumer shared. " +
"@see https://docs.oracle.com/javaee/7/api/javax/jms/Session.html#createSharedConsumer-javax.jms.Topic-java.lang.String-")
.required(false)
.expressionLanguageSupported(true)
.defaultValue("false")
.allowableValues("true", "false")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor SUBSCRIPTION_NAME = new PropertyDescriptor.Builder()
.name("Subscription Name")
.description("The name of the subscription to use if destination is Topic and is shared or durable.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder() public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
.description("All FlowFiles that are received from the JMS Destination are routed to this relationship") .description("All FlowFiles that are received from the JMS Destination are routed to this relationship")
@ -99,6 +128,9 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>(); List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.addAll(propertyDescriptors); _propertyDescriptors.addAll(propertyDescriptors);
_propertyDescriptors.add(ACKNOWLEDGEMENT_MODE); _propertyDescriptors.add(ACKNOWLEDGEMENT_MODE);
_propertyDescriptors.add(DURABLE_SUBSCRIBER);
_propertyDescriptors.add(SHARED_SUBSCRIBER);
_propertyDescriptors.add(SUBSCRIPTION_NAME);
thisPropertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); thisPropertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
Set<Relationship> _relationships = new HashSet<>(); Set<Relationship> _relationships = new HashSet<>();
@ -116,7 +148,12 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
@Override @Override
protected void rendezvousWithJms(final ProcessContext context, final ProcessSession processSession) throws ProcessException { protected void rendezvousWithJms(final ProcessContext context, final ProcessSession processSession) throws ProcessException {
final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue(); final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
this.targetResource.consume(destinationName, new ConsumerCallback(){ final Boolean durableBoolean = context.getProperty(DURABLE_SUBSCRIBER).evaluateAttributeExpressions().asBoolean();
final boolean durable = durableBoolean == null ? false : durableBoolean;
final Boolean sharedBoolean = context.getProperty(SHARED_SUBSCRIBER).evaluateAttributeExpressions().asBoolean();
final boolean shared = sharedBoolean == null ? false : sharedBoolean;
final String subscriptionName = context.getProperty(SUBSCRIPTION_NAME).evaluateAttributeExpressions().getValue();
this.targetResource.consume(destinationName, durable, shared, subscriptionName, new ConsumerCallback(){
@Override @Override
public void accept(final JMSResponse response) { public void accept(final JMSResponse response) {
if (response != null){ if (response != null){

View File

@ -32,6 +32,7 @@ import javax.jms.TextMessage;
import javax.jms.Topic; import javax.jms.Topic;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.SessionCallback; import org.springframework.jms.core.SessionCallback;
import org.springframework.jms.support.JmsHeaders; import org.springframework.jms.support.JmsHeaders;
@ -61,7 +62,7 @@ final class JMSConsumer extends JMSWorker {
/** /**
* *
*/ */
public void consume(final String destinationName, final ConsumerCallback consumerCallback) { public void consume(final String destinationName, final boolean durable, final boolean shared, final String subscriberName, final ConsumerCallback consumerCallback) {
this.jmsTemplate.execute(new SessionCallback<Void>() { this.jmsTemplate.execute(new SessionCallback<Void>() {
@Override @Override
public Void doInJms(Session session) throws JMSException { public Void doInJms(Session session) throws JMSException {
@ -71,10 +72,31 @@ final class JMSConsumer extends JMSWorker {
* delivery and restarts with the oldest unacknowledged message * delivery and restarts with the oldest unacknowledged message
*/ */
session.recover(); session.recover();
boolean isPubSub = JMSConsumer.this.jmsTemplate.isPubSubDomain();
Destination destination = JMSConsumer.this.jmsTemplate.getDestinationResolver().resolveDestinationName( Destination destination = JMSConsumer.this.jmsTemplate.getDestinationResolver().resolveDestinationName(
session, destinationName, JMSConsumer.this.jmsTemplate.isPubSubDomain()); session, destinationName, isPubSub);
MessageConsumer msgConsumer = session.createConsumer(destination, null, MessageConsumer msgConsumer;
JMSConsumer.this.jmsTemplate.isPubSubDomain()); if (isPubSub) {
if (shared) {
try {
if (durable) {
msgConsumer = session.createSharedDurableConsumer((Topic)destination, subscriberName);
} else {
msgConsumer = session.createSharedConsumer((Topic)destination, subscriberName);
}
} catch (AbstractMethodError e) {
throw new ProcessException("Failed to create a shared consumer. Make sure the target broker is JMS 2.0 compliant.", e);
}
} else {
if (durable) {
msgConsumer = session.createDurableConsumer((Topic)destination, subscriberName, null, JMSConsumer.this.jmsTemplate.isPubSubDomain());
} else {
msgConsumer = session.createConsumer((Topic)destination, null, JMSConsumer.this.jmsTemplate.isPubSubDomain());
}
}
} else {
msgConsumer = session.createConsumer(destination, null, JMSConsumer.this.jmsTemplate.isPubSubDomain());
}
Message message = msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout()); Message message = msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout());
JMSResponse response = null; JMSResponse response = null;
try { try {

View File

@ -100,7 +100,7 @@ public class JMSPublisherConsumerTest {
JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class)); JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class));
try { try {
consumer.consume(destinationName, new ConsumerCallback() { consumer.consume(destinationName, false, false, null, new ConsumerCallback() {
@Override @Override
public void accept(JMSResponse response) { public void accept(JMSResponse response) {
// noop // noop
@ -129,7 +129,7 @@ public class JMSPublisherConsumerTest {
JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class)); JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class));
final AtomicBoolean callbackInvoked = new AtomicBoolean(); final AtomicBoolean callbackInvoked = new AtomicBoolean();
consumer.consume(destinationName, new ConsumerCallback() { consumer.consume(destinationName, false, false, null, new ConsumerCallback() {
@Override @Override
public void accept(JMSResponse response) { public void accept(JMSResponse response) {
callbackInvoked.set(true); callbackInvoked.set(true);
@ -155,7 +155,7 @@ public class JMSPublisherConsumerTest {
JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class)); JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class));
final AtomicBoolean callbackInvoked = new AtomicBoolean(); final AtomicBoolean callbackInvoked = new AtomicBoolean();
try { try {
consumer.consume(destinationName, new ConsumerCallback() { consumer.consume(destinationName, false, false, null, new ConsumerCallback() {
@Override @Override
public void accept(JMSResponse response) { public void accept(JMSResponse response) {
callbackInvoked.set(true); callbackInvoked.set(true);
@ -171,7 +171,7 @@ public class JMSPublisherConsumerTest {
// should receive the same message, but will process it successfully // should receive the same message, but will process it successfully
try { try {
consumer.consume(destinationName, new ConsumerCallback() { consumer.consume(destinationName, false, false, null, new ConsumerCallback() {
@Override @Override
public void accept(JMSResponse response) { public void accept(JMSResponse response) {
callbackInvoked.set(true); callbackInvoked.set(true);
@ -186,7 +186,7 @@ public class JMSPublisherConsumerTest {
// receiving next message and fail again // receiving next message and fail again
try { try {
consumer.consume(destinationName, new ConsumerCallback() { consumer.consume(destinationName, false, false, null, new ConsumerCallback() {
@Override @Override
public void accept(JMSResponse response) { public void accept(JMSResponse response) {
callbackInvoked.set(true); callbackInvoked.set(true);
@ -202,7 +202,7 @@ public class JMSPublisherConsumerTest {
// should receive the same message, but will process it successfully // should receive the same message, but will process it successfully
try { try {
consumer.consume(destinationName, new ConsumerCallback() { consumer.consume(destinationName, false, false, null, new ConsumerCallback() {
@Override @Override
public void accept(JMSResponse response) { public void accept(JMSResponse response) {
callbackInvoked.set(true); callbackInvoked.set(true);