NIFI-3531 Moved session.recover in JMSConsumer to exceptional situations

This commit is contained in:
Mike Moser 2018-08-01 20:11:35 +00:00
parent b4894c5572
commit 8309747889

View File

@ -85,10 +85,6 @@ final class JMSConsumer extends JMSWorker {
this.jmsTemplate.execute(new SessionCallback<Void>() { this.jmsTemplate.execute(new SessionCallback<Void>() {
@Override @Override
public Void doInJms(final Session session) throws JMSException { public Void doInJms(final Session session) throws JMSException {
// We need to call recover to ensure that in in the event of
// abrupt end or exception the current session will stop message
// delivery and restarts with the oldest unacknowledged message
session.recover();
final MessageConsumer msgConsumer = createMessageConsumer(session, destinationName, durable, shared, subscriberName); final MessageConsumer msgConsumer = createMessageConsumer(session, destinationName, durable, shared, subscriberName);
try { try {
@ -126,6 +122,12 @@ final class JMSConsumer extends JMSWorker {
// and if CLIENT_ACKNOWLEDGE is set. // and if CLIENT_ACKNOWLEDGE is set.
consumerCallback.accept(response); consumerCallback.accept(response);
acknowledge(message, session); acknowledge(message, session);
} catch (JMSException e) {
// We need to call recover to ensure that in the event of
// abrupt end or exception the current session will stop message
// delivery and restart with the oldest unacknowledged message
session.recover();
throw e;
} finally { } finally {
JmsUtils.closeMessageConsumer(msgConsumer); JmsUtils.closeMessageConsumer(msgConsumer);
} }