diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java index d5b704b6c6..39118a75cb 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java @@ -79,6 +79,15 @@ abstract class AbstractJMSProcessor extends AbstractProcess .defaultValue(QUEUE) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + static final PropertyDescriptor CLIENT_ID = new PropertyDescriptor.Builder() + .name("Connection Client ID") + .description("The client id to be set on the connection, if set. For durable non shared consumer this is mandatory, " + + "for all others it is optional, typically with shared consumers it is undesirable to be set. " + + "Please see JMS spec for further details") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); static final PropertyDescriptor SESSION_CACHE_SIZE = new PropertyDescriptor.Builder() .name("Session Cache size") .description("The maximum limit for the number of cached Sessions.") @@ -87,6 +96,7 @@ abstract class AbstractJMSProcessor extends AbstractProcess .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) .build(); + // ConnectionFactoryProvider ControllerService static final PropertyDescriptor CF_SERVICE = new PropertyDescriptor.Builder() .name("Connection Factory Service") @@ -107,6 +117,7 @@ abstract class AbstractJMSProcessor extends AbstractProcess propertyDescriptors.add(DESTINATION_TYPE); propertyDescriptors.add(USER); propertyDescriptors.add(PASSWORD); + propertyDescriptors.add(CLIENT_ID); propertyDescriptors.add(SESSION_CACHE_SIZE); } @@ -196,7 +207,10 @@ abstract class AbstractJMSProcessor extends AbstractProcess this.cachingConnectionFactory = new CachingConnectionFactory(cfCredentialsAdapter); this.cachingConnectionFactory.setSessionCacheSize(Integer.parseInt(context.getProperty(SESSION_CACHE_SIZE).getValue())); - + String clientId = context.getProperty(CLIENT_ID).evaluateAttributeExpressions().getValue(); + if (clientId != null) { + this.cachingConnectionFactory.setClientId(clientId); + } JmsTemplate jmsTemplate = new JmsTemplate(); jmsTemplate.setConnectionFactory(this.cachingConnectionFactory); jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue())); diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java index d85d26f726..87743974c7 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java @@ -45,6 +45,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; import org.springframework.jms.core.JmsTemplate; /** @@ -86,6 +87,34 @@ public class ConsumeJMS extends AbstractJMSProcessor { .defaultValue(CLIENT_ACK.getValue()) .build(); + static final PropertyDescriptor DURABLE_SUBSCRIBER = new PropertyDescriptor.Builder() + .name("Durable subscription") + .description("If destination is Topic if present then make it the consumer durable. " + + "@see https://docs.oracle.com/javaee/7/api/javax/jms/Session.html#createDurableConsumer-javax.jms.Topic-java.lang.String-") + .required(false) + .expressionLanguageSupported(true) + .defaultValue("false") + .allowableValues("true", "false") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + static final PropertyDescriptor SHARED_SUBSCRIBER = new PropertyDescriptor.Builder() + .name("Shared subscription") + .description("If destination is Topic if present then make it the consumer shared. " + + "@see https://docs.oracle.com/javaee/7/api/javax/jms/Session.html#createSharedConsumer-javax.jms.Topic-java.lang.String-") + .required(false) + .expressionLanguageSupported(true) + .defaultValue("false") + .allowableValues("true", "false") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + static final PropertyDescriptor SUBSCRIPTION_NAME = new PropertyDescriptor.Builder() + .name("Subscription Name") + .description("The name of the subscription to use if destination is Topic and is shared or durable.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("All FlowFiles that are received from the JMS Destination are routed to this relationship") @@ -99,6 +128,9 @@ public class ConsumeJMS extends AbstractJMSProcessor { List _propertyDescriptors = new ArrayList<>(); _propertyDescriptors.addAll(propertyDescriptors); _propertyDescriptors.add(ACKNOWLEDGEMENT_MODE); + _propertyDescriptors.add(DURABLE_SUBSCRIBER); + _propertyDescriptors.add(SHARED_SUBSCRIBER); + _propertyDescriptors.add(SUBSCRIPTION_NAME); thisPropertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); Set _relationships = new HashSet<>(); @@ -116,7 +148,12 @@ public class ConsumeJMS extends AbstractJMSProcessor { @Override protected void rendezvousWithJms(final ProcessContext context, final ProcessSession processSession) throws ProcessException { final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue(); - this.targetResource.consume(destinationName, new ConsumerCallback(){ + final Boolean durableBoolean = context.getProperty(DURABLE_SUBSCRIBER).evaluateAttributeExpressions().asBoolean(); + final boolean durable = durableBoolean == null ? false : durableBoolean; + final Boolean sharedBoolean = context.getProperty(SHARED_SUBSCRIBER).evaluateAttributeExpressions().asBoolean(); + final boolean shared = sharedBoolean == null ? false : sharedBoolean; + final String subscriptionName = context.getProperty(SUBSCRIPTION_NAME).evaluateAttributeExpressions().getValue(); + this.targetResource.consume(destinationName, durable, shared, subscriptionName, new ConsumerCallback(){ @Override public void accept(final JMSResponse response) { if (response != null){ diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java index e955236d31..a4fc47a1c3 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java @@ -32,6 +32,7 @@ import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.exception.ProcessException; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.SessionCallback; import org.springframework.jms.support.JmsHeaders; @@ -61,7 +62,7 @@ final class JMSConsumer extends JMSWorker { /** * */ - public void consume(final String destinationName, final ConsumerCallback consumerCallback) { + public void consume(final String destinationName, final boolean durable, final boolean shared, final String subscriberName, final ConsumerCallback consumerCallback) { this.jmsTemplate.execute(new SessionCallback() { @Override public Void doInJms(Session session) throws JMSException { @@ -71,10 +72,31 @@ final class JMSConsumer extends JMSWorker { * delivery and restarts with the oldest unacknowledged message */ session.recover(); + boolean isPubSub = JMSConsumer.this.jmsTemplate.isPubSubDomain(); Destination destination = JMSConsumer.this.jmsTemplate.getDestinationResolver().resolveDestinationName( - session, destinationName, JMSConsumer.this.jmsTemplate.isPubSubDomain()); - MessageConsumer msgConsumer = session.createConsumer(destination, null, - JMSConsumer.this.jmsTemplate.isPubSubDomain()); + session, destinationName, isPubSub); + MessageConsumer msgConsumer; + if (isPubSub) { + if (shared) { + try { + if (durable) { + msgConsumer = session.createSharedDurableConsumer((Topic)destination, subscriberName); + } else { + msgConsumer = session.createSharedConsumer((Topic)destination, subscriberName); + } + } catch (AbstractMethodError e) { + throw new ProcessException("Failed to create a shared consumer. Make sure the target broker is JMS 2.0 compliant.", e); + } + } else { + if (durable) { + msgConsumer = session.createDurableConsumer((Topic)destination, subscriberName, null, JMSConsumer.this.jmsTemplate.isPubSubDomain()); + } else { + msgConsumer = session.createConsumer((Topic)destination, null, JMSConsumer.this.jmsTemplate.isPubSubDomain()); + } + } + } else { + msgConsumer = session.createConsumer(destination, null, JMSConsumer.this.jmsTemplate.isPubSubDomain()); + } Message message = msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout()); JMSResponse response = null; try { diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java index 3168443240..0f8dafbe33 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java @@ -100,7 +100,7 @@ public class JMSPublisherConsumerTest { JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class)); try { - consumer.consume(destinationName, new ConsumerCallback() { + consumer.consume(destinationName, false, false, null, new ConsumerCallback() { @Override public void accept(JMSResponse response) { // noop @@ -129,7 +129,7 @@ public class JMSPublisherConsumerTest { JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class)); final AtomicBoolean callbackInvoked = new AtomicBoolean(); - consumer.consume(destinationName, new ConsumerCallback() { + consumer.consume(destinationName, false, false, null, new ConsumerCallback() { @Override public void accept(JMSResponse response) { callbackInvoked.set(true); @@ -155,7 +155,7 @@ public class JMSPublisherConsumerTest { JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class)); final AtomicBoolean callbackInvoked = new AtomicBoolean(); try { - consumer.consume(destinationName, new ConsumerCallback() { + consumer.consume(destinationName, false, false, null, new ConsumerCallback() { @Override public void accept(JMSResponse response) { callbackInvoked.set(true); @@ -171,7 +171,7 @@ public class JMSPublisherConsumerTest { // should receive the same message, but will process it successfully try { - consumer.consume(destinationName, new ConsumerCallback() { + consumer.consume(destinationName, false, false, null, new ConsumerCallback() { @Override public void accept(JMSResponse response) { callbackInvoked.set(true); @@ -186,7 +186,7 @@ public class JMSPublisherConsumerTest { // receiving next message and fail again try { - consumer.consume(destinationName, new ConsumerCallback() { + consumer.consume(destinationName, false, false, null, new ConsumerCallback() { @Override public void accept(JMSResponse response) { callbackInvoked.set(true); @@ -202,7 +202,7 @@ public class JMSPublisherConsumerTest { // should receive the same message, but will process it successfully try { - consumer.consume(destinationName, new ConsumerCallback() { + consumer.consume(destinationName, false, false, null, new ConsumerCallback() { @Override public void accept(JMSResponse response) { callbackInvoked.set(true);