From db1f6fe2b42d6d1d505ae59618b0cbc36532cef5 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Thu, 12 Jul 2012 11:58:05 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-3903 - Failed to fire fast producer advisory, reason: java.lang.NullPointerException. A generic producer does not contain a destination, so it must be obtained from the exchange. Modified the boker interface to reflect that. fixed up typo in the policy entry, advisoryForFastProducers now correctly spelled in favour of advisdoryForFastProducers git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1360642 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/advisory/AdvisoryBroker.java | 8 +- .../org/apache/activemq/broker/Broker.java | 3 +- .../apache/activemq/broker/BrokerFilter.java | 4 +- .../apache/activemq/broker/EmptyBroker.java | 2 +- .../apache/activemq/broker/ErrorBroker.java | 2 +- .../activemq/broker/MutableBrokerFilter.java | 4 +- .../broker/region/BaseDestination.java | 16 +- .../broker/region/policy/PolicyEntry.java | 14 +- .../broker/util/LoggingBrokerPlugin.java | 4 +- .../AdvisoryTempDestinationTests.java | 2 +- .../activemq/advisory/AdvisoryTests.java | 2 +- .../org/apache/activemq/bugs/AMQ3324Test.java | 2 +- .../org/apache/activemq/bugs/AMQ3903Test.java | 139 ++++++++++++++++++ .../transport/stomp/StompAdvisoryTest.java | 10 +- .../usecases/AdvisoryTopicCleanUpTest.java | 2 +- 15 files changed, 174 insertions(+), 40 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3903Test.java diff --git a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index 2b2fa3e943..48612a9852 100755 --- a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -362,11 +362,11 @@ public class AdvisoryBroker extends BrokerFilter { } @Override - public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) { - super.fastProducer(context, producerInfo); + public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) { + super.fastProducer(context, producerInfo, destination); try { - if (!AdvisorySupport.isAdvisoryTopic(producerInfo.getDestination())) { - ActiveMQTopic topic = AdvisorySupport.getFastProducerAdvisoryTopic(producerInfo.getDestination()); + if (!AdvisorySupport.isAdvisoryTopic(destination)) { + ActiveMQTopic topic = AdvisorySupport.getFastProducerAdvisoryTopic(destination); ActiveMQMessage advisoryMessage = new ActiveMQMessage(); advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_PRODUCER_ID, producerInfo.getProducerId().toString()); fireAdvisory(context, topic, producerInfo, null, advisoryMessage); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java index 46861cc7a1..3d108e9500 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java @@ -361,8 +361,9 @@ public interface Broker extends Region, Service { * Called to notify a producer is too fast * @param context * @param producerInfo + * @param destination */ - void fastProducer(ConnectionContext context,ProducerInfo producerInfo); + void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination); /** * Called when a Usage reaches a limit diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java index ec02a936a0..9438e7bf04 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java @@ -271,8 +271,8 @@ public class BrokerFilter implements Broker { } - public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) { - next.fastProducer(context, producerInfo); + public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) { + next.fastProducer(context, producerInfo, destination); } public void isFull(ConnectionContext context,Destination destination, Usage usage) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java index 9b41a7f8bd..065a1bb889 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java @@ -263,7 +263,7 @@ public class EmptyBroker implements Broker { return -1l; } - public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) { + public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) { } public void isFull(ConnectionContext context, Destination destination,Usage usage) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java index c6c2b2153d..109d3abb45 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java @@ -273,7 +273,7 @@ public class ErrorBroker implements Broker { throw new BrokerStoppedException(this.message); } - public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) { + public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) { throw new BrokerStoppedException(this.message); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java index 183d120ed5..20a2e8f01f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java @@ -282,8 +282,8 @@ public class MutableBrokerFilter implements Broker { return getNext().getBrokerSequenceId(); } - public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) { - getNext().fastProducer(context, producerInfo); + public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) { + getNext().fastProducer(context, producerInfo, destination); } public void isFull(ConnectionContext context,Destination destination, Usage usage) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index e2911a81b1..0a2daecd6a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -76,7 +76,7 @@ public abstract class BaseDestination implements Destination { private int minimumMessageSize = 1024; private boolean lazyDispatch = false; private boolean advisoryForSlowConsumers; - private boolean advisdoryForFastProducers; + private boolean advisoryForFastProducers; private boolean advisoryForDiscardingMessages; private boolean advisoryWhenFull; private boolean advisoryForDelivery; @@ -407,15 +407,15 @@ public abstract class BaseDestination implements Destination { /** * @return the advisdoryForFastProducers */ - public boolean isAdvisdoryForFastProducers() { - return advisdoryForFastProducers; + public boolean isAdvisoryForFastProducers() { + return advisoryForFastProducers; } /** - * @param advisdoryForFastProducers the advisdoryForFastProducers to set + * @param advisoryForFastProducers the advisdoryForFastProducers to set */ - public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) { - this.advisdoryForFastProducers = advisdoryForFastProducers; + public void setAdvisoryForFastProducers(boolean advisoryForFastProducers) { + this.advisoryForFastProducers = advisoryForFastProducers; } public boolean isSendAdvisoryIfNoConsumers() { @@ -509,8 +509,8 @@ public abstract class BaseDestination implements Destination { * @param producerInfo */ public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) { - if (advisdoryForFastProducers) { - broker.fastProducer(context, producerInfo); + if (advisoryForFastProducers) { + broker.fastProducer(context, producerInfo, getActiveMQDestination()); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 554f1819a1..4025914027 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -74,7 +74,7 @@ public class PolicyEntry extends DestinationMapEntry { private int timeBeforeDispatchStarts = 0; private int consumersBeforeDispatchStarts = 0; private boolean advisoryForSlowConsumers; - private boolean advisdoryForFastProducers; + private boolean advisoryForFastProducers; private boolean advisoryForDiscardingMessages; private boolean advisoryWhenFull; private boolean advisoryForDelivery; @@ -159,7 +159,7 @@ public class PolicyEntry extends DestinationMapEntry { destination.setAdvisoryForDelivery(isAdvisoryForDelivery()); destination.setAdvisoryForDiscardingMessages(isAdvisoryForDiscardingMessages()); destination.setAdvisoryForSlowConsumers(isAdvisoryForSlowConsumers()); - destination.setAdvisdoryForFastProducers(isAdvisdoryForFastProducers()); + destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers()); destination.setAdvisoryWhenFull(isAdvisoryWhenFull()); destination.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers); destination.setExpireMessagesPeriod(getExpireMessagesPeriod()); @@ -661,15 +661,15 @@ public class PolicyEntry extends DestinationMapEntry { /** * @return the advisdoryForFastProducers */ - public boolean isAdvisdoryForFastProducers() { - return advisdoryForFastProducers; + public boolean isAdvisoryForFastProducers() { + return advisoryForFastProducers; } /** - * @param advisdoryForFastProducers the advisdoryForFastProducers to set + * @param advisoryForFastProducers the advisdoryForFastProducers to set */ - public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) { - this.advisdoryForFastProducers = advisdoryForFastProducers; + public void setAdvisoryForFastProducers(boolean advisoryForFastProducers) { + this.advisoryForFastProducers = advisoryForFastProducers; } public void setMaxExpirePageSize(int maxExpirePageSize) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java b/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java index b97b24ee48..4f9b6d676b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java @@ -510,11 +510,11 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { } @Override - public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) { + public void fastProducer(ConnectionContext context, ProducerInfo producerInfo,ActiveMQDestination destination) { if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) { LOG.info("Fast Producer : " + producerInfo); } - super.fastProducer(context, producerInfo); + super.fastProducer(context, producerInfo, destination); } @Override diff --git a/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java b/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java index e48815d44b..76af309d60 100644 --- a/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java +++ b/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java @@ -200,7 +200,7 @@ public class AdvisoryTempDestinationTests extends TestCase { private PolicyEntry createPolicyEntry(ConstantPendingMessageLimitStrategy strategy) { PolicyEntry policy = new PolicyEntry(); - policy.setAdvisdoryForFastProducers(true); + policy.setAdvisoryForFastProducers(true); policy.setAdvisoryForConsumed(true); policy.setAdvisoryForDelivery(true); policy.setAdvisoryForDiscardingMessages(true); diff --git a/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java b/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java index 2dee188e42..b04871fc96 100644 --- a/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java +++ b/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java @@ -210,7 +210,7 @@ public class AdvisoryTests extends TestCase { protected void configureBroker(BrokerService answer) throws Exception { answer.setPersistent(false); PolicyEntry policy = new PolicyEntry(); - policy.setAdvisdoryForFastProducers(true); + policy.setAdvisoryForFastProducers(true); policy.setAdvisoryForConsumed(true); policy.setAdvisoryForDelivery(true); policy.setAdvisoryForDiscardingMessages(true); diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java index d4026da772..a1e9b93141 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java @@ -130,7 +130,7 @@ public class AMQ3324Test { entry.setInactiveTimoutBeforeGC(2000); entry.setProducerFlowControl(true); entry.setAdvisoryForConsumed(true); - entry.setAdvisdoryForFastProducers(true); + entry.setAdvisoryForFastProducers(true); entry.setAdvisoryForDelivery(true); PolicyMap map = new PolicyMap(); map.setDefaultEntry(entry); diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3903Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3903Test.java new file mode 100644 index 0000000000..cdda378ed8 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3903Test.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.ResourceAllocationException; +import javax.jms.Session; +import javax.jms.TemporaryQueue; +import javax.jms.Topic; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.DestinationInterceptor; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.virtual.MirroredQueue; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class AMQ3903Test { + + private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3903Test.class); + + private static final String bindAddress = "tcp://0.0.0.0:0"; + private BrokerService broker; + private ActiveMQConnectionFactory cf; + + private static final int MESSAGE_COUNT = 100; + + @Before + public void setUp() throws Exception { + broker = this.createBroker(); + String address = broker.getTransportConnectors().get(0).getPublishableConnectString(); + broker.start(); + broker.waitUntilStarted(); + + cf = new ActiveMQConnectionFactory(address); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + } + + @Test + public void testAdvisoryForFastGenericProducer() throws Exception { + doTestAdvisoryForFastProducer(true); + } + + @Test + public void testAdvisoryForFastDedicatedProducer() throws Exception { + doTestAdvisoryForFastProducer(false); + } + + public void doTestAdvisoryForFastProducer(boolean genericProducer) throws Exception { + + Connection connection = cf.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + final TemporaryQueue queue = session.createTemporaryQueue(); + + final Topic advisoryTopic = AdvisorySupport.getFastProducerAdvisoryTopic((ActiveMQDestination) queue); + + MessageConsumer advisoryConsumer = session.createConsumer(advisoryTopic); + MessageProducer producer = session.createProducer(genericProducer ? null : queue); + + try { + // send lots of messages to the tempQueue + for (int i = 0; i < MESSAGE_COUNT; i++) { + BytesMessage m = session.createBytesMessage(); + m.writeBytes(new byte[1024]); + if (genericProducer) { + producer.send(queue, m, DeliveryMode.PERSISTENT, 4, 0); + } else { + producer.send(m); + } + } + } catch (ResourceAllocationException expectedOnLimitReachedAfterFastAdvisory) {} + + // check one advisory message has produced on the advisoryTopic + Message advCmsg = advisoryConsumer.receive(4000); + assertNotNull(advCmsg); + + + connection.close(); + LOG.debug("Connection closed, destinations should now become inactive."); + } + + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + answer.setPersistent(false); + answer.setUseJmx(false); + + PolicyEntry entry = new PolicyEntry(); + entry.setAdvisoryForFastProducers(true); + entry.setMemoryLimit(10000); + PolicyMap map = new PolicyMap(); + map.setDefaultEntry(entry); + + answer.setDestinationPolicy(map); + answer.addConnector(bindAddress); + + answer.getSystemUsage().setSendFailIfNoSpace(true); + + return answer; + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java index 50dcbc325b..2d53e530b3 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java @@ -20,27 +20,21 @@ package org.apache.activemq.transport.stomp; import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.JmsTestSupport; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.*; import java.io.File; -import java.io.IOException; import java.net.Socket; import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; /** @@ -59,7 +53,7 @@ public class StompAdvisoryTest extends TestCase { private PolicyEntry createPolicyEntry() { PolicyEntry policy = new PolicyEntry(); - policy.setAdvisdoryForFastProducers(true); + policy.setAdvisoryForFastProducers(true); policy.setAdvisoryForConsumed(true); policy.setAdvisoryForDelivery(true); policy.setAdvisoryForDiscardingMessages(true); @@ -78,7 +72,7 @@ public class StompAdvisoryTest extends TestCase { broker.setPersistent(false); PolicyEntry policy = new PolicyEntry(); - policy.setAdvisdoryForFastProducers(true); + policy.setAdvisoryForFastProducers(true); policy.setAdvisoryForConsumed(true); policy.setAdvisoryForDelivery(true); policy.setAdvisoryForDiscardingMessages(true); diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/AdvisoryTopicCleanUpTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/AdvisoryTopicCleanUpTest.java index abfbc55fb5..1fcb0d1a68 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/AdvisoryTopicCleanUpTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/AdvisoryTopicCleanUpTest.java @@ -62,7 +62,7 @@ public class AdvisoryTopicCleanUpTest { connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString(); PolicyEntry policy = new PolicyEntry(); - policy.setAdvisdoryForFastProducers(true); + policy.setAdvisoryForFastProducers(true); policy.setAdvisoryForConsumed(true); policy.setAdvisoryForDelivery(true); policy.setAdvisoryForDiscardingMessages(true);