diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index 834cd1425b..2cca4c0bd2 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -599,10 +599,13 @@ public class Topic extends BaseDestination implements Task { public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException { if (topicStore != null && node.isPersistent()) { - DurableTopicSubscription dsub = (DurableTopicSubscription) sub; - SubscriptionKey key = dsub.getSubscriptionKey(); - topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(), + if (sub instanceof DurableTopicSubscription) { + DurableTopicSubscription dsub = (DurableTopicSubscription) sub; + SubscriptionKey key = dsub.getSubscriptionKey(); + topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), + node.getMessageId(), convertToNonRangedAck(ack, node)); + } } messageConsumed(context, node); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index 18404fa49c..fc87e1d650 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -187,7 +187,8 @@ public class TopicSubscription extends AbstractSubscription { messagesToEvict = oldMessages.length; for (int i = 0; i < messagesToEvict; i++) { MessageReference oldMessage = oldMessages[i]; - discard(oldMessage); + //Expired here is false as we are discarding due to the messageEvictingStrategy + discard(oldMessage, false); } } // lets avoid an infinite loop if we are given a bad eviction strategy @@ -233,8 +234,7 @@ public class TopicSubscription extends AbstractSubscription { matched.remove(); node.decrementReferenceCount(); if (broker.isExpired(node)) { - ((Destination) node.getRegionDestination()).getDestinationStatistics().getExpired().increment(); - broker.messageExpired(getContext(), node, this); + ((Destination) node.getRegionDestination()).messageExpired(getContext(), this, node); } break; } @@ -654,7 +654,7 @@ public class TopicSubscription extends AbstractSubscription { // Message may have been sitting in the matched list a while // waiting for the consumer to ak the message. if (message.isExpired()) { - discard(message); + discard(message, true); continue; // just drop it. } dispatch(message); @@ -739,19 +739,25 @@ public class TopicSubscription extends AbstractSubscription { } } - private void discard(MessageReference message) { + private void discard(MessageReference message, boolean expired) { discarding = true; try { message.decrementReferenceCount(); matched.remove(message); - discarded.incrementAndGet(); if (destination != null) { destination.getDestinationStatistics().getDequeues().increment(); } - LOG.debug("{}, discarding message {}", this, message); Destination dest = (Destination) message.getRegionDestination(); if (dest != null) { - dest.messageDiscarded(getContext(), this, message); + //If discard is due to expiration then use the messageExpired() callback + if (expired) { + LOG.debug("{}, expiring message {}", this, message); + dest.messageExpired(getContext(), this, message); + } else { + LOG.debug("{}, discarding message {}", this, message); + discarded.incrementAndGet(); + dest.messageDiscarded(getContext(), this, message); + } } broker.getRoot().sendToDeadLetterQueue(getContext(), message, this, new Throwable("TopicSubDiscard. ID:" + info.getConsumerId())); } finally { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java index b6866b4b40..82f50cc801 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java @@ -26,6 +26,7 @@ import java.util.Collection; import java.util.Enumeration; import java.util.HashSet; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import javax.jms.BytesMessage; import javax.jms.Connection; @@ -44,7 +45,11 @@ import javax.jms.Topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerFilter; +import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.NullMessageReference; import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; @@ -52,6 +57,7 @@ import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.MessageDispatch; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -70,7 +76,7 @@ public class AdvisoryTests { protected Connection connection; protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; protected final boolean includeBodyForAdvisory; - protected final int EXPIRE_MESSAGE_PERIOD = 10000; + protected final int EXPIRE_MESSAGE_PERIOD = 3000; @Parameters(name = "includeBodyForAdvisory={0}") public static Collection data() { @@ -338,24 +344,143 @@ public class AdvisoryTests { } @Test(timeout = 60000) - public void testMessageExpiredAdvisory() throws Exception { - Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = s.createQueue(getClass().getName()); - MessageConsumer consumer = s.createConsumer(queue); - assertNotNull(consumer); + public void testMessageExpiredAdvisoryQueueSubClient() throws Exception { + testMessageExpiredAdvisoryQueue(new ActiveMQQueue(getClass().getName() + "client.timeout"), + 300000, true, 500); + } - Topic advisoryTopic = AdvisorySupport.getExpiredMessageTopic((ActiveMQDestination) queue); + @Test(timeout = 60000) + public void testMessageExpiredAdvisoryQueueSubServer() throws Exception { + testMessageExpiredAdvisoryQueue(new ActiveMQQueue(getClass().getName()), 1,true, 500); + } + + @Test(timeout = 60000) + public void testMessageExpiredAdvisoryQueueSubServerTask() throws Exception { + testMessageExpiredAdvisoryQueue(new ActiveMQQueue(getClass().getName()), 1000,false, + EXPIRE_MESSAGE_PERIOD * 2); + } + + private void testMessageExpiredAdvisoryQueue(ActiveMQQueue dest, int ttl, boolean createConsumer, int receiveTimeout) throws Exception { + Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);; + + Topic advisoryTopic = AdvisorySupport.getExpiredMessageTopic(dest); MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); // start throwing messages at the consumer - MessageProducer producer = s.createProducer(queue); - producer.setTimeToLive(1); + MessageProducer producer = s.createProducer(dest); + producer.setTimeToLive(ttl); + for (int i = 0; i < MESSAGE_COUNT; i++) { BytesMessage m = s.createBytesMessage(); m.writeBytes(new byte[1024]); producer.send(m); } - Message msg = advisoryConsumer.receive(EXPIRE_MESSAGE_PERIOD); + MessageConsumer consumer = null; + if (createConsumer) { + consumer = s.createConsumer(dest); + assertNotNull(consumer); + } + + Message msg = advisoryConsumer.receive(receiveTimeout); + assertNotNull(msg); + ActiveMQMessage message = (ActiveMQMessage) msg; + ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure(); + + //This should be set + assertNotNull(message.getProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL)); + + //Add assertion to make sure body is included for advisory topics + //when includeBodyForAdvisory is true + assertIncludeBodyForAdvisory(payload); + } + + @Test(timeout = 60000) + public void testMessageExpiredAdvisoryTopicSub() throws Exception { + ActiveMQTopic dest = new ActiveMQTopic(getClass().getName()); + //Set prefetch to 1 so acks will trigger expiration on dispatching more messages + broker.getDestinationPolicy().getDefaultEntry().setTopicPrefetch(1); + Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);; + + MessageConsumer consumer = s.createConsumer(dest); + MessageConsumer expiredAdvisoryConsumer = s.createConsumer(AdvisorySupport.getExpiredMessageTopic(dest)); + MessageConsumer discardedAdvisoryConsumer = s.createConsumer(AdvisorySupport.getMessageDiscardedAdvisoryTopic(dest)); + + // start throwing messages at the consumer + MessageProducer producer = s.createProducer(dest); + producer.setTimeToLive(10); + for (int i = 0; i < 10; i++) { + BytesMessage m = s.createBytesMessage(); + m.writeBytes(new byte[1024]); + producer.send(m); + } + Thread.sleep(500); + + //Receiving will trigger the server to discard on dispatch when acks are received + //Currently the advisory is only fired on dispatch from server or messages added to ta topic + //and not on expired acks from the client side as the original messages are not tracked in + //dispatch list so the advisory can't be fired + for (int i = 0; i < 10; i++) { + assertNull(consumer.receive(10)); + } + + //Should no longer receive discard advisories for expiration + assertNull(discardedAdvisoryConsumer.receive(1000)); + + Message msg = expiredAdvisoryConsumer.receive(1000); + assertNotNull(msg); + ActiveMQMessage message = (ActiveMQMessage) msg; + ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure(); + + //This should be set + assertNotNull(message.getProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL)); + + //Add assertion to make sure body is included for advisory topics + //when includeBodyForAdvisory is true + assertIncludeBodyForAdvisory(payload); + } + + @Test(timeout = 60000) + public void testMessageExpiredAdvisoryDurableClient() throws Exception { + testMessageExpiredDurableAdvisory(getClass().getName() + "client.timeout", + 300000, true, 500); + } + + @Test(timeout = 60000) + public void testMessageExpiredAdvisoryDurableServer() throws Exception { + testMessageExpiredDurableAdvisory(getClass().getName(), 1,true, 500); + } + + @Test(timeout = 60000) + public void testMessageExpiredAdvisoryDurableServerTask() throws Exception { + testMessageExpiredDurableAdvisory(getClass().getName(), 2000,false, EXPIRE_MESSAGE_PERIOD * 2); + } + + private void testMessageExpiredDurableAdvisory(String topic, int ttl, boolean bringDurableOnline, + int receiveTimeout) throws Exception { + ActiveMQTopic dest = new ActiveMQTopic(topic); + Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);; + + //create durable and send offline messages + MessageConsumer consumer = s.createDurableSubscriber(dest, "sub1"); + consumer.close(); + + Topic advisoryTopic = AdvisorySupport.getExpiredMessageTopic(dest); + MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); + // start throwing messages at the consumer + MessageProducer producer = s.createProducer(dest); + producer.setTimeToLive(ttl); + for (int i = 0; i < MESSAGE_COUNT; i++) { + BytesMessage m = s.createBytesMessage(); + m.writeBytes(new byte[1024]); + producer.send(m); + } + + //if flag is true then bring online to trigger expiration on dispatch + if (bringDurableOnline) { + consumer = s.createDurableSubscriber(dest, "sub1"); + } + + Message msg = advisoryConsumer.receive(receiveTimeout); assertNotNull(msg); ActiveMQMessage message = (ActiveMQMessage) msg; ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure(); @@ -529,6 +654,29 @@ public class AdvisoryTests { answer.addConnector("nio://localhost:0"); answer.addConnector("tcp://localhost:0").setName("OpenWire"); answer.setDeleteAllMessagesOnStartup(true); + + // add a plugin to ensure the expiration happens on the client side rather + // than broker side. + answer.setPlugins(new BrokerPlugin[] { new BrokerPlugin() { + + @Override + public Broker installPlugin(Broker broker) throws Exception { + return new BrokerFilter(broker) { + + @Override + public void preProcessDispatch(MessageDispatch messageDispatch) { + ActiveMQDestination dest = messageDispatch.getDestination(); + if (dest != null && !AdvisorySupport.isAdvisoryTopic(dest) && messageDispatch.getDestination() + .getPhysicalName().contains("client.timeout")) { + // Set the expiration to now + messageDispatch.getMessage().setExpiration(System.currentTimeMillis() - 1000); + } + + super.preProcessDispatch(messageDispatch); + } + }; + } + } }); } protected void assertIncludeBodyForAdvisory(ActiveMQMessage payload) {