https://issues.apache.org/jira/browse/AMQ-5735 - fix up semantics around lastDeliveredSequenceId

This commit is contained in:
gtully 2015-04-22 16:32:17 +01:00
parent 3a5f127d52
commit eb6c082631
11 changed files with 101 additions and 80 deletions

View File

@ -67,6 +67,7 @@ import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerAck; import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response; import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId; import org.apache.activemq.command.SessionId;
@ -1187,7 +1188,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
cs.getContext().getStopping().set(true); cs.getContext().getStopping().set(true);
try { try {
LOG.debug("Cleaning up connection resources: {}", getRemoteAddress()); LOG.debug("Cleaning up connection resources: {}", getRemoteAddress());
processRemoveConnection(cs.getInfo().getConnectionId(), -1); processRemoveConnection(cs.getInfo().getConnectionId(), RemoveInfo.LAST_DELIVERED_UNKNOWN);
} catch (Throwable ignore) { } catch (Throwable ignore) {
ignore.printStackTrace(); ignore.printStackTrace();
} }

View File

@ -66,6 +66,7 @@ import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerAck; import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response; import org.apache.activemq.command.Response;
import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
@ -482,9 +483,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
} }
@Override @Override
public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId) public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId)
throws Exception { throws Exception {
super.removeSubscription(context, sub, lastDeiveredSequenceId); super.removeSubscription(context, sub, lastDeliveredSequenceId);
// synchronize with dispatch method so that no new messages are sent // synchronize with dispatch method so that no new messages are sent
// while removing up a subscription. // while removing up a subscription.
pagedInPendingDispatchLock.writeLock().lock(); pagedInPendingDispatchLock.writeLock().lock();
@ -492,7 +493,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
LOG.debug("{} remove sub: {}, lastDeliveredSeqId: {}, dequeues: {}, dispatched: {}, inflight: {}, groups: {}", new Object[]{ LOG.debug("{} remove sub: {}, lastDeliveredSeqId: {}, dequeues: {}, dispatched: {}, inflight: {}, groups: {}", new Object[]{
getActiveMQDestination().getQualifiedName(), getActiveMQDestination().getQualifiedName(),
sub, sub,
lastDeiveredSequenceId, lastDeliveredSequenceId,
getDestinationStatistics().getDequeues().getCount(), getDestinationStatistics().getDequeues().getCount(),
getDestinationStatistics().getDispatched().getCount(), getDestinationStatistics().getDispatched().getCount(),
getDestinationStatistics().getInflight().getCount(), getDestinationStatistics().getInflight().getCount(),
@ -536,12 +537,12 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
List<MessageReference> unAckedMessages = sub.remove(context, this); List<MessageReference> unAckedMessages = sub.remove(context, this);
// locate last redelivered in unconsumed list (list in delivery rather than seq order) // locate last redelivered in unconsumed list (list in delivery rather than seq order)
if (lastDeiveredSequenceId > 0) { if (lastDeliveredSequenceId > RemoveInfo.LAST_DELIVERED_UNSET) {
for (MessageReference ref : unAckedMessages) { for (MessageReference ref : unAckedMessages) {
if (ref.getMessageId().getBrokerSequenceId() == lastDeiveredSequenceId) { if (ref.getMessageId().getBrokerSequenceId() == lastDeliveredSequenceId) {
lastDeliveredRef = ref; lastDeliveredRef = ref;
markAsRedelivered = true; markAsRedelivered = true;
LOG.debug("found lastDeliveredSeqID: {}, message reference: {}", lastDeiveredSequenceId, ref.getMessageId()); LOG.debug("found lastDeliveredSeqID: {}, message reference: {}", lastDeliveredSequenceId, ref.getMessageId());
break; break;
} }
} }
@ -557,7 +558,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
qmr.unlock(); qmr.unlock();
// have no delivery information // have no delivery information
if (lastDeiveredSequenceId == 0) { if (lastDeliveredSequenceId == RemoveInfo.LAST_DELIVERED_UNKNOWN) {
qmr.incrementRedeliveryCounter(); qmr.incrementRedeliveryCounter();
} else { } else {
if (markAsRedelivered) { if (markAsRedelivered) {
@ -821,9 +822,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
checkUsage(context, producerExchange, message); checkUsage(context, producerExchange, message);
sendLock.lockInterruptibly(); sendLock.lockInterruptibly();
try { try {
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
if (store != null && message.isPersistent()) { if (store != null && message.isPersistent()) {
try { try {
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
if (messages.isCacheEnabled()) { if (messages.isCacheEnabled()) {
result = store.asyncAddQueueMessage(context, message, isOptimizeStorage()); result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());
result.addListener(new PendingMarshalUsageTracker(message)); result.addListener(new PendingMarshalUsageTracker(message));

View File

@ -664,7 +664,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
} }
} }
long lastDeliveredSequenceId = 0; long lastDeliveredSequenceId = -1;
for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next(); ActiveMQSession s = i.next();
s.dispose(); s.dispose();
@ -683,7 +683,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
RemoveInfo removeCommand = info.createRemoveCommand(); RemoveInfo removeCommand = info.createRemoveCommand();
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
try { try {
doSyncSendPacket(info.createRemoveCommand(), closeTimeout); doSyncSendPacket(removeCommand, closeTimeout);
} catch (JMSException e) { } catch (JMSException e) {
if (e.getCause() instanceof RequestTimedOutIOException) { if (e.getCause() instanceof RequestTimedOutIOException) {
// expected // expected

View File

@ -228,7 +228,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
private DeliveryListener deliveryListener; private DeliveryListener deliveryListener;
private MessageTransformer transformer; private MessageTransformer transformer;
private BlobTransferPolicy blobTransferPolicy; private BlobTransferPolicy blobTransferPolicy;
private long lastDeliveredSequenceId; private long lastDeliveredSequenceId = -2;
/** /**
* Construct the Session * Construct the Session
@ -878,7 +878,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
MessageDispatch messageDispatch; MessageDispatch messageDispatch;
while ((messageDispatch = executor.dequeueNoWait()) != null) { while ((messageDispatch = executor.dequeueNoWait()) != null) {
final MessageDispatch md = messageDispatch; final MessageDispatch md = messageDispatch;
ActiveMQMessage message = (ActiveMQMessage)md.getMessage(); final ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
MessageAck earlyAck = null; MessageAck earlyAck = null;
if (message.isExpired()) { if (message.isExpired()) {
@ -913,6 +913,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
} }
md.setDeliverySequenceId(getNextDeliveryId()); md.setDeliverySequenceId(getNextDeliveryId());
lastDeliveredSequenceId = message.getMessageId().getBrokerSequenceId();
final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
try { try {

View File

@ -62,7 +62,7 @@ public class ConsumerInfo extends BaseCommand {
// not marshalled, populated from RemoveInfo, the last message delivered, used // not marshalled, populated from RemoveInfo, the last message delivered, used
// to suppress redelivery on prefetched messages after close // to suppress redelivery on prefetched messages after close
private transient long lastDeliveredSequenceId; private transient long lastDeliveredSequenceId = RemoveInfo.LAST_DELIVERED_UNSET;
private transient long assignedGroupCount; private transient long assignedGroupCount;
// originated from a // originated from a
// network connection // network connection

View File

@ -29,9 +29,10 @@ import org.apache.activemq.state.CommandVisitor;
public class RemoveInfo extends BaseCommand { public class RemoveInfo extends BaseCommand {
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.REMOVE_INFO; public static final byte DATA_STRUCTURE_TYPE = CommandTypes.REMOVE_INFO;
public static final int LAST_DELIVERED_UNSET = -1;
public static final int LAST_DELIVERED_UNKNOWN = -2;
protected DataStructure objectId; protected DataStructure objectId;
protected long lastDeliveredSequenceId; protected long lastDeliveredSequenceId = LAST_DELIVERED_UNKNOWN;
public RemoveInfo() { public RemoveInfo() {
} }

View File

@ -310,6 +310,7 @@ public class MDBTest extends TestCase {
@Override @Override
public void doAppend(LoggingEvent event) { public void doAppend(LoggingEvent event) {
if (event.getLevel().isGreaterOrEqual(Level.ERROR)) { if (event.getLevel().isGreaterOrEqual(Level.ERROR)) {
System.err.println("Event :" + event.getRenderedMessage());
errorMessage.set(event.getRenderedMessage()); errorMessage.set(event.getRenderedMessage());
} }
} }
@ -389,7 +390,7 @@ public class MDBTest extends TestCase {
// Activate an Endpoint // Activate an Endpoint
adapter.endpointActivation(messageEndpointFactory, activationSpec); adapter.endpointActivation(messageEndpointFactory, activationSpec);
ActiveMQMessage msg = (ActiveMQMessage)advisory.receive(1000); ActiveMQMessage msg = (ActiveMQMessage)advisory.receive(4000);
if (msg != null) { if (msg != null) {
assertEquals("Prefetch size hasn't been set", 0, ((ConsumerInfo)msg.getDataStructure()).getPrefetchSize()); assertEquals("Prefetch size hasn't been set", 0, ((ConsumerInfo)msg.getDataStructure()).getPrefetchSize());
} else { } else {
@ -410,7 +411,7 @@ public class MDBTest extends TestCase {
adapter.stop(); adapter.stop();
assertNotNull("We got an error message", errorMessage.get()); assertNotNull("We got an error message", errorMessage.get());
assertTrue("correct message", errorMessage.get().contains("zero")); assertTrue("correct message: " + errorMessage.get(), errorMessage.get().contains("zero"));
LogManager.getRootLogger().removeAppender(testAppender); LogManager.getRootLogger().removeAppender(testAppender);
brokerService.stop(); brokerService.stop();

View File

@ -34,8 +34,12 @@ import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQSession; import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.util.Wait;
import org.hamcrest.Description; import org.hamcrest.Description;
import org.jmock.Expectations; import org.jmock.Expectations;
import org.jmock.Mockery; import org.jmock.Mockery;
@ -181,7 +185,11 @@ public class ServerSessionImplTest extends TestCase {
ActiveMQSession session1 = (ActiveMQSession) serverSession1.getSession(); ActiveMQSession session1 = (ActiveMQSession) serverSession1.getSession();
for (int i=0; i<maxMessages; i++) { for (int i=0; i<maxMessages; i++) {
MessageDispatch messageDispatch = new MessageDispatch(); MessageDispatch messageDispatch = new MessageDispatch();
messageDispatch.setMessage(new ActiveMQTextMessage()); ActiveMQMessage message = new ActiveMQTextMessage();
message.setMessageId(new MessageId("0:0:0:" + i));
message.getMessageId().setBrokerSequenceId(i);
messageDispatch.setMessage(message);
messageDispatch.setConsumerId(new ConsumerId("0:0:0"));
session1.dispatch(messageDispatch); session1.dispatch(messageDispatch);
} }
@ -199,9 +207,13 @@ public class ServerSessionImplTest extends TestCase {
} }
}); });
while (messageCount.getCount() > maxMessages - 10) { Wait.waitFor(new Wait.Condition() {
TimeUnit.MILLISECONDS.sleep(100); @Override
} public boolean isSatisified() throws Exception {
return messageCount.getCount() < maxMessages - 10;
}
});
assertTrue("some messages consumed", messageCount.getCount() < maxMessages);
LOG.info("Closing pool on {}", messageCount.getCount()); LOG.info("Closing pool on {}", messageCount.getCount());
pool.close(); pool.close();

View File

@ -32,6 +32,7 @@ import javax.jms.Topic;
import junit.framework.Test; import junit.framework.Test;
import junit.framework.TestCase; import junit.framework.TestCase;
import junit.framework.TestSuite; import junit.framework.TestSuite;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.transport.vm.VMTransport; import org.apache.activemq.transport.vm.VMTransport;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
@ -403,7 +404,7 @@ public class JmsRedeliveredTest extends TestCase {
session.close(); session.close();
} }
public void testNoReceiveConsumerDisconnectDoesNotIncrementRedelivery() throws Exception { public void testNoReceiveConsumerDisconnectDoesIncrementRedelivery() throws Exception {
connection.setClientID(getName()); connection.setClientID(getName());
connection.start(); connection.start();
@ -425,7 +426,9 @@ public class JmsRedeliveredTest extends TestCase {
} }
}); });
// whack the connection - like a rebalance or tcp drop // whack the connection - like a rebalance or tcp drop - consumer does not get to communicate
// a close and delivered sequence info to broker. So broker is in the dark and must increment
// redelivery to be safe
((ActiveMQConnection)connection).getTransport().narrow(VMTransport.class).stop(); ((ActiveMQConnection)connection).getTransport().narrow(VMTransport.class).stop();
session = keepBrokerAliveConnection.createSession(true, Session.CLIENT_ACKNOWLEDGE); session = keepBrokerAliveConnection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
@ -434,6 +437,52 @@ public class JmsRedeliveredTest extends TestCase {
assertNotNull(msg); assertNotNull(msg);
msg.acknowledge(); msg.acknowledge();
assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
session.close();
keepBrokerAliveConnection.close();
}
public void testNoReceiveConsumerAbortDoesNotIncrementRedelivery() throws Exception {
connection.setClientID(getName());
connection.start();
Connection keepBrokerAliveConnection = createConnection();
keepBrokerAliveConnection.start();
Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("queue-" + getName());
final MessageConsumer consumer = session.createConsumer(queue);
MessageProducer producer = createProducer(session, queue);
producer.send(createTextMessage(session));
session.commit();
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return ((ActiveMQMessageConsumer)consumer).getMessageSize() == 1;
}
});
// on abort via something like slowConsumerPolicy
ConsumerControl consumerControl = new ConsumerControl();
consumerControl.setConsumerId(((ActiveMQMessageConsumer)consumer).getConsumerId());
consumerControl.setClose(true);
((ActiveMQConnection) connection).getTransport().narrow(VMTransport.class).getTransportListener().onCommand(consumerControl);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return ((ActiveMQMessageConsumer)consumer).getMessageSize() == 0;
}
});
session = keepBrokerAliveConnection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer messageConsumer = session.createConsumer(queue);
Message msg = messageConsumer.receive(1000);
assertNotNull(msg);
msg.acknowledge();
assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
session.close(); session.close();
keepBrokerAliveConnection.close(); keepBrokerAliveConnection.close();

View File

@ -37,8 +37,11 @@ import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RedeliveryPolicyTest extends JmsTestSupport { public class RedeliveryPolicyTest extends JmsTestSupport {
static final Logger LOG = LoggerFactory.getLogger(RedeliveryPolicyTest.class);
public static Test suite() { public static Test suite() {
return suite(RedeliveryPolicyTest.class); return suite(RedeliveryPolicyTest.class);
@ -535,7 +538,6 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
final AtomicInteger receivedCount = new AtomicInteger(0); final AtomicInteger receivedCount = new AtomicInteger(0);
for (int i=0;i<=maxRedeliveries+1;i++) { for (int i=0;i<=maxRedeliveries+1;i++) {
connection = (ActiveMQConnection)factory.createConnection(userName, password); connection = (ActiveMQConnection)factory.createConnection(userName, password);
connections.add(connection); connections.add(connection);
@ -553,6 +555,7 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
public void onMessage(Message message) { public void onMessage(Message message) {
try { try {
ActiveMQTextMessage m = (ActiveMQTextMessage) message; ActiveMQTextMessage m = (ActiveMQTextMessage) message;
LOG.info("Got: " + ((ActiveMQTextMessage) message).getMessageId() + ", seq:" + ((ActiveMQTextMessage) message).getMessageId().getBrokerSequenceId());
assertEquals("1st", m.getText()); assertEquals("1st", m.getText());
assertEquals(receivedCount.get(), m.getRedeliveryCounter()); assertEquals(receivedCount.get(), m.getRedeliveryCounter());
receivedCount.incrementAndGet(); receivedCount.incrementAndGet();
@ -590,7 +593,7 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
session.run(); session.run();
return done.await(10, TimeUnit.MILLISECONDS); return done.await(10, TimeUnit.MILLISECONDS);
} }
}); }, 5000);
if (i<=maxRedeliveries) { if (i<=maxRedeliveries) {
assertTrue("listener done @" + i, done.await(5, TimeUnit.SECONDS)); assertTrue("listener done @" + i, done.await(5, TimeUnit.SECONDS));

View File

@ -33,6 +33,7 @@ import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.SessionInfo;
public class BrokerTest extends BrokerTestSupport { public class BrokerTest extends BrokerTestSupport {
@ -486,59 +487,6 @@ public class BrokerTest extends BrokerTestSupport {
assertEquals(2, countMessagesInQueue(connection1, connectionInfo1, destination)); assertEquals(2, countMessagesInQueue(connection1, connectionInfo1, destination));
} }
public void initCombosForTestConsumerCloseCausesRedelivery() {
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
Integer.valueOf(DeliveryMode.PERSISTENT)});
addCombinationValues("destination", new Object[] {new ActiveMQQueue("TEST")});
}
public void testConsumerCloseCausesRedelivery() throws Exception {
// Setup a first connection
StubConnection connection1 = createConnection();
ConnectionInfo connectionInfo1 = createConnectionInfo();
SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
connection1.send(connectionInfo1);
connection1.send(sessionInfo1);
connection1.send(producerInfo1);
ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
consumerInfo1.setPrefetchSize(100);
connection1.request(consumerInfo1);
// Send the messages
connection1.send(createMessage(producerInfo1, destination, deliveryMode));
connection1.send(createMessage(producerInfo1, destination, deliveryMode));
connection1.send(createMessage(producerInfo1, destination, deliveryMode));
connection1.send(createMessage(producerInfo1, destination, deliveryMode));
// Receive the messages.
for (int i = 0; i < 4; i++) {
Message m1 = receiveMessage(connection1);
assertNotNull("m1 is null for index: " + i, m1);
assertFalse(m1.isRedelivered());
}
// Close the consumer without acking.. this should cause re-delivery of
// the messages.
connection1.send(consumerInfo1.createRemoveCommand());
// Create another consumer that should get the messages again.
ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo1, destination);
consumerInfo2.setPrefetchSize(100);
connection1.request(consumerInfo2);
// Receive the messages.
for (int i = 0; i < 4; i++) {
Message m1 = receiveMessage(connection1);
assertNotNull("m1 is null for index: " + i, m1);
assertTrue(m1.isRedelivered());
}
assertNoMessagesLeft(connection1);
}
public void testTopicDurableSubscriptionCanBeRestored() throws Exception { public void testTopicDurableSubscriptionCanBeRestored() throws Exception {
ActiveMQDestination destination = new ActiveMQTopic("TEST"); ActiveMQDestination destination = new ActiveMQTopic("TEST");
@ -1396,14 +1344,18 @@ public class BrokerTest extends BrokerTestSupport {
connection1.send(createMessage(producerInfo, destination, deliveryMode)); connection1.send(createMessage(producerInfo, destination, deliveryMode));
connection1.send(createMessage(producerInfo, destination, deliveryMode)); connection1.send(createMessage(producerInfo, destination, deliveryMode));
long lastDeliveredSeq = -1;
// Get the messages // Get the messages
for (int i = 0; i < 4; i++) { for (int i = 0; i < 4; i++) {
Message m1 = receiveMessage(connection1); Message m1 = receiveMessage(connection1);
assertNotNull(m1); assertNotNull(m1);
assertFalse(m1.isRedelivered()); assertFalse(m1.isRedelivered());
lastDeliveredSeq = m1.getMessageId().getBrokerSequenceId();
} }
// Close the consumer without sending any ACKS. // Close the consumer without sending any ACKS.
connection1.send(closeConsumerInfo(consumerInfo1)); RemoveInfo removeInfo = closeConsumerInfo(consumerInfo1);
removeInfo.setLastDeliveredSequenceId(lastDeliveredSeq);
connection1.send(removeInfo);
// Drain any in flight messages.. // Drain any in flight messages..
while (connection1.getDispatchQueue().poll(0, TimeUnit.MILLISECONDS) != null) { while (connection1.getDispatchQueue().poll(0, TimeUnit.MILLISECONDS) != null) {