diff --git a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index faef62d6fe..eff550f8e5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -27,6 +27,7 @@ import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.TopicSubscription; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; @@ -309,14 +310,19 @@ public class AdvisoryBroker extends BrokerFilter { } @Override - public void messageDiscarded(ConnectionContext context, MessageReference messageReference) { - super.messageDiscarded(context, messageReference); + public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { + super.messageDiscarded(context, sub, messageReference); try { if (!messageReference.isAdvisory()) { ActiveMQTopic topic = AdvisorySupport.getMessageDiscardedAdvisoryTopic(messageReference.getMessage().getDestination()); Message payload = messageReference.getMessage().copy(); payload.clearBody(); - fireAdvisory(context, topic,payload); + ActiveMQMessage advisoryMessage = new ActiveMQMessage(); + if (sub instanceof TopicSubscription) { + advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT, ((TopicSubscription)sub).discarded()); + } + advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, sub.getConsumerInfo().getConsumerId().toString()); + fireAdvisory(context, topic, payload, null, advisoryMessage); } } catch (Exception e) { LOG.warn("Failed to fire message discarded advisory"); @@ -403,7 +409,7 @@ public class AdvisoryBroker extends BrokerFilter { count += dest.getDestinationStatistics().getConsumers().getCount(); } } - advisoryMessage.setIntProperty("consumerCount", count); + advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_COUNT, count); fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); } diff --git a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java index df56019283..4ad39b0014 100755 --- a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java @@ -55,6 +55,9 @@ public final class AdvisorySupport { public static final String MSG_PROPERTY_CONSUMER_ID = "consumerId"; public static final String MSG_PROPERTY_PRODUCER_ID = "producerId"; public static final String MSG_PROPERTY_MESSAGE_ID = "orignalMessageId"; + public static final String MSG_PROPERTY_CONSUMER_COUNT = "consumerCount"; + public static final String MSG_PROPERTY_DISCARDED_COUNT = "discardedCount"; + public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic( TEMP_QUEUE_ADVISORY_TOPIC + "," + TEMP_TOPIC_ADVISORY_TOPIC); private static final ActiveMQTopic AGENT_TOPIC_DESTINATION = new ActiveMQTopic(AGENT_TOPIC); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java index e7513e5876..068ce41491 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java @@ -339,9 +339,10 @@ public interface Broker extends Region, Service { * Called when a message is discarded - e.g. running low on memory * This will happen only if the policy is enabled - e.g. non durable topics * @param context + * @param sub * @param messageReference */ - void messageDiscarded(ConnectionContext context, MessageReference messageReference); + void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference); /** * Called when there is a slow consumer diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java index 941d3cafd5..50f2f765dd 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java @@ -284,8 +284,8 @@ public class BrokerFilter implements Broker { next.messageDelivered(context, messageReference); } - public void messageDiscarded(ConnectionContext context,MessageReference messageReference) { - next.messageDiscarded(context, messageReference); + public void messageDiscarded(ConnectionContext context,Subscription sub, MessageReference messageReference) { + next.messageDiscarded(context, sub, messageReference); } public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java index 44db511ba1..9ce2c72381 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java @@ -271,7 +271,7 @@ public class EmptyBroker implements Broker { public void messageDelivered(ConnectionContext context,MessageReference messageReference) { } - public void messageDiscarded(ConnectionContext context,MessageReference messageReference) { + public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { } public void slowConsumer(ConnectionContext context,Destination destination, Subscription subs) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java index 5b2333e0cd..db88ce90bb 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java @@ -286,7 +286,7 @@ public class ErrorBroker implements Broker { throw new BrokerStoppedException(this.message); } - public void messageDiscarded(ConnectionContext context,MessageReference messageReference) { + public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { throw new BrokerStoppedException(this.message); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java index f1ce429963..d6f1670109 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java @@ -295,8 +295,8 @@ public class MutableBrokerFilter implements Broker { getNext().messageDelivered(context, messageReference); } - public void messageDiscarded(ConnectionContext context,MessageReference messageReference) { - getNext().messageDiscarded(context, messageReference); + public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { + getNext().messageDiscarded(context, sub, messageReference); } public void slowConsumer(ConnectionContext context, Destination dest, Subscription subs) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index d104fff849..34eb754013 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -429,9 +429,9 @@ public abstract class BaseDestination implements Destination { * @param context * @param messageReference */ - public void messageDiscarded(ConnectionContext context, MessageReference messageReference) { + public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { if (advisoryForDiscardingMessages) { - broker.messageDiscarded(context, messageReference); + broker.messageDiscarded(context, sub, messageReference); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java index 18cf83c140..13953419fc 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java @@ -177,8 +177,9 @@ public interface Destination extends Service, Task { * * @param context * @param messageReference + * @param sub */ - void messageDiscarded(ConnectionContext context, MessageReference messageReference); + void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference); /** * Called when there is a slow consumer diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java index 446356172f..c9856dad5a 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java @@ -239,8 +239,8 @@ public class DestinationFilter implements Destination { next.messageDelivered(context, messageReference); } - public void messageDiscarded(ConnectionContext context, MessageReference messageReference) { - next.messageDiscarded(context, messageReference); + public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { + next.messageDiscarded(context, sub, messageReference); } public void slowConsumer(ConnectionContext context, Subscription subs) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index cb8f082f68..86772c19dd 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -463,7 +463,7 @@ public class TopicSubscription extends AbstractSubscription { } Destination dest = message.getRegionDestination(); if (dest != null) { - dest.messageDiscarded(getContext(), message); + dest.messageDiscarded(getContext(), this, message); } broker.getRoot().sendToDeadLetterQueue(getContext(), message); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java b/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java index 5775fd275b..d0fe8e1dcc 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java @@ -579,7 +579,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { } @Override - public void messageDiscarded(ConnectionContext context, + public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { if (isLogAll() || isLogInternalEvents()) { String msg = "Unable to display message."; @@ -589,7 +589,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { } LOG.info("Message discarded : " + msg); } - super.messageDiscarded(context, messageReference); + super.messageDiscarded(context, sub, messageReference); } @Override diff --git a/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java b/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java index 2698bb8fc5..4b875baa45 100644 --- a/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java @@ -18,6 +18,7 @@ package org.apache.activemq; import static junit.framework.Assert.fail; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.util.ArrayList; @@ -30,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; import javax.jms.ConnectionFactory; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; @@ -37,6 +39,7 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.Topic; +import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy; @@ -47,6 +50,8 @@ import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.util.ThreadTracker; import org.apache.activemq.util.Wait; import org.apache.commons.logging.Log; @@ -61,7 +66,8 @@ public class MessageEvictionTest { Connection connection; private Session session; private Topic destination; - protected int numMessages = 4000; + private final String destinationName = "verifyEvection"; + protected int numMessages = 2000; protected String payload = new String(new byte[1024*2]); public void setUp(PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy) throws Exception { @@ -71,7 +77,7 @@ public class MessageEvictionTest { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - destination = session.createTopic("verifyEvection"); + destination = session.createTopic(destinationName); } @After @@ -83,16 +89,69 @@ public class MessageEvictionTest { @Test public void testMessageEvictionMemoryUsageFileCursor() throws Exception { - doTestMessageEvictionMemoryUsage(new FilePendingSubscriberMessageStoragePolicy()); + setUp(new FilePendingSubscriberMessageStoragePolicy()); + doTestMessageEvictionMemoryUsage(); } @Test public void testMessageEvictionMemoryUsageVmCursor() throws Exception { - doTestMessageEvictionMemoryUsage(new VMPendingSubscriberMessageStoragePolicy()); + setUp(new VMPendingSubscriberMessageStoragePolicy()); + doTestMessageEvictionMemoryUsage(); } - public void doTestMessageEvictionMemoryUsage(PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy) throws Exception { - setUp(pendingSubscriberPolicy); + @Test + public void testMessageEvictionDiscardedAdvisory() throws Exception { + setUp(new VMPendingSubscriberMessageStoragePolicy()); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + final CountDownLatch consumerRegistered = new CountDownLatch(1); + final CountDownLatch gotAdvisory = new CountDownLatch(1); + final CountDownLatch advisoryIsGood = new CountDownLatch(1); + + executor.execute(new Runnable() { + public void run() { + try { + ActiveMQTopic discardedAdvisoryDestination = + AdvisorySupport.getMessageDiscardedAdvisoryTopic(destination); + // use separate session rather than asyncDispatch on consumer session + // as we want consumer session to block + Session advisorySession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = advisorySession.createConsumer(discardedAdvisoryDestination); + consumer.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + try { + LOG.info("advisory:" + message); + ActiveMQMessage activeMQMessage = (ActiveMQMessage) message; + assertNotNull(activeMQMessage.getStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID)); + assertEquals(1, activeMQMessage.getIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT)); + message.acknowledge(); + advisoryIsGood.countDown(); + } catch (JMSException e) { + e.printStackTrace(); + fail(e.toString()); + } finally { + gotAdvisory.countDown(); + } + } + }); + consumerRegistered.countDown(); + gotAdvisory.await(120, TimeUnit.SECONDS); + consumer.close(); + advisorySession.close(); + } catch (Exception e) { + e.printStackTrace(); + fail(e.toString()); + } + } + }); + assertTrue("we have an advisory consumer", consumerRegistered.await(60, TimeUnit.SECONDS)); + doTestMessageEvictionMemoryUsage(); + assertTrue("got an advisory for discarded", gotAdvisory.await(0, TimeUnit.SECONDS)); + assertTrue("advisory is good",advisoryIsGood.await(0, TimeUnit.SECONDS)); + } + + public void doTestMessageEvictionMemoryUsage() throws Exception { + ExecutorService executor = Executors.newCachedThreadPool(); final CountDownLatch doAck = new CountDownLatch(1); final CountDownLatch consumerRegistered = new CountDownLatch(1); @@ -147,7 +206,7 @@ public class MessageEvictionTest { } }); - assertTrue("messages sending done", sendDone.await(90, TimeUnit.SECONDS)); + assertTrue("messages sending done", sendDone.await(180, TimeUnit.SECONDS)); assertEquals("all message were sent", numMessages, sent.get()); doAck.countDown(); @@ -175,6 +234,8 @@ public class MessageEvictionTest { final PolicyEntry entry = new PolicyEntry(); entry.setTopic(">"); + entry.setAdvisoryForDiscardingMessages(true); + // so consumer does not get over run while blocked limit the prefetch entry.setTopicPrefetch(50); @@ -204,8 +265,6 @@ public class MessageEvictionTest { policyMap.setPolicyEntries(policyEntries); brokerService.setDestinationPolicy(policyMap); - brokerService.setAdvisorySupport(false); - return brokerService; }