git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@614645 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-01-23 20:08:27 +00:00
parent 23cda2d6bb
commit 55810b3155
8 changed files with 75 additions and 43 deletions

View File

@ -99,6 +99,12 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
this.active = true;
this.context = context;
this.info = info;
int prefetch = info.getPrefetchSize();
if (prefetch>0) {
prefetch += prefetch/2;
}
int depth = Math.max(prefetch, this.pending.getMaxAuditDepth());
this.pending.setMaxAuditDepth(depth);
if (!keepDurableSubsActive) {
for (Iterator<Destination> iter = destinations.values()
.iterator(); iter.hasNext();) {

View File

@ -127,25 +127,29 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
}
public void add(MessageReference node) throws Exception {
boolean pendingEmpty = false;
synchronized(pendingLock) {
pendingEmpty = pending.isEmpty();
}
enqueueCounter++;
if (optimizedDispatch && !isFull() && pendingEmpty && !isSlave()) {
dispatch(node);
} else {
optimizePrefetch();
synchronized(pendingLock) {
if (pending.isEmpty() && LOG.isDebugEnabled()) {
LOG.debug("Prefetch limit.");
}
pending.addMessageLast(node);
}
dispatchPending();
}
}
boolean pendingEmpty = false;
boolean dispatchPending = false;
synchronized (pendingLock) {
pendingEmpty = pending.isEmpty();
enqueueCounter++;
if (optimizedDispatch && !isFull() && pendingEmpty && !isSlave()) {
pending.dispatched(node);
dispatch(node);
} else {
optimizePrefetch();
synchronized (pendingLock) {
if (pending.isEmpty() && LOG.isDebugEnabled()) {
LOG.debug("Prefetch limit.");
}
pending.addMessageLast(node);
dispatchPending = true;
}
}
}
if (dispatchPending) {
dispatchPending();
}
}
public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
synchronized(pendingLock) {
@ -511,8 +515,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
final Message message = node.getMessage();
if (message == null) {
return false;
}
}
// Make sure we can dispatch a message.
if (canDispatch(node) && !isSlave()) {
MessageDispatch md = createMessageDispatch(node, message);
@ -520,11 +523,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (node != QueueMessageReference.NULL_MESSAGE) {
dispatchCounter++;
dispatched.add(node);
if(pending != null) {
synchronized(pendingLock) {
pending.dispatched(message);
}
}
} else {
prefetchExtension = Math.max(0, prefetchExtension - 1);
}

View File

@ -245,9 +245,26 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor {
* Mark a message as already dispatched
* @param message
*/
public void dispatched(MessageReference message) {
public void dispatched(MessageReference message) {
//add it to the audit
isDuplicate(message.getMessageId());
}
/**
* set the audit
* @param audit
*/
public void setMessageAudit(ActiveMQMessageAudit audit) {
this.audit=audit;
}
/**
* @return the audit
*/
public ActiveMQMessageAudit getMessageAudit() {
return audit;
}
protected synchronized boolean isDuplicate(MessageId messageId) {
if (!this.enableAudit || this.audit==null) {
@ -265,6 +282,4 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor {
protected synchronized boolean isStarted() {
return started;
}
}

View File

@ -19,10 +19,12 @@ package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
import java.util.LinkedList;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.usage.SystemUsage;
/**
@ -253,6 +255,18 @@ public interface PendingMessageCursor extends Service {
* @param message
*/
public void dispatched(MessageReference message);
/**
* set the audit
* @param audit
*/
public void setMessageAudit(ActiveMQMessageAudit audit);
/**
* @return the audit - could be null
*/
public ActiveMQMessageAudit getMessageAudit();
}

View File

@ -74,6 +74,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
started = true;
super.start();
for (PendingMessageCursor tsp : storePrefetches) {
tsp.setMessageAudit(getMessageAudit());
tsp.start();
}
}

View File

@ -66,7 +66,9 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
nonPersistent.setMaxAuditDepth(getMaxAuditDepth());
nonPersistent.setMaxProducersToAudit(getMaxProducersToAudit());
}
nonPersistent.setMessageAudit(getMessageAudit());
nonPersistent.start();
persistent.setMessageAudit(getMessageAudit());
persistent.start();
pendingCount = persistent.size() + nonPersistent.size();
}

View File

@ -30,7 +30,6 @@ import org.apache.activemq.command.MessageId;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
import org.apache.commons.logging.Log;
@ -51,7 +50,7 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
private String subscriberName;
private Destination regionDestination;
private boolean batchResetNeeded = true;
private boolean storeMayHaveMoreMessages = true;
private boolean storeHasMessages = false;
private boolean started;
private final Subscription subscription;
@ -74,8 +73,8 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
if (!started) {
started = true;
super.start();
this.storeHasMessages = getStoreSize() > 0;
getSystemUsage().getMemoryUsage().addUsageListener(this);
safeFillBatch();
}
}
@ -104,14 +103,14 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
public synchronized void addMessageLast(MessageReference node) throws Exception {
if (node != null) {
storeMayHaveMoreMessages=true;
storeHasMessages=true;
node.decrementReferenceCount();
}
}
public synchronized void addMessageFirst(MessageReference node) throws Exception {
if (node != null) {
storeMayHaveMoreMessages=true;
storeHasMessages=true;
node.decrementReferenceCount();
rollback(node.getMessageId());
}
@ -168,8 +167,6 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
}
batchList.put(message.getMessageId(), message);
}else {
this.storeMayHaveMoreMessages=true;
}
}
return true;
@ -208,14 +205,13 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
if (batchResetNeeded) {
this.store.resetBatching(clientId, subscriberName);
this.batchResetNeeded = false;
this.storeMayHaveMoreMessages = true;
}
while (this.batchList.isEmpty() && this.storeMayHaveMoreMessages) {
this.storeMayHaveMoreMessages = false;
while (this.batchList.isEmpty() && this.storeHasMessages) {
this.storeHasMessages = false;
this.store.recoverNextMessages(clientId, subscriberName,
maxBatchSize, this);
if (!this.batchList.isEmpty()) {
this.storeMayHaveMoreMessages=true;
this.storeHasMessages=true;
}
}
}
@ -240,7 +236,7 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
public void onUsageChanged(Usage usage, int oldPercentUsage,int newPercentUsage) {
if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
storeMayHaveMoreMessages = true;
storeHasMessages = true;
try {
fillBatch();
} catch (Exception e) {

View File

@ -39,7 +39,7 @@ import org.apache.commons.logging.LogFactory;
*/
public class JmsDurableTopicSlowReceiveTest extends JmsTopicSendReceiveTest {
static final int NMSG = 100;
static final int NMSG = 200;
static final int MSIZE = 256000;
private static final transient Log LOG = LogFactory.getLog(JmsDurableTopicSlowReceiveTest.class);
private static final String COUNT_PROPERY_NAME = "count";