diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQJMSConsumer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQJMSConsumer.java index e359f5b575..1e50bef053 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQJMSConsumer.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQJMSConsumer.java @@ -63,7 +63,7 @@ public class ActiveMQJMSConsumer implements JMSConsumer { @Override public Message receive() { try { - return context.setLastMessage(this, consumer.receive()); + return context.setLastMessage(consumer.receive()); } catch (JMSException e) { throw JmsExceptionUtils.convertToRuntimeException(e); } @@ -72,7 +72,7 @@ public class ActiveMQJMSConsumer implements JMSConsumer { @Override public Message receive(long timeout) { try { - return context.setLastMessage(this, consumer.receive(timeout)); + return context.setLastMessage(consumer.receive(timeout)); } catch (JMSException e) { throw JmsExceptionUtils.convertToRuntimeException(e); } @@ -81,7 +81,7 @@ public class ActiveMQJMSConsumer implements JMSConsumer { @Override public Message receiveNoWait() { try { - return context.setLastMessage(this, consumer.receiveNoWait()); + return context.setLastMessage(consumer.receiveNoWait()); } catch (JMSException e) { throw JmsExceptionUtils.convertToRuntimeException(e); } @@ -100,7 +100,7 @@ public class ActiveMQJMSConsumer implements JMSConsumer { public T receiveBody(Class c) { try { Message message = consumer.receive(); - context.setLastMessage(ActiveMQJMSConsumer.this, message); + context.setLastMessage(message); return message == null ? null : message.getBody(c); } catch (JMSException e) { throw JmsExceptionUtils.convertToRuntimeException(e); @@ -111,7 +111,7 @@ public class ActiveMQJMSConsumer implements JMSConsumer { public T receiveBody(Class c, long timeout) { try { Message message = consumer.receive(timeout); - context.setLastMessage(ActiveMQJMSConsumer.this, message); + context.setLastMessage(message); return message == null ? null : message.getBody(c); } catch (JMSException e) { throw JmsExceptionUtils.convertToRuntimeException(e); @@ -122,7 +122,7 @@ public class ActiveMQJMSConsumer implements JMSConsumer { public T receiveBodyNoWait(Class c) { try { Message message = consumer.receiveNoWait(); - context.setLastMessage(ActiveMQJMSConsumer.this, message); + context.setLastMessage(message); return message == null ? null : message.getBody(c); } catch (JMSException e) { throw JmsExceptionUtils.convertToRuntimeException(e); @@ -139,7 +139,7 @@ public class ActiveMQJMSConsumer implements JMSConsumer { @Override public void onMessage(Message message) { - context.setLastMessage(ActiveMQJMSConsumer.this, message); + context.setLastMessage(message); context.getThreadAwareContext().setCurrentThread(false); try { diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQJMSContext.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQJMSContext.java index ae3ab5d75e..9f13e077ae 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQJMSContext.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQJMSContext.java @@ -582,8 +582,8 @@ public class ActiveMQJMSContext implements JMSContext { /** * this is to ensure Context.acknowledge would work on ClientACK */ - Message setLastMessage(final JMSConsumer consumer, final Message lastMessageReceived) { - if (sessionMode == CLIENT_ACKNOWLEDGE) { + Message setLastMessage(final Message lastMessageReceived) { + if (sessionMode == CLIENT_ACKNOWLEDGE && lastMessageReceived != null) { lastMessagesWaitingAck = lastMessageReceived; } return lastMessageReceived; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsContextTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsContextTest.java index 0e77f07e99..c557b33699 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsContextTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsContextTest.java @@ -511,6 +511,50 @@ public class JmsContextTest extends JMSTestBase { } } + @Test + public void receiveNullAckTest() throws Exception { + // Create JMSContext with CLIENT_ACKNOWLEDGE + + try (JMSContext context = cf.createContext(JMSContext.CLIENT_ACKNOWLEDGE)) { + int numMessages = 10; + + TextMessage textMessage = null; + + // Create JMSConsumer from JMSContext + JMSConsumer consumer = context.createConsumer(queue1); + + // Create JMSProducer from JMSContext + JMSProducer producer = context.createProducer(); + + // send messages + for (int i = 0; i < numMessages; i++) { + String message = "text message " + i; + textMessage = context.createTextMessage(message); + textMessage.setStringProperty("COM_SUN_JMS_TESTNAME", "recoverAckTest" + i); + producer.send(queue1, textMessage); + } + + // receive messages but do not acknowledge + for (int i = 0; i < numMessages; i++) { + textMessage = (TextMessage) consumer.receive(5000); + assertNotNull(textMessage); + } + + assertNull(consumer.receiveNoWait()); + + // Acknowledge all messages + context.acknowledge(); + } + + // doing this check with another context / consumer to make sure it was acked. + try (JMSContext context = cf.createContext(JMSContext.CLIENT_ACKNOWLEDGE)) { + // Create JMSConsumer from JMSContext + JMSConsumer consumer = context.createConsumer(queue1); + + assertNull(consumer.receiveNoWait()); + } + } + @Test public void bytesMessage() throws Exception { context = cf.createContext();