mirror of https://github.com/apache/activemq.git
Reduce contention by not sending an advisory for every destination when not all destination types are requested. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1214964 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
41cdadbe2a
commit
68bcf0fb15
|
@ -40,8 +40,8 @@ import org.slf4j.LoggerFactory;
|
|||
/**
|
||||
* This broker filter handles tracking the state of the broker for purposes of
|
||||
* publishing advisory messages to advisory consumers.
|
||||
*
|
||||
*
|
||||
*
|
||||
*
|
||||
*/
|
||||
public class AdvisoryBroker extends BrokerFilter {
|
||||
|
||||
|
@ -54,9 +54,9 @@ public class AdvisoryBroker extends BrokerFilter {
|
|||
protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
|
||||
protected final ConcurrentHashMap<BrokerInfo, ActiveMQMessage> networkBridges = new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>();
|
||||
protected final ProducerId advisoryProducerId = new ProducerId();
|
||||
|
||||
|
||||
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
|
||||
|
||||
|
||||
public AdvisoryBroker(Broker next) {
|
||||
super(next);
|
||||
advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
|
||||
|
@ -78,7 +78,7 @@ public class AdvisoryBroker extends BrokerFilter {
|
|||
@Override
|
||||
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
|
||||
Subscription answer = super.addConsumer(context, info);
|
||||
|
||||
|
||||
// Don't advise advisory topics.
|
||||
if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
|
||||
ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
|
||||
|
@ -96,15 +96,23 @@ public class AdvisoryBroker extends BrokerFilter {
|
|||
}
|
||||
}
|
||||
|
||||
// We need to replay all the previously collected destination
|
||||
// objects
|
||||
// for this newly added consumer.
|
||||
if (AdvisorySupport.isDestinationAdvisoryTopic(info.getDestination())) {
|
||||
// Replay the destinations.
|
||||
for (Iterator<DestinationInfo> iter = destinations.values().iterator(); iter.hasNext();) {
|
||||
DestinationInfo value = iter.next();
|
||||
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(value.getDestination());
|
||||
fireAdvisory(context, topic, value, info.getConsumerId());
|
||||
// We check here whether the Destination is Temporary Destination specific or not since we
|
||||
// can avoid sending advisory messages to the consumer if it only wants Temporary Destination
|
||||
// notifications. If its not just temporary destination related destinations then we have
|
||||
// to send them all, a composite destination could want both.
|
||||
if (AdvisorySupport.isTempDestinationAdvisoryTopic(info.getDestination())) {
|
||||
// Replay the temporary destinations.
|
||||
for (DestinationInfo destination : destinations.values()) {
|
||||
if (destination.getDestination().isTemporary()) {
|
||||
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination.getDestination());
|
||||
fireAdvisory(context, topic, destination, info.getConsumerId());
|
||||
}
|
||||
}
|
||||
} else if (AdvisorySupport.isDestinationAdvisoryTopic(info.getDestination())) {
|
||||
// Replay all the destinations.
|
||||
for (DestinationInfo destination : destinations.values()) {
|
||||
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination.getDestination());
|
||||
fireAdvisory(context, topic, destination, info.getConsumerId());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -191,7 +199,7 @@ public class AdvisoryBroker extends BrokerFilter {
|
|||
fireAdvisory(context, topic, info);
|
||||
try {
|
||||
next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), -1);
|
||||
} catch (Exception expectedIfDestinationDidNotExistYet) {
|
||||
} catch (Exception expectedIfDestinationDidNotExistYet) {
|
||||
}
|
||||
try {
|
||||
next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), -1);
|
||||
|
@ -203,7 +211,7 @@ public class AdvisoryBroker extends BrokerFilter {
|
|||
|
||||
@Override
|
||||
public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception {
|
||||
super.removeDestinationInfo(context, destInfo);
|
||||
super.removeDestinationInfo(context, destInfo);
|
||||
DestinationInfo info = destinations.remove(destInfo.getDestination());
|
||||
if (info != null) {
|
||||
// ensure we don't modify (and loose/overwrite) an in-flight add advisory, so duplicate
|
||||
|
@ -243,7 +251,7 @@ public class AdvisoryBroker extends BrokerFilter {
|
|||
ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest);
|
||||
consumers.remove(info.getConsumerId());
|
||||
if (!dest.isTemporary() || destinations.containsKey(dest)) {
|
||||
fireConsumerAdvisory(context,dest, topic, info.createRemoveCommand());
|
||||
fireConsumerAdvisory(context,dest, topic, info.createRemoveCommand());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -279,7 +287,7 @@ public class AdvisoryBroker extends BrokerFilter {
|
|||
handleFireFailure("expired", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
|
||||
super.messageConsumed(context, messageReference);
|
||||
|
@ -294,7 +302,7 @@ public class AdvisoryBroker extends BrokerFilter {
|
|||
handleFireFailure("consumed", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
|
||||
super.messageDelivered(context, messageReference);
|
||||
|
@ -309,7 +317,7 @@ public class AdvisoryBroker extends BrokerFilter {
|
|||
handleFireFailure("delivered", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
|
||||
super.messageDiscarded(context, sub, messageReference);
|
||||
|
@ -329,7 +337,7 @@ public class AdvisoryBroker extends BrokerFilter {
|
|||
handleFireFailure("discarded", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) {
|
||||
super.slowConsumer(context, destination,subs);
|
||||
|
@ -342,7 +350,7 @@ public class AdvisoryBroker extends BrokerFilter {
|
|||
handleFireFailure("slow consumer", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
|
||||
super.fastProducer(context, producerInfo);
|
||||
|
@ -355,7 +363,7 @@ public class AdvisoryBroker extends BrokerFilter {
|
|||
handleFireFailure("fast producer", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void isFull(ConnectionContext context, Destination destination, Usage usage) {
|
||||
super.isFull(context, destination, usage);
|
||||
|
@ -372,13 +380,13 @@ public class AdvisoryBroker extends BrokerFilter {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void nowMasterBroker() {
|
||||
public void nowMasterBroker() {
|
||||
super.nowMasterBroker();
|
||||
try {
|
||||
ActiveMQTopic topic = AdvisorySupport.getMasterBrokerAdvisoryTopic();
|
||||
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
|
||||
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
|
||||
ConnectionContext context = new ConnectionContext();
|
||||
context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
|
||||
context.setBroker(getBrokerService().getBroker());
|
||||
|
@ -387,7 +395,7 @@ public class AdvisoryBroker extends BrokerFilter {
|
|||
handleFireFailure("now master broker", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
|
||||
Subscription subscription){
|
||||
|
@ -401,7 +409,7 @@ public class AdvisoryBroker extends BrokerFilter {
|
|||
}
|
||||
} catch (Exception e) {
|
||||
handleFireFailure("add to DLQ", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -476,7 +484,7 @@ public class AdvisoryBroker extends BrokerFilter {
|
|||
}
|
||||
}
|
||||
advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_COUNT, count);
|
||||
|
||||
|
||||
fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
|
||||
}
|
||||
|
||||
|
@ -505,13 +513,13 @@ public class AdvisoryBroker extends BrokerFilter {
|
|||
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());
|
||||
String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET";
|
||||
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
|
||||
|
||||
|
||||
String url = getBrokerService().getVmConnectorURI().toString();
|
||||
if (getBrokerService().getDefaultSocketURIString() != null) {
|
||||
url = getBrokerService().getDefaultSocketURIString();
|
||||
}
|
||||
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url);
|
||||
|
||||
|
||||
//set the data structure
|
||||
advisoryMessage.setDataStructure(command);
|
||||
advisoryMessage.setPersistent(false);
|
||||
|
|
|
@ -59,7 +59,10 @@ public final class AdvisorySupport {
|
|||
public static final String MSG_PROPERTY_MESSAGE_ID = "orignalMessageId";
|
||||
public static final String MSG_PROPERTY_CONSUMER_COUNT = "consumerCount";
|
||||
public static final String MSG_PROPERTY_DISCARDED_COUNT = "discardedCount";
|
||||
|
||||
|
||||
public static final ActiveMQTopic ALL_DESTINATIONS_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(
|
||||
TOPIC_ADVISORY_TOPIC.getPhysicalName() + "," + QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," +
|
||||
TEMP_QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," + TEMP_TOPIC_ADVISORY_TOPIC.getPhysicalName());
|
||||
public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(
|
||||
TEMP_QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," + TEMP_TOPIC_ADVISORY_TOPIC.getPhysicalName());
|
||||
private static final ActiveMQTopic AGENT_TOPIC_DESTINATION = new ActiveMQTopic(AGENT_TOPIC);
|
||||
|
@ -187,7 +190,7 @@ public final class AdvisorySupport {
|
|||
+ destination.getPhysicalName();
|
||||
return new ActiveMQTopic(name);
|
||||
}
|
||||
|
||||
|
||||
public static ActiveMQTopic getMessageDLQdAdvisoryTopic(ActiveMQDestination destination) {
|
||||
String name = MESSAGE_DLQ_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
|
||||
+ destination.getPhysicalName();
|
||||
|
@ -239,6 +242,20 @@ public final class AdvisorySupport {
|
|||
return isDestinationAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
|
||||
}
|
||||
|
||||
public static boolean isTempDestinationAdvisoryTopic(ActiveMQDestination destination) {
|
||||
if (destination.isComposite()) {
|
||||
ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
|
||||
for (int i = 0; i < compositeDestinations.length; i++) {
|
||||
if (!isTempDestinationAdvisoryTopic(compositeDestinations[i])) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
} else {
|
||||
return destination.equals(TEMP_QUEUE_ADVISORY_TOPIC) || destination.equals(TEMP_TOPIC_ADVISORY_TOPIC);
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean isDestinationAdvisoryTopic(ActiveMQDestination destination) {
|
||||
if (destination.isComposite()) {
|
||||
ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
|
||||
|
|
Loading…
Reference in New Issue