diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 16da77d8ef..a95b34b90c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -528,10 +528,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } } - protected void checkMessageListener() throws IllegalStateException { - if (messageListener != null) { - throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); - } + protected void checkMessageListener() throws JMSException { + session.checkMessageListener(); } private void beforeMessageIsConsumed(MessageDispatch md) { @@ -713,8 +711,17 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC if (listener != null && started.get()) { ActiveMQMessage message = createActiveMQMessage(md); beforeMessageIsConsumed(md); - listener.onMessage(message); - afterMessageIsConsumed(md, false); + try { + listener.onMessage(message); + afterMessageIsConsumed(md, false); + } catch (RuntimeException e) { + if ( session.isDupsOkAcknowledge() || session.isAutoAcknowledge() ) { + // Redeliver the message + } else { + // Transacted or Client ack: Deliver the next message. + afterMessageIsConsumed(md, false); + } + } } else { unconsumedMessages.enqueue(md); if (availableListener != null) { diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java index ffa718d17b..c77027a88f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -1666,5 +1666,16 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta return "ActiveMQSession {id="+info.getSessionId()+",started="+started.get()+"}"; } + public void checkMessageListener() throws JMSException { + if (messageListener != null) { + throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); + } + for (Iterator i = consumers.iterator(); i.hasNext();) { + ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next(); + if( consumer.getMessageListener()!=null ) { + throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); + } + } + } }