NIFI-7671: Support Message Selector in ConsumeJMS processor

Also fixed some display names and variable names.

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4432.
This commit is contained in:
Peter Turcsanyi 2020-07-27 01:44:21 +02:00 committed by Pierre Villard
parent 7d20c03f89
commit cdd766d649
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
4 changed files with 69 additions and 18 deletions

View File

@ -109,6 +109,7 @@ public abstract class AbstractJMSProcessor<T extends JMSWorker> extends Abstract
.build();
static final PropertyDescriptor SESSION_CACHE_SIZE = new PropertyDescriptor.Builder()
.name("Session Cache size")
.displayName("Session Cache Size")
.description("This property is deprecated and no longer has any effect on the Processor. It will be removed in a later version.")
.required(false)
.defaultValue("1")

View File

@ -106,6 +106,15 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
public static final String JMS_SOURCE_DESTINATION_NAME = "jms.source.destination";
static final PropertyDescriptor MESSAGE_SELECTOR = new PropertyDescriptor.Builder()
.name("Message Selector")
.displayName("Message Selector")
.description("The JMS Message Selector to filter the messages that the processor will receive")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor ACKNOWLEDGEMENT_MODE = new PropertyDescriptor.Builder()
.name("Acknowledgement Mode")
.description("The JMS Acknowledgement Mode. Using Auto Acknowledge can cause messages to be lost on restart of NiFi but may provide "
@ -117,6 +126,7 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
static final PropertyDescriptor DURABLE_SUBSCRIBER = new PropertyDescriptor.Builder()
.name("Durable subscription")
.displayName("Durable Subscription")
.description("If destination is Topic if present then make it the consumer durable. " +
"@see https://docs.oracle.com/javaee/7/api/javax/jms/Session.html#createDurableConsumer-javax.jms.Topic-java.lang.String-")
.required(false)
@ -127,6 +137,7 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
.build();
static final PropertyDescriptor SHARED_SUBSCRIBER = new PropertyDescriptor.Builder()
.name("Shared subscription")
.displayName("Shared Subscription")
.description("If destination is Topic if present then make it the consumer shared. " +
"@see https://docs.oracle.com/javaee/7/api/javax/jms/Session.html#createSharedConsumer-javax.jms.Topic-java.lang.String-")
.required(false)
@ -174,6 +185,7 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
_propertyDescriptors.add(CF_SERVICE);
_propertyDescriptors.add(DESTINATION);
_propertyDescriptors.add(DESTINATION_TYPE);
_propertyDescriptors.add(MESSAGE_SELECTOR);
_propertyDescriptors.add(USER);
_propertyDescriptors.add(PASSWORD);
_propertyDescriptors.add(CLIENT_ID);
@ -252,10 +264,11 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
final boolean durable = isDurableSubscriber(context);
final boolean shared = isShared(context);
final String subscriptionName = context.getProperty(SUBSCRIPTION_NAME).evaluateAttributeExpressions().getValue();
final String messageSelector = context.getProperty(MESSAGE_SELECTOR).evaluateAttributeExpressions().getValue();
final String charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
try {
consumer.consume(destinationName, errorQueueName, durable, shared, subscriptionName, charset, new ConsumerCallback() {
consumer.consume(destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset, new ConsumerCallback() {
@Override
public void accept(final JMSResponse response) {
if (response == null) {

View File

@ -55,7 +55,8 @@ final class JMSConsumer extends JMSWorker {
}
private MessageConsumer createMessageConsumer(final Session session, final String destinationName, final boolean durable, final boolean shared, final String subscriberName) throws JMSException {
private MessageConsumer createMessageConsumer(final Session session, final String destinationName, final boolean durable, final boolean shared, final String subscriptionName,
final String messageSelector) throws JMSException {
final boolean isPubSub = JMSConsumer.this.jmsTemplate.isPubSubDomain();
final Destination destination = JMSConsumer.this.jmsTemplate.getDestinationResolver().resolveDestinationName(session, destinationName, isPubSub);
@ -63,33 +64,33 @@ final class JMSConsumer extends JMSWorker {
if (shared) {
try {
if (durable) {
return session.createSharedDurableConsumer((Topic) destination, subscriberName);
return session.createSharedDurableConsumer((Topic) destination, subscriptionName, messageSelector);
} else {
return session.createSharedConsumer((Topic) destination, subscriberName);
return session.createSharedConsumer((Topic) destination, subscriptionName, messageSelector);
}
} catch (AbstractMethodError e) {
throw new ProcessException("Failed to create a shared consumer. Make sure the target broker is JMS 2.0 compliant.", e);
}
} else {
if (durable) {
return session.createDurableConsumer((Topic) destination, subscriberName, null, JMSConsumer.this.jmsTemplate.isPubSubDomain());
return session.createDurableConsumer((Topic) destination, subscriptionName, messageSelector, JMSConsumer.this.jmsTemplate.isPubSubDomain());
} else {
return session.createConsumer(destination, null, JMSConsumer.this.jmsTemplate.isPubSubDomain());
return session.createConsumer(destination, messageSelector, JMSConsumer.this.jmsTemplate.isPubSubDomain());
}
}
} else {
return session.createConsumer(destination, null, JMSConsumer.this.jmsTemplate.isPubSubDomain());
return session.createConsumer(destination, messageSelector, JMSConsumer.this.jmsTemplate.isPubSubDomain());
}
}
public void consume(final String destinationName, String errorQueueName, final boolean durable, final boolean shared, final String subscriberName, final String charset,
final ConsumerCallback consumerCallback) {
public void consume(final String destinationName, String errorQueueName, final boolean durable, final boolean shared, final String subscriptionName, final String messageSelector,
final String charset, final ConsumerCallback consumerCallback) {
this.jmsTemplate.execute(new SessionCallback<Void>() {
@Override
public Void doInJms(final Session session) throws JMSException {
final MessageConsumer msgConsumer = createMessageConsumer(session, destinationName, durable, shared, subscriberName);
final MessageConsumer msgConsumer = createMessageConsumer(session, destinationName, durable, shared, subscriptionName, messageSelector);
try {
final Message message = msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout());
JMSResponse response = null;

View File

@ -198,7 +198,7 @@ public class JMSPublisherConsumerIT {
jmsTemplate.send(destinationName, messageCreator);
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
consumer.consume(destinationName, null, false, false, null, "UTF-8", response -> {
consumer.consume(destinationName, null, false, false, null, null, "UTF-8", response -> {
callbackInvoked.set(true);
responseChecker.accept(response);
});
@ -280,7 +280,7 @@ public class JMSPublisherConsumerIT {
});
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
consumer.consume(destinationName, null, false, false, null, "UTF-8", new ConsumerCallback() {
consumer.consume(destinationName, null, false, false, null, null, "UTF-8", new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
// noop
@ -310,7 +310,7 @@ public class JMSPublisherConsumerIT {
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
final AtomicBoolean callbackInvoked = new AtomicBoolean();
consumer.consume(destinationName, null, false, false, null, "UTF-8", new ConsumerCallback() {
consumer.consume(destinationName, null, false, false, null, null, "UTF-8", new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
callbackInvoked.set(true);
@ -357,7 +357,7 @@ public class JMSPublisherConsumerIT {
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) consumeTemplate.getConnectionFactory(), consumeTemplate, mock(ComponentLog.class));
for (int j = 0; j < 1000 && msgCount.get() < 4000; j++) {
consumer.consume(destinationName, null, false, false, null, "UTF-8", callback);
consumer.consume(destinationName, null, false, false, null, null, "UTF-8", callback);
}
} finally {
((CachingConnectionFactory) consumeTemplate.getConnectionFactory()).destroy();
@ -396,7 +396,7 @@ public class JMSPublisherConsumerIT {
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
final AtomicBoolean callbackInvoked = new AtomicBoolean();
try {
consumer.consume(destinationName, null, false, false, null, "UTF-8", new ConsumerCallback() {
consumer.consume(destinationName, null, false, false, null, null, "UTF-8", new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
callbackInvoked.set(true);
@ -413,7 +413,7 @@ public class JMSPublisherConsumerIT {
// should receive the same message, but will process it successfully
while (!callbackInvoked.get()) {
consumer.consume(destinationName, null, false, false, null, "UTF-8", new ConsumerCallback() {
consumer.consume(destinationName, null, false, false, null, null, "UTF-8", new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
if (response == null) {
@ -432,7 +432,7 @@ public class JMSPublisherConsumerIT {
// receiving next message and fail again
try {
while (!callbackInvoked.get()) {
consumer.consume(destinationName, null, false, false, null, "UTF-8", new ConsumerCallback() {
consumer.consume(destinationName, null, false, false, null, null, "UTF-8", new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
if (response == null) {
@ -454,7 +454,7 @@ public class JMSPublisherConsumerIT {
// should receive the same message, but will process it successfully
try {
while (!callbackInvoked.get()) {
consumer.consume(destinationName, null, false, false, null, "UTF-8", new ConsumerCallback() {
consumer.consume(destinationName, null, false, false, null, null, "UTF-8", new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
if (response == null) {
@ -473,4 +473,40 @@ public class JMSPublisherConsumerIT {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}
@Test
public void testMessageSelector() {
String destinationName = "testMessageSelector";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
String messageSelector = "prop = '1'";
try {
jmsTemplate.send(destinationName, session -> session.createTextMessage("msg0"));
jmsTemplate.send(destinationName, session -> {
TextMessage message = session.createTextMessage("msg1");
message.setStringProperty("prop", "1");
return message;
});
jmsTemplate.send(destinationName, session -> {
TextMessage message = session.createTextMessage("msg2");
message.setStringProperty("prop", "2");
return message;
});
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
AtomicBoolean callbackInvoked = new AtomicBoolean();
consumer.consume(destinationName, null, false, false, null, messageSelector, "UTF-8", new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
callbackInvoked.set(true);
assertEquals("msg1", new String(response.getMessageBody()));
}
});
assertTrue(callbackInvoked.get());
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}
}