NIFI-5921 - Added property to allow a user to define a timeout on the ConsumeJMS processor

Co-Authored-By: rwhittington <ryan.whittington@gmail.com>
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #3240.
This commit is contained in:
Ryan Whittington 2019-01-02 16:17:06 +00:00 committed by Pierre Villard
parent 6fa5deafc2
commit cd91197a45
No known key found for this signature in database
GPG Key ID: BEE1599F0726E9CD
3 changed files with 21 additions and 10 deletions

View File

@ -48,6 +48,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* Consuming JMS processor which upon each invocation of
@ -128,6 +129,14 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
.name("Timeout")
.description("How long to wait to consume a message from the remote broker before giving up.")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("0 sec")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@ -154,6 +163,7 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
_propertyDescriptors.add(DURABLE_SUBSCRIBER);
_propertyDescriptors.add(SHARED_SUBSCRIBER);
_propertyDescriptors.add(SUBSCRIPTION_NAME);
_propertyDescriptors.add(TIMEOUT);
thisPropertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
Set<Relationship> _relationships = new HashSet<>();
@ -177,8 +187,9 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
final boolean shared = sharedBoolean == null ? false : sharedBoolean;
final String subscriptionName = context.getProperty(SUBSCRIPTION_NAME).evaluateAttributeExpressions().getValue();
final String charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
final long timeout = context.getProperty(TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
consumer.consume(destinationName, durable, shared, subscriptionName, charset, new ConsumerCallback() {
consumer.consume(destinationName, durable, shared, subscriptionName, charset, timeout, new ConsumerCallback() {
@Override
public void accept(final JMSResponse response) {
if (response == null) {

View File

@ -80,7 +80,7 @@ final class JMSConsumer extends JMSWorker {
}
public void consume(final String destinationName, final boolean durable, final boolean shared, final String subscriberName, final String charset,
public void consume(final String destinationName, final boolean durable, final boolean shared, final String subscriberName, final String charset, final long timeout,
final ConsumerCallback consumerCallback) {
this.jmsTemplate.execute(new SessionCallback<Void>() {
@Override
@ -88,7 +88,7 @@ final class JMSConsumer extends JMSWorker {
final MessageConsumer msgConsumer = createMessageConsumer(session, destinationName, durable, shared, subscriberName);
try {
final Message message = msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout());
final Message message = msgConsumer.receive(timeout);
JMSResponse response = null;
if (message != null) {

View File

@ -113,7 +113,7 @@ public class JMSPublisherConsumerIT {
});
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
consumer.consume(destinationName, false, false, null, "UTF-8", 1000, new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
// noop
@ -143,7 +143,7 @@ public class JMSPublisherConsumerIT {
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
final AtomicBoolean callbackInvoked = new AtomicBoolean();
consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
consumer.consume(destinationName, false, false, null, "UTF-8", 1000, new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
callbackInvoked.set(true);
@ -190,7 +190,7 @@ public class JMSPublisherConsumerIT {
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) consumeTemplate.getConnectionFactory(), consumeTemplate, mock(ComponentLog.class));
for (int j = 0; j < 1000 && msgCount.get() < 4000; j++) {
consumer.consume(destinationName, false, false, null, "UTF-8", callback);
consumer.consume(destinationName, false, false, null, "UTF-8", 1000, callback);
}
} finally {
((CachingConnectionFactory) consumeTemplate.getConnectionFactory()).destroy();
@ -229,7 +229,7 @@ public class JMSPublisherConsumerIT {
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
final AtomicBoolean callbackInvoked = new AtomicBoolean();
try {
consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
consumer.consume(destinationName, false, false, null, "UTF-8", 1000, new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
callbackInvoked.set(true);
@ -246,7 +246,7 @@ public class JMSPublisherConsumerIT {
// should receive the same message, but will process it successfully
while (!callbackInvoked.get()) {
consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
consumer.consume(destinationName, false, false, null, "UTF-8", 1000, new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
if (response == null) {
@ -265,7 +265,7 @@ public class JMSPublisherConsumerIT {
// receiving next message and fail again
try {
while (!callbackInvoked.get()) {
consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
consumer.consume(destinationName, false, false, null, "UTF-8", 1000, new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
if (response == null) {
@ -287,7 +287,7 @@ public class JMSPublisherConsumerIT {
// should receive the same message, but will process it successfully
try {
while (!callbackInvoked.get()) {
consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
consumer.consume(destinationName, false, false, null, "UTF-8", 1000, new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
if (response == null) {