refactored the previous implementation of virtual topics to use DestinationInterceptors instead so that there is no cost to virtual topics unless you are using them. Also added support for CompositeQueue/CompositeTopics too

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@426060 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-07-27 13:12:46 +00:00
parent f2a80acac5
commit da67a38696
18 changed files with 746 additions and 68 deletions

View File

@ -33,8 +33,11 @@ import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.jmx.NetworkConnectorView;
import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
import org.apache.activemq.broker.jmx.ProxyConnectorView;
import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.virtual.*;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.network.ConnectionFilter;
@ -124,6 +127,7 @@ public class BrokerService implements Service, Serializable {
private boolean keepDurableSubsActive=true;
private boolean useVirtualTopics=true;
private BrokerId brokerId;
private DestinationInterceptor[] destinationInterceptors;
/**
* Adds a new transport connector for the given bind address
@ -1004,20 +1008,44 @@ public class BrokerService implements Service, Serializable {
// broker
getPersistenceAdapter().setUsageManager(getMemoryManager());
getPersistenceAdapter().start();
DestinationInterceptor destinationInterceptor = null;
if (destinationInterceptors != null) {
destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors);
}
else {
destinationInterceptor = createDefaultDestinationInterceptor();
}
RegionBroker regionBroker = null;
if (isUseJmx()) {
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
regionBroker = new ManagedRegionBroker(this, mbeanServer, getBrokerObjectName(), getTaskRunnerFactory(), getMemoryManager(),
getPersistenceAdapter());
getPersistenceAdapter(), destinationInterceptor);
}
else {
regionBroker = new RegionBroker(this,getTaskRunnerFactory(), getMemoryManager(), getPersistenceAdapter());
regionBroker = new RegionBroker(this,getTaskRunnerFactory(), getMemoryManager(), getPersistenceAdapter(), destinationInterceptor);
}
regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
regionBroker.setBrokerName(getBrokerName());
return regionBroker;
}
/**
* Create the default destination interceptor
*/
protected DestinationInterceptor createDefaultDestinationInterceptor() {
if (! isUseVirtualTopics()) {
return null;
}
VirtualDestinationInterceptor answer = new VirtualDestinationInterceptor();
VirtualTopic virtualTopic = new VirtualTopic();
virtualTopic.setName("VirtualTopic.>");
VirtualDestination[] virtualDestinations = { virtualTopic };
answer.setVirtualDestinations(virtualDestinations);
return answer;
}
/**
* Strategy method to add interceptors to the broker
*
@ -1028,9 +1056,6 @@ public class BrokerService implements Service, Serializable {
if (isAdvisorySupport()) {
broker = new AdvisoryBroker(broker);
}
if (isUseVirtualTopics()) {
broker = new VirtualTopicBroker(broker);
}
broker = new CompositeDestinationBroker(broker);
if (isPopulateJMSXUserID()) {
broker = new UserIDBroker(broker);

View File

@ -1,55 +0,0 @@
/*
* Copyright 2005-2006 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.Message;
import java.util.Iterator;
import java.util.Set;
/**
* Implements <a href="http://incubator.apache.org/activemq/virtual-destinations.html">Virtual Topics</a>.
*
* @version $Revision: $
*/
public class VirtualTopicBroker extends BrokerPluginSupport {
public static final String VIRTUAL_WILDCARD = "ActiveMQ.Virtual.*.";
public VirtualTopicBroker() {
}
public VirtualTopicBroker(Broker broker) {
setNext(broker);
}
public void send(ConnectionContext ctx, Message message) throws Exception {
String name = message.getDestination().getPhysicalName();
String virtualName = VIRTUAL_WILDCARD + name;
Set destinations = getDestinations(new ActiveMQQueue(virtualName));
for (Iterator iter = destinations.iterator(); iter.hasNext();) {
Destination dest = (Destination) iter.next();
dest.send(ctx, message);
}
getNext().send(ctx, message);
}
}

View File

@ -38,6 +38,7 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Region;
import org.apache.activemq.broker.region.RegionBroker;
@ -84,9 +85,9 @@ public class ManagedRegionBroker extends RegionBroker {
private Broker contextBroker;
public ManagedRegionBroker(BrokerService brokerService,MBeanServer mbeanServer,ObjectName brokerObjectName,
TaskRunnerFactory taskRunnerFactory,UsageManager memoryManager,PersistenceAdapter adapter)
TaskRunnerFactory taskRunnerFactory,UsageManager memoryManager,PersistenceAdapter adapter, DestinationInterceptor destinationInterceptor)
throws IOException{
super(brokerService,taskRunnerFactory,memoryManager,adapter);
super(brokerService,taskRunnerFactory,memoryManager,adapter, destinationInterceptor);
this.mbeanServer=mbeanServer;
this.brokerObjectName=brokerObjectName;
}

View File

@ -75,6 +75,13 @@ abstract public class AbstractRegion implements Region {
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
log.debug("Adding destination: "+destination);
Destination dest = createDestination(context, destination);
// intercept if there is a valid interceptor defined
DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
if (destinationInterceptor != null) {
dest = destinationInterceptor.intercept(dest);
}
dest.start();
synchronized(destinationsMutex){
destinations.put(destination,dest);
@ -293,7 +300,7 @@ abstract public class AbstractRegion implements Region {
protected abstract Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws Exception;
abstract protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception;
public boolean isAutoCreateDestinations() {
public boolean isAutoCreateDestinations() {
return autoCreateDestinations;
}

View File

@ -0,0 +1,39 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region;
/**
* Represents a Composite Pattern of a {@link DestinationInterceptor}
*
* @version $Revision$
*/
public class CompositeDestinationInterceptor implements DestinationInterceptor {
private final DestinationInterceptor[] interceptors;
public CompositeDestinationInterceptor(final DestinationInterceptor[] interceptors) {
this.interceptors = interceptors;
}
public Destination intercept(Destination destination) {
for (int i = 0; i < interceptors.length; i++) {
destination = interceptors[i].intercept(destination);
}
return destination;
}
}

View File

@ -0,0 +1,161 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageStore;
import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
/**
*
* @version $Revision$
*/
public class DestinationFilter implements Destination {
private Destination next;
public DestinationFilter(Destination next) {
this.next = next;
}
public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException {
next.acknowledge(context, sub, ack, node);
}
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
next.addSubscription(context, sub);
}
public Message[] browse() {
return next.browse();
}
public void dispose(ConnectionContext context) throws IOException {
next.dispose(context);
}
public void gc() {
next.gc();
}
public ActiveMQDestination getActiveMQDestination() {
return next.getActiveMQDestination();
}
public long getConsumerCount() {
return next.getConsumerCount();
}
public DeadLetterStrategy getDeadLetterStrategy() {
return next.getDeadLetterStrategy();
}
public long getDequeueCount() {
return next.getDequeueCount();
}
public DestinationStatistics getDestinationStatistics() {
return next.getDestinationStatistics();
}
public long getEnqueueCount() {
return next.getEnqueueCount();
}
public long getMemoryLimit() {
return next.getMemoryLimit();
}
public int getMemoryPercentageUsed() {
return next.getMemoryPercentageUsed();
}
public long getMessagesCached() {
return next.getMessagesCached();
}
public MessageStore getMessageStore() {
return next.getMessageStore();
}
public String getName() {
return next.getName();
}
public long getQueueSize() {
return next.getQueueSize();
}
public UsageManager getUsageManager() {
return next.getUsageManager();
}
public Message loadMessage(MessageId messageId) throws IOException {
return next.loadMessage(messageId);
}
public boolean lock(MessageReference node, LockOwner lockOwner) {
return next.lock(node, lockOwner);
}
public void removeSubscription(ConnectionContext context, Subscription sub) throws Exception {
next.removeSubscription(context, sub);
}
public void resetStatistics() {
next.resetStatistics();
}
public void send(ConnectionContext context, Message messageSend) throws Exception {
next.send(context, messageSend);
}
public void setMemoryLimit(long limit) {
next.setMemoryLimit(limit);
}
public void start() throws Exception {
next.start();
}
public void stop() throws Exception {
next.stop();
}
/**
* Sends a message to the given destination which may be a wildcard
*/
protected void send(ConnectionContext context, Message message, ActiveMQDestination destination) throws Exception {
Broker broker = context.getBroker();
Set destinations = broker.getDestinations(destination);
for (Iterator iter = destinations.iterator(); iter.hasNext();) {
Destination dest = (Destination) iter.next();
dest.send(context, message);
}
}
}

View File

@ -0,0 +1,29 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region;
/**
* Represents an interceptor on destination instances.
*
* @version $Revision$
*/
public interface DestinationInterceptor {
Destination intercept(Destination destination);
}

View File

@ -84,10 +84,11 @@ public class RegionBroker implements Broker {
private String brokerName;
private Map clientIdSet = new HashMap(); // we will synchronize access
protected PersistenceAdapter adaptor;
private final DestinationInterceptor destinationInterceptor;
public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter) throws IOException {
public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter, DestinationInterceptor destinationInterceptor) throws IOException {
this.brokerService = brokerService;
this.destinationInterceptor = destinationInterceptor;
this.sequenceGenerator.setLastSequenceId( adapter.getLastMessageBrokerSequenceId() );
this.adaptor = adapter;//weird - both are valid spellings ...
queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, adapter);
@ -528,7 +529,9 @@ public class RegionBroker implements Broker {
this.keepDurableSubsActive = keepDurableSubsActive;
}
public DestinationInterceptor getDestinationInterceptor() {
return destinationInterceptor;
}
}

View File

@ -0,0 +1,71 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.virtual;
import org.apache.activemq.broker.region.Destination;
import java.util.Collection;
/**
*
* @version $Revision$
*/
public abstract class CompositeDestination implements VirtualDestination {
private String name;
private Collection forwardDestinations;
private boolean forwardOnly = true;
public Destination intercept(Destination destination) {
return new CompositeDestinationInterceptor(destination, getForwardDestinations(), isForwardOnly());
}
public String getName() {
return name;
}
/**
* Sets the name of this composite destination
*/
public void setName(String name) {
this.name = name;
}
public Collection getForwardDestinations() {
return forwardDestinations;
}
/**
* Sets the list of destinations to forward to
*/
public void setForwardDestinations(Collection forwardDestinations) {
this.forwardDestinations = forwardDestinations;
}
public boolean isForwardOnly() {
return forwardOnly;
}
/**
* Sets if the virtual destination is forward only (and so there is no
* physical queue to match the virtual queue) or if there is also a physical
* queue with the same name).
*/
public void setForwardOnly(boolean forwardOnly) {
this.forwardOnly = forwardOnly;
}
}

View File

@ -0,0 +1,54 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.virtual;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import java.util.Collection;
import java.util.Iterator;
/**
* Represents a composite {@link Destination} where send()s are replicated to
* each Destination instance.
*
* @version $Revision$
*/
public class CompositeDestinationInterceptor extends DestinationFilter {
private Collection forwardDestinations;
private boolean forwardOnly;
public CompositeDestinationInterceptor(Destination next, Collection forwardDestinations, boolean forwardOnly) {
super(next);
this.forwardDestinations = forwardDestinations;
this.forwardOnly = forwardOnly;
}
public void send(ConnectionContext context, Message message) throws Exception {
for (Iterator iter = forwardDestinations.iterator(); iter.hasNext();) {
ActiveMQDestination destination = (ActiveMQDestination) iter.next();
send(context, message, destination);
}
if (!forwardOnly) {
super.send(context, message);
}
}
}

View File

@ -0,0 +1,34 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.virtual;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
/**
* Represents a virtual queue which forwards to a number of other destinations.
*
* @org.apache.xbean.XBean
*
* @version $Revision$
*/
public class CompositeQueue extends CompositeDestination {
public ActiveMQDestination getVirtualDestination() {
return new ActiveMQQueue(getName());
}
}

View File

@ -0,0 +1,34 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.virtual;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
/**
* Represents a virtual topic which forwards to a number of other destinations.
*
* @org.apache.xbean.XBean
*
* @version $Revision$
*/
public class CompositeTopic extends CompositeDestination {
public ActiveMQDestination getVirtualDestination() {
return new ActiveMQTopic(getName());
}
}

View File

@ -0,0 +1,39 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.virtual;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.command.ActiveMQDestination;
/**
* Represents some kind of virtual destination.
*
* @version $Revision$
*/
public interface VirtualDestination extends DestinationInterceptor {
/**
* Returns the virtual destination
*/
public ActiveMQDestination getVirtualDestination();
/**
* Creates a virtual destination from the physical destination
*/
public Destination intercept(Destination destination);
}

View File

@ -0,0 +1,86 @@
/*
* Copyright 2005-2006 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.virtual;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.command.Message;
import org.apache.activemq.filter.DestinationMap;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
/**
* Implements <a
* href="http://incubator.apache.org/activemq/virtual-destinations.html">Virtual
* Topics</a>.
*
* @version $Revision$
*/
public class VirtualDestinationInterceptor implements DestinationInterceptor {
private DestinationMap destinationMap = new DestinationMap();
private VirtualDestination[] virtualDestinations;
public Destination intercept(Destination destination) {
Set virtualDestinations = destinationMap.get(destination.getActiveMQDestination());
List destinations = new ArrayList();
for (Iterator iter = virtualDestinations.iterator(); iter.hasNext();) {
VirtualDestination virtualDestination = (VirtualDestination) iter.next();
Destination newNestination = virtualDestination.intercept(destination);
destinations.add(newNestination);
}
if (!destinations.isEmpty()) {
if (destinations.size() == 1) {
return (Destination) destinations.get(0);
}
else {
// should rarely be used but here just in case
return createCompositeDestination(destination, destinations);
}
}
return destination;
}
public VirtualDestination[] getVirtualDestinations() {
return virtualDestinations;
}
public void setVirtualDestinations(VirtualDestination[] virtualDestinations) {
destinationMap = new DestinationMap();
this.virtualDestinations = virtualDestinations;
for (int i = 0; i < virtualDestinations.length; i++) {
VirtualDestination virtualDestination = virtualDestinations[i];
destinationMap.put(virtualDestination.getVirtualDestination(), virtualDestination);
}
}
protected Destination createCompositeDestination(Destination destination, final List destinations) {
return new DestinationFilter(destination) {
public void send(ConnectionContext context, Message messageSend) throws Exception {
for (Iterator iter = destinations.iterator(); iter.hasNext();) {
Destination destination = (Destination) iter.next();
destination.send(context, messageSend);
}
}
};
}
}

View File

@ -0,0 +1,90 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.virtual;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
/**
* Creates <a href="http://activemq.org/site/virtual-destinations.html">Virtual
* Topics</a> using a prefix and postfix. The virtual destination creates a
* wildcard that is then used to look up all active queue subscriptions which
* match.
*
* @org.apache.xbean.XBean
*
* @version $Revision$
*/
public class VirtualTopic implements VirtualDestination {
private String prefix = "Consumer.*.";
private String postfix = "";
private String name = ">";
public ActiveMQDestination getVirtualDestination() {
return new ActiveMQTopic(getName());
}
public Destination intercept(Destination destination) {
return new VirtualTopicInterceptor(destination, getPrefix(), getPostfix());
}
// Properties
// -------------------------------------------------------------------------
public String getPostfix() {
return postfix;
}
/**
* Sets any postix used to identify the queue consumers
*/
public void setPostfix(String postfix) {
this.postfix = postfix;
}
public String getPrefix() {
return prefix;
}
/**
* Sets the prefix wildcard used to identify the queue consumers for a given
* topic
*/
public void setPrefix(String prefix) {
this.prefix = prefix;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
// Implementation methods
// -------------------------------------------------------------------------
protected ActiveMQDestination getQueueConsumersWildcard(ActiveMQDestination original) {
return new ActiveMQQueue(prefix + original.getPhysicalName() + postfix);
}
}

View File

@ -0,0 +1,51 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.virtual;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.Message;
/**
* A Destination which implements <a
* href="http://activemq.org/site/virtual-destinations.html">Virtual Topic</a>
*
* @version $Revision$
*/
public class VirtualTopicInterceptor extends DestinationFilter {
private String prefix;
private String postfix;
public VirtualTopicInterceptor(Destination next, String prefix, String postfix) {
super(next);
this.prefix = prefix;
this.postfix = postfix;
}
public void send(ConnectionContext context, Message message) throws Exception {
ActiveMQDestination queueConsumers = getQueueConsumersWildcard(message.getDestination());
send(context, message, queueConsumers);
}
protected ActiveMQDestination getQueueConsumersWildcard(ActiveMQDestination original) {
return new ActiveMQQueue(prefix + original.getPhysicalName() + postfix);
}
}

View File

@ -0,0 +1,9 @@
<html>
<head>
</head>
<body>
Implementation classes for <a href="http://activemq.org/site/virtual-destinations.html">Virtual Destinations</a>
</body>
</html>

View File

@ -42,7 +42,7 @@ public class VirtualTopicPubSubTest extends EmbeddedBrokerTestSupport {
ConsumerBean messageList = new ConsumerBean();
messageList.setVerbose(true);
String queueAName = "ActiveMQ.Virtual.A.TEST";
String queueAName = "Consumer.A.VirtualTopic.TEST";
// create consumer 'cluster'
ActiveMQQueue queue1 = new ActiveMQQueue(queueAName);
ActiveMQQueue queue2 = new ActiveMQQueue(queueAName);
@ -55,7 +55,7 @@ public class VirtualTopicPubSubTest extends EmbeddedBrokerTestSupport {
c2.setMessageListener(messageList);
// create topic producer
MessageProducer producer = session.createProducer(new ActiveMQTopic("TEST"));
MessageProducer producer = session.createProducer(new ActiveMQTopic("VirtualTopic.TEST"));
assertNotNull(producer);
int total = 10;