git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@613230 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-01-18 19:16:15 +00:00
parent 671ed4916e
commit 8d11f07a96
8 changed files with 213 additions and 242 deletions

View File

@ -137,7 +137,7 @@ public class AdvisoryBroker extends BrokerFilter {
// Don't advise advisory topics. // Don't advise advisory topics.
if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) { if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()); ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination());
fireAdvisory(context, topic, info); fireProducerAdvisory(context, info.getDestination(), topic, info);
producers.put(info.getProducerId(), info); producers.put(info.getProducerId(), info);
} }
} }
@ -282,8 +282,7 @@ public class AdvisoryBroker extends BrokerFilter {
Set<Destination> set = getDestinations(producerDestination); Set<Destination> set = getDestinations(producerDestination);
if (set != null) { if (set != null) {
for (Destination dest : set) { for (Destination dest : set) {
count += dest.getDestinationStatistics().getConsumers() count += dest.getDestinationStatistics().getProducers().getCount();
.getCount();
} }
} }
} }

View File

@ -112,6 +112,7 @@ public class ProducerEventSource implements Service, MessageListener {
return n.intValue(); return n.intValue();
} }
LOG.warn("No producerCount header available on the message: " + message); LOG.warn("No producerCount header available on the message: " + message);
Thread.dumpStack();
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Failed to extract producerCount from message: " + message + ".Reason: " + e, e); LOG.warn("Failed to extract producerCount from message: " + message + ".Reason: " + e, e);
} }

View File

@ -16,8 +16,13 @@
*/ */
package org.apache.activemq.broker.region; package org.apache.activemq.broker.region;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
/** /**
@ -25,11 +30,40 @@ import org.apache.activemq.command.ProducerInfo;
*/ */
public abstract class BaseDestination implements Destination { public abstract class BaseDestination implements Destination {
protected final ActiveMQDestination destination;
protected final Broker broker;
protected final MessageStore store;
protected final SystemUsage systemUsage;
protected final MemoryUsage memoryUsage;
private boolean producerFlowControl = true; private boolean producerFlowControl = true;
private int maxProducersToAudit=1024; private int maxProducersToAudit=1024;
private int maxAuditDepth=1; private int maxAuditDepth=1;
private boolean enableAudit=true; private boolean enableAudit=true;
protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
/**
* @param broker
* @param store
* @param destination
* @param systemUsage
* @param parentStats
*/
public BaseDestination(Broker broker,MessageStore store,ActiveMQDestination destination, SystemUsage systemUsage,DestinationStatistics parentStats) {
this.broker=broker;
this.store=store;
this.destination=destination;
this.systemUsage=systemUsage;
this.memoryUsage = new MemoryUsage(systemUsage.getMemoryUsage(), destination.toString());
this.memoryUsage.setUsagePortion(1.0f);
// Let the store know what usage manager we are using so that he can
// flush messages to disk when usage gets high.
if (store != null) {
store.setMemoryUsage(this.memoryUsage);
}
// let's copy the enabled property from the parent DestinationStatistics
this.destinationStatistics.setEnabled(parentStats.isEnabled());
this.destinationStatistics.setParent(parentStats);
}
/** /**
* @return the producerFlowControl * @return the producerFlowControl
*/ */
@ -87,5 +121,30 @@ public abstract class BaseDestination implements Destination {
destinationStatistics.getProducers().decrement(); destinationStatistics.getProducers().decrement();
} }
public final MemoryUsage getBrokerMemoryUsage() {
return memoryUsage;
}
public DestinationStatistics getDestinationStatistics() {
return destinationStatistics;
}
public ActiveMQDestination getActiveMQDestination() {
return destination;
}
public final String getDestination() {
return destination.getPhysicalName();
}
public final String getName() {
return getActiveMQDestination().getPhysicalName();
}
public final MessageStore getMessageStore() {
return store;
}
} }

View File

@ -19,6 +19,7 @@ package org.apache.activemq.broker.region;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -58,7 +59,6 @@ import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.BrokerSupport; import org.apache.activemq.util.BrokerSupport;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -71,26 +71,16 @@ import org.apache.commons.logging.LogFactory;
* @version $Revision: 1.28 $ * @version $Revision: 1.28 $
*/ */
public class Queue extends BaseDestination implements Task { public class Queue extends BaseDestination implements Task {
private static int MAXIMUM_PAGE_SIZE = 1000;
final Broker broker;
private final Log log; private final Log log;
private final ActiveMQDestination destination;
private final List<Subscription> consumers = new ArrayList<Subscription>(50); private final List<Subscription> consumers = new ArrayList<Subscription>(50);
private final SystemUsage systemUsage;
private final MemoryUsage memoryUsage;
private PendingMessageCursor messages; private PendingMessageCursor messages;
private final LinkedList<MessageReference> pagedInMessages = new LinkedList<MessageReference>(); private final LinkedHashMap<MessageId,MessageReference> pagedInMessages = new LinkedHashMap<MessageId,MessageReference>();
private LockOwner exclusiveOwner; private LockOwner exclusiveOwner;
private MessageGroupMap messageGroupOwners; private MessageGroupMap messageGroupOwners;
private int garbageSize;
private int garbageSizeBeforeCollection = 1000;
private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy(); private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
private final MessageStore store;
private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy(); private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory(); private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
private int maximumPagedInMessages = garbageSizeBeforeCollection * 2;
private final Object exclusiveLockMutex = new Object(); private final Object exclusiveLockMutex = new Object();
private final Object sendLock = new Object(); private final Object sendLock = new Object();
private final TaskRunner taskRunner; private final TaskRunner taskRunner;
@ -107,12 +97,8 @@ public class Queue extends BaseDestination implements Task {
public Queue(Broker broker, ActiveMQDestination destination, final SystemUsage systemUsage, MessageStore store, DestinationStatistics parentStats, public Queue(Broker broker, ActiveMQDestination destination, final SystemUsage systemUsage, MessageStore store, DestinationStatistics parentStats,
TaskRunnerFactory taskFactory, Store tmpStore) throws Exception { TaskRunnerFactory taskFactory, Store tmpStore) throws Exception {
this.broker = broker; super(broker, store, destination,systemUsage, parentStats);
this.destination = destination;
this.systemUsage=systemUsage;
this.memoryUsage = new MemoryUsage(systemUsage.getMemoryUsage(), destination.toString());
this.memoryUsage.setUsagePortion(1.0f);
this.store = store;
if (destination.isTemporary() || tmpStore==null ) { if (destination.isTemporary() || tmpStore==null ) {
this.messages = new VMPendingMessageCursor(); this.messages = new VMPendingMessageCursor();
} else { } else {
@ -120,19 +106,7 @@ public class Queue extends BaseDestination implements Task {
} }
this.taskRunner = taskFactory.createTaskRunner(this, "Queue " + destination.getPhysicalName()); this.taskRunner = taskFactory.createTaskRunner(this, "Queue " + destination.getPhysicalName());
// Let the store know what usage manager we are using so that he can
// flush messages to disk
// when usage gets high.
if (store != null) {
store.setMemoryUsage(memoryUsage);
}
// let's copy the enabled property from the parent DestinationStatistics
this.destinationStatistics.setEnabled(parentStats.isEnabled());
destinationStatistics.setParent(parentStats);
this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName()); this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName());
} }
public void initialize() throws Exception { public void initialize() throws Exception {
@ -204,8 +178,6 @@ public class Queue extends BaseDestination implements Task {
public void addSubscription(ConnectionContext context,Subscription sub) throws Exception { public void addSubscription(ConnectionContext context,Subscription sub) throws Exception {
sub.add(context, this); sub.add(context, this);
destinationStatistics.getConsumers().increment(); destinationStatistics.getConsumers().increment();
maximumPagedInMessages += sub.getConsumerInfo().getPrefetchSize();
MessageEvaluationContext msgContext = new MessageEvaluationContext(); MessageEvaluationContext msgContext = new MessageEvaluationContext();
// needs to be synchronized - so no contention with dispatching // needs to be synchronized - so no contention with dispatching
@ -239,7 +211,7 @@ public class Queue extends BaseDestination implements Task {
synchronized (pagedInMessages) { synchronized (pagedInMessages) {
// Add all the matching messages in the queue to the // Add all the matching messages in the queue to the
// subscription. // subscription.
for (Iterator<MessageReference> i = pagedInMessages.iterator(); i for (Iterator<MessageReference> i = pagedInMessages.values().iterator(); i
.hasNext();) { .hasNext();) {
QueueMessageReference node = (QueueMessageReference) i.next(); QueueMessageReference node = (QueueMessageReference) i.next();
if (node.isDropped() if (node.isDropped()
@ -263,7 +235,6 @@ public class Queue extends BaseDestination implements Task {
public void removeSubscription(ConnectionContext context, Subscription sub) public void removeSubscription(ConnectionContext context, Subscription sub)
throws Exception { throws Exception {
destinationStatistics.getConsumers().decrement(); destinationStatistics.getConsumers().decrement();
maximumPagedInMessages -= sub.getConsumerInfo().getPrefetchSize();
// synchronize with dispatch method so that no new messages are sent // synchronize with dispatch method so that no new messages are sent
// while // while
// removing up a subscription. // removing up a subscription.
@ -309,7 +280,7 @@ public class Queue extends BaseDestination implements Task {
// lets copy the messages to dispatch to avoid deadlock // lets copy the messages to dispatch to avoid deadlock
List<QueueMessageReference> messagesToDispatch = new ArrayList<QueueMessageReference>(); List<QueueMessageReference> messagesToDispatch = new ArrayList<QueueMessageReference>();
synchronized (pagedInMessages) { synchronized (pagedInMessages) {
for (Iterator<MessageReference> i = pagedInMessages.iterator(); i for (Iterator<MessageReference> i = pagedInMessages.values().iterator(); i
.hasNext();) { .hasNext();) {
QueueMessageReference node = (QueueMessageReference) i QueueMessageReference node = (QueueMessageReference) i
.next(); .next();
@ -493,38 +464,7 @@ public class Queue extends BaseDestination implements Task {
destinationStatistics.setParent(null); destinationStatistics.setParent(null);
} }
public void dropEvent() {
dropEvent(false);
}
public void dropEvent(boolean skipGc) {
// TODO: need to also decrement when messages expire.
destinationStatistics.getMessages().decrement();
synchronized (pagedInMessages) {
garbageSize++;
}
if (!skipGc && garbageSize > garbageSizeBeforeCollection) {
gc();
}
try {
taskRunner.wakeup();
} catch (InterruptedException e) {
log.warn("Task Runner failed to wakeup ", e);
}
}
public void gc(){ public void gc(){
synchronized (pagedInMessages) {
for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();) {
// Remove dropped messages from the queue.
QueueMessageReference node = (QueueMessageReference)i.next();
if (node.isDropped()) {
garbageSize--;
i.remove();
continue;
}
}
}
} }
public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException { public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException {
@ -589,17 +529,6 @@ public class Queue extends BaseDestination implements Task {
return destination; return destination;
} }
public String getDestination() {
return destination.getPhysicalName();
}
public MemoryUsage getBrokerMemoryUsage() {
return memoryUsage;
}
public DestinationStatistics getDestinationStatistics() {
return destinationStatistics;
}
public MessageGroupMap getMessageGroupOwners() { public MessageGroupMap getMessageGroupOwners() {
if (messageGroupOwners == null) { if (messageGroupOwners == null) {
@ -632,10 +561,6 @@ public class Queue extends BaseDestination implements Task {
this.messageGroupMapFactory = messageGroupMapFactory; this.messageGroupMapFactory = messageGroupMapFactory;
} }
public String getName() {
return getActiveMQDestination().getPhysicalName();
}
public PendingMessageCursor getMessages() { public PendingMessageCursor getMessages() {
return this.messages; return this.messages;
} }
@ -652,10 +577,6 @@ public class Queue extends BaseDestination implements Task {
return result; return result;
} }
public MessageStore getMessageStore() {
return store;
}
public Message[] browse() { public Message[] browse() {
List<Message> l = new ArrayList<Message>(); List<Message> l = new ArrayList<Message>();
try { try {
@ -664,7 +585,7 @@ public class Queue extends BaseDestination implements Task {
log.error("caught an exception browsing " + this, e); log.error("caught an exception browsing " + this, e);
} }
synchronized (pagedInMessages) { synchronized (pagedInMessages) {
for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();) { for (Iterator<MessageReference> i = pagedInMessages.values().iterator(); i.hasNext();) {
MessageReference r = i.next(); MessageReference r = i.next();
r.incrementReferenceCount(); r.incrementReferenceCount();
try { try {
@ -737,14 +658,17 @@ public class Queue extends BaseDestination implements Task {
} }
public void purge() throws Exception { public void purge() throws Exception {
pageInMessages();
synchronized (pagedInMessages) {
ConnectionContext c = createConnectionContext(); ConnectionContext c = createConnectionContext();
for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();) { List<MessageReference> list = null;
do {
pageInMessages();
synchronized (pagedInMessages) {
list = new ArrayList<MessageReference>(pagedInMessages.values());
}
for (MessageReference ref : list) {
try { try {
QueueMessageReference r = (QueueMessageReference)i.next(); QueueMessageReference r = (QueueMessageReference) ref;
// We should only delete messages that can be locked. // We should only delete messages that can be locked.
if (r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER)) { if (r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER)) {
@ -752,19 +676,14 @@ public class Queue extends BaseDestination implements Task {
ack.setAckType(MessageAck.STANDARD_ACK_TYPE); ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
ack.setDestination(destination); ack.setDestination(destination);
ack.setMessageID(r.getMessageId()); ack.setMessageID(r.getMessageId());
acknowledge(c, null, ack, r); removeMessage(c, null, r, ack);
r.drop();
dropEvent(true);
} }
} catch (IOException e) { } catch (IOException e) {
} }
} }
} while (!pagedInMessages.isEmpty() || this.destinationStatistics.getMessages().getCount() > 0);
// Run gc() by hand. Had we run it in the loop it could be
// quite expensive.
gc(); gc();
} }
}
/** /**
* Removes the message matching the given messageId * Removes the message matching the given messageId
@ -799,22 +718,29 @@ public class Queue extends BaseDestination implements Task {
* @return the number of messages removed * @return the number of messages removed
*/ */
public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception { public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception {
int movedCounter = 0;
int count = 0;
ConnectionContext context = createConnectionContext();
List<MessageReference> list = null;
do {
pageInMessages(); pageInMessages();
int counter = 0;
synchronized (pagedInMessages) { synchronized (pagedInMessages) {
ConnectionContext c = createConnectionContext(); list = new ArrayList<MessageReference>(pagedInMessages.values());
for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();) {
IndirectMessageReference r = (IndirectMessageReference)i.next();
if (filter.evaluate(c, r)) {
removeMessage(c, r);
if (++counter >= maximumMessages && maximumMessages > 0) {
break;
} }
for (MessageReference ref : list) {
IndirectMessageReference r = (IndirectMessageReference) ref;
if (filter.evaluate(context, r)) {
removeMessage(context, r);
if (++movedCounter >= maximumMessages
&& maximumMessages > 0) {
return movedCounter;
} }
} }
count++;
} }
return counter; } while (count < this.destinationStatistics.getMessages().getCount());
return movedCounter;
} }
/** /**
@ -850,26 +776,36 @@ public class Queue extends BaseDestination implements Task {
* @return the number of messages copied * @return the number of messages copied
*/ */
public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception { public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception {
int movedCounter = 0;
int count = 0;
List<MessageReference> list = null;
do {
pageInMessages(); pageInMessages();
int counter = 0;
synchronized (pagedInMessages) { synchronized (pagedInMessages) {
for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();) { list = new ArrayList<MessageReference>(pagedInMessages.values());
MessageReference r = i.next(); }
for (MessageReference ref : list) {
IndirectMessageReference r = (IndirectMessageReference) ref;
if (filter.evaluate(context, r)) { if (filter.evaluate(context, r)) {
// We should only copy messages that can be locked.
if (lockMessage(r)) {
r.incrementReferenceCount(); r.incrementReferenceCount();
try { try {
Message m = r.getMessage(); Message m = r.getMessage();
BrokerSupport.resend(context, m, dest); BrokerSupport.resend(context, m, dest);
if (++counter >= maximumMessages && maximumMessages > 0) { if (++movedCounter >= maximumMessages
break; && maximumMessages > 0) {
return movedCounter;
} }
} finally { } finally {
r.decrementReferenceCount(); r.decrementReferenceCount();
} }
} }
} }
count++;
} }
return counter; } while (count < this.destinationStatistics.getMessages().getCount());
return movedCounter;
} }
/** /**
@ -901,11 +837,16 @@ public class Queue extends BaseDestination implements Task {
* matched messages * matched messages
*/ */
public int moveMatchingMessagesTo(ConnectionContext context,MessageReferenceFilter filter, ActiveMQDestination dest,int maximumMessages) throws Exception { public int moveMatchingMessagesTo(ConnectionContext context,MessageReferenceFilter filter, ActiveMQDestination dest,int maximumMessages) throws Exception {
int movedCounter = 0;
int count = 0;
List<MessageReference> list = null;
do {
pageInMessages(); pageInMessages();
int counter = 0;
synchronized (pagedInMessages) { synchronized (pagedInMessages) {
for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();) { list = new ArrayList<MessageReference>(pagedInMessages.values());
IndirectMessageReference r = (IndirectMessageReference)i.next(); }
for (MessageReference ref : list) {
IndirectMessageReference r = (IndirectMessageReference) ref;
if (filter.evaluate(context, r)) { if (filter.evaluate(context, r)) {
// We should only move messages that can be locked. // We should only move messages that can be locked.
if (lockMessage(r)) { if (lockMessage(r)) {
@ -914,17 +855,19 @@ public class Queue extends BaseDestination implements Task {
Message m = r.getMessage(); Message m = r.getMessage();
BrokerSupport.resend(context, m, dest); BrokerSupport.resend(context, m, dest);
removeMessage(context, r); removeMessage(context, r);
if (++counter >= maximumMessages && maximumMessages > 0) { if (++movedCounter >= maximumMessages
break; && maximumMessages > 0) {
return movedCounter;
} }
} finally { } finally {
r.decrementReferenceCount(); r.decrementReferenceCount();
} }
} }
} }
count++;
} }
} } while (count < this.destinationStatistics.getMessages().getCount());
return counter; return movedCounter;
} }
/** /**
@ -937,7 +880,6 @@ public class Queue extends BaseDestination implements Task {
Runnable op = messagesWaitingForSpace.removeFirst(); Runnable op = messagesWaitingForSpace.removeFirst();
op.run(); op.run();
} }
try { try {
pageInMessages(false); pageInMessages(false);
} catch (Exception e) { } catch (Exception e) {
@ -976,9 +918,21 @@ public class Queue extends BaseDestination implements Task {
ack.setAckType(MessageAck.STANDARD_ACK_TYPE); ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
ack.setDestination(destination); ack.setDestination(destination);
ack.setMessageID(r.getMessageId()); ack.setMessageID(r.getMessageId());
acknowledge(c, null, ack, r); removeMessage(c, null, r, ack);
r.drop(); }
dropEvent();
protected void removeMessage(ConnectionContext context,Subscription sub,QueueMessageReference reference,MessageAck ack) throws IOException {
reference.drop();
acknowledge(context, sub, ack, reference);
destinationStatistics.getMessages().decrement();
synchronized(pagedInMessages) {
pagedInMessages.remove(reference.getMessageId());
}
try {
taskRunner.wakeup();
} catch (InterruptedException e) {
log.warn("Task Runner failed to wakeup ", e);
}
} }
protected boolean lockMessage(IndirectMessageReference r) { protected boolean lockMessage(IndirectMessageReference r) {
@ -1008,7 +962,7 @@ public class Queue extends BaseDestination implements Task {
} }
private List<MessageReference> buildList(boolean force) throws Exception { private List<MessageReference> buildList(boolean force) throws Exception {
final int toPageIn = maximumPagedInMessages - pagedInMessages.size(); final int toPageIn = MAXIMUM_PAGE_SIZE - pagedInMessages.size();
List<MessageReference> result = null; List<MessageReference> result = null;
if ((force || !consumers.isEmpty()) && toPageIn > 0) { if ((force || !consumers.isEmpty()) && toPageIn > 0) {
messages.setMaxBatchSize(toPageIn); messages.setMaxBatchSize(toPageIn);
@ -1036,7 +990,9 @@ public class Queue extends BaseDestination implements Task {
} }
} }
synchronized (pagedInMessages) { synchronized (pagedInMessages) {
pagedInMessages.addAll(result); for(MessageReference ref:result) {
pagedInMessages.put(ref.getMessageId(), ref);
}
} }
} }
return result; return result;

View File

@ -46,22 +46,18 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
* *
* @throws IOException * @throws IOException
*/ */
protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference n) throws IOException { protected void acknowledge(final ConnectionContext context, final MessageAck ack, final MessageReference n) throws IOException {
final Destination q = n.getRegionDestination(); final Destination q = n.getRegionDestination();
q.acknowledge(context, this, ack, n);
final QueueMessageReference node = (QueueMessageReference)n; final QueueMessageReference node = (QueueMessageReference)n;
final Queue queue = (Queue)q; final Queue queue = (Queue)q;
if (!ack.isInTransaction()) { if (!ack.isInTransaction()) {
node.drop(); queue.removeMessage(context, this, node, ack);
queue.dropEvent();
} else { } else {
node.setAcked(true); node.setAcked(true);
context.getTransaction().addSynchronization(new Synchronization() { context.getTransaction().addSynchronization(new Synchronization() {
public void afterCommit() throws Exception { public void afterCommit() throws Exception {
node.drop(); queue.removeMessage(context, QueueSubscription.this, node, ack);
queue.dropEvent();
} }
public void afterRollback() throws Exception { public void afterRollback() throws Exception {

View File

@ -326,6 +326,7 @@ public class RegionBroker implements Broker {
throws Exception { throws Exception {
ActiveMQDestination destination = info.getDestination(); ActiveMQDestination destination = info.getDestination();
if (destination != null) { if (destination != null) {
addDestination(context, destination);
switch (destination.getDestinationType()) { switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE: case ActiveMQDestination.QUEUE_TYPE:
queueRegion.addProducer(context, info); queueRegion.addProducer(context, info);

View File

@ -18,7 +18,6 @@ package org.apache.activemq.broker.region;
import java.io.IOException; import java.io.IOException;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
@ -48,14 +47,12 @@ import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.state.ProducerState; import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.Task; import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.thread.Valve; import org.apache.activemq.thread.Valve;
import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.SubscriptionKey; import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -69,14 +66,9 @@ import org.apache.commons.logging.LogFactory;
*/ */
public class Topic extends BaseDestination implements Task{ public class Topic extends BaseDestination implements Task{
private static final Log LOG = LogFactory.getLog(Topic.class); private static final Log LOG = LogFactory.getLog(Topic.class);
protected final ActiveMQDestination destination; private final TopicMessageStore topicStore;
protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>(); protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
protected final Valve dispatchValve = new Valve(true); protected final Valve dispatchValve = new Valve(true);
// this could be NULL! (If an advisory)
protected final TopicMessageStore store;
private final SystemUsage systemUsage;
private final MemoryUsage memoryUsage;
private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy(); private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
private boolean sendAdvisoryIfNoConsumers; private boolean sendAdvisoryIfNoConsumers;
@ -92,16 +84,12 @@ public class Topic extends BaseDestination implements Task{
} }
}; };
}; };
private final Broker broker;
public Topic(Broker broker, ActiveMQDestination destination, TopicMessageStore store, SystemUsage systemUsage, DestinationStatistics parentStats, public Topic(Broker broker, ActiveMQDestination destination, TopicMessageStore store, SystemUsage systemUsage, DestinationStatistics parentStats,
TaskRunnerFactory taskFactory) throws Exception { TaskRunnerFactory taskFactory) throws Exception {
this.broker = broker; super(broker, store, destination,systemUsage, parentStats);
this.destination = destination; this.topicStore=store;
this.store = store; // this could be NULL! (If an advisory)
this.systemUsage=systemUsage;
this.memoryUsage = new MemoryUsage(systemUsage.getMemoryUsage(), destination.toString());
this.memoryUsage.setUsagePortion(1.0f);
//set default subscription recovery policy //set default subscription recovery policy
if (destination.isTemporary() || AdvisorySupport.isAdvisoryTopic(destination) ){ if (destination.isTemporary() || AdvisorySupport.isAdvisoryTopic(destination) ){
subscriptionRecoveryPolicy= new NoSubscriptionRecoveryPolicy(); subscriptionRecoveryPolicy= new NoSubscriptionRecoveryPolicy();
@ -110,16 +98,6 @@ public class Topic extends BaseDestination implements Task{
subscriptionRecoveryPolicy= new FixedSizedSubscriptionRecoveryPolicy(); subscriptionRecoveryPolicy= new FixedSizedSubscriptionRecoveryPolicy();
} }
this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName()); this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName());
// Let the store know what usage manager we are using so that he can
// flush messages to disk
// when usage gets high.
if (store != null) {
store.setMemoryUsage(memoryUsage);
}
// let's copy the enabled property from the parent DestinationStatistics
this.destinationStatistics.setEnabled(parentStats.isEnabled());
this.destinationStatistics.setParent(parentStats);
} }
public boolean lock(MessageReference node, LockOwner sub) { public boolean lock(MessageReference node, LockOwner sub) {
@ -174,8 +152,8 @@ public class Topic extends BaseDestination implements Task{
} }
public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws IOException { public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws IOException {
if (store != null) { if (topicStore != null) {
store.deleteSubscription(key.clientId, key.subscriptionName); topicStore.deleteSubscription(key.clientId, key.subscriptionName);
Object removed = durableSubcribers.remove(key); Object removed = durableSubcribers.remove(key);
if (removed != null) { if (removed != null) {
destinationStatistics.getConsumers().decrement(); destinationStatistics.getConsumers().decrement();
@ -194,7 +172,7 @@ public class Topic extends BaseDestination implements Task{
consumers.add(subscription); consumers.add(subscription);
} }
if (store == null) { if (topicStore == null) {
return; return;
} }
@ -202,13 +180,13 @@ public class Topic extends BaseDestination implements Task{
String clientId = subscription.getClientId(); String clientId = subscription.getClientId();
String subscriptionName = subscription.getSubscriptionName(); String subscriptionName = subscription.getSubscriptionName();
String selector = subscription.getConsumerInfo().getSelector(); String selector = subscription.getConsumerInfo().getSelector();
SubscriptionInfo info = store.lookupSubscription(clientId, subscriptionName); SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName);
if (info != null) { if (info != null) {
// Check to see if selector changed. // Check to see if selector changed.
String s1 = info.getSelector(); String s1 = info.getSelector();
if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector))) { if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector))) {
// Need to delete the subscription // Need to delete the subscription
store.deleteSubscription(clientId, subscriptionName); topicStore.deleteSubscription(clientId, subscriptionName);
info = null; info = null;
} }
} }
@ -222,13 +200,13 @@ public class Topic extends BaseDestination implements Task{
// Thi destination is an actual destination id. // Thi destination is an actual destination id.
info.setSubscribedDestination(subscription.getConsumerInfo().getDestination()); info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());
// This destination might be a pattern // This destination might be a pattern
store.addSubsciption(info,subscription.getConsumerInfo().isRetroactive()); topicStore.addSubsciption(info,subscription.getConsumerInfo().isRetroactive());
} }
final MessageEvaluationContext msgContext = new MessageEvaluationContext(); final MessageEvaluationContext msgContext = new MessageEvaluationContext();
msgContext.setDestination(destination); msgContext.setDestination(destination);
if (subscription.isRecoveryRequired()) { if (subscription.isRecoveryRequired()) {
store.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() { topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {
public boolean recoverMessage(Message message) throws Exception { public boolean recoverMessage(Message message) throws Exception {
message.setRegionDestination(Topic.this); message.setRegionDestination(Topic.this);
try { try {
@ -395,14 +373,14 @@ public class Topic extends BaseDestination implements Task{
.getConnectionContext(); .getConnectionContext();
message.setRegionDestination(this); message.setRegionDestination(this);
if (store != null && message.isPersistent() if (topicStore != null && message.isPersistent()
&& !canOptimizeOutPersistence()) { && !canOptimizeOutPersistence()) {
while (!systemUsage.getStoreUsage().waitForSpace(1000)) { while (!systemUsage.getStoreUsage().waitForSpace(1000)) {
if (context.getStopping().get()) { if (context.getStopping().get()) {
throw new IOException("Connection closed, send aborted."); throw new IOException("Connection closed, send aborted.");
} }
} }
store.addMessage(context, message); topicStore.addMessage(context, message);
} }
message.incrementReferenceCount(); message.incrementReferenceCount();
@ -446,15 +424,15 @@ public class Topic extends BaseDestination implements Task{
} }
public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException { public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException {
if (store != null && node.isPersistent()) { if (topicStore != null && node.isPersistent()) {
DurableTopicSubscription dsub = (DurableTopicSubscription)sub; DurableTopicSubscription dsub = (DurableTopicSubscription)sub;
store.acknowledge(context, dsub.getClientId(), dsub.getSubscriptionName(), node.getMessageId()); topicStore.acknowledge(context, dsub.getClientId(), dsub.getSubscriptionName(), node.getMessageId());
} }
} }
public void dispose(ConnectionContext context) throws IOException { public void dispose(ConnectionContext context) throws IOException {
if (store != null) { if (topicStore != null) {
store.removeAllMessages(context); topicStore.removeAllMessages(context);
} }
destinationStatistics.setParent(null); destinationStatistics.setParent(null);
} }
@ -463,7 +441,7 @@ public class Topic extends BaseDestination implements Task{
} }
public Message loadMessage(MessageId messageId) throws IOException { public Message loadMessage(MessageId messageId) throws IOException {
return store != null ? store.getMessage(messageId) : null; return topicStore != null ? topicStore.getMessage(messageId) : null;
} }
public void start() throws Exception { public void start() throws Exception {
@ -487,8 +465,8 @@ public class Topic extends BaseDestination implements Task{
public Message[] browse() { public Message[] browse() {
final Set<Message> result = new CopyOnWriteArraySet<Message>(); final Set<Message> result = new CopyOnWriteArraySet<Message>();
try { try {
if (store != null) { if (topicStore != null) {
store.recover(new MessageRecoveryListener() { topicStore.recover(new MessageRecoveryListener() {
public boolean recoverMessage(Message message) throws Exception { public boolean recoverMessage(Message message) throws Exception {
result.add(message); result.add(message);
return true; return true;
@ -527,21 +505,7 @@ public class Topic extends BaseDestination implements Task{
// Properties // Properties
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
public MemoryUsage getBrokerMemoryUsage() {
return memoryUsage;
}
public DestinationStatistics getDestinationStatistics() {
return destinationStatistics;
}
public ActiveMQDestination getActiveMQDestination() {
return destination;
}
public String getDestination() {
return destination.getPhysicalName();
}
public DispatchPolicy getDispatchPolicy() { public DispatchPolicy getDispatchPolicy() {
return dispatchPolicy; return dispatchPolicy;
@ -567,10 +531,6 @@ public class Topic extends BaseDestination implements Task{
this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers; this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
} }
public MessageStore getMessageStore() {
return store;
}
public DeadLetterStrategy getDeadLetterStrategy() { public DeadLetterStrategy getDeadLetterStrategy() {
return deadLetterStrategy; return deadLetterStrategy;
} }
@ -579,9 +539,6 @@ public class Topic extends BaseDestination implements Task{
this.deadLetterStrategy = deadLetterStrategy; this.deadLetterStrategy = deadLetterStrategy;
} }
public String getName() {
return getActiveMQDestination().getPhysicalName();
}
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------

View File

@ -31,11 +31,12 @@ import org.apache.activemq.filter.MessageEvaluationContext;
*/ */
public class SimpleDispatchPolicy implements DispatchPolicy { public class SimpleDispatchPolicy implements DispatchPolicy {
public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Exception { public boolean dispatch(MessageReference node,MessageEvaluationContext msgContext, List<Subscription> consumers)
int count = 0; throws Exception {
for (Iterator iter = consumers.iterator(); iter.hasNext();) {
Subscription sub = (Subscription)iter.next();
int count = 0;
synchronized (consumers) {
for (Subscription sub:consumers) {
// Don't deliver to browsers // Don't deliver to browsers
if (sub.getConsumerInfo().isBrowser()) { if (sub.getConsumerInfo().isBrowser()) {
continue; continue;
@ -48,6 +49,7 @@ public class SimpleDispatchPolicy implements DispatchPolicy {
sub.add(node); sub.add(node);
count++; count++;
} }
}
return count > 0; return count > 0;
} }