From 8d11f07a96fe4e2a0a338e68c9785438813d53b6 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Fri, 18 Jan 2008 19:16:15 +0000 Subject: [PATCH] Fix for https://issues.apache.org/activemq/browse/AMQ-1553 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@613230 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/advisory/AdvisoryBroker.java | 5 +- .../advisory/ProducerEventSource.java | 1 + .../broker/region/BaseDestination.java | 59 ++++ .../apache/activemq/broker/region/Queue.java | 262 ++++++++---------- .../broker/region/QueueSubscription.java | 10 +- .../activemq/broker/region/RegionBroker.java | 1 + .../apache/activemq/broker/region/Topic.java | 89 ++---- .../region/policy/SimpleDispatchPolicy.java | 28 +- 8 files changed, 213 insertions(+), 242 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index d1b3014171..7769a0b194 100755 --- a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -137,7 +137,7 @@ public class AdvisoryBroker extends BrokerFilter { // Don't advise advisory topics. if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) { ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()); - fireAdvisory(context, topic, info); + fireProducerAdvisory(context, info.getDestination(), topic, info); producers.put(info.getProducerId(), info); } } @@ -282,8 +282,7 @@ public class AdvisoryBroker extends BrokerFilter { Set set = getDestinations(producerDestination); if (set != null) { for (Destination dest : set) { - count += dest.getDestinationStatistics().getConsumers() - .getCount(); + count += dest.getDestinationStatistics().getProducers().getCount(); } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerEventSource.java b/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerEventSource.java index 2ca45d7404..c7348c3347 100644 --- a/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerEventSource.java +++ b/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerEventSource.java @@ -112,6 +112,7 @@ public class ProducerEventSource implements Service, MessageListener { return n.intValue(); } LOG.warn("No producerCount header available on the message: " + message); + Thread.dumpStack(); } catch (Exception e) { LOG.warn("Failed to extract producerCount from message: " + message + ".Reason: " + e, e); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 9a5aff436f..f216f58740 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -16,8 +16,13 @@ */ package org.apache.activemq.broker.region; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.usage.MemoryUsage; +import org.apache.activemq.usage.SystemUsage; /** @@ -25,11 +30,40 @@ import org.apache.activemq.command.ProducerInfo; */ public abstract class BaseDestination implements Destination { + protected final ActiveMQDestination destination; + protected final Broker broker; + protected final MessageStore store; + protected final SystemUsage systemUsage; + protected final MemoryUsage memoryUsage; private boolean producerFlowControl = true; private int maxProducersToAudit=1024; private int maxAuditDepth=1; private boolean enableAudit=true; protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); + + /** + * @param broker + * @param store + * @param destination + * @param systemUsage + * @param parentStats + */ + public BaseDestination(Broker broker,MessageStore store,ActiveMQDestination destination, SystemUsage systemUsage,DestinationStatistics parentStats) { + this.broker=broker; + this.store=store; + this.destination=destination; + this.systemUsage=systemUsage; + this.memoryUsage = new MemoryUsage(systemUsage.getMemoryUsage(), destination.toString()); + this.memoryUsage.setUsagePortion(1.0f); + // Let the store know what usage manager we are using so that he can + // flush messages to disk when usage gets high. + if (store != null) { + store.setMemoryUsage(this.memoryUsage); + } + // let's copy the enabled property from the parent DestinationStatistics + this.destinationStatistics.setEnabled(parentStats.isEnabled()); + this.destinationStatistics.setParent(parentStats); + } /** * @return the producerFlowControl */ @@ -86,6 +120,31 @@ public abstract class BaseDestination implements Destination { public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception{ destinationStatistics.getProducers().decrement(); } + + public final MemoryUsage getBrokerMemoryUsage() { + return memoryUsage; + } + + public DestinationStatistics getDestinationStatistics() { + return destinationStatistics; + } + + public ActiveMQDestination getActiveMQDestination() { + return destination; + } + + public final String getDestination() { + return destination.getPhysicalName(); + } + + public final String getName() { + return getActiveMQDestination().getPhysicalName(); + } + + public final MessageStore getMessageStore() { + return store; + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index de7ef0f377..44d0a5daa0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -19,6 +19,7 @@ package org.apache.activemq.broker.region; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; @@ -58,7 +59,6 @@ import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transaction.Synchronization; -import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.util.BrokerSupport; import org.apache.commons.logging.Log; @@ -71,26 +71,16 @@ import org.apache.commons.logging.LogFactory; * @version $Revision: 1.28 $ */ public class Queue extends BaseDestination implements Task { - - final Broker broker; - + private static int MAXIMUM_PAGE_SIZE = 1000; private final Log log; - private final ActiveMQDestination destination; private final List consumers = new ArrayList(50); - private final SystemUsage systemUsage; - private final MemoryUsage memoryUsage; private PendingMessageCursor messages; - private final LinkedList pagedInMessages = new LinkedList(); + private final LinkedHashMap pagedInMessages = new LinkedHashMap(); private LockOwner exclusiveOwner; private MessageGroupMap messageGroupOwners; - - private int garbageSize; - private int garbageSizeBeforeCollection = 1000; private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy(); - private final MessageStore store; private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy(); private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory(); - private int maximumPagedInMessages = garbageSizeBeforeCollection * 2; private final Object exclusiveLockMutex = new Object(); private final Object sendLock = new Object(); private final TaskRunner taskRunner; @@ -104,15 +94,11 @@ public class Queue extends BaseDestination implements Task { } }; }; - + public Queue(Broker broker, ActiveMQDestination destination, final SystemUsage systemUsage, MessageStore store, DestinationStatistics parentStats, TaskRunnerFactory taskFactory, Store tmpStore) throws Exception { - this.broker = broker; - this.destination = destination; - this.systemUsage=systemUsage; - this.memoryUsage = new MemoryUsage(systemUsage.getMemoryUsage(), destination.toString()); - this.memoryUsage.setUsagePortion(1.0f); - this.store = store; + super(broker, store, destination,systemUsage, parentStats); + if (destination.isTemporary() || tmpStore==null ) { this.messages = new VMPendingMessageCursor(); } else { @@ -120,19 +106,7 @@ public class Queue extends BaseDestination implements Task { } this.taskRunner = taskFactory.createTaskRunner(this, "Queue " + destination.getPhysicalName()); - - // Let the store know what usage manager we are using so that he can - // flush messages to disk - // when usage gets high. - if (store != null) { - store.setMemoryUsage(memoryUsage); - } - - // let's copy the enabled property from the parent DestinationStatistics - this.destinationStatistics.setEnabled(parentStats.isEnabled()); - destinationStatistics.setParent(parentStats); this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName()); - } public void initialize() throws Exception { @@ -204,8 +178,6 @@ public class Queue extends BaseDestination implements Task { public void addSubscription(ConnectionContext context,Subscription sub) throws Exception { sub.add(context, this); destinationStatistics.getConsumers().increment(); - maximumPagedInMessages += sub.getConsumerInfo().getPrefetchSize(); - MessageEvaluationContext msgContext = new MessageEvaluationContext(); // needs to be synchronized - so no contention with dispatching @@ -239,7 +211,7 @@ public class Queue extends BaseDestination implements Task { synchronized (pagedInMessages) { // Add all the matching messages in the queue to the // subscription. - for (Iterator i = pagedInMessages.iterator(); i + for (Iterator i = pagedInMessages.values().iterator(); i .hasNext();) { QueueMessageReference node = (QueueMessageReference) i.next(); if (node.isDropped() @@ -263,7 +235,6 @@ public class Queue extends BaseDestination implements Task { public void removeSubscription(ConnectionContext context, Subscription sub) throws Exception { destinationStatistics.getConsumers().decrement(); - maximumPagedInMessages -= sub.getConsumerInfo().getPrefetchSize(); // synchronize with dispatch method so that no new messages are sent // while // removing up a subscription. @@ -309,7 +280,7 @@ public class Queue extends BaseDestination implements Task { // lets copy the messages to dispatch to avoid deadlock List messagesToDispatch = new ArrayList(); synchronized (pagedInMessages) { - for (Iterator i = pagedInMessages.iterator(); i + for (Iterator i = pagedInMessages.values().iterator(); i .hasNext();) { QueueMessageReference node = (QueueMessageReference) i .next(); @@ -493,40 +464,9 @@ public class Queue extends BaseDestination implements Task { destinationStatistics.setParent(null); } - public void dropEvent() { - dropEvent(false); - } - - public void dropEvent(boolean skipGc) { - // TODO: need to also decrement when messages expire. - destinationStatistics.getMessages().decrement(); - synchronized (pagedInMessages) { - garbageSize++; - } - if (!skipGc && garbageSize > garbageSizeBeforeCollection) { - gc(); - } - try { - taskRunner.wakeup(); - } catch (InterruptedException e) { - log.warn("Task Runner failed to wakeup ", e); - } - } - - public void gc() { - synchronized (pagedInMessages) { - for (Iterator i = pagedInMessages.iterator(); i.hasNext();) { - // Remove dropped messages from the queue. - QueueMessageReference node = (QueueMessageReference)i.next(); - if (node.isDropped()) { - garbageSize--; - i.remove(); - continue; - } - } - } - } - + public void gc(){ + } + public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException { if (store != null && node.isPersistent()) { // the original ack may be a ranged ack, but we are trying to delete @@ -589,18 +529,7 @@ public class Queue extends BaseDestination implements Task { return destination; } - public String getDestination() { - return destination.getPhysicalName(); - } - - public MemoryUsage getBrokerMemoryUsage() { - return memoryUsage; - } - - public DestinationStatistics getDestinationStatistics() { - return destinationStatistics; - } - + public MessageGroupMap getMessageGroupOwners() { if (messageGroupOwners == null) { messageGroupOwners = getMessageGroupMapFactory().createMessageGroupMap(); @@ -632,10 +561,6 @@ public class Queue extends BaseDestination implements Task { this.messageGroupMapFactory = messageGroupMapFactory; } - public String getName() { - return getActiveMQDestination().getPhysicalName(); - } - public PendingMessageCursor getMessages() { return this.messages; } @@ -652,10 +577,6 @@ public class Queue extends BaseDestination implements Task { return result; } - public MessageStore getMessageStore() { - return store; - } - public Message[] browse() { List l = new ArrayList(); try { @@ -664,7 +585,7 @@ public class Queue extends BaseDestination implements Task { log.error("caught an exception browsing " + this, e); } synchronized (pagedInMessages) { - for (Iterator i = pagedInMessages.iterator(); i.hasNext();) { + for (Iterator i = pagedInMessages.values().iterator(); i.hasNext();) { MessageReference r = i.next(); r.incrementReferenceCount(); try { @@ -736,15 +657,18 @@ public class Queue extends BaseDestination implements Task { return null; } - public void purge() throws Exception { + public void purge() throws Exception { + ConnectionContext c = createConnectionContext(); + List list = null; + do { + pageInMessages(); + synchronized (pagedInMessages) { + list = new ArrayList(pagedInMessages.values()); + } - pageInMessages(); - - synchronized (pagedInMessages) { - ConnectionContext c = createConnectionContext(); - for (Iterator i = pagedInMessages.iterator(); i.hasNext();) { + for (MessageReference ref : list) { try { - QueueMessageReference r = (QueueMessageReference)i.next(); + QueueMessageReference r = (QueueMessageReference) ref; // We should only delete messages that can be locked. if (r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER)) { @@ -752,18 +676,13 @@ public class Queue extends BaseDestination implements Task { ack.setAckType(MessageAck.STANDARD_ACK_TYPE); ack.setDestination(destination); ack.setMessageID(r.getMessageId()); - acknowledge(c, null, ack, r); - r.drop(); - dropEvent(true); + removeMessage(c, null, r, ack); } } catch (IOException e) { } } - - // Run gc() by hand. Had we run it in the loop it could be - // quite expensive. - gc(); - } + } while (!pagedInMessages.isEmpty() || this.destinationStatistics.getMessages().getCount() > 0); + gc(); } /** @@ -799,22 +718,29 @@ public class Queue extends BaseDestination implements Task { * @return the number of messages removed */ public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception { - pageInMessages(); - int counter = 0; - synchronized (pagedInMessages) { - ConnectionContext c = createConnectionContext(); - for (Iterator i = pagedInMessages.iterator(); i.hasNext();) { - IndirectMessageReference r = (IndirectMessageReference)i.next(); - if (filter.evaluate(c, r)) { - removeMessage(c, r); - if (++counter >= maximumMessages && maximumMessages > 0) { - break; - } - - } + int movedCounter = 0; + int count = 0; + ConnectionContext context = createConnectionContext(); + List list = null; + do { + pageInMessages(); + synchronized (pagedInMessages) { + list = new ArrayList(pagedInMessages.values()); } - } - return counter; + for (MessageReference ref : list) { + IndirectMessageReference r = (IndirectMessageReference) ref; + if (filter.evaluate(context, r)) { + + removeMessage(context, r); + if (++movedCounter >= maximumMessages + && maximumMessages > 0) { + return movedCounter; + } + } + count++; + } + } while (count < this.destinationStatistics.getMessages().getCount()); + return movedCounter; } /** @@ -850,26 +776,36 @@ public class Queue extends BaseDestination implements Task { * @return the number of messages copied */ public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception { - pageInMessages(); - int counter = 0; - synchronized (pagedInMessages) { - for (Iterator i = pagedInMessages.iterator(); i.hasNext();) { - MessageReference r = i.next(); + int movedCounter = 0; + int count = 0; + List list = null; + do { + pageInMessages(); + synchronized (pagedInMessages) { + list = new ArrayList(pagedInMessages.values()); + } + for (MessageReference ref : list) { + IndirectMessageReference r = (IndirectMessageReference) ref; if (filter.evaluate(context, r)) { - r.incrementReferenceCount(); - try { - Message m = r.getMessage(); - BrokerSupport.resend(context, m, dest); - if (++counter >= maximumMessages && maximumMessages > 0) { - break; + // We should only copy messages that can be locked. + if (lockMessage(r)) { + r.incrementReferenceCount(); + try { + Message m = r.getMessage(); + BrokerSupport.resend(context, m, dest); + if (++movedCounter >= maximumMessages + && maximumMessages > 0) { + return movedCounter; + } + } finally { + r.decrementReferenceCount(); } - } finally { - r.decrementReferenceCount(); } } + count++; } - } - return counter; + } while (count < this.destinationStatistics.getMessages().getCount()); + return movedCounter; } /** @@ -900,12 +836,17 @@ public class Queue extends BaseDestination implements Task { * Moves the messages matching the given filter up to the maximum number of * matched messages */ - public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception { - pageInMessages(); - int counter = 0; - synchronized (pagedInMessages) { - for (Iterator i = pagedInMessages.iterator(); i.hasNext();) { - IndirectMessageReference r = (IndirectMessageReference)i.next(); + public int moveMatchingMessagesTo(ConnectionContext context,MessageReferenceFilter filter, ActiveMQDestination dest,int maximumMessages) throws Exception { + int movedCounter = 0; + int count = 0; + List list = null; + do { + pageInMessages(); + synchronized (pagedInMessages) { + list = new ArrayList(pagedInMessages.values()); + } + for (MessageReference ref : list) { + IndirectMessageReference r = (IndirectMessageReference) ref; if (filter.evaluate(context, r)) { // We should only move messages that can be locked. if (lockMessage(r)) { @@ -914,17 +855,19 @@ public class Queue extends BaseDestination implements Task { Message m = r.getMessage(); BrokerSupport.resend(context, m, dest); removeMessage(context, r); - if (++counter >= maximumMessages && maximumMessages > 0) { - break; + if (++movedCounter >= maximumMessages + && maximumMessages > 0) { + return movedCounter; } } finally { r.decrementReferenceCount(); } } } + count++; } - } - return counter; + } while (count < this.destinationStatistics.getMessages().getCount()); + return movedCounter; } /** @@ -937,7 +880,6 @@ public class Queue extends BaseDestination implements Task { Runnable op = messagesWaitingForSpace.removeFirst(); op.run(); } - try { pageInMessages(false); } catch (Exception e) { @@ -976,9 +918,21 @@ public class Queue extends BaseDestination implements Task { ack.setAckType(MessageAck.STANDARD_ACK_TYPE); ack.setDestination(destination); ack.setMessageID(r.getMessageId()); - acknowledge(c, null, ack, r); - r.drop(); - dropEvent(); + removeMessage(c, null, r, ack); + } + + protected void removeMessage(ConnectionContext context,Subscription sub,QueueMessageReference reference,MessageAck ack) throws IOException { + reference.drop(); + acknowledge(context, sub, ack, reference); + destinationStatistics.getMessages().decrement(); + synchronized(pagedInMessages) { + pagedInMessages.remove(reference.getMessageId()); + } + try { + taskRunner.wakeup(); + } catch (InterruptedException e) { + log.warn("Task Runner failed to wakeup ", e); + } } protected boolean lockMessage(IndirectMessageReference r) { @@ -1008,7 +962,7 @@ public class Queue extends BaseDestination implements Task { } private List buildList(boolean force) throws Exception { - final int toPageIn = maximumPagedInMessages - pagedInMessages.size(); + final int toPageIn = MAXIMUM_PAGE_SIZE - pagedInMessages.size(); List result = null; if ((force || !consumers.isEmpty()) && toPageIn > 0) { messages.setMaxBatchSize(toPageIn); @@ -1036,7 +990,9 @@ public class Queue extends BaseDestination implements Task { } } synchronized (pagedInMessages) { - pagedInMessages.addAll(result); + for(MessageReference ref:result) { + pagedInMessages.put(ref.getMessageId(), ref); + } } } return result; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java index 425a8dbd58..dc4f00d55a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java @@ -46,22 +46,18 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner * * @throws IOException */ - protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference n) throws IOException { + protected void acknowledge(final ConnectionContext context, final MessageAck ack, final MessageReference n) throws IOException { final Destination q = n.getRegionDestination(); - q.acknowledge(context, this, ack, n); - final QueueMessageReference node = (QueueMessageReference)n; final Queue queue = (Queue)q; if (!ack.isInTransaction()) { - node.drop(); - queue.dropEvent(); + queue.removeMessage(context, this, node, ack); } else { node.setAcked(true); context.getTransaction().addSynchronization(new Synchronization() { public void afterCommit() throws Exception { - node.drop(); - queue.dropEvent(); + queue.removeMessage(context, QueueSubscription.this, node, ack); } public void afterRollback() throws Exception { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index ef64fb5ed7..e5daae1c6d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -326,6 +326,7 @@ public class RegionBroker implements Broker { throws Exception { ActiveMQDestination destination = info.getDestination(); if (destination != null) { + addDestination(context, destination); switch (destination.getDestinationType()) { case ActiveMQDestination.QUEUE_TYPE: queueRegion.addProducer(context, info); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index a916b29c21..36404777d4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -18,7 +18,6 @@ package org.apache.activemq.broker.region; import java.io.IOException; import java.util.LinkedList; -import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -48,14 +47,12 @@ import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.state.ProducerState; import org.apache.activemq.store.MessageRecoveryListener; -import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.thread.Valve; import org.apache.activemq.transaction.Synchronization; -import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.util.SubscriptionKey; import org.apache.commons.logging.Log; @@ -69,14 +66,9 @@ import org.apache.commons.logging.LogFactory; */ public class Topic extends BaseDestination implements Task{ private static final Log LOG = LogFactory.getLog(Topic.class); - protected final ActiveMQDestination destination; + private final TopicMessageStore topicStore; protected final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList(); - protected final Valve dispatchValve = new Valve(true); - // this could be NULL! (If an advisory) - protected final TopicMessageStore store; - private final SystemUsage systemUsage; - private final MemoryUsage memoryUsage; - + protected final Valve dispatchValve = new Valve(true); private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy(); private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; private boolean sendAdvisoryIfNoConsumers; @@ -92,16 +84,12 @@ public class Topic extends BaseDestination implements Task{ } }; }; - private final Broker broker; + public Topic(Broker broker, ActiveMQDestination destination, TopicMessageStore store, SystemUsage systemUsage, DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception { - this.broker = broker; - this.destination = destination; - this.store = store; // this could be NULL! (If an advisory) - this.systemUsage=systemUsage; - this.memoryUsage = new MemoryUsage(systemUsage.getMemoryUsage(), destination.toString()); - this.memoryUsage.setUsagePortion(1.0f); + super(broker, store, destination,systemUsage, parentStats); + this.topicStore=store; //set default subscription recovery policy if (destination.isTemporary() || AdvisorySupport.isAdvisoryTopic(destination) ){ subscriptionRecoveryPolicy= new NoSubscriptionRecoveryPolicy(); @@ -110,16 +98,6 @@ public class Topic extends BaseDestination implements Task{ subscriptionRecoveryPolicy= new FixedSizedSubscriptionRecoveryPolicy(); } this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName()); - // Let the store know what usage manager we are using so that he can - // flush messages to disk - // when usage gets high. - if (store != null) { - store.setMemoryUsage(memoryUsage); - } - - // let's copy the enabled property from the parent DestinationStatistics - this.destinationStatistics.setEnabled(parentStats.isEnabled()); - this.destinationStatistics.setParent(parentStats); } public boolean lock(MessageReference node, LockOwner sub) { @@ -174,8 +152,8 @@ public class Topic extends BaseDestination implements Task{ } public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws IOException { - if (store != null) { - store.deleteSubscription(key.clientId, key.subscriptionName); + if (topicStore != null) { + topicStore.deleteSubscription(key.clientId, key.subscriptionName); Object removed = durableSubcribers.remove(key); if (removed != null) { destinationStatistics.getConsumers().decrement(); @@ -194,7 +172,7 @@ public class Topic extends BaseDestination implements Task{ consumers.add(subscription); } - if (store == null) { + if (topicStore == null) { return; } @@ -202,13 +180,13 @@ public class Topic extends BaseDestination implements Task{ String clientId = subscription.getClientId(); String subscriptionName = subscription.getSubscriptionName(); String selector = subscription.getConsumerInfo().getSelector(); - SubscriptionInfo info = store.lookupSubscription(clientId, subscriptionName); + SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName); if (info != null) { // Check to see if selector changed. String s1 = info.getSelector(); if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector))) { // Need to delete the subscription - store.deleteSubscription(clientId, subscriptionName); + topicStore.deleteSubscription(clientId, subscriptionName); info = null; } } @@ -222,13 +200,13 @@ public class Topic extends BaseDestination implements Task{ // Thi destination is an actual destination id. info.setSubscribedDestination(subscription.getConsumerInfo().getDestination()); // This destination might be a pattern - store.addSubsciption(info,subscription.getConsumerInfo().isRetroactive()); + topicStore.addSubsciption(info,subscription.getConsumerInfo().isRetroactive()); } final MessageEvaluationContext msgContext = new MessageEvaluationContext(); msgContext.setDestination(destination); if (subscription.isRecoveryRequired()) { - store.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() { + topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() { public boolean recoverMessage(Message message) throws Exception { message.setRegionDestination(Topic.this); try { @@ -395,14 +373,14 @@ public class Topic extends BaseDestination implements Task{ .getConnectionContext(); message.setRegionDestination(this); - if (store != null && message.isPersistent() + if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) { while (!systemUsage.getStoreUsage().waitForSpace(1000)) { if (context.getStopping().get()) { throw new IOException("Connection closed, send aborted."); } } - store.addMessage(context, message); + topicStore.addMessage(context, message); } message.incrementReferenceCount(); @@ -446,15 +424,15 @@ public class Topic extends BaseDestination implements Task{ } public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException { - if (store != null && node.isPersistent()) { + if (topicStore != null && node.isPersistent()) { DurableTopicSubscription dsub = (DurableTopicSubscription)sub; - store.acknowledge(context, dsub.getClientId(), dsub.getSubscriptionName(), node.getMessageId()); + topicStore.acknowledge(context, dsub.getClientId(), dsub.getSubscriptionName(), node.getMessageId()); } } public void dispose(ConnectionContext context) throws IOException { - if (store != null) { - store.removeAllMessages(context); + if (topicStore != null) { + topicStore.removeAllMessages(context); } destinationStatistics.setParent(null); } @@ -463,7 +441,7 @@ public class Topic extends BaseDestination implements Task{ } public Message loadMessage(MessageId messageId) throws IOException { - return store != null ? store.getMessage(messageId) : null; + return topicStore != null ? topicStore.getMessage(messageId) : null; } public void start() throws Exception { @@ -487,8 +465,8 @@ public class Topic extends BaseDestination implements Task{ public Message[] browse() { final Set result = new CopyOnWriteArraySet(); try { - if (store != null) { - store.recover(new MessageRecoveryListener() { + if (topicStore != null) { + topicStore.recover(new MessageRecoveryListener() { public boolean recoverMessage(Message message) throws Exception { result.add(message); return true; @@ -527,21 +505,7 @@ public class Topic extends BaseDestination implements Task{ // Properties // ------------------------------------------------------------------------- - public MemoryUsage getBrokerMemoryUsage() { - return memoryUsage; - } - - public DestinationStatistics getDestinationStatistics() { - return destinationStatistics; - } - - public ActiveMQDestination getActiveMQDestination() { - return destination; - } - - public String getDestination() { - return destination.getPhysicalName(); - } + public DispatchPolicy getDispatchPolicy() { return dispatchPolicy; @@ -567,10 +531,6 @@ public class Topic extends BaseDestination implements Task{ this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers; } - public MessageStore getMessageStore() { - return store; - } - public DeadLetterStrategy getDeadLetterStrategy() { return deadLetterStrategy; } @@ -579,10 +539,7 @@ public class Topic extends BaseDestination implements Task{ this.deadLetterStrategy = deadLetterStrategy; } - public String getName() { - return getActiveMQDestination().getPhysicalName(); - } - + // Implementation methods // ------------------------------------------------------------------------- protected void dispatch(final ConnectionContext context, Message message) throws Exception { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java index 29469f4d5c..79ea292d6b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java @@ -31,22 +31,24 @@ import org.apache.activemq.filter.MessageEvaluationContext; */ public class SimpleDispatchPolicy implements DispatchPolicy { - public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Exception { + public boolean dispatch(MessageReference node,MessageEvaluationContext msgContext, List consumers) + throws Exception { + int count = 0; - for (Iterator iter = consumers.iterator(); iter.hasNext();) { - Subscription sub = (Subscription)iter.next(); + synchronized (consumers) { + for (Subscription sub:consumers) { + // Don't deliver to browsers + if (sub.getConsumerInfo().isBrowser()) { + continue; + } + // Only dispatch to interested subscriptions + if (!sub.matches(node, msgContext)) { + continue; + } - // Don't deliver to browsers - if (sub.getConsumerInfo().isBrowser()) { - continue; + sub.add(node); + count++; } - // Only dispatch to interested subscriptions - if (!sub.matches(node, msgContext)) { - continue; - } - - sub.add(node); - count++; } return count > 0; }