mirror of https://github.com/apache/activemq.git
Fixed failing ChangeSessionDeliveryModeTest test in the assembly module.
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@376766 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ffd4756740
commit
d86f77f462
|
@ -528,10 +528,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void checkMessageListener() throws IllegalStateException {
|
protected void checkMessageListener() throws JMSException {
|
||||||
if (messageListener != null) {
|
session.checkMessageListener();
|
||||||
throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void beforeMessageIsConsumed(MessageDispatch md) {
|
private void beforeMessageIsConsumed(MessageDispatch md) {
|
||||||
|
@ -713,8 +711,17 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
if (listener != null && started.get()) {
|
if (listener != null && started.get()) {
|
||||||
ActiveMQMessage message = createActiveMQMessage(md);
|
ActiveMQMessage message = createActiveMQMessage(md);
|
||||||
beforeMessageIsConsumed(md);
|
beforeMessageIsConsumed(md);
|
||||||
listener.onMessage(message);
|
try {
|
||||||
afterMessageIsConsumed(md, false);
|
listener.onMessage(message);
|
||||||
|
afterMessageIsConsumed(md, false);
|
||||||
|
} catch (RuntimeException e) {
|
||||||
|
if ( session.isDupsOkAcknowledge() || session.isAutoAcknowledge() ) {
|
||||||
|
// Redeliver the message
|
||||||
|
} else {
|
||||||
|
// Transacted or Client ack: Deliver the next message.
|
||||||
|
afterMessageIsConsumed(md, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
unconsumedMessages.enqueue(md);
|
unconsumedMessages.enqueue(md);
|
||||||
if (availableListener != null) {
|
if (availableListener != null) {
|
||||||
|
|
|
@ -1666,5 +1666,16 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
return "ActiveMQSession {id="+info.getSessionId()+",started="+started.get()+"}";
|
return "ActiveMQSession {id="+info.getSessionId()+",started="+started.get()+"}";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void checkMessageListener() throws JMSException {
|
||||||
|
if (messageListener != null) {
|
||||||
|
throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
|
||||||
|
}
|
||||||
|
for (Iterator i = consumers.iterator(); i.hasNext();) {
|
||||||
|
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
|
||||||
|
if( consumer.getMessageListener()!=null ) {
|
||||||
|
throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue