From eb6c0826311a876769e56029dc1e261ee7519db5 Mon Sep 17 00:00:00 2001 From: gtully Date: Wed, 22 Apr 2015 16:32:17 +0100 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5735 - fix up semantics around lastDeliveredSequenceId --- .../activemq/broker/TransportConnection.java | 3 +- .../apache/activemq/broker/region/Queue.java | 17 +++--- .../apache/activemq/ActiveMQConnection.java | 4 +- .../org/apache/activemq/ActiveMQSession.java | 5 +- .../apache/activemq/command/ConsumerInfo.java | 2 +- .../apache/activemq/command/RemoveInfo.java | 5 +- .../java/org/apache/activemq/ra/MDBTest.java | 5 +- .../activemq/ra/ServerSessionImplTest.java | 20 +++++-- .../apache/activemq/JmsRedeliveredTest.java | 53 +++++++++++++++- .../apache/activemq/RedeliveryPolicyTest.java | 7 ++- .../apache/activemq/broker/BrokerTest.java | 60 ++----------------- 11 files changed, 101 insertions(+), 80 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index 5c6307af9d..b5e1c5542e 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -67,6 +67,7 @@ import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.ProducerAck; import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.Response; import org.apache.activemq.command.SessionId; @@ -1187,7 +1188,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { cs.getContext().getStopping().set(true); try { LOG.debug("Cleaning up connection resources: {}", getRemoteAddress()); - processRemoveConnection(cs.getInfo().getConnectionId(), -1); + processRemoveConnection(cs.getInfo().getConnectionId(), RemoveInfo.LAST_DELIVERED_UNKNOWN); } catch (Throwable ignore) { ignore.printStackTrace(); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 3438dccd16..8d12a8b641 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -66,6 +66,7 @@ import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.ProducerAck; import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.command.Response; import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.filter.MessageEvaluationContext; @@ -482,9 +483,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } @Override - public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId) + public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception { - super.removeSubscription(context, sub, lastDeiveredSequenceId); + super.removeSubscription(context, sub, lastDeliveredSequenceId); // synchronize with dispatch method so that no new messages are sent // while removing up a subscription. pagedInPendingDispatchLock.writeLock().lock(); @@ -492,7 +493,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index LOG.debug("{} remove sub: {}, lastDeliveredSeqId: {}, dequeues: {}, dispatched: {}, inflight: {}, groups: {}", new Object[]{ getActiveMQDestination().getQualifiedName(), sub, - lastDeiveredSequenceId, + lastDeliveredSequenceId, getDestinationStatistics().getDequeues().getCount(), getDestinationStatistics().getDispatched().getCount(), getDestinationStatistics().getInflight().getCount(), @@ -536,12 +537,12 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index List unAckedMessages = sub.remove(context, this); // locate last redelivered in unconsumed list (list in delivery rather than seq order) - if (lastDeiveredSequenceId > 0) { + if (lastDeliveredSequenceId > RemoveInfo.LAST_DELIVERED_UNSET) { for (MessageReference ref : unAckedMessages) { - if (ref.getMessageId().getBrokerSequenceId() == lastDeiveredSequenceId) { + if (ref.getMessageId().getBrokerSequenceId() == lastDeliveredSequenceId) { lastDeliveredRef = ref; markAsRedelivered = true; - LOG.debug("found lastDeliveredSeqID: {}, message reference: {}", lastDeiveredSequenceId, ref.getMessageId()); + LOG.debug("found lastDeliveredSeqID: {}, message reference: {}", lastDeliveredSequenceId, ref.getMessageId()); break; } } @@ -557,7 +558,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index qmr.unlock(); // have no delivery information - if (lastDeiveredSequenceId == 0) { + if (lastDeliveredSequenceId == RemoveInfo.LAST_DELIVERED_UNKNOWN) { qmr.incrementRedeliveryCounter(); } else { if (markAsRedelivered) { @@ -821,9 +822,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index checkUsage(context, producerExchange, message); sendLock.lockInterruptibly(); try { + message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); if (store != null && message.isPersistent()) { try { - message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); if (messages.isCacheEnabled()) { result = store.asyncAddQueueMessage(context, message, isOptimizeStorage()); result.addListener(new PendingMarshalUsageTracker(message)); diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java index 87e4c91150..4d425a2900 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -664,7 +664,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon } } - long lastDeliveredSequenceId = 0; + long lastDeliveredSequenceId = -1; for (Iterator i = this.sessions.iterator(); i.hasNext();) { ActiveMQSession s = i.next(); s.dispose(); @@ -683,7 +683,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon RemoveInfo removeCommand = info.createRemoveCommand(); removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); try { - doSyncSendPacket(info.createRemoveCommand(), closeTimeout); + doSyncSendPacket(removeCommand, closeTimeout); } catch (JMSException e) { if (e.getCause() instanceof RequestTimedOutIOException) { // expected diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java index 14c2869dbb..1d2ae836fb 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -228,7 +228,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta private DeliveryListener deliveryListener; private MessageTransformer transformer; private BlobTransferPolicy blobTransferPolicy; - private long lastDeliveredSequenceId; + private long lastDeliveredSequenceId = -2; /** * Construct the Session @@ -878,7 +878,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta MessageDispatch messageDispatch; while ((messageDispatch = executor.dequeueNoWait()) != null) { final MessageDispatch md = messageDispatch; - ActiveMQMessage message = (ActiveMQMessage)md.getMessage(); + final ActiveMQMessage message = (ActiveMQMessage)md.getMessage(); MessageAck earlyAck = null; if (message.isExpired()) { @@ -913,6 +913,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta } md.setDeliverySequenceId(getNextDeliveryId()); + lastDeliveredSequenceId = message.getMessageId().getBrokerSequenceId(); final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); try { diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java b/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java index 09b6be56c4..0c1e6913e6 100755 --- a/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java @@ -62,7 +62,7 @@ public class ConsumerInfo extends BaseCommand { // not marshalled, populated from RemoveInfo, the last message delivered, used // to suppress redelivery on prefetched messages after close - private transient long lastDeliveredSequenceId; + private transient long lastDeliveredSequenceId = RemoveInfo.LAST_DELIVERED_UNSET; private transient long assignedGroupCount; // originated from a // network connection diff --git a/activemq-client/src/main/java/org/apache/activemq/command/RemoveInfo.java b/activemq-client/src/main/java/org/apache/activemq/command/RemoveInfo.java index 3452104c32..118940f787 100755 --- a/activemq-client/src/main/java/org/apache/activemq/command/RemoveInfo.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/RemoveInfo.java @@ -29,9 +29,10 @@ import org.apache.activemq.state.CommandVisitor; public class RemoveInfo extends BaseCommand { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.REMOVE_INFO; - + public static final int LAST_DELIVERED_UNSET = -1; + public static final int LAST_DELIVERED_UNKNOWN = -2; protected DataStructure objectId; - protected long lastDeliveredSequenceId; + protected long lastDeliveredSequenceId = LAST_DELIVERED_UNKNOWN; public RemoveInfo() { } diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java index 904dd188d0..af36389fcf 100644 --- a/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java +++ b/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java @@ -310,6 +310,7 @@ public class MDBTest extends TestCase { @Override public void doAppend(LoggingEvent event) { if (event.getLevel().isGreaterOrEqual(Level.ERROR)) { + System.err.println("Event :" + event.getRenderedMessage()); errorMessage.set(event.getRenderedMessage()); } } @@ -389,7 +390,7 @@ public class MDBTest extends TestCase { // Activate an Endpoint adapter.endpointActivation(messageEndpointFactory, activationSpec); - ActiveMQMessage msg = (ActiveMQMessage)advisory.receive(1000); + ActiveMQMessage msg = (ActiveMQMessage)advisory.receive(4000); if (msg != null) { assertEquals("Prefetch size hasn't been set", 0, ((ConsumerInfo)msg.getDataStructure()).getPrefetchSize()); } else { @@ -410,7 +411,7 @@ public class MDBTest extends TestCase { adapter.stop(); assertNotNull("We got an error message", errorMessage.get()); - assertTrue("correct message", errorMessage.get().contains("zero")); + assertTrue("correct message: " + errorMessage.get(), errorMessage.get().contains("zero")); LogManager.getRootLogger().removeAppender(testAppender); brokerService.stop(); diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java index fb99330fb8..9323d7b94d 100644 --- a/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java +++ b/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java @@ -34,8 +34,12 @@ import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.util.Wait; import org.hamcrest.Description; import org.jmock.Expectations; import org.jmock.Mockery; @@ -181,7 +185,11 @@ public class ServerSessionImplTest extends TestCase { ActiveMQSession session1 = (ActiveMQSession) serverSession1.getSession(); for (int i=0; i maxMessages - 10) { - TimeUnit.MILLISECONDS.sleep(100); - } + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return messageCount.getCount() < maxMessages - 10; + } + }); + assertTrue("some messages consumed", messageCount.getCount() < maxMessages); LOG.info("Closing pool on {}", messageCount.getCount()); pool.close(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java index e5d90d681e..49069636b1 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java @@ -32,6 +32,7 @@ import javax.jms.Topic; import junit.framework.Test; import junit.framework.TestCase; import junit.framework.TestSuite; +import org.apache.activemq.command.ConsumerControl; import org.apache.activemq.transport.vm.VMTransport; import org.apache.activemq.util.Wait; @@ -403,7 +404,7 @@ public class JmsRedeliveredTest extends TestCase { session.close(); } - public void testNoReceiveConsumerDisconnectDoesNotIncrementRedelivery() throws Exception { + public void testNoReceiveConsumerDisconnectDoesIncrementRedelivery() throws Exception { connection.setClientID(getName()); connection.start(); @@ -425,7 +426,9 @@ public class JmsRedeliveredTest extends TestCase { } }); - // whack the connection - like a rebalance or tcp drop + // whack the connection - like a rebalance or tcp drop - consumer does not get to communicate + // a close and delivered sequence info to broker. So broker is in the dark and must increment + // redelivery to be safe ((ActiveMQConnection)connection).getTransport().narrow(VMTransport.class).stop(); session = keepBrokerAliveConnection.createSession(true, Session.CLIENT_ACKNOWLEDGE); @@ -434,6 +437,52 @@ public class JmsRedeliveredTest extends TestCase { assertNotNull(msg); msg.acknowledge(); + assertTrue("Message should be redelivered.", msg.getJMSRedelivered()); + session.close(); + keepBrokerAliveConnection.close(); + } + + public void testNoReceiveConsumerAbortDoesNotIncrementRedelivery() throws Exception { + connection.setClientID(getName()); + connection.start(); + + Connection keepBrokerAliveConnection = createConnection(); + keepBrokerAliveConnection.start(); + + Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue("queue-" + getName()); + final MessageConsumer consumer = session.createConsumer(queue); + + MessageProducer producer = createProducer(session, queue); + producer.send(createTextMessage(session)); + session.commit(); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return ((ActiveMQMessageConsumer)consumer).getMessageSize() == 1; + } + }); + + // on abort via something like slowConsumerPolicy + ConsumerControl consumerControl = new ConsumerControl(); + consumerControl.setConsumerId(((ActiveMQMessageConsumer)consumer).getConsumerId()); + consumerControl.setClose(true); + ((ActiveMQConnection) connection).getTransport().narrow(VMTransport.class).getTransportListener().onCommand(consumerControl); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return ((ActiveMQMessageConsumer)consumer).getMessageSize() == 0; + } + }); + + session = keepBrokerAliveConnection.createSession(true, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer messageConsumer = session.createConsumer(queue); + Message msg = messageConsumer.receive(1000); + assertNotNull(msg); + msg.acknowledge(); + assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); session.close(); keepBrokerAliveConnection.close(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java index b7a870aa45..659e9827e3 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java @@ -37,8 +37,11 @@ import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class RedeliveryPolicyTest extends JmsTestSupport { + static final Logger LOG = LoggerFactory.getLogger(RedeliveryPolicyTest.class); public static Test suite() { return suite(RedeliveryPolicyTest.class); @@ -535,7 +538,6 @@ public class RedeliveryPolicyTest extends JmsTestSupport { final AtomicInteger receivedCount = new AtomicInteger(0); for (int i=0;i<=maxRedeliveries+1;i++) { - connection = (ActiveMQConnection)factory.createConnection(userName, password); connections.add(connection); @@ -553,6 +555,7 @@ public class RedeliveryPolicyTest extends JmsTestSupport { public void onMessage(Message message) { try { ActiveMQTextMessage m = (ActiveMQTextMessage) message; + LOG.info("Got: " + ((ActiveMQTextMessage) message).getMessageId() + ", seq:" + ((ActiveMQTextMessage) message).getMessageId().getBrokerSequenceId()); assertEquals("1st", m.getText()); assertEquals(receivedCount.get(), m.getRedeliveryCounter()); receivedCount.incrementAndGet(); @@ -590,7 +593,7 @@ public class RedeliveryPolicyTest extends JmsTestSupport { session.run(); return done.await(10, TimeUnit.MILLISECONDS); } - }); + }, 5000); if (i<=maxRedeliveries) { assertTrue("listener done @" + i, done.await(5, TimeUnit.SECONDS)); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java index 2facb98f6f..769cdbf781 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java @@ -33,6 +33,7 @@ import org.apache.activemq.command.LocalTransactionId; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.command.SessionInfo; public class BrokerTest extends BrokerTestSupport { @@ -486,59 +487,6 @@ public class BrokerTest extends BrokerTestSupport { assertEquals(2, countMessagesInQueue(connection1, connectionInfo1, destination)); } - public void initCombosForTestConsumerCloseCausesRedelivery() { - addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), - Integer.valueOf(DeliveryMode.PERSISTENT)}); - addCombinationValues("destination", new Object[] {new ActiveMQQueue("TEST")}); - } - - public void testConsumerCloseCausesRedelivery() throws Exception { - - // Setup a first connection - StubConnection connection1 = createConnection(); - ConnectionInfo connectionInfo1 = createConnectionInfo(); - SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); - ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); - connection1.send(connectionInfo1); - connection1.send(sessionInfo1); - connection1.send(producerInfo1); - - ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); - consumerInfo1.setPrefetchSize(100); - connection1.request(consumerInfo1); - - // Send the messages - connection1.send(createMessage(producerInfo1, destination, deliveryMode)); - connection1.send(createMessage(producerInfo1, destination, deliveryMode)); - connection1.send(createMessage(producerInfo1, destination, deliveryMode)); - connection1.send(createMessage(producerInfo1, destination, deliveryMode)); - - // Receive the messages. - for (int i = 0; i < 4; i++) { - Message m1 = receiveMessage(connection1); - assertNotNull("m1 is null for index: " + i, m1); - assertFalse(m1.isRedelivered()); - } - - // Close the consumer without acking.. this should cause re-delivery of - // the messages. - connection1.send(consumerInfo1.createRemoveCommand()); - - // Create another consumer that should get the messages again. - ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo1, destination); - consumerInfo2.setPrefetchSize(100); - connection1.request(consumerInfo2); - - // Receive the messages. - for (int i = 0; i < 4; i++) { - Message m1 = receiveMessage(connection1); - assertNotNull("m1 is null for index: " + i, m1); - assertTrue(m1.isRedelivered()); - } - assertNoMessagesLeft(connection1); - - } - public void testTopicDurableSubscriptionCanBeRestored() throws Exception { ActiveMQDestination destination = new ActiveMQTopic("TEST"); @@ -1396,14 +1344,18 @@ public class BrokerTest extends BrokerTestSupport { connection1.send(createMessage(producerInfo, destination, deliveryMode)); connection1.send(createMessage(producerInfo, destination, deliveryMode)); + long lastDeliveredSeq = -1; // Get the messages for (int i = 0; i < 4; i++) { Message m1 = receiveMessage(connection1); assertNotNull(m1); assertFalse(m1.isRedelivered()); + lastDeliveredSeq = m1.getMessageId().getBrokerSequenceId(); } // Close the consumer without sending any ACKS. - connection1.send(closeConsumerInfo(consumerInfo1)); + RemoveInfo removeInfo = closeConsumerInfo(consumerInfo1); + removeInfo.setLastDeliveredSequenceId(lastDeliveredSeq); + connection1.send(removeInfo); // Drain any in flight messages.. while (connection1.getDispatchQueue().poll(0, TimeUnit.MILLISECONDS) != null) {