https://issues.apache.org/jira/browse/AMQ-3903 - Failed to fire fast producer advisory, reason: java.lang.NullPointerException. A generic producer does not contain a destination, so it must be obtained from the exchange. Modified the boker interface to reflect that. fixed up typo in the policy entry, advisoryForFastProducers now correctly spelled in favour of advisdoryForFastProducers

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1360642 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2012-07-12 11:58:05 +00:00
parent d4742f597b
commit db1f6fe2b4
15 changed files with 174 additions and 40 deletions

View File

@ -362,11 +362,11 @@ public class AdvisoryBroker extends BrokerFilter {
} }
@Override @Override
public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) { public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) {
super.fastProducer(context, producerInfo); super.fastProducer(context, producerInfo, destination);
try { try {
if (!AdvisorySupport.isAdvisoryTopic(producerInfo.getDestination())) { if (!AdvisorySupport.isAdvisoryTopic(destination)) {
ActiveMQTopic topic = AdvisorySupport.getFastProducerAdvisoryTopic(producerInfo.getDestination()); ActiveMQTopic topic = AdvisorySupport.getFastProducerAdvisoryTopic(destination);
ActiveMQMessage advisoryMessage = new ActiveMQMessage(); ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_PRODUCER_ID, producerInfo.getProducerId().toString()); advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_PRODUCER_ID, producerInfo.getProducerId().toString());
fireAdvisory(context, topic, producerInfo, null, advisoryMessage); fireAdvisory(context, topic, producerInfo, null, advisoryMessage);

View File

@ -361,8 +361,9 @@ public interface Broker extends Region, Service {
* Called to notify a producer is too fast * Called to notify a producer is too fast
* @param context * @param context
* @param producerInfo * @param producerInfo
* @param destination
*/ */
void fastProducer(ConnectionContext context,ProducerInfo producerInfo); void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination);
/** /**
* Called when a Usage reaches a limit * Called when a Usage reaches a limit

View File

@ -271,8 +271,8 @@ public class BrokerFilter implements Broker {
} }
public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) { public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) {
next.fastProducer(context, producerInfo); next.fastProducer(context, producerInfo, destination);
} }
public void isFull(ConnectionContext context,Destination destination, Usage usage) { public void isFull(ConnectionContext context,Destination destination, Usage usage) {

View File

@ -263,7 +263,7 @@ public class EmptyBroker implements Broker {
return -1l; return -1l;
} }
public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) { public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) {
} }
public void isFull(ConnectionContext context, Destination destination,Usage usage) { public void isFull(ConnectionContext context, Destination destination,Usage usage) {

View File

@ -273,7 +273,7 @@ public class ErrorBroker implements Broker {
throw new BrokerStoppedException(this.message); throw new BrokerStoppedException(this.message);
} }
public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) { public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) {
throw new BrokerStoppedException(this.message); throw new BrokerStoppedException(this.message);
} }

View File

@ -282,8 +282,8 @@ public class MutableBrokerFilter implements Broker {
return getNext().getBrokerSequenceId(); return getNext().getBrokerSequenceId();
} }
public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) { public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) {
getNext().fastProducer(context, producerInfo); getNext().fastProducer(context, producerInfo, destination);
} }
public void isFull(ConnectionContext context,Destination destination, Usage usage) { public void isFull(ConnectionContext context,Destination destination, Usage usage) {

View File

@ -76,7 +76,7 @@ public abstract class BaseDestination implements Destination {
private int minimumMessageSize = 1024; private int minimumMessageSize = 1024;
private boolean lazyDispatch = false; private boolean lazyDispatch = false;
private boolean advisoryForSlowConsumers; private boolean advisoryForSlowConsumers;
private boolean advisdoryForFastProducers; private boolean advisoryForFastProducers;
private boolean advisoryForDiscardingMessages; private boolean advisoryForDiscardingMessages;
private boolean advisoryWhenFull; private boolean advisoryWhenFull;
private boolean advisoryForDelivery; private boolean advisoryForDelivery;
@ -407,15 +407,15 @@ public abstract class BaseDestination implements Destination {
/** /**
* @return the advisdoryForFastProducers * @return the advisdoryForFastProducers
*/ */
public boolean isAdvisdoryForFastProducers() { public boolean isAdvisoryForFastProducers() {
return advisdoryForFastProducers; return advisoryForFastProducers;
} }
/** /**
* @param advisdoryForFastProducers the advisdoryForFastProducers to set * @param advisoryForFastProducers the advisdoryForFastProducers to set
*/ */
public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) { public void setAdvisoryForFastProducers(boolean advisoryForFastProducers) {
this.advisdoryForFastProducers = advisdoryForFastProducers; this.advisoryForFastProducers = advisoryForFastProducers;
} }
public boolean isSendAdvisoryIfNoConsumers() { public boolean isSendAdvisoryIfNoConsumers() {
@ -509,8 +509,8 @@ public abstract class BaseDestination implements Destination {
* @param producerInfo * @param producerInfo
*/ */
public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) { public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
if (advisdoryForFastProducers) { if (advisoryForFastProducers) {
broker.fastProducer(context, producerInfo); broker.fastProducer(context, producerInfo, getActiveMQDestination());
} }
} }

View File

@ -74,7 +74,7 @@ public class PolicyEntry extends DestinationMapEntry {
private int timeBeforeDispatchStarts = 0; private int timeBeforeDispatchStarts = 0;
private int consumersBeforeDispatchStarts = 0; private int consumersBeforeDispatchStarts = 0;
private boolean advisoryForSlowConsumers; private boolean advisoryForSlowConsumers;
private boolean advisdoryForFastProducers; private boolean advisoryForFastProducers;
private boolean advisoryForDiscardingMessages; private boolean advisoryForDiscardingMessages;
private boolean advisoryWhenFull; private boolean advisoryWhenFull;
private boolean advisoryForDelivery; private boolean advisoryForDelivery;
@ -159,7 +159,7 @@ public class PolicyEntry extends DestinationMapEntry {
destination.setAdvisoryForDelivery(isAdvisoryForDelivery()); destination.setAdvisoryForDelivery(isAdvisoryForDelivery());
destination.setAdvisoryForDiscardingMessages(isAdvisoryForDiscardingMessages()); destination.setAdvisoryForDiscardingMessages(isAdvisoryForDiscardingMessages());
destination.setAdvisoryForSlowConsumers(isAdvisoryForSlowConsumers()); destination.setAdvisoryForSlowConsumers(isAdvisoryForSlowConsumers());
destination.setAdvisdoryForFastProducers(isAdvisdoryForFastProducers()); destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers());
destination.setAdvisoryWhenFull(isAdvisoryWhenFull()); destination.setAdvisoryWhenFull(isAdvisoryWhenFull());
destination.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers); destination.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
destination.setExpireMessagesPeriod(getExpireMessagesPeriod()); destination.setExpireMessagesPeriod(getExpireMessagesPeriod());
@ -661,15 +661,15 @@ public class PolicyEntry extends DestinationMapEntry {
/** /**
* @return the advisdoryForFastProducers * @return the advisdoryForFastProducers
*/ */
public boolean isAdvisdoryForFastProducers() { public boolean isAdvisoryForFastProducers() {
return advisdoryForFastProducers; return advisoryForFastProducers;
} }
/** /**
* @param advisdoryForFastProducers the advisdoryForFastProducers to set * @param advisoryForFastProducers the advisdoryForFastProducers to set
*/ */
public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) { public void setAdvisoryForFastProducers(boolean advisoryForFastProducers) {
this.advisdoryForFastProducers = advisdoryForFastProducers; this.advisoryForFastProducers = advisoryForFastProducers;
} }
public void setMaxExpirePageSize(int maxExpirePageSize) { public void setMaxExpirePageSize(int maxExpirePageSize) {

View File

@ -510,11 +510,11 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
} }
@Override @Override
public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) { public void fastProducer(ConnectionContext context, ProducerInfo producerInfo,ActiveMQDestination destination) {
if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) { if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) {
LOG.info("Fast Producer : " + producerInfo); LOG.info("Fast Producer : " + producerInfo);
} }
super.fastProducer(context, producerInfo); super.fastProducer(context, producerInfo, destination);
} }
@Override @Override

View File

@ -200,7 +200,7 @@ public class AdvisoryTempDestinationTests extends TestCase {
private PolicyEntry createPolicyEntry(ConstantPendingMessageLimitStrategy strategy) { private PolicyEntry createPolicyEntry(ConstantPendingMessageLimitStrategy strategy) {
PolicyEntry policy = new PolicyEntry(); PolicyEntry policy = new PolicyEntry();
policy.setAdvisdoryForFastProducers(true); policy.setAdvisoryForFastProducers(true);
policy.setAdvisoryForConsumed(true); policy.setAdvisoryForConsumed(true);
policy.setAdvisoryForDelivery(true); policy.setAdvisoryForDelivery(true);
policy.setAdvisoryForDiscardingMessages(true); policy.setAdvisoryForDiscardingMessages(true);

View File

@ -210,7 +210,7 @@ public class AdvisoryTests extends TestCase {
protected void configureBroker(BrokerService answer) throws Exception { protected void configureBroker(BrokerService answer) throws Exception {
answer.setPersistent(false); answer.setPersistent(false);
PolicyEntry policy = new PolicyEntry(); PolicyEntry policy = new PolicyEntry();
policy.setAdvisdoryForFastProducers(true); policy.setAdvisoryForFastProducers(true);
policy.setAdvisoryForConsumed(true); policy.setAdvisoryForConsumed(true);
policy.setAdvisoryForDelivery(true); policy.setAdvisoryForDelivery(true);
policy.setAdvisoryForDiscardingMessages(true); policy.setAdvisoryForDiscardingMessages(true);

View File

@ -130,7 +130,7 @@ public class AMQ3324Test {
entry.setInactiveTimoutBeforeGC(2000); entry.setInactiveTimoutBeforeGC(2000);
entry.setProducerFlowControl(true); entry.setProducerFlowControl(true);
entry.setAdvisoryForConsumed(true); entry.setAdvisoryForConsumed(true);
entry.setAdvisdoryForFastProducers(true); entry.setAdvisoryForFastProducers(true);
entry.setAdvisoryForDelivery(true); entry.setAdvisoryForDelivery(true);
PolicyMap map = new PolicyMap(); PolicyMap map = new PolicyMap();
map.setDefaultEntry(entry); map.setDefaultEntry(entry);

View File

@ -0,0 +1,139 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ResourceAllocationException;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.virtual.MirroredQueue;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class AMQ3903Test {
private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3903Test.class);
private static final String bindAddress = "tcp://0.0.0.0:0";
private BrokerService broker;
private ActiveMQConnectionFactory cf;
private static final int MESSAGE_COUNT = 100;
@Before
public void setUp() throws Exception {
broker = this.createBroker();
String address = broker.getTransportConnectors().get(0).getPublishableConnectString();
broker.start();
broker.waitUntilStarted();
cf = new ActiveMQConnectionFactory(address);
}
@After
public void tearDown() throws Exception {
if (broker != null) {
broker.stop();
broker.waitUntilStopped();
}
}
@Test
public void testAdvisoryForFastGenericProducer() throws Exception {
doTestAdvisoryForFastProducer(true);
}
@Test
public void testAdvisoryForFastDedicatedProducer() throws Exception {
doTestAdvisoryForFastProducer(false);
}
public void doTestAdvisoryForFastProducer(boolean genericProducer) throws Exception {
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final TemporaryQueue queue = session.createTemporaryQueue();
final Topic advisoryTopic = AdvisorySupport.getFastProducerAdvisoryTopic((ActiveMQDestination) queue);
MessageConsumer advisoryConsumer = session.createConsumer(advisoryTopic);
MessageProducer producer = session.createProducer(genericProducer ? null : queue);
try {
// send lots of messages to the tempQueue
for (int i = 0; i < MESSAGE_COUNT; i++) {
BytesMessage m = session.createBytesMessage();
m.writeBytes(new byte[1024]);
if (genericProducer) {
producer.send(queue, m, DeliveryMode.PERSISTENT, 4, 0);
} else {
producer.send(m);
}
}
} catch (ResourceAllocationException expectedOnLimitReachedAfterFastAdvisory) {}
// check one advisory message has produced on the advisoryTopic
Message advCmsg = advisoryConsumer.receive(4000);
assertNotNull(advCmsg);
connection.close();
LOG.debug("Connection closed, destinations should now become inactive.");
}
protected BrokerService createBroker() throws Exception {
BrokerService answer = new BrokerService();
answer.setPersistent(false);
answer.setUseJmx(false);
PolicyEntry entry = new PolicyEntry();
entry.setAdvisoryForFastProducers(true);
entry.setMemoryLimit(10000);
PolicyMap map = new PolicyMap();
map.setDefaultEntry(entry);
answer.setDestinationPolicy(map);
answer.addConnector(bindAddress);
answer.getSystemUsage().setSendFailIfNoSpace(true);
return answer;
}
}

View File

@ -20,27 +20,21 @@ package org.apache.activemq.transport.stomp;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsTestSupport;
import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy; import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.URISupport; import org.apache.activemq.util.URISupport;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.jms.*; import javax.jms.*;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.net.Socket; import java.net.Socket;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
@ -59,7 +53,7 @@ public class StompAdvisoryTest extends TestCase {
private PolicyEntry createPolicyEntry() { private PolicyEntry createPolicyEntry() {
PolicyEntry policy = new PolicyEntry(); PolicyEntry policy = new PolicyEntry();
policy.setAdvisdoryForFastProducers(true); policy.setAdvisoryForFastProducers(true);
policy.setAdvisoryForConsumed(true); policy.setAdvisoryForConsumed(true);
policy.setAdvisoryForDelivery(true); policy.setAdvisoryForDelivery(true);
policy.setAdvisoryForDiscardingMessages(true); policy.setAdvisoryForDiscardingMessages(true);
@ -78,7 +72,7 @@ public class StompAdvisoryTest extends TestCase {
broker.setPersistent(false); broker.setPersistent(false);
PolicyEntry policy = new PolicyEntry(); PolicyEntry policy = new PolicyEntry();
policy.setAdvisdoryForFastProducers(true); policy.setAdvisoryForFastProducers(true);
policy.setAdvisoryForConsumed(true); policy.setAdvisoryForConsumed(true);
policy.setAdvisoryForDelivery(true); policy.setAdvisoryForDelivery(true);
policy.setAdvisoryForDiscardingMessages(true); policy.setAdvisoryForDiscardingMessages(true);

View File

@ -62,7 +62,7 @@ public class AdvisoryTopicCleanUpTest {
connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString(); connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
PolicyEntry policy = new PolicyEntry(); PolicyEntry policy = new PolicyEntry();
policy.setAdvisdoryForFastProducers(true); policy.setAdvisoryForFastProducers(true);
policy.setAdvisoryForConsumed(true); policy.setAdvisoryForConsumed(true);
policy.setAdvisoryForDelivery(true); policy.setAdvisoryForDelivery(true);
policy.setAdvisoryForDiscardingMessages(true); policy.setAdvisoryForDiscardingMessages(true);