git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@599129 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-11-28 20:19:27 +00:00
parent 987ad27449
commit 9dd11cb71f
29 changed files with 351 additions and 180 deletions

View File

@ -33,8 +33,8 @@ import org.apache.activemq.util.LRUCache;
*/
public class ActiveMQMessageAudit {
private static final int DEFAULT_WINDOW_SIZE = 1024;
private static final int MAXIMUM_PRODUCER_COUNT = 128;
private static final int DEFAULT_WINDOW_SIZE = 2048;
private static final int MAXIMUM_PRODUCER_COUNT = 64;
private int auditDepth;
private int maximumNumberOfProducersToTrack;
private LRUCache<Object, BitArrayBin> map;
@ -218,25 +218,35 @@ public class ActiveMQMessageAudit {
return answer;
}
/**
* Check the MessageId is in order
* @param message
* @return
*/
public synchronized boolean isInOrder(final MessageReference message) {
return isInOrder(message.getMessageId());
}
/**
* Check the MessageId is in order
* @param id
* @return
*/
public synchronized boolean isInOrder(final MessageId id) {
boolean answer = true;
boolean answer = false;
if (id != null) {
ProducerId pid = id.getProducerId();
if (pid != null) {
BitArrayBin bab = map.get(pid);
if (bab != null) {
answer = bab.isInOrder(id.getProducerSequenceId());
if (bab == null) {
bab = new BitArrayBin(auditDepth);
map.put(pid, bab);
}
answer = bab.isInOrder(id.getProducerSequenceId());
}
}
return answer;
}
}

View File

@ -113,7 +113,7 @@ public class BrokerService implements Service {
private TaskRunnerFactory persistenceTaskRunnerFactory;
private SystemUsage systemUsage;
private SystemUsage producerSystemUsage;
private SystemUsage consumerSystemUsage;
private SystemUsage storeSystemUsage;
private PersistenceAdapter persistenceAdapter;
private PersistenceAdapterFactory persistenceFactory;
private DestinationFactory destinationFactory;
@ -668,23 +668,23 @@ public class BrokerService implements Service {
* @throws IOException
*/
public SystemUsage getConsumerSystemUsage() throws IOException {
if (consumerSystemUsage == null) {
consumerSystemUsage = new SystemUsage(getSystemUsage(), "Consumer");
consumerSystemUsage.getMemoryUsage().setUsagePortion(0.5f);
addService(consumerSystemUsage);
if (this.storeSystemUsage == null) {
this.storeSystemUsage = new SystemUsage(getSystemUsage(), "Store");
this.storeSystemUsage.getMemoryUsage().setUsagePortion(0.5f);
addService(this.storeSystemUsage);
}
return consumerSystemUsage;
return this.storeSystemUsage;
}
/**
* @param consumerUsageManager the consumerUsageManager to set
* @param storeSystemUsage the storeSystemUsage to set
*/
public void setConsumerSystemUsage(SystemUsage consumerUsageManager) {
if (this.consumerSystemUsage != null) {
removeService(this.consumerSystemUsage);
public void setConsumerSystemUsage(SystemUsage storeSystemUsage) {
if (this.storeSystemUsage != null) {
removeService(this.storeSystemUsage);
}
this.consumerSystemUsage = consumerUsageManager;
addService(this.producerSystemUsage);
this.storeSystemUsage = storeSystemUsage;
addService(this.storeSystemUsage);
}
/**

View File

@ -56,7 +56,7 @@ public abstract class AbstractRegion implements Region {
protected final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
protected final DestinationMap destinationMap = new DestinationMap();
protected final Map<ConsumerId, Subscription> subscriptions = new ConcurrentHashMap<ConsumerId, Subscription>();
protected final SystemUsage memoryManager;
protected final SystemUsage usageManager;
protected final DestinationFactory destinationFactory;
protected final DestinationStatistics destinationStatistics;
protected final RegionBroker broker;
@ -73,7 +73,7 @@ public abstract class AbstractRegion implements Region {
}
this.broker = broker;
this.destinationStatistics = destinationStatistics;
this.memoryManager = memoryManager;
this.usageManager = memoryManager;
this.taskRunnerFactory = taskRunnerFactory;
if (broker == null) {
throw new IllegalArgumentException("null destinationFactory");

View File

@ -43,14 +43,13 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
private final ConcurrentHashMap<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
private final SubscriptionKey subscriptionKey;
private final boolean keepDurableSubsActive;
private final SystemUsage usageManager;
private boolean active;
public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
throws InvalidSelectorException {
super(broker, context, info);
super(broker,usageManager, context, info);
this.pending = new StoreDurableSubscriberCursor(context.getClientId(), info.getSubscriptionName(), broker.getTempDataStore(), info.getPrefetchSize(), this);
this.usageManager = usageManager;
this.pending.setSystemUsage(usageManager);
this.keepDurableSubsActive = keepDurableSubsActive;
subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
}
@ -191,7 +190,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
return active;
}
protected synchronized void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
node.getRegionDestination().acknowledge(context, this, ack, node);
redeliveredMessages.remove(node.getMessageId());
node.decrementReferenceCount();
@ -238,7 +237,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
}
/**
* @param memoryManager
* @param usageManager
* @param oldPercentUsage
* @param newPercentUsage
* @see org.apache.activemq.usage.UsageListener#onMemoryUseChanged(org.apache.activemq.usage.SystemUsage,

View File

@ -23,6 +23,7 @@ import java.util.LinkedList;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@ -38,6 +39,7 @@ import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.Response;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -55,14 +57,20 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
protected long enqueueCounter;
protected long dispatchCounter;
protected long dequeueCounter;
protected boolean optimizedDispatch=false;
private int maxProducersToAudit=32;
private int maxAuditDepth=2048;
protected final SystemUsage usageManager;
protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
public PrefetchSubscription(Broker broker, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
super(broker, context, info);
this.usageManager=usageManager;
pending = cursor;
}
public PrefetchSubscription(Broker broker, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
this(broker, context, info, new VMPendingMessageCursor());
public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
this(broker,usageManager,context, info, new VMPendingMessageCursor());
}
/**
@ -118,8 +126,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
boolean pendingEmpty = false;
pendingEmpty = pending.isEmpty();
enqueueCounter++;
if (!isFull() && pendingEmpty && !isSlave()) {
if (optimizedDispatch && !isFull() && pendingEmpty && !isSlave()) {
dispatch(node);
} else {
optimizePrefetch();
@ -128,6 +135,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
LOG.debug("Prefetch limit.");
}
pending.addMessageLast(node);
dispatchMatched();
}
}
}
@ -364,6 +372,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
public synchronized void setPending(PendingMessageCursor pending) {
this.pending = pending;
if (this.pending!=null) {
this.pending.setSystemUsage(usageManager);
}
}
/**
@ -440,6 +451,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (node != QueueMessageReference.NULL_MESSAGE) {
dispatchCounter++;
dispatched.addLast(node);
if(pending != null) {
pending.dispatched(message);
}
} else {
prefetchExtension = Math.max(0, prefetchExtension - 1);
}
@ -459,8 +473,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
context.getConnection().dispatchSync(md);
onDispatch(node, message);
}
// System.err.println(broker.getBrokerName() + " " + this + " (" +
// enqueueCounter + ", " + dispatchCounter +") " + node);
return true;
} else {
return false;
@ -536,4 +548,28 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException {
}
public boolean isOptimizedDispatch() {
return optimizedDispatch;
}
public void setOptimizedDispatch(boolean optimizedDispatch) {
this.optimizedDispatch = optimizedDispatch;
}
public int getMaxProducersToAudit() {
return maxProducersToAudit;
}
public void setMaxProducersToAudit(int maxProducersToAudit) {
this.maxProducersToAudit = maxProducersToAudit;
}
public int getMaxAuditDepth() {
return maxAuditDepth;
}
public void setMaxAuditDepth(int maxAuditDepth) {
this.maxAuditDepth = maxAuditDepth;
}
}

View File

@ -96,7 +96,7 @@ public class Queue extends BaseDestination implements Task {
private int maximumPagedInMessages = garbageSizeBeforeCollection * 2;
private final MessageEvaluationContext queueMsgConext = new MessageEvaluationContext();
private final Object exclusiveLockMutex = new Object();
private TaskRunner taskRunner;
private final TaskRunner taskRunner;
private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {

View File

@ -25,14 +25,15 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.usage.SystemUsage;
public class QueueBrowserSubscription extends QueueSubscription {
boolean browseDone;
public QueueBrowserSubscription(Broker broker, ConnectionContext context, ConsumerInfo info)
public QueueBrowserSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info)
throws InvalidSelectorException {
super(broker, context, info);
super(broker,usageManager, context, info);
}
protected boolean canDispatch(MessageReference node) {

View File

@ -41,15 +41,15 @@ public class QueueRegion extends AbstractRegion {
public String toString() {
return "QueueRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size()
+ ", memory=" + memoryManager.getMemoryUsage().getPercentUsage() + "%";
+ ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%";
}
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info)
throws InvalidSelectorException {
if (info.isBrowser()) {
return new QueueBrowserSubscription(broker, context, info);
return new QueueBrowserSubscription(broker,usageManager, context, info);
} else {
return new QueueSubscription(broker, context, info);
return new QueueSubscription(broker, usageManager,context, info);
}
}

View File

@ -28,6 +28,7 @@ import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -35,8 +36,8 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
private static final Log LOG = LogFactory.getLog(QueueSubscription.class);
public QueueSubscription(Broker broker, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
super(broker, context, info);
public QueueSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
super(broker,usageManager, context, info);
}
/**

View File

@ -41,7 +41,7 @@ public class TempQueueRegion extends AbstractRegion {
protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
return new Queue(broker.getRoot(), destination, memoryManager, null, destinationStatistics, taskRunnerFactory, null) {
return new Queue(broker.getRoot(), destination, usageManager, null, destinationStatistics, taskRunnerFactory, null) {
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
@ -58,14 +58,14 @@ public class TempQueueRegion extends AbstractRegion {
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
if (info.isBrowser()) {
return new QueueBrowserSubscription(broker, context, info);
return new QueueBrowserSubscription(broker,usageManager,context, info);
} else {
return new QueueSubscription(broker, context, info);
return new QueueSubscription(broker, usageManager,context, info);
}
}
public String toString() {
return "TempQueueRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getMemoryUsage().getPercentUsage() + "%";
return "TempQueueRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%";
}
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {

View File

@ -47,13 +47,13 @@ public class TempTopicRegion extends AbstractRegion {
throw new JMSException("A durable subscription cannot be created for a temporary topic.");
}
try {
TopicSubscription answer = new TopicSubscription(broker, context, info, memoryManager);
TopicSubscription answer = new TopicSubscription(broker, context, info, usageManager);
// lets configure the subscription depending on the destination
ActiveMQDestination destination = info.getDestination();
if (destination != null && broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
if (entry != null) {
entry.configure(broker, memoryManager, answer);
entry.configure(broker, usageManager, answer);
}
}
answer.init();
@ -67,7 +67,7 @@ public class TempTopicRegion extends AbstractRegion {
}
public String toString() {
return "TempTopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getMemoryUsage().getPercentUsage() + "%";
return "TempTopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%";
}
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {

View File

@ -50,6 +50,8 @@ import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.thread.Valve;
import org.apache.activemq.transaction.Synchronization;
@ -65,7 +67,7 @@ import org.apache.commons.logging.LogFactory;
*
* @version $Revision: 1.21 $
*/
public class Topic extends BaseDestination {
public class Topic extends BaseDestination implements Task{
private static final Log LOG = LogFactory.getLog(Topic.class);
protected final ActiveMQDestination destination;
protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
@ -81,28 +83,20 @@ public class Topic extends BaseDestination {
private boolean sendAdvisoryIfNoConsumers;
private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
private final TaskRunner taskRunner;
private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
public void run() {
// We may need to do this in async thread since this is run for
// within a synchronization
// that the UsageManager is holding.
synchronized (messagesWaitingForSpace) {
while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
Runnable op = messagesWaitingForSpace.removeFirst();
op.run();
try {
Topic.this.taskRunner.wakeup();
} catch (InterruptedException e) {
}
}
};
};
private final Broker broker;
public Topic(Broker broker, ActiveMQDestination destination, TopicMessageStore store, SystemUsage systemUsage, DestinationStatistics parentStats,
TaskRunnerFactory taskFactory) {
TaskRunnerFactory taskFactory) throws Exception {
this.broker = broker;
this.destination = destination;
this.store = store; // this could be NULL! (If an advisory)
@ -115,7 +109,8 @@ public class Topic extends BaseDestination {
}else{
//set the default
subscriptionRecoveryPolicy= new FixedSizedSubscriptionRecoveryPolicy();
}
}
this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName());
// Let the store know what usage manager we are using so that he can
// flush messages to disk
// when usage gets high.
@ -463,6 +458,9 @@ public class Topic extends BaseDestination {
}
public void stop() throws Exception {
if (taskRunner != null) {
taskRunner.shutdown();
}
this.subscriptionRecoveryPolicy.stop();
if (memoryUsage != null) {
memoryUsage.stop();
@ -499,6 +497,15 @@ public class Topic extends BaseDestination {
}
return result.toArray(new Message[result.size()]);
}
public boolean iterate() {
while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
Runnable op = messagesWaitingForSpace.removeFirst();
op.run();
}
return false;
}
// Properties
// -------------------------------------------------------------------------

View File

@ -100,7 +100,7 @@ public class TopicRegion extends AbstractRegion {
+ " subscriberName: " + key.getSubscriptionName());
}
}
sub.activate(memoryManager, context, info);
sub.activate(usageManager, context, info);
return sub;
} else {
return super.addConsumer(context, info);
@ -140,7 +140,7 @@ public class TopicRegion extends AbstractRegion {
}
public String toString() {
return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getMemoryUsage().getPercentUsage() + "%";
return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%";
}
@Override
@ -230,12 +230,12 @@ public class TopicRegion extends AbstractRegion {
SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
DurableTopicSubscription sub = durableSubscriptions.get(key);
if (sub == null) {
sub = new DurableTopicSubscription(broker, memoryManager, context, info, keepDurableSubsActive);
sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive);
ActiveMQDestination destination = info.getDestination();
if (destination != null && broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
if (entry != null) {
entry.configure(broker, memoryManager, sub);
entry.configure(broker, usageManager, sub);
}
}
durableSubscriptions.put(key, sub);
@ -245,13 +245,13 @@ public class TopicRegion extends AbstractRegion {
return sub;
}
try {
TopicSubscription answer = new TopicSubscription(broker, context, info, memoryManager);
TopicSubscription answer = new TopicSubscription(broker, context, info, usageManager);
// lets configure the subscription depending on the destination
ActiveMQDestination destination = info.getDestination();
if (destination != null && broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
if (entry != null) {
entry.configure(broker, memoryManager, answer);
entry.configure(broker, usageManager, answer);
}
}
answer.init();

View File

@ -50,6 +50,7 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor {
public synchronized void stop() throws Exception {
started=false;
audit=null;
gc();
}
@ -238,6 +239,13 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor {
public boolean isTransient() {
return false;
}
/**
* Mark a message as already dispatched
* @param message
*/
public void dispatched(MessageReference message) {
}
protected synchronized boolean isDuplicate(MessageId messageId) {
@ -246,7 +254,12 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor {
}
return this.audit.isDuplicate(messageId);
}
protected synchronized void rollback(MessageId id) {
if (this.audit != null) {
audit.rollback(id);
}
}
}

View File

@ -142,6 +142,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
for (Iterator<MessageReference> i = getDiskList().iterator(); i.hasNext() && count < maxItems;) {
Message message = (Message)i.next();
message.setRegionDestination(regionDestination);
message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
message.incrementReferenceCount();
result.add(message);
count++;
@ -210,6 +211,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
if (!isDiskListEmpty()) {
// got from disk
message.setRegionDestination(regionDestination);
message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
message.incrementReferenceCount();
}
return message;

View File

@ -247,6 +247,12 @@ public interface PendingMessageCursor extends Service {
* disappears when the broker shuts down
*/
public boolean isTransient();
/**
* Mark a message as already dispatched
* @param message
*/
public void dispatched(MessageReference message);
}

View File

@ -119,6 +119,7 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements Message
Message result = batchList.removeFirst();
result.decrementReferenceCount();
result.setRegionDestination(regionDestination);
result.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
return result;
}
@ -133,6 +134,7 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements Message
throws Exception {
if (!isDuplicate(message.getMessageId())) {
message.setRegionDestination(regionDestination);
message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
message.incrementReferenceCount();
batchList.addLast(message);
} else {

View File

@ -288,6 +288,20 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
nonPersistent.setEnableAudit(enableAudit);
}
}
/**
* Mark a message as already dispatched
* @param message
*/
public void dispatched(MessageReference message) {
super.dispatched(message);
for (PendingMessageCursor cursor : storePrefetches) {
cursor.dispatched(message);
}
if (nonPersistent != null) {
nonPersistent.dispatched(message);
}
}
protected synchronized PendingMessageCursor getNextCursor() throws Exception {
if (currentCursor == null || currentCursor.isEmpty()) {

View File

@ -18,7 +18,9 @@ package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
@ -28,6 +30,9 @@ import org.apache.activemq.command.MessageId;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -37,21 +42,19 @@ import org.apache.commons.logging.LogFactory;
*
* @version $Revision$
*/
class TopicStorePrefetch extends AbstractPendingMessageCursor implements MessageRecoveryListener {
class TopicStorePrefetch extends AbstractPendingMessageCursor implements MessageRecoveryListener, UsageListener {
private static final Log LOG = LogFactory.getLog(TopicStorePrefetch.class);
private TopicMessageStore store;
private final LinkedList<Message> batchList = new LinkedList<Message>();
private final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message> ();
private String clientId;
private String subscriberName;
private Destination regionDestination;
private MessageId firstMessageId;
private MessageId lastMessageId;
private boolean batchResetNeeded = true;
private boolean storeMayHaveMoreMessages = true;
private boolean started;
private final Subscription subscription;
/**
* @param topic
* @param clientId
@ -63,12 +66,15 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
this.store = (TopicMessageStore)topic.getMessageStore();
this.clientId = clientId;
this.subscriberName = subscriberName;
this.maxProducersToAudit=32;
this.maxAuditDepth=10000;
}
public synchronized void start() throws Exception {
if (!started) {
started = true;
super.start();
getSystemUsage().getMemoryUsage().addUsageListener(this);
safeFillBatch();
}
}
@ -76,6 +82,7 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
public synchronized void stop() throws Exception {
if (started) {
started = false;
getSystemUsage().getMemoryUsage().removeUsageListener(this);
super.stop();
store.resetBatching(clientId, subscriberName);
gc();
@ -97,22 +104,16 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
public synchronized void addMessageLast(MessageReference node) throws Exception {
if (node != null) {
if (isEmpty() && started) {
firstMessageId = node.getMessageId();
}
lastMessageId = node.getMessageId();
node.decrementReferenceCount();
storeMayHaveMoreMessages=true;
node.decrementReferenceCount();
}
}
public synchronized void addMessageFirst(MessageReference node) throws Exception {
if (node != null) {
if (started) {
firstMessageId = node.getMessageId();
}
node.decrementReferenceCount();
storeMayHaveMoreMessages=true;
node.decrementReferenceCount();
rollback(node.getMessageId());
}
}
@ -127,7 +128,8 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
}
public synchronized boolean hasNext() {
return !isEmpty();
boolean result = !isEmpty();
return result;
}
public synchronized MessageReference next() {
@ -136,13 +138,11 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
if (batchList.isEmpty()) {
return null;
} else {
result = batchList.removeFirst();
if (lastMessageId != null) {
if (result.getMessageId().equals(lastMessageId)) {
// pendingCount=0;
}
}
Iterator i = batchList.entrySet().iterator();
result = (Message) ((Map.Entry)i.next()).getValue();
i.remove();
result.setRegionDestination(regionDestination);
result.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
}
return result;
}
@ -154,16 +154,23 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
public void finished() {
}
public synchronized boolean recoverMessage(Message message) throws Exception {
public synchronized boolean recoverMessage(Message message)
throws Exception {
MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
messageEvaluationContext.setMessageReference(message);
if( subscription.matches(message, messageEvaluationContext) ) {
if (subscription.matches(message, messageEvaluationContext)) {
message.setRegionDestination(regionDestination);
// only increment if count is zero (could have been cached)
if (message.getReferenceCount() == 0) {
message.incrementReferenceCount();
if (!isDuplicate(message.getMessageId())) {
// only increment if count is zero (could have been cached)
if (message.getReferenceCount() == 0) {
message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
message.incrementReferenceCount();
}
batchList.put(message.getMessageId(), message);
}else {
this.storeMayHaveMoreMessages=true;
}
batchList.addLast(message);
}
return true;
}
@ -172,9 +179,23 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
// shouldn't get called
throw new RuntimeException("Not supported");
}
/**
* Mark a message as already dispatched
* @param message
*/
public synchronized void dispatched(MessageReference message) {
if (this.audit != null) {
isDuplicate(message.getMessageId());
Message removed = this.batchList.remove(message.getMessageId());
if (removed != null) {
removed.decrementReferenceCount();
}
}
}
// implementation
protected void safeFillBatch() {
protected synchronized void safeFillBatch() {
try {
fillBatch();
} catch (Exception e) {
@ -184,29 +205,17 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
}
protected synchronized void fillBatch() throws Exception {
if( batchResetNeeded ) {
store.resetBatching(clientId, subscriberName);
batchResetNeeded=false;
storeMayHaveMoreMessages=true;
if (batchResetNeeded) {
this.store.resetBatching(clientId, subscriberName);
this.batchResetNeeded = false;
this.storeMayHaveMoreMessages = true;
}
while( batchList.isEmpty() && storeMayHaveMoreMessages ) {
store.recoverNextMessages(clientId, subscriberName, maxBatchSize, this);
if( batchList.isEmpty() ) {
storeMayHaveMoreMessages = false;
} else {
if (firstMessageId != null) {
int pos = 0;
for (Iterator<Message> iter = batchList.iterator(); iter.hasNext();) {
Message msg = iter.next();
if (msg.getMessageId().equals(firstMessageId)) {
firstMessageId = null;
break;
} else {
iter.remove();
}
}
}
while (this.batchList.isEmpty() && this.storeMayHaveMoreMessages) {
this.storeMayHaveMoreMessages = false;
this.store.recoverNextMessages(clientId, subscriberName,
maxBatchSize, this);
if (!this.batchList.isEmpty()) {
this.storeMayHaveMoreMessages=true;
}
}
}
@ -221,12 +230,23 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
}
public synchronized void gc() {
for (Message msg : batchList) {
for (Message msg : batchList.values()) {
msg.decrementReferenceCount();
}
batchList.clear();
batchResetNeeded = true;
}
public void onUsageChanged(Usage usage, int oldPercentUsage,int newPercentUsage) {
if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
storeMayHaveMoreMessages = true;
try {
fillBatch();
} catch (Exception e) {
LOG.error("Failed to fill batch ", e);
}
}
}
public String toString() {
return "TopicStorePrefetch" + System.identityHashCode(this) + "(" + clientId + "," + subscriberName + ")";

View File

@ -51,11 +51,13 @@ public class PolicyEntry extends DestinationMapEntry {
private PendingQueueMessageStoragePolicy pendingQueuePolicy;
private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy;
private PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy;
private int maxProducersToAudit=1024;
private int maxAuditDepth=1;
private int maxProducersToAudit=32;
private int maxAuditDepth=1024;
private int maxQueueAuditDepth=1;
private boolean enableAudit=true;
private boolean producerFlowControl = true;
private boolean optimizedDispatch=false;
public void configure(Queue queue, Store tmpStore) {
if (dispatchPolicy != null) {
queue.setDispatchPolicy(dispatchPolicy);
@ -73,7 +75,7 @@ public class PolicyEntry extends DestinationMapEntry {
}
queue.setProducerFlowControl(isProducerFlowControl());
queue.setEnableAudit(isEnableAudit());
queue.setMaxAuditDepth(getMaxAuditDepth());
queue.setMaxAuditDepth(getMaxQueueAuditDepth());
queue.setMaxProducersToAudit(getMaxProducersToAudit());
}
@ -132,6 +134,8 @@ public class PolicyEntry extends DestinationMapEntry {
cursor.setSystemUsage(memoryManager);
sub.setPending(cursor);
}
sub.setMaxAuditDepth(getMaxAuditDepth());
sub.setMaxProducersToAudit(getMaxProducersToAudit());
}
// Properties
@ -331,4 +335,20 @@ public class PolicyEntry extends DestinationMapEntry {
this.enableAudit = enableAudit;
}
public int getMaxQueueAuditDepth() {
return maxQueueAuditDepth;
}
public void setMaxQueueAuditDepth(int maxQueueAuditDepth) {
this.maxQueueAuditDepth = maxQueueAuditDepth;
}
public boolean isOptimizedDispatch() {
return optimizedDispatch;
}
public void setOptimizedDispatch(boolean optimizedDispatch) {
this.optimizedDispatch = optimizedDispatch;
}
}

View File

@ -26,6 +26,7 @@ import java.util.Map;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
@ -81,6 +82,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
private transient short referenceCount;
private transient ActiveMQConnection connection;
private transient org.apache.activemq.broker.region.Destination regionDestination;
private transient MemoryUsage memoryUsage;
private BrokerId[] brokerPath;
private BrokerId[] cluster;
@ -127,6 +129,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
copy.regionDestination = regionDestination;
copy.brokerInTime = brokerInTime;
copy.brokerOutTime = brokerOutTime;
copy.memoryUsage=this.memoryUsage;
// copying the broker path breaks networks - if a consumer re-uses a
// consumed
// message and forwards it on
@ -567,6 +570,17 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
public void setRegionDestination(org.apache.activemq.broker.region.Destination destination) {
this.regionDestination = destination;
if(this.memoryUsage==null) {
this.memoryUsage=regionDestination.getBrokerMemoryUsage();
}
}
public MemoryUsage getMemoryUsage() {
return this.memoryUsage;
}
public void setMemoryUsage(MemoryUsage usage) {
this.memoryUsage=usage;
}
public boolean isMarshallAware() {
@ -581,16 +595,15 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
size = getSize();
}
if (rc == 1 && regionDestination != null) {
regionDestination.getBrokerMemoryUsage().increaseUsage(size);
if (rc == 1 && getMemoryUsage() != null) {
getMemoryUsage().increaseUsage(size);
}
// System.out.println(" + "+getDestination()+" :::: "+getMessageId()+"
// "+rc);
//System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
return rc;
}
public synchronized int decrementReferenceCount() {
public int decrementReferenceCount() {
int rc;
int size;
synchronized (this) {
@ -598,11 +611,10 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
size = getSize();
}
if (rc == 0 && regionDestination != null) {
regionDestination.getBrokerMemoryUsage().decreaseUsage(size);
if (rc == 0 && getMemoryUsage() != null) {
getMemoryUsage().decreaseUsage(size);
}
// System.out.println(" - "+getDestination()+" :::: "+getMessageId()+"
// "+rc);
//System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
return rc;
}

View File

@ -59,13 +59,15 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag
topicReferenceStore.recoverSubscription(clientId, subscriptionName, new RecoveryListenerAdapter(this, listener));
}
public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, final MessageRecoveryListener listener) throws Exception {
public void recoverNextMessages(String clientId, String subscriptionName,
int maxReturned, final MessageRecoveryListener listener)
throws Exception {
RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener);
topicReferenceStore.recoverNextMessages(clientId, subscriptionName, maxReturned, recoveryListener);
if (recoveryListener.size() == 0) {
flush();
topicReferenceStore.recoverNextMessages(clientId, subscriptionName, maxReturned, recoveryListener);
}
topicReferenceStore.recoverNextMessages(clientId, subscriptionName,maxReturned, recoveryListener);
if (recoveryListener.size() == 0) {
flush();
topicReferenceStore.recoverNextMessages(clientId,subscriptionName, maxReturned, recoveryListener);
}
}
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
@ -145,14 +147,18 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag
* @param key
* @throws IOException
*/
protected void acknowledge(ConnectionContext context,MessageId messageId, Location location, String clientId,String subscriptionName) throws IOException {
protected void acknowledge(ConnectionContext context, MessageId messageId,
Location location, String clientId, String subscriptionName)
throws IOException {
synchronized (this) {
lastLocation = location;
if (topicReferenceStore.acknowledgeReference(context, clientId, subscriptionName, messageId)){
MessageAck ack = new MessageAck();
ack.setLastMessageId(messageId);
removeMessage(context, ack);
}
}
if (topicReferenceStore.acknowledgeReference(context, clientId,
subscriptionName, messageId)) {
MessageAck ack = new MessageAck();
ack.setLastMessageId(messageId);
removeMessage(context, ack);
}
try {
asyncWriteTask.wakeup();

View File

@ -63,10 +63,14 @@ public class KahaReferenceStore implements ReferenceStore {
throw new RuntimeException("Use addMessageReference instead");
}
protected final boolean recoverReference(MessageRecoveryListener listener, ReferenceRecord record)
throws Exception {
listener.recoverMessageReference(new MessageId(record.getMessageId()));
return listener.hasSpace();
protected final boolean recoverReference(MessageRecoveryListener listener,
ReferenceRecord record) throws Exception {
MessageId id = new MessageId(record.getMessageId());
if (listener.hasSpace()) {
listener.recoverMessageReference(id);
return true;
}
return false;
}
public synchronized void recover(MessageRecoveryListener listener) throws Exception {
@ -90,14 +94,15 @@ public class KahaReferenceStore implements ReferenceStore {
entry = messageContainer.getNext(entry);
}
}
if (entry != null) {
if (entry != null) {
int count = 0;
do {
ReferenceRecord msg = messageContainer.getValue(entry);
if (msg != null) {
recoverReference(listener, msg);
count++;
lastBatchId = msg.getMessageId();
if (msg != null ) {
if ( recoverReference(listener, msg)) {
count++;
lastBatchId = msg.getMessageId();
}
} else {
lastBatchId = null;
}
@ -134,7 +139,7 @@ public class KahaReferenceStore implements ReferenceStore {
removeMessage(ack.getLastMessageId());
}
public synchronized void removeMessage(MessageId msgId) throws IOException {
public synchronized void removeMessage(MessageId msgId) throws IOException {
StoreEntry entry = messageContainer.getEntry(msgId);
if (entry != null) {
ReferenceRecord rr = messageContainer.remove(msgId);

View File

@ -245,15 +245,19 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
entry = container.getNextEntry(entry);
}
}
if (entry != null) {
do {
ConsumerMessageRef consumerRef = container.get(entry);
ReferenceRecord msg = messageContainer.getValue(consumerRef.getMessageEntry());
ReferenceRecord msg = messageContainer.getValue(consumerRef
.getMessageEntry());
if (msg != null) {
recoverReference(listener, msg);
count++;
container.setBatchEntry(msg.getMessageId(), entry);
if (recoverReference(listener, msg)) {
count++;
container.setBatchEntry(msg.getMessageId(), entry);
} else {
break;
}
} else {
container.reset();
}

View File

@ -67,18 +67,21 @@ public class TopicSubContainer {
if (!listContainer.isEmpty()) {
StoreEntry entry = listContainer.getFirst();
while (entry != null) {
ConsumerMessageRef ref = (ConsumerMessageRef)listContainer.get(entry);
listContainer.remove(entry);
if (listContainer != null && batchEntry != null && (listContainer.isEmpty() || batchEntry.equals(entry))) {
reset();
}
ConsumerMessageRef ref = (ConsumerMessageRef)listContainer.get(entry);
if (ref != null && ref.getMessageId().equals(id)) {
result = ref;
listContainer.remove(entry);
if (batchEntry != null && batchEntry.equals(entry)) {
reset();
}
break;
}
entry = listContainer.getFirst();
entry = listContainer.getNext(entry);
}
}
if (listContainer != null && (listContainer.isEmpty() )) {
reset();
}
return result;
}

View File

@ -118,15 +118,15 @@ public class MemoryUsage extends Usage<MemoryUsage> {
if (value == 0) {
return;
}
if (parent != null) {
((MemoryUsage)parent).increaseUsage(value);
}
int percentUsage;
synchronized (usageMutex) {
usage += value;
percentUsage = caclPercentUsage();
}
setPercentUsage(percentUsage);
if (parent != null) {
((MemoryUsage)parent).increaseUsage(value);
}
}
/**
@ -138,15 +138,15 @@ public class MemoryUsage extends Usage<MemoryUsage> {
if (value == 0) {
return;
}
if (parent != null) {
parent.decreaseUsage(value);
}
int percentUsage;
synchronized (usageMutex) {
usage -= value;
percentUsage = caclPercentUsage();
}
setPercentUsage(percentUsage);
if (parent != null) {
parent.decreaseUsage(value);
}
}
protected long retrieveUsage() {

View File

@ -30,6 +30,7 @@ public class BitArrayBin {
private int firstIndex = -1;
private int firstBin = -1;
private long lastBitSet=-1;
private long lastInOrderBit=-1;
/**
* Create a BitArrayBin to a certain window size (number of messages to
@ -76,10 +77,15 @@ public class BitArrayBin {
* @return true if next message is in order
*/
public boolean isInOrder(long index) {
if (lastBitSet== -1) {
return true;
boolean result = false;
if (lastInOrderBit == -1) {
result = true;
} else {
result = lastInOrderBit + 1 == index;
}
return lastBitSet+1==index;
lastInOrderBit = index;
return result;
}
/**

View File

@ -105,6 +105,7 @@ public class ActiveMQMessageAuditTest extends TestCase {
String id = idGen.generateId();
if (i==0) {
assertFalse(audit.isDuplicate(id));
assertTrue(audit.isInOrder(id));
}
if (i > 1 && i%2 != 0) {
list.add(id);

View File

@ -63,6 +63,8 @@ public class DurableConsumerTest extends TestCase {
MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME);
consumerConnection.start();
consumerConnection.close();
broker.stop();
broker =createBroker();
Connection producerConnection = factory.createConnection();
@ -79,7 +81,8 @@ public class DurableConsumerTest extends TestCase {
}
}
producerConnection.close();
broker.stop();
broker =createBroker();
consumerConnection = factory.createConnection();
consumerConnection.setClientID(CONSUMER_NAME);