ARTEMIS-4520 JMSContext.acknowledge() doesn't work if last message received is null

This commit is contained in:
Justin Bertram 2023-11-30 11:39:22 -06:00 committed by clebertsuconic
parent 3bdef0e8e1
commit c858323f07
3 changed files with 53 additions and 9 deletions

View File

@ -63,7 +63,7 @@ public class ActiveMQJMSConsumer implements JMSConsumer {
@Override
public Message receive() {
try {
return context.setLastMessage(this, consumer.receive());
return context.setLastMessage(consumer.receive());
} catch (JMSException e) {
throw JmsExceptionUtils.convertToRuntimeException(e);
}
@ -72,7 +72,7 @@ public class ActiveMQJMSConsumer implements JMSConsumer {
@Override
public Message receive(long timeout) {
try {
return context.setLastMessage(this, consumer.receive(timeout));
return context.setLastMessage(consumer.receive(timeout));
} catch (JMSException e) {
throw JmsExceptionUtils.convertToRuntimeException(e);
}
@ -81,7 +81,7 @@ public class ActiveMQJMSConsumer implements JMSConsumer {
@Override
public Message receiveNoWait() {
try {
return context.setLastMessage(this, consumer.receiveNoWait());
return context.setLastMessage(consumer.receiveNoWait());
} catch (JMSException e) {
throw JmsExceptionUtils.convertToRuntimeException(e);
}
@ -100,7 +100,7 @@ public class ActiveMQJMSConsumer implements JMSConsumer {
public <T> T receiveBody(Class<T> c) {
try {
Message message = consumer.receive();
context.setLastMessage(ActiveMQJMSConsumer.this, message);
context.setLastMessage(message);
return message == null ? null : message.getBody(c);
} catch (JMSException e) {
throw JmsExceptionUtils.convertToRuntimeException(e);
@ -111,7 +111,7 @@ public class ActiveMQJMSConsumer implements JMSConsumer {
public <T> T receiveBody(Class<T> c, long timeout) {
try {
Message message = consumer.receive(timeout);
context.setLastMessage(ActiveMQJMSConsumer.this, message);
context.setLastMessage(message);
return message == null ? null : message.getBody(c);
} catch (JMSException e) {
throw JmsExceptionUtils.convertToRuntimeException(e);
@ -122,7 +122,7 @@ public class ActiveMQJMSConsumer implements JMSConsumer {
public <T> T receiveBodyNoWait(Class<T> c) {
try {
Message message = consumer.receiveNoWait();
context.setLastMessage(ActiveMQJMSConsumer.this, message);
context.setLastMessage(message);
return message == null ? null : message.getBody(c);
} catch (JMSException e) {
throw JmsExceptionUtils.convertToRuntimeException(e);
@ -139,7 +139,7 @@ public class ActiveMQJMSConsumer implements JMSConsumer {
@Override
public void onMessage(Message message) {
context.setLastMessage(ActiveMQJMSConsumer.this, message);
context.setLastMessage(message);
context.getThreadAwareContext().setCurrentThread(false);
try {

View File

@ -582,8 +582,8 @@ public class ActiveMQJMSContext implements JMSContext {
/**
* this is to ensure Context.acknowledge would work on ClientACK
*/
Message setLastMessage(final JMSConsumer consumer, final Message lastMessageReceived) {
if (sessionMode == CLIENT_ACKNOWLEDGE) {
Message setLastMessage(final Message lastMessageReceived) {
if (sessionMode == CLIENT_ACKNOWLEDGE && lastMessageReceived != null) {
lastMessagesWaitingAck = lastMessageReceived;
}
return lastMessageReceived;

View File

@ -511,6 +511,50 @@ public class JmsContextTest extends JMSTestBase {
}
}
@Test
public void receiveNullAckTest() throws Exception {
// Create JMSContext with CLIENT_ACKNOWLEDGE
try (JMSContext context = cf.createContext(JMSContext.CLIENT_ACKNOWLEDGE)) {
int numMessages = 10;
TextMessage textMessage = null;
// Create JMSConsumer from JMSContext
JMSConsumer consumer = context.createConsumer(queue1);
// Create JMSProducer from JMSContext
JMSProducer producer = context.createProducer();
// send messages
for (int i = 0; i < numMessages; i++) {
String message = "text message " + i;
textMessage = context.createTextMessage(message);
textMessage.setStringProperty("COM_SUN_JMS_TESTNAME", "recoverAckTest" + i);
producer.send(queue1, textMessage);
}
// receive messages but do not acknowledge
for (int i = 0; i < numMessages; i++) {
textMessage = (TextMessage) consumer.receive(5000);
assertNotNull(textMessage);
}
assertNull(consumer.receiveNoWait());
// Acknowledge all messages
context.acknowledge();
}
// doing this check with another context / consumer to make sure it was acked.
try (JMSContext context = cf.createContext(JMSContext.CLIENT_ACKNOWLEDGE)) {
// Create JMSConsumer from JMSContext
JMSConsumer consumer = context.createConsumer(queue1);
assertNull(consumer.receiveNoWait());
}
}
@Test
public void bytesMessage() throws Exception {
context = cf.createContext();