mirror of https://github.com/apache/activemq.git
added test case and patch that shows adding producers & sending messages didn't trigger advisories
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@641548 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
cdfb3c0691
commit
3cbee72a9c
|
@ -333,7 +333,9 @@ public class RegionBroker implements Broker {
|
|||
throws Exception {
|
||||
ActiveMQDestination destination = info.getDestination();
|
||||
if (destination != null) {
|
||||
addDestination(context, destination);
|
||||
|
||||
// This seems to cause the destination to be added but without advisories firing...
|
||||
context.getBroker().addDestination(context, destination);
|
||||
switch (destination.getDestinationType()) {
|
||||
case ActiveMQDestination.QUEUE_TYPE:
|
||||
queueRegion.addProducer(context, info);
|
||||
|
@ -422,7 +424,7 @@ public class RegionBroker implements Broker {
|
|||
if (producerExchange.isMutable() || producerExchange.getRegion() == null) {
|
||||
ActiveMQDestination destination = message.getDestination();
|
||||
// ensure the destination is registered with the RegionBroker
|
||||
addDestination(producerExchange.getConnectionContext(), destination);
|
||||
producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(), destination);
|
||||
Region region = null;
|
||||
switch (destination.getDestinationType()) {
|
||||
case ActiveMQDestination.QUEUE_TYPE:
|
||||
|
|
|
@ -16,8 +16,14 @@
|
|||
*/
|
||||
package org.apache.activemq.advisory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.jms.Session;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnection;
|
||||
import org.apache.activemq.EmbeddedBrokerTestSupport;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
|
@ -26,18 +32,20 @@ import org.apache.activemq.command.ActiveMQQueue;
|
|||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
/**
|
||||
* @version $Revision$
|
||||
*/
|
||||
public class DestinationListenerTest extends EmbeddedBrokerTestSupport implements DestinationListener {
|
||||
private static final transient Log LOG = LogFactory.getLog(DestinationListenerTest.class);
|
||||
|
||||
protected ActiveMQConnection connection;
|
||||
protected ActiveMQQueue sampleQueue = new ActiveMQQueue("foo.bar");
|
||||
protected ActiveMQTopic sampleTopic = new ActiveMQTopic("cheese");
|
||||
protected List<ActiveMQDestination> newDestinations = new ArrayList<ActiveMQDestination>();
|
||||
|
||||
public void testDestiationSource() throws Exception {
|
||||
public void testDestiationSourceHasInitialDestinations() throws Exception {
|
||||
Thread.sleep(1000);
|
||||
|
||||
DestinationSource destinationSource = connection.getDestinationSource();
|
||||
|
@ -54,13 +62,43 @@ public class DestinationListenerTest extends EmbeddedBrokerTestSupport implement
|
|||
assertTrue("topics contains initial topic: " + queues, topics.contains(sampleTopic));
|
||||
}
|
||||
|
||||
public void testConsumerForcesNotificationOfNewDestination() throws Exception {
|
||||
// now lets cause a destination to be created
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
ActiveMQQueue newQueue = new ActiveMQQueue("Test.Cheese");
|
||||
session.createConsumer(newQueue);
|
||||
|
||||
Thread.sleep(3000);
|
||||
|
||||
assertThat(newQueue, isIn(newDestinations));
|
||||
|
||||
LOG.info("New destinations are: " + newDestinations);
|
||||
}
|
||||
|
||||
public void testProducerForcesNotificationOfNewDestination() throws Exception {
|
||||
// now lets cause a destination to be created
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
ActiveMQQueue newQueue = new ActiveMQQueue("Test.Beer");
|
||||
MessageProducer producer = session.createProducer(newQueue);
|
||||
TextMessage message = session.createTextMessage("<hello>world</hello>");
|
||||
producer.send(message);
|
||||
|
||||
Thread.sleep(3000);
|
||||
|
||||
assertThat(newQueue, isIn(newDestinations));
|
||||
|
||||
LOG.info("New destinations are: " + newDestinations);
|
||||
}
|
||||
|
||||
public void onDestinationEvent(DestinationEvent event) {
|
||||
ActiveMQDestination destination = event.getDestination();
|
||||
if (event.isAddOperation()) {
|
||||
LOG.info("Added: " + destination);
|
||||
newDestinations.add(destination);
|
||||
}
|
||||
else {
|
||||
LOG.info("Removed: " + destination);
|
||||
newDestinations.remove(destination);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue