NIFI-5921 - Timeout property for ConsumeJMS

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #3245.
This commit is contained in:
Pierre Villard 2019-01-04 10:51:19 +01:00
parent cd91197a45
commit 432ba8787f
No known key found for this signature in database
GPG Key ID: BEE1599F0726E9CD
4 changed files with 15 additions and 15 deletions

View File

@ -221,9 +221,6 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
jmsTemplate.setConnectionFactory(cachingFactory);
jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue()));
// set of properties that may be good candidates for exposure via configuration
jmsTemplate.setReceiveTimeout(1000);
return finishBuildingJmsWorker(cachingFactory, jmsTemplate, context);
}
}

View File

@ -134,7 +134,7 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
.description("How long to wait to consume a message from the remote broker before giving up.")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("0 sec")
.defaultValue("1 sec")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
@ -187,9 +187,8 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
final boolean shared = sharedBoolean == null ? false : sharedBoolean;
final String subscriptionName = context.getProperty(SUBSCRIPTION_NAME).evaluateAttributeExpressions().getValue();
final String charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
final long timeout = context.getProperty(TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
consumer.consume(destinationName, durable, shared, subscriptionName, charset, timeout, new ConsumerCallback() {
consumer.consume(destinationName, durable, shared, subscriptionName, charset, new ConsumerCallback() {
@Override
public void accept(final JMSResponse response) {
if (response == null) {
@ -220,6 +219,10 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
protected JMSConsumer finishBuildingJmsWorker(CachingConnectionFactory connectionFactory, JmsTemplate jmsTemplate, ProcessContext processContext) {
int ackMode = processContext.getProperty(ACKNOWLEDGEMENT_MODE).asInteger();
jmsTemplate.setSessionAcknowledgeMode(ackMode);
long timeout = processContext.getProperty(TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
jmsTemplate.setReceiveTimeout(timeout);
return new JMSConsumer(connectionFactory, jmsTemplate, this.getLogger());
}

View File

@ -80,7 +80,7 @@ final class JMSConsumer extends JMSWorker {
}
public void consume(final String destinationName, final boolean durable, final boolean shared, final String subscriberName, final String charset, final long timeout,
public void consume(final String destinationName, final boolean durable, final boolean shared, final String subscriberName, final String charset,
final ConsumerCallback consumerCallback) {
this.jmsTemplate.execute(new SessionCallback<Void>() {
@Override
@ -88,7 +88,7 @@ final class JMSConsumer extends JMSWorker {
final MessageConsumer msgConsumer = createMessageConsumer(session, destinationName, durable, shared, subscriberName);
try {
final Message message = msgConsumer.receive(timeout);
final Message message = msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout());
JMSResponse response = null;
if (message != null) {

View File

@ -113,7 +113,7 @@ public class JMSPublisherConsumerIT {
});
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
consumer.consume(destinationName, false, false, null, "UTF-8", 1000, new ConsumerCallback() {
consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
// noop
@ -143,7 +143,7 @@ public class JMSPublisherConsumerIT {
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
final AtomicBoolean callbackInvoked = new AtomicBoolean();
consumer.consume(destinationName, false, false, null, "UTF-8", 1000, new ConsumerCallback() {
consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
callbackInvoked.set(true);
@ -190,7 +190,7 @@ public class JMSPublisherConsumerIT {
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) consumeTemplate.getConnectionFactory(), consumeTemplate, mock(ComponentLog.class));
for (int j = 0; j < 1000 && msgCount.get() < 4000; j++) {
consumer.consume(destinationName, false, false, null, "UTF-8", 1000, callback);
consumer.consume(destinationName, false, false, null, "UTF-8", callback);
}
} finally {
((CachingConnectionFactory) consumeTemplate.getConnectionFactory()).destroy();
@ -229,7 +229,7 @@ public class JMSPublisherConsumerIT {
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
final AtomicBoolean callbackInvoked = new AtomicBoolean();
try {
consumer.consume(destinationName, false, false, null, "UTF-8", 1000, new ConsumerCallback() {
consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
callbackInvoked.set(true);
@ -246,7 +246,7 @@ public class JMSPublisherConsumerIT {
// should receive the same message, but will process it successfully
while (!callbackInvoked.get()) {
consumer.consume(destinationName, false, false, null, "UTF-8", 1000, new ConsumerCallback() {
consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
if (response == null) {
@ -265,7 +265,7 @@ public class JMSPublisherConsumerIT {
// receiving next message and fail again
try {
while (!callbackInvoked.get()) {
consumer.consume(destinationName, false, false, null, "UTF-8", 1000, new ConsumerCallback() {
consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
if (response == null) {
@ -287,7 +287,7 @@ public class JMSPublisherConsumerIT {
// should receive the same message, but will process it successfully
try {
while (!callbackInvoked.get()) {
consumer.consume(destinationName, false, false, null, "UTF-8", 1000, new ConsumerCallback() {
consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
if (response == null) {