diff --git a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index 3eb6d338c6..066b654ba2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -40,8 +40,8 @@ import org.slf4j.LoggerFactory; /** * This broker filter handles tracking the state of the broker for purposes of * publishing advisory messages to advisory consumers. - * - * + * + * */ public class AdvisoryBroker extends BrokerFilter { @@ -54,9 +54,9 @@ public class AdvisoryBroker extends BrokerFilter { protected final ConcurrentHashMap destinations = new ConcurrentHashMap(); protected final ConcurrentHashMap networkBridges = new ConcurrentHashMap(); protected final ProducerId advisoryProducerId = new ProducerId(); - + private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); - + public AdvisoryBroker(Broker next) { super(next); advisoryProducerId.setConnectionId(ID_GENERATOR.generateId()); @@ -78,7 +78,7 @@ public class AdvisoryBroker extends BrokerFilter { @Override public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { Subscription answer = super.addConsumer(context, info); - + // Don't advise advisory topics. if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) { ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()); @@ -96,15 +96,23 @@ public class AdvisoryBroker extends BrokerFilter { } } - // We need to replay all the previously collected destination - // objects - // for this newly added consumer. - if (AdvisorySupport.isDestinationAdvisoryTopic(info.getDestination())) { - // Replay the destinations. - for (Iterator iter = destinations.values().iterator(); iter.hasNext();) { - DestinationInfo value = iter.next(); - ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(value.getDestination()); - fireAdvisory(context, topic, value, info.getConsumerId()); + // We check here whether the Destination is Temporary Destination specific or not since we + // can avoid sending advisory messages to the consumer if it only wants Temporary Destination + // notifications. If its not just temporary destination related destinations then we have + // to send them all, a composite destination could want both. + if (AdvisorySupport.isTempDestinationAdvisoryTopic(info.getDestination())) { + // Replay the temporary destinations. + for (DestinationInfo destination : destinations.values()) { + if (destination.getDestination().isTemporary()) { + ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination.getDestination()); + fireAdvisory(context, topic, destination, info.getConsumerId()); + } + } + } else if (AdvisorySupport.isDestinationAdvisoryTopic(info.getDestination())) { + // Replay all the destinations. + for (DestinationInfo destination : destinations.values()) { + ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination.getDestination()); + fireAdvisory(context, topic, destination, info.getConsumerId()); } } @@ -191,7 +199,7 @@ public class AdvisoryBroker extends BrokerFilter { fireAdvisory(context, topic, info); try { next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), -1); - } catch (Exception expectedIfDestinationDidNotExistYet) { + } catch (Exception expectedIfDestinationDidNotExistYet) { } try { next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), -1); @@ -203,7 +211,7 @@ public class AdvisoryBroker extends BrokerFilter { @Override public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception { - super.removeDestinationInfo(context, destInfo); + super.removeDestinationInfo(context, destInfo); DestinationInfo info = destinations.remove(destInfo.getDestination()); if (info != null) { // ensure we don't modify (and loose/overwrite) an in-flight add advisory, so duplicate @@ -243,7 +251,7 @@ public class AdvisoryBroker extends BrokerFilter { ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest); consumers.remove(info.getConsumerId()); if (!dest.isTemporary() || destinations.containsKey(dest)) { - fireConsumerAdvisory(context,dest, topic, info.createRemoveCommand()); + fireConsumerAdvisory(context,dest, topic, info.createRemoveCommand()); } } } @@ -279,7 +287,7 @@ public class AdvisoryBroker extends BrokerFilter { handleFireFailure("expired", e); } } - + @Override public void messageConsumed(ConnectionContext context, MessageReference messageReference) { super.messageConsumed(context, messageReference); @@ -294,7 +302,7 @@ public class AdvisoryBroker extends BrokerFilter { handleFireFailure("consumed", e); } } - + @Override public void messageDelivered(ConnectionContext context, MessageReference messageReference) { super.messageDelivered(context, messageReference); @@ -309,7 +317,7 @@ public class AdvisoryBroker extends BrokerFilter { handleFireFailure("delivered", e); } } - + @Override public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { super.messageDiscarded(context, sub, messageReference); @@ -329,7 +337,7 @@ public class AdvisoryBroker extends BrokerFilter { handleFireFailure("discarded", e); } } - + @Override public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) { super.slowConsumer(context, destination,subs); @@ -342,7 +350,7 @@ public class AdvisoryBroker extends BrokerFilter { handleFireFailure("slow consumer", e); } } - + @Override public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) { super.fastProducer(context, producerInfo); @@ -355,7 +363,7 @@ public class AdvisoryBroker extends BrokerFilter { handleFireFailure("fast producer", e); } } - + @Override public void isFull(ConnectionContext context, Destination destination, Usage usage) { super.isFull(context, destination, usage); @@ -372,13 +380,13 @@ public class AdvisoryBroker extends BrokerFilter { } } } - + @Override - public void nowMasterBroker() { + public void nowMasterBroker() { super.nowMasterBroker(); try { ActiveMQTopic topic = AdvisorySupport.getMasterBrokerAdvisoryTopic(); - ActiveMQMessage advisoryMessage = new ActiveMQMessage(); + ActiveMQMessage advisoryMessage = new ActiveMQMessage(); ConnectionContext context = new ConnectionContext(); context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); context.setBroker(getBrokerService().getBroker()); @@ -387,7 +395,7 @@ public class AdvisoryBroker extends BrokerFilter { handleFireFailure("now master broker", e); } } - + @Override public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription){ @@ -401,7 +409,7 @@ public class AdvisoryBroker extends BrokerFilter { } } catch (Exception e) { handleFireFailure("add to DLQ", e); - } + } } @Override @@ -476,7 +484,7 @@ public class AdvisoryBroker extends BrokerFilter { } } advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_COUNT, count); - + fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); } @@ -505,13 +513,13 @@ public class AdvisoryBroker extends BrokerFilter { advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName()); String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET"; advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id); - + String url = getBrokerService().getVmConnectorURI().toString(); if (getBrokerService().getDefaultSocketURIString() != null) { url = getBrokerService().getDefaultSocketURIString(); } advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url); - + //set the data structure advisoryMessage.setDataStructure(command); advisoryMessage.setPersistent(false); diff --git a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java index 98ccc2f75c..e4e6adb7c7 100755 --- a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java @@ -59,7 +59,10 @@ public final class AdvisorySupport { public static final String MSG_PROPERTY_MESSAGE_ID = "orignalMessageId"; public static final String MSG_PROPERTY_CONSUMER_COUNT = "consumerCount"; public static final String MSG_PROPERTY_DISCARDED_COUNT = "discardedCount"; - + + public static final ActiveMQTopic ALL_DESTINATIONS_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic( + TOPIC_ADVISORY_TOPIC.getPhysicalName() + "," + QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," + + TEMP_QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," + TEMP_TOPIC_ADVISORY_TOPIC.getPhysicalName()); public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic( TEMP_QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," + TEMP_TOPIC_ADVISORY_TOPIC.getPhysicalName()); private static final ActiveMQTopic AGENT_TOPIC_DESTINATION = new ActiveMQTopic(AGENT_TOPIC); @@ -187,7 +190,7 @@ public final class AdvisorySupport { + destination.getPhysicalName(); return new ActiveMQTopic(name); } - + public static ActiveMQTopic getMessageDLQdAdvisoryTopic(ActiveMQDestination destination) { String name = MESSAGE_DLQ_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "." + destination.getPhysicalName(); @@ -239,6 +242,20 @@ public final class AdvisorySupport { return isDestinationAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); } + public static boolean isTempDestinationAdvisoryTopic(ActiveMQDestination destination) { + if (destination.isComposite()) { + ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); + for (int i = 0; i < compositeDestinations.length; i++) { + if (!isTempDestinationAdvisoryTopic(compositeDestinations[i])) { + return false; + } + } + return true; + } else { + return destination.equals(TEMP_QUEUE_ADVISORY_TOPIC) || destination.equals(TEMP_TOPIC_ADVISORY_TOPIC); + } + } + public static boolean isDestinationAdvisoryTopic(ActiveMQDestination destination) { if (destination.isComposite()) { ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();