Adding support for consumers on virtual destinations to create network
demand. This behavior is turned off by default but can be enabled.

For example, if a consumer comes online for a queue that is part of a
VirtualTopic, this will cause a network of brokers to forward messages
because a demand subscription will be created. Same for if a consumer
comes online for a forwarded destination from a composite
destination.

There is also an option to enable flow based on the existence of a
virtual destination if the virtual destination is forwarding to a
Queue.

Full configuration instructions for this feature will be on the wiki page.
This commit is contained in:
Christopher L. Shannon (cshannon) 2015-10-20 18:15:30 +00:00
parent 480b3e7c36
commit cc81680e10
20 changed files with 2076 additions and 3 deletions

View File

@ -18,6 +18,7 @@ package org.apache.activemq.advisory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
@ -40,6 +41,7 @@ import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicRegion; import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.broker.region.TopicSubscription; import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
@ -55,6 +57,7 @@ import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.security.SecurityContext; import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.state.ProducerState; import org.apache.activemq.state.ProducerState;
import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.Usage;
@ -78,6 +81,22 @@ public class AdvisoryBroker extends BrokerFilter {
private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock();
protected final Map<ConsumerId, ConsumerInfo> consumers = new LinkedHashMap<ConsumerId, ConsumerInfo>(); protected final Map<ConsumerId, ConsumerInfo> consumers = new LinkedHashMap<ConsumerId, ConsumerInfo>();
/**
* This is a set to track all of the virtual destinations that have been added to the broker so
* they can be easily referenced later.
*/
protected final Set<VirtualDestination> virtualDestinations = Collections.newSetFromMap(new ConcurrentHashMap<VirtualDestination, Boolean>());
/**
* This is a map to track all consumers that exist on the virtual destination so that we can fire
* an advisory later when they go away to remove the demand.
*/
protected final ConcurrentMap<ConsumerInfo, VirtualDestination> virtualDestinationConsumers = new ConcurrentHashMap<>();
/**
* This is a map to track unique demand for the existence of a virtual destination so we make sure
* we don't send duplicate advisories.
*/
protected final ConcurrentMap<VirtualConsumerPair, ConsumerInfo> brokerConsumerDests = new ConcurrentHashMap<>();
protected final ConcurrentMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>(); protected final ConcurrentMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>();
protected final ConcurrentMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>(); protected final ConcurrentMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
protected final ConcurrentMap<BrokerInfo, ActiveMQMessage> networkBridges = new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>(); protected final ConcurrentMap<BrokerInfo, ActiveMQMessage> networkBridges = new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>();
@ -85,6 +104,8 @@ public class AdvisoryBroker extends BrokerFilter {
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
private VirtualDestinationMatcher virtualDestinationMatcher = new DestinationFilterVirtualDestinationMatcher();
public AdvisoryBroker(Broker next) { public AdvisoryBroker(Broker next) {
super(next); super(next);
advisoryProducerId.setConnectionId(ID_GENERATOR.generateId()); advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
@ -112,6 +133,15 @@ public class AdvisoryBroker extends BrokerFilter {
consumersLock.writeLock().lock(); consumersLock.writeLock().lock();
try { try {
consumers.put(info.getConsumerId(), info); consumers.put(info.getConsumerId(), info);
//check if this is a consumer on a destination that matches a virtual destination
if (getBrokerService().isUseVirtualDestSubs()) {
for (VirtualDestination virtualDestination : virtualDestinations) {
if (virtualDestinationMatcher.matches(virtualDestination, info.getDestination())) {
fireVirtualDestinationAddAdvisory(context, info, info.getDestination(), virtualDestination);
}
}
}
} finally { } finally {
consumersLock.writeLock().unlock(); consumersLock.writeLock().unlock();
} }
@ -171,6 +201,15 @@ public class AdvisoryBroker extends BrokerFilter {
} }
} }
// Replay the virtual destination consumers.
if (AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(info.getDestination())) {
for (Iterator<ConsumerInfo> iter = virtualDestinationConsumers.keySet().iterator(); iter.hasNext(); ) {
ConsumerInfo key = iter.next();
ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(key.getDestination());
fireConsumerAdvisory(context, key.getDestination(), topic, key);
}
}
// Replay network bridges // Replay network bridges
if (AdvisorySupport.isNetworkBridgeAdvisoryTopic(info.getDestination())) { if (AdvisorySupport.isNetworkBridgeAdvisoryTopic(info.getDestination())) {
for (Iterator<BrokerInfo> iter = networkBridges.keySet().iterator(); iter.hasNext(); ) { for (Iterator<BrokerInfo> iter = networkBridges.keySet().iterator(); iter.hasNext(); ) {
@ -199,6 +238,16 @@ public class AdvisoryBroker extends BrokerFilter {
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean create) throws Exception { public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean create) throws Exception {
Destination answer = super.addDestination(context, destination, create); Destination answer = super.addDestination(context, destination, create);
if (!AdvisorySupport.isAdvisoryTopic(destination)) { if (!AdvisorySupport.isAdvisoryTopic(destination)) {
//for queues, create demand if isUseVirtualDestSubsOnCreation is true
if (getBrokerService().isUseVirtualDestSubsOnCreation() && destination.isQueue()) {
//check if this new destination matches a virtual destination that exists
for (VirtualDestination virtualDestination : virtualDestinations) {
if (virtualDestinationMatcher.matches(virtualDestination, destination)) {
fireVirtualDestinationAddAdvisory(context, null, destination, virtualDestination);
}
}
}
DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination); DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
DestinationInfo previous = destinations.putIfAbsent(destination, info); DestinationInfo previous = destinations.putIfAbsent(destination, info);
if (previous == null) { if (previous == null) {
@ -228,6 +277,28 @@ public class AdvisoryBroker extends BrokerFilter {
super.removeDestination(context, destination, timeout); super.removeDestination(context, destination, timeout);
DestinationInfo info = destinations.remove(destination); DestinationInfo info = destinations.remove(destination);
if (info != null) { if (info != null) {
//on destination removal, remove all demand if using virtual dest subs
if (getBrokerService().isUseVirtualDestSubs()) {
for (ConsumerInfo consumerInfo : virtualDestinationConsumers.keySet()) {
//find all consumers for this virtual destination
VirtualDestination virtualDestination = virtualDestinationConsumers.get(consumerInfo);
//find a consumer that matches this virtualDest and destination
if (virtualDestinationMatcher.matches(virtualDestination, destination)) {
//in case of multiple matches
VirtualConsumerPair key = new VirtualConsumerPair(virtualDestination, destination);
ConsumerInfo i = brokerConsumerDests.get(key);
if (consumerInfo.equals(i)) {
if (brokerConsumerDests.remove(key) != null) {
fireVirtualDestinationRemoveAdvisory(context, consumerInfo);
break;
}
}
}
}
}
// ensure we don't modify (and loose/overwrite) an in-flight add advisory, so duplicate // ensure we don't modify (and loose/overwrite) an in-flight add advisory, so duplicate
info = info.copy(); info = info.copy();
info.setDestination(destination); info.setDestination(destination);
@ -285,6 +356,11 @@ public class AdvisoryBroker extends BrokerFilter {
consumersLock.writeLock().lock(); consumersLock.writeLock().lock();
try { try {
consumers.remove(info.getConsumerId()); consumers.remove(info.getConsumerId());
//remove the demand for this consumer if it matches a virtual destination
if(getBrokerService().isUseVirtualDestSubs()) {
fireVirtualDestinationRemoveAdvisory(context, info);
}
} finally { } finally {
consumersLock.writeLock().unlock(); consumersLock.writeLock().unlock();
} }
@ -467,6 +543,140 @@ public class AdvisoryBroker extends BrokerFilter {
} }
} }
private final IdGenerator connectionIdGenerator = new IdGenerator("advisory");
private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
@Override
public void virtualDestinationAdded(ConnectionContext context,
VirtualDestination virtualDestination) {
super.virtualDestinationAdded(context, virtualDestination);
if (virtualDestinations.add(virtualDestination)) {
try {
// Don't advise advisory topics.
if (!AdvisorySupport.isAdvisoryTopic(virtualDestination.getVirtualDestination())) {
//create demand for consumers on virtual destinations
consumersLock.readLock().lock();
try {
//loop through existing destinations to see if any match this newly
//created virtual destination
if (getBrokerService().isUseVirtualDestSubsOnCreation()) {
//for matches that are a queue, fire an advisory for demand
for (ActiveMQDestination destination : destinations.keySet()) {
if(destination.isQueue()) {
if (virtualDestinationMatcher.matches(virtualDestination, destination)) {
fireVirtualDestinationAddAdvisory(context, null, destination, virtualDestination);
}
}
}
}
//loop through existing consumers to see if any of them are consuming on a destination
//that matches the new virtual destination
for (Iterator<ConsumerInfo> iter = consumers.values().iterator(); iter.hasNext(); ) {
ConsumerInfo info = iter.next();
if (virtualDestinationMatcher.matches(virtualDestination, info.getDestination())) {
fireVirtualDestinationAddAdvisory(context, info, info.getDestination(), virtualDestination);
}
}
} finally {
consumersLock.readLock().unlock();
}
}
} catch (Exception e) {
handleFireFailure("virtualDestinationAdded", e);
}
}
}
private void fireVirtualDestinationAddAdvisory(ConnectionContext context, ConsumerInfo info, ActiveMQDestination activeMQDest,
VirtualDestination virtualDestination) throws Exception {
//if no consumer info, we need to create one - this is the case when an advisory is fired
//because of the existence of a destination matching a virtual destination
if (info == null) {
ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId());
SessionId sessionId = new SessionId(connectionId, sessionIdGenerator.getNextSequenceId());
ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
info = new ConsumerInfo(consumerId);
//store the virtual destination and the activeMQDestination as a pair so that we can keep track
//of all matching forwarded destinations that caused demand
if(brokerConsumerDests.putIfAbsent(new VirtualConsumerPair(virtualDestination, activeMQDest), info) == null) {
info.setDestination(virtualDestination.getVirtualDestination());
ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination());
if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) {
fireConsumerAdvisory(context, info.getDestination(), topic, info);
}
}
//this is the case of a real consumer coming online
} else {
info = info.copy();
info.setDestination(virtualDestination.getVirtualDestination());
ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination());
if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) {
fireConsumerAdvisory(context, info.getDestination(), topic, info);
}
}
}
@Override
public void virtualDestinationRemoved(ConnectionContext context,
VirtualDestination virtualDestination) {
super.virtualDestinationRemoved(context, virtualDestination);
if (virtualDestinations.remove(virtualDestination)) {
try {
consumersLock.readLock().lock();
try {
// remove the demand created by the addition of the virtual destination
if (getBrokerService().isUseVirtualDestSubsOnCreation()) {
if (!AdvisorySupport.isAdvisoryTopic(virtualDestination.getVirtualDestination())) {
for (ConsumerInfo info : virtualDestinationConsumers.keySet()) {
//find all consumers for this virtual destination
if (virtualDestinationConsumers.get(info).equals(virtualDestination)) {
fireVirtualDestinationRemoveAdvisory(context, info);
}
//check consumers created for the existence of a destination to see if they
//match the consumerinfo and clean up
for (VirtualConsumerPair activeMQDest : brokerConsumerDests.keySet()) {
ConsumerInfo i = brokerConsumerDests.get(activeMQDest);
if (info.equals(i)) {
brokerConsumerDests.remove(activeMQDest);
}
}
}
}
}
} finally {
consumersLock.readLock().unlock();
}
} catch (Exception e) {
handleFireFailure("virtualDestinationAdded", e);
}
}
}
private void fireVirtualDestinationRemoveAdvisory(ConnectionContext context,
ConsumerInfo info) throws Exception {
VirtualDestination virtualDestination = virtualDestinationConsumers.remove(info);
if (virtualDestination != null) {
ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(virtualDestination.getVirtualDestination());
ActiveMQDestination dest = info.getDestination();
if (!dest.isTemporary() || destinations.containsKey(dest)) {
fireConsumerAdvisory(context, dest, topic, info.createRemoveCommand());
}
}
}
@Override @Override
public void isFull(ConnectionContext context, Destination destination, Usage usage) { public void isFull(ConnectionContext context, Destination destination, Usage usage) {
super.isFull(context, destination, usage); super.isFull(context, destination, usage);
@ -681,4 +891,61 @@ public class AdvisoryBroker extends BrokerFilter {
public Map<ActiveMQDestination, DestinationInfo> getAdvisoryDestinations() { public Map<ActiveMQDestination, DestinationInfo> getAdvisoryDestinations() {
return destinations; return destinations;
} }
private class VirtualConsumerPair {
private final VirtualDestination virtualDestination;
//destination that matches this virtualDestination as part target
//this is so we can keep track of more than one destination that might
//match the virtualDestination and cause demand
private final ActiveMQDestination activeMQDestination;
public VirtualConsumerPair(VirtualDestination virtualDestination,
ActiveMQDestination activeMQDestination) {
super();
this.virtualDestination = virtualDestination;
this.activeMQDestination = activeMQDestination;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + getOuterType().hashCode();
result = prime
* result
+ ((activeMQDestination == null) ? 0 : activeMQDestination
.hashCode());
result = prime
* result
+ ((virtualDestination == null) ? 0 : virtualDestination
.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
VirtualConsumerPair other = (VirtualConsumerPair) obj;
if (!getOuterType().equals(other.getOuterType()))
return false;
if (activeMQDestination == null) {
if (other.activeMQDestination != null)
return false;
} else if (!activeMQDestination.equals(other.activeMQDestination))
return false;
if (virtualDestination == null) {
if (other.virtualDestination != null)
return false;
} else if (!virtualDestination.equals(other.virtualDestination))
return false;
return true;
}
private AdvisoryBroker getOuterType() {
return AdvisoryBroker.this;
}
}
} }

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.advisory;
import org.apache.activemq.broker.region.virtual.CompositeDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.filter.DestinationFilter;
/**
* This class will use a destination filter to see if the activeMQ destination matches
* the given virtual destination
*
*/
public class DestinationFilterVirtualDestinationMatcher implements VirtualDestinationMatcher {
/* (non-Javadoc)
* @see org.apache.activemq.advisory.VirtualDestinationMatcher#matches(org.apache.activemq.broker.region.virtual.VirtualDestination)
*/
@Override
public boolean matches(VirtualDestination virtualDestination, ActiveMQDestination activeMQDest) {
if (virtualDestination instanceof CompositeDestination) {
DestinationFilter filter = DestinationFilter.parseFilter(virtualDestination.getMappedDestinations());
if (filter.matches(activeMQDest)) {
return true;
}
} else if (virtualDestination instanceof VirtualTopic) {
DestinationFilter filter = DestinationFilter.parseFilter(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT));
if (filter.matches(activeMQDest)) {
return true;
}
}
return false;
}
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.advisory;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.command.ActiveMQDestination;
/**
*
*
*/
public interface VirtualDestinationMatcher {
public boolean matches(VirtualDestination virtualDestination, ActiveMQDestination activeMQDest);
}

View File

@ -26,6 +26,7 @@ import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Region; import org.apache.activemq.broker.region.Region;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
@ -385,6 +386,10 @@ public interface Broker extends Region, Service {
*/ */
void isFull(ConnectionContext context,Destination destination,Usage usage); void isFull(ConnectionContext context,Destination destination,Usage usage);
void virtualDestinationAdded(ConnectionContext context, VirtualDestination virtualDestination);
void virtualDestinationRemoved(ConnectionContext context, VirtualDestination virtualDestination);
/** /**
* called when the broker becomes the master in a master/slave * called when the broker becomes the master in a master/slave
* configuration * configuration

View File

@ -24,6 +24,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
@ -359,6 +360,18 @@ public class BrokerFilter implements Broker {
next.slowConsumer(context, destination,subs); next.slowConsumer(context, destination,subs);
} }
@Override
public void virtualDestinationAdded(ConnectionContext context,
VirtualDestination virtualDestination) {
next.virtualDestinationAdded(context, virtualDestination);
}
@Override
public void virtualDestinationRemoved(ConnectionContext context,
VirtualDestination virtualDestination) {
next.virtualDestinationRemoved(context, virtualDestination);
}
@Override @Override
public void nowMasterBroker() { public void nowMasterBroker() {
next.nowMasterBroker(); next.nowMasterBroker();

View File

@ -203,6 +203,15 @@ public class BrokerService implements Service {
private boolean useVirtualTopics = true; private boolean useVirtualTopics = true;
private boolean useMirroredQueues = false; private boolean useMirroredQueues = false;
private boolean useTempMirroredQueues = true; private boolean useTempMirroredQueues = true;
/**
* Whether or not virtual destination subscriptions should cause network demand
*/
private boolean useVirtualDestSubs = false;
/**
* Whether or no the creation of destinations that match virtual destinations
* should cause network demand
*/
private boolean useVirtualDestSubsOnCreation = false;
private BrokerId brokerId; private BrokerId brokerId;
private volatile DestinationInterceptor[] destinationInterceptors; private volatile DestinationInterceptor[] destinationInterceptors;
private ActiveMQDestination[] destinations; private ActiveMQDestination[] destinations;
@ -2699,6 +2708,14 @@ public class BrokerService implements Service {
if (virtualDestination instanceof VirtualTopic) { if (virtualDestination instanceof VirtualTopic) {
consumerDestinations.add(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT)); consumerDestinations.add(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT));
} }
if (isUseVirtualDestSubs()) {
try {
broker.virtualDestinationAdded(getAdminConnectionContext(), virtualDestination);
LOG.debug("Adding virtual destination: {}", virtualDestination);
} catch (Exception e) {
LOG.warn("Could not fire virtual destination consumer advisory", e);
}
}
} }
} }
} }
@ -3133,4 +3150,22 @@ public class BrokerService implements Service {
public void setRejectDurableConsumers(boolean rejectDurableConsumers) { public void setRejectDurableConsumers(boolean rejectDurableConsumers) {
this.rejectDurableConsumers = rejectDurableConsumers; this.rejectDurableConsumers = rejectDurableConsumers;
} }
public boolean isUseVirtualDestSubs() {
return useVirtualDestSubs;
}
public void setUseVirtualDestSubs(
boolean useVirtualDestSubs) {
this.useVirtualDestSubs = useVirtualDestSubs;
}
public boolean isUseVirtualDestSubsOnCreation() {
return useVirtualDestSubsOnCreation;
}
public void setUseVirtualDestSubsOnCreation(
boolean useVirtualDestSubsOnCreation) {
this.useVirtualDestSubsOnCreation = useVirtualDestSubsOnCreation;
}
} }

View File

@ -25,6 +25,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
@ -345,6 +346,16 @@ public class EmptyBroker implements Broker {
public void slowConsumer(ConnectionContext context,Destination destination, Subscription subs) { public void slowConsumer(ConnectionContext context,Destination destination, Subscription subs) {
} }
@Override
public void virtualDestinationAdded(ConnectionContext context,
VirtualDestination virtualDestination) {
}
@Override
public void virtualDestinationRemoved(ConnectionContext context,
VirtualDestination virtualDestination) {
}
@Override @Override
public void nowMasterBroker() { public void nowMasterBroker() {
} }

View File

@ -25,6 +25,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
@ -361,6 +362,18 @@ public class ErrorBroker implements Broker {
throw new BrokerStoppedException(this.message); throw new BrokerStoppedException(this.message);
} }
@Override
public void virtualDestinationAdded(ConnectionContext context,
VirtualDestination virtualDestination) {
throw new BrokerStoppedException(this.message);
}
@Override
public void virtualDestinationRemoved(ConnectionContext context,
VirtualDestination virtualDestination) {
throw new BrokerStoppedException(this.message);
}
@Override @Override
public void nowMasterBroker() { public void nowMasterBroker() {
throw new BrokerStoppedException(this.message); throw new BrokerStoppedException(this.message);

View File

@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
@ -370,6 +371,18 @@ public class MutableBrokerFilter implements Broker {
getNext().slowConsumer(context, dest,subs); getNext().slowConsumer(context, dest,subs);
} }
@Override
public void virtualDestinationAdded(ConnectionContext context,
VirtualDestination virtualDestination) {
getNext().virtualDestinationAdded(context, virtualDestination);
}
@Override
public void virtualDestinationRemoved(ConnectionContext context,
VirtualDestination virtualDestination) {
getNext().virtualDestinationRemoved(context, virtualDestination);
}
@Override @Override
public void nowMasterBroker() { public void nowMasterBroker() {
getNext().nowMasterBroker(); getNext().nowMasterBroker();

View File

@ -143,4 +143,45 @@ public abstract class CompositeDestination implements VirtualDestination {
} }
}; };
} }
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (concurrentSend ? 1231 : 1237);
result = prime * result + (copyMessage ? 1231 : 1237);
result = prime * result + (forwardOnly ? 1231 : 1237);
result = prime * result
+ ((forwardTo == null) ? 0 : forwardTo.hashCode());
result = prime * result + ((name == null) ? 0 : name.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
CompositeDestination other = (CompositeDestination) obj;
if (concurrentSend != other.concurrentSend)
return false;
if (copyMessage != other.copyMessage)
return false;
if (forwardOnly != other.forwardOnly)
return false;
if (forwardTo == null) {
if (other.forwardTo != null)
return false;
} else if (!forwardTo.equals(other.forwardTo))
return false;
if (name == null) {
if (other.name != null)
return false;
} else if (!name.equals(other.name))
return false;
return true;
}
} }

View File

@ -38,4 +38,9 @@ public class CompositeQueue extends CompositeDestination {
// nothing to do for mapped destinations // nothing to do for mapped destinations
return destination; return destination;
} }
@Override
public String toString() {
return "CompositeQueue [" + getName() + "]";
}
} }

View File

@ -41,4 +41,9 @@ public class CompositeTopic extends CompositeDestination {
} }
return destination; return destination;
} }
@Override
public String toString() {
return "CompositeTopic [" + getName() + "]";
}
} }

View File

@ -194,4 +194,53 @@ public class VirtualTopic implements VirtualDestination {
public void setTransactedSend(boolean transactedSend) { public void setTransactedSend(boolean transactedSend) {
this.transactedSend = transactedSend; this.transactedSend = transactedSend;
} }
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (concurrentSend ? 1231 : 1237);
result = prime * result + (local ? 1231 : 1237);
result = prime * result + ((name == null) ? 0 : name.hashCode());
result = prime * result + ((postfix == null) ? 0 : postfix.hashCode());
result = prime * result + ((prefix == null) ? 0 : prefix.hashCode());
result = prime * result + (selectorAware ? 1231 : 1237);
result = prime * result + (transactedSend ? 1231 : 1237);
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
VirtualTopic other = (VirtualTopic) obj;
if (concurrentSend != other.concurrentSend)
return false;
if (local != other.local)
return false;
if (name == null) {
if (other.name != null)
return false;
} else if (!name.equals(other.name))
return false;
if (postfix == null) {
if (other.postfix != null)
return false;
} else if (!postfix.equals(other.postfix))
return false;
if (prefix == null) {
if (other.prefix != null)
return false;
} else if (!prefix.equals(other.prefix))
return false;
if (selectorAware != other.selectorAware)
return false;
if (transactedSend != other.transactedSend)
return false;
return true;
}
} }

View File

@ -1352,7 +1352,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} }
protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException { protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException {
if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) { if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination()) ||
AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(info.getDestination())) {
sub.getLocalInfo().setDispatchAsync(true); sub.getLocalInfo().setDispatchAsync(true);
} else { } else {
sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync()); sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());

View File

@ -29,6 +29,7 @@ import org.apache.activemq.command.ConsumerInfo;
public class NetworkBridgeConfiguration { public class NetworkBridgeConfiguration {
private boolean conduitSubscriptions = true; private boolean conduitSubscriptions = true;
private boolean useVirtualDestSubs;
private boolean dynamicOnly; private boolean dynamicOnly;
private boolean dispatchAsync = true; private boolean dispatchAsync = true;
private boolean decreaseNetworkConsumerPriority; private boolean decreaseNetworkConsumerPriority;
@ -237,11 +238,27 @@ public class NetworkBridgeConfiguration {
filter.append("."); filter.append(".");
filter.append(destination.getPhysicalName()); filter.append(destination.getPhysicalName());
delimiter = ","; delimiter = ",";
if (useVirtualDestSubs) {
filter.append(delimiter);
filter.append(AdvisorySupport.VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX);
filter.append(destination.getDestinationTypeAsString());
filter.append(".");
filter.append(destination.getPhysicalName());
}
} }
} }
return filter.toString(); return filter.toString();
} else { } else {
return AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + ">"; StringBuffer filter = new StringBuffer();
filter.append(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX);
filter.append(">");
if (useVirtualDestSubs) {
filter.append(",");
filter.append(AdvisorySupport.VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX);
filter.append(">");
}
return filter.toString();
} }
} else { } else {
// prepend consumer advisory prefix // prepend consumer advisory prefix
@ -449,4 +466,13 @@ public class NetworkBridgeConfiguration {
this.checkDuplicateMessagesOnDuplex = checkDuplicateMessagesOnDuplex; this.checkDuplicateMessagesOnDuplex = checkDuplicateMessagesOnDuplex;
} }
public boolean isUseVirtualDestSus() {
return useVirtualDestSubs;
}
public void setUseVirtualDestSubs(
boolean useVirtualDestSubs) {
this.useVirtualDestSubs = useVirtualDestSubs;
}
} }

View File

@ -37,8 +37,11 @@ public final class AdvisorySupport {
public static final String QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Queue."; public static final String QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Queue.";
public static final String TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Topic."; public static final String TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Topic.";
public static final String CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Consumer."; public static final String CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Consumer.";
public static final String VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "VirtualDestination.Consumer.";
public static final String QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue."; public static final String QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue.";
public static final String TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Topic."; public static final String TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Topic.";
public static final String QUEUE_VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX = VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue.";
public static final String TOPIC_VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX = VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX + "Topic.";
public static final String EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Topic."; public static final String EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Topic.";
public static final String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Queue."; public static final String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Queue.";
public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Topic."; public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Topic.";
@ -116,6 +119,16 @@ public final class AdvisorySupport {
return getAdvisoryTopic(destination, prefix, true); return getAdvisoryTopic(destination, prefix, true);
} }
public static ActiveMQTopic getVirtualDestinationConsumerAdvisoryTopic(ActiveMQDestination destination) {
String prefix;
if (destination.isQueue()) {
prefix = QUEUE_VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX;
} else {
prefix = TOPIC_VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX;
}
return getAdvisoryTopic(destination, prefix, true);
}
public static ActiveMQTopic getProducerAdvisoryTopic(Destination destination) throws JMSException { public static ActiveMQTopic getProducerAdvisoryTopic(Destination destination) throws JMSException {
return getProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); return getProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
} }
@ -389,6 +402,24 @@ public final class AdvisorySupport {
} }
} }
public static boolean isVirtualDestinationConsumerAdvisoryTopic(Destination destination) throws JMSException {
return isVirtualDestinationConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
}
public static boolean isVirtualDestinationConsumerAdvisoryTopic(ActiveMQDestination destination) {
if (destination.isComposite()) {
ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
for (int i = 0; i < compositeDestinations.length; i++) {
if (isVirtualDestinationConsumerAdvisoryTopic(compositeDestinations[i])) {
return true;
}
}
return false;
} else {
return destination.isTopic() && destination.getPhysicalName().startsWith(VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX);
}
}
public static boolean isSlowConsumerAdvisoryTopic(Destination destination) throws JMSException { public static boolean isSlowConsumerAdvisoryTopic(Destination destination) throws JMSException {
return isSlowConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); return isSlowConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
} }

View File

@ -51,14 +51,17 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
this.consumerInfo = consumerInfo; this.consumerInfo = consumerInfo;
} }
@Override
public byte getDataStructureType() { public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE; return DATA_STRUCTURE_TYPE;
} }
@Override
public boolean isMarshallAware() { public boolean isMarshallAware() {
return false; return false;
} }
@Override
public boolean matches(MessageEvaluationContext mec) throws JMSException { public boolean matches(MessageEvaluationContext mec) throws JMSException {
try { try {
// for Queues - the message can be acknowledged and dropped whilst // for Queues - the message can be acknowledged and dropped whilst
@ -72,6 +75,7 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
} }
} }
@Override
public Object evaluate(MessageEvaluationContext message) throws JMSException { public Object evaluate(MessageEvaluationContext message) throws JMSException {
return matches(message) ? Boolean.TRUE : Boolean.FALSE; return matches(message) ? Boolean.TRUE : Boolean.FALSE;
} }
@ -125,7 +129,9 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
} }
public static boolean isAdvisoryInterpretedByNetworkBridge(Message message) { public static boolean isAdvisoryInterpretedByNetworkBridge(Message message) {
return AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination()) || AdvisorySupport.isTempDestinationAdvisoryTopic(message.getDestination()); return AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination()) ||
AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(message.getDestination()) ||
AdvisorySupport.isTempDestinationAdvisoryTopic(message.getDestination());
} }
public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) { public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {

View File

@ -18,16 +18,23 @@ package org.apache.activemq.plugin;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.CompositeDestinationInterceptor; import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
import org.apache.activemq.broker.region.DestinationInterceptor; import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.virtual.VirtualDestination; import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class UpdateVirtualDestinationsTask implements Runnable { public abstract class UpdateVirtualDestinationsTask implements Runnable {
public static final Logger LOG = LoggerFactory.getLogger(UpdateVirtualDestinationsTask.class);
private final AbstractRuntimeConfigurationBroker plugin; private final AbstractRuntimeConfigurationBroker plugin;
public UpdateVirtualDestinationsTask( public UpdateVirtualDestinationsTask(
@ -49,11 +56,52 @@ public abstract class UpdateVirtualDestinationsTask implements Runnable {
// update existing interceptor // update existing interceptor
final VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) destinationInterceptor; final VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) destinationInterceptor;
Set<VirtualDestination> existingVirtualDests = new HashSet<>();
Collections.addAll(existingVirtualDests, virtualDestinationInterceptor.getVirtualDestinations());
Set<VirtualDestination> newVirtualDests = new HashSet<>();
Collections.addAll(newVirtualDests, getVirtualDestinations());
Set<VirtualDestination> addedVirtualDests = new HashSet<>();
Set<VirtualDestination> removedVirtualDests = new HashSet<>();
//detect new virtual destinations
for (VirtualDestination newVirtualDest : newVirtualDests) {
if (!existingVirtualDests.contains(newVirtualDest)) {
addedVirtualDests.add(newVirtualDest);
}
}
//detect removed virtual destinations
for (VirtualDestination existingVirtualDest : existingVirtualDests) {
if (!newVirtualDests.contains(existingVirtualDest)) {
removedVirtualDests.add(existingVirtualDest);
}
}
virtualDestinationInterceptor virtualDestinationInterceptor
.setVirtualDestinations(getVirtualDestinations()); .setVirtualDestinations(getVirtualDestinations());
plugin.info("applied updates to: " plugin.info("applied updates to: "
+ virtualDestinationInterceptor); + virtualDestinationInterceptor);
updatedExistingInterceptor = true; updatedExistingInterceptor = true;
ConnectionContext connectionContext;
try {
connectionContext = plugin.getBrokerService().getAdminConnectionContext();
//signal updates
if (plugin.getBrokerService().isUseVirtualDestSubs()) {
for (VirtualDestination removedVirtualDest : removedVirtualDests) {
plugin.virtualDestinationRemoved(connectionContext, removedVirtualDest);
LOG.info("Removing virtual destination: {}", removedVirtualDest);
}
for (VirtualDestination addedVirtualDest : addedVirtualDests) {
plugin.virtualDestinationAdded(connectionContext, addedVirtualDest);
LOG.info("Adding virtual destination: {}", addedVirtualDest);
}
}
} catch (Exception e) {
LOG.warn("Could not process virtual destination advisories", e);
}
} }
} }

View File

@ -70,6 +70,10 @@
<groupId>org.apache.activemq</groupId> <groupId>org.apache.activemq</groupId>
<artifactId>activemq-partition</artifactId> <artifactId>activemq-partition</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-runtime-config</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.apache.geronimo.specs</groupId> <groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId> <artifactId>geronimo-jms_1.1_spec</artifactId>