From d86f77f462111a74673cc67e1a2bb799d0bcfd67 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Fri, 10 Feb 2006 17:06:46 +0000 Subject: [PATCH] Fixed failing ChangeSessionDeliveryModeTest test in the assembly module. git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@376766 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/ActiveMQMessageConsumer.java | 19 +++++++++++++------ .../org/apache/activemq/ActiveMQSession.java | 11 +++++++++++ 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 16da77d8ef..a95b34b90c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -528,10 +528,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } } - protected void checkMessageListener() throws IllegalStateException { - if (messageListener != null) { - throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); - } + protected void checkMessageListener() throws JMSException { + session.checkMessageListener(); } private void beforeMessageIsConsumed(MessageDispatch md) { @@ -713,8 +711,17 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC if (listener != null && started.get()) { ActiveMQMessage message = createActiveMQMessage(md); beforeMessageIsConsumed(md); - listener.onMessage(message); - afterMessageIsConsumed(md, false); + try { + listener.onMessage(message); + afterMessageIsConsumed(md, false); + } catch (RuntimeException e) { + if ( session.isDupsOkAcknowledge() || session.isAutoAcknowledge() ) { + // Redeliver the message + } else { + // Transacted or Client ack: Deliver the next message. + afterMessageIsConsumed(md, false); + } + } } else { unconsumedMessages.enqueue(md); if (availableListener != null) { diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java index ffa718d17b..c77027a88f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -1666,5 +1666,16 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta return "ActiveMQSession {id="+info.getSessionId()+",started="+started.get()+"}"; } + public void checkMessageListener() throws JMSException { + if (messageListener != null) { + throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); + } + for (Iterator i = consumers.iterator(); i.hasNext();) { + ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next(); + if( consumer.getMessageListener()!=null ) { + throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); + } + } + } }