James Strachan 2006-07-24 10:27:32 +00:00
parent 7d0d25ba32
commit 8f8fddc70c
4 changed files with 180 additions and 2 deletions

View File

@ -62,6 +62,7 @@ import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.vm.VMTransportFactory;
import org.apache.activemq.usecases.VirtualTopicPubSubTest;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.ServiceStopper;
@ -122,6 +123,7 @@ public class BrokerService implements Service, Serializable {
private AtomicBoolean started = new AtomicBoolean(false);
private BrokerPlugin[] plugins;
private boolean keepDurableSubsActive=true;
private boolean useVirtualTopics=true;
private BrokerId brokerId;
/**
@ -820,6 +822,20 @@ public class BrokerService implements Service, Serializable {
public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
this.keepDurableSubsActive = keepDurableSubsActive;
}
public boolean isUseVirtualTopics() {
return useVirtualTopics;
}
/**
* Sets whether or not
* <a href="http://incubator.apache.org/activemq/virtual-destinations.html">Virtual Topics</a>
* should be supported.
*/
public void setUseVirtualTopics(boolean useVirtualTopics) {
this.useVirtualTopics = useVirtualTopics;
}
// Implementation methods
// -------------------------------------------------------------------------
/**
@ -1013,6 +1029,9 @@ public class BrokerService implements Service, Serializable {
if (isAdvisorySupport()) {
broker = new AdvisoryBroker(broker);
}
if (isUseVirtualTopics()) {
broker = new VirtualTopicBroker(broker);
}
broker = new CompositeDestinationBroker(broker);
if (isPopulateJMSXUserID()) {
broker = new UserIDBroker(broker);

View File

@ -0,0 +1,55 @@
/*
* Copyright 2005-2006 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.Message;
import java.util.Iterator;
import java.util.Set;
/**
* Implements <a href="http://incubator.apache.org/activemq/virtual-destinations.html">Virtual Topics</a>.
*
* @version $Revision: $
*/
public class VirtualTopicBroker extends BrokerPluginSupport {
public static final String VIRTUAL_WILDCARD = "ActiveMQ.Virtual.*.";
public VirtualTopicBroker() {
}
public VirtualTopicBroker(Broker broker) {
setNext(broker);
}
public void send(ConnectionContext ctx, Message message) throws Exception {
String name = message.getDestination().getPhysicalName();
String virtualName = VIRTUAL_WILDCARD + name;
Set destinations = getDestinations(new ActiveMQQueue(virtualName));
for (Iterator iter = destinations.iterator(); iter.hasNext();) {
Destination dest = (Destination) iter.next();
dest.send(ctx, message);
}
getNext().send(ctx, message);
}
}

View File

@ -21,12 +21,15 @@ import javax.jms.MessageListener;
import java.util.ArrayList;
import java.util.List;
public class ConsumerBean implements MessageListener {
import junit.framework.Assert;
public class ConsumerBean extends Assert implements MessageListener {
private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
.getLog(ConsumerBean.class);
private List messages = new ArrayList();
private Object semaphore;
private boolean verbose;
/**
* Constructor.
@ -37,6 +40,7 @@ public class ConsumerBean implements MessageListener {
/**
* Constructor, initialized semaphore object.
*
* @param semaphore
*/
public ConsumerBean(Object semaphore) {
@ -54,10 +58,14 @@ public class ConsumerBean implements MessageListener {
/**
* Method implemented from MessageListener interface.
*
* @param message
*/
public synchronized void onMessage(Message message) {
messages.add(message);
if (verbose) {
log.info("Received: " + message);
}
synchronized (semaphore) {
semaphore.notifyAll();
}
@ -88,6 +96,7 @@ public class ConsumerBean implements MessageListener {
/**
* Used to wait for a message to arrive given a particular message count.
*
* @param messageCount
*/
public void waitForMessagesToArrive(int messageCount) {
@ -113,8 +122,26 @@ public class ConsumerBean implements MessageListener {
log.info("End of wait for " + end + " millis");
}
public void assertMessagesArrived(int total) {
waitForMessagesToArrive(total);
synchronized (this) {
int count = messages.size();
assertEquals("Messages received", total, count);
}
}
public boolean isVerbose() {
return verbose;
}
public void setVerbose(boolean verbose) {
this.verbose = verbose;
}
/**
* Identifies if the message is empty.
*
* @return
*/
protected boolean hasReceivedMessage() {
@ -123,6 +150,7 @@ public class ConsumerBean implements MessageListener {
/**
* Identifies if the message count has reached the total size of message.
*
* @param messageCount
* @return
*/

View File

@ -0,0 +1,76 @@
/*
* Copyright 2005-2006 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.spring.ConsumerBean;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
/**
*
* @version $Revision: $
*/
public class VirtualTopicPubSubTest extends EmbeddedBrokerTestSupport {
private Connection connection;
public void testVirtualTopicCreation() throws Exception {
if (connection == null) {
connection = createConnection();
}
connection.start();
ConsumerBean messageList = new ConsumerBean();
messageList.setVerbose(true);
String queueAName = "ActiveMQ.Virtual.A.TEST";
// create consumer 'cluster'
ActiveMQQueue queue1 = new ActiveMQQueue(queueAName);
ActiveMQQueue queue2 = new ActiveMQQueue(queueAName);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer c1 = session.createConsumer(queue1);
MessageConsumer c2 = session.createConsumer(queue2);
c1.setMessageListener(messageList);
c2.setMessageListener(messageList);
// create topic producer
MessageProducer producer = session.createProducer(new ActiveMQTopic("TEST"));
assertNotNull(producer);
int total = 10;
for (int i = 0; i < total; i++) {
producer.send(session.createTextMessage("message: " + i));
}
messageList.assertMessagesArrived(total);
}
protected void tearDown() throws Exception {
if (connection != null) {
connection.close();
}
super.tearDown();
}
}