git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@955504 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2010-06-17 08:24:26 +00:00
parent 1999d948bb
commit f42c91f43d
4 changed files with 66 additions and 26 deletions

View File

@ -19,9 +19,7 @@ package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
@ -67,6 +65,7 @@ public class TopicSubscription extends AbstractSubscription {
protected int maxAuditDepth = 1000;
protected boolean enableAudit = false;
protected ActiveMQMessageAudit audit;
protected boolean active = false;
public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
super(broker, context, info);
@ -86,6 +85,7 @@ public class TopicSubscription extends AbstractSubscription {
if (enableAudit) {
audit= new ActiveMQMessageAudit(maxAuditDepth, maxProducersToAudit);
}
this.active=true;
}
public void add(MessageReference node) throws Exception {
@ -108,21 +108,33 @@ public class TopicSubscription extends AbstractSubscription {
}
if (maximumPendingMessages != 0) {
boolean warnedAboutWait = false;
synchronized(matchedListMutex){
while (matched.isFull()){
if (getContext().getStopping().get()) {
LOG.warn(toString() + ": stopped waiting for space in pendingMessage cursor for: " + node.getMessageId());
enqueueCounter.decrementAndGet();
return;
}
if (!warnedAboutWait) {
LOG.info(toString() + ": Pending message cursor ["+ matched + "] is full, temp usage (" + + matched.getSystemUsage().getTempUsage().getPercentUsage() + "%) or memory usage (" + matched.getSystemUsage().getMemoryUsage().getPercentUsage() + "%) limit reached, blocking message add() pending the release of resources.");
warnedAboutWait = true;
}
matchedListMutex.wait(20);
}
matched.addMessageLast(node);
}
while (active) {
synchronized (matchedListMutex) {
while (matched.isFull()) {
if (getContext().getStopping().get()) {
LOG.warn(toString() + ": stopped waiting for space in pendingMessage cursor for: "
+ node.getMessageId());
enqueueCounter.decrementAndGet();
return;
}
if (!warnedAboutWait) {
LOG.info(toString() + ": Pending message cursor [" + matched
+ "] is full, temp usage ("
+ +matched.getSystemUsage().getTempUsage().getPercentUsage()
+ "%) or memory usage ("
+ matched.getSystemUsage().getMemoryUsage().getPercentUsage()
+ "%) limit reached, blocking message add() pending the release of resources.");
warnedAboutWait = true;
}
matchedListMutex.wait(20);
}
//Temporary storage could be full - so just try to add the message
//see https://issues.apache.org/activemq/browse/AMQ-2475
if (matched.tryAddMessageLast(node, 10)) {
break;
}
}
}
synchronized (matchedListMutex) {
// NOTE - be careful about the slaveBroker!
@ -239,6 +251,7 @@ public class TopicSubscription extends AbstractSubscription {
if (context.isInTransaction()) {
context.getTransaction().addSynchronization(new Synchronization() {
@Override
public void afterCommit() throws Exception {
synchronized (TopicSubscription.this) {
if (singleDestination && destination != null) {
@ -456,7 +469,7 @@ public class TopicSubscription extends AbstractSubscription {
matched.reset();
while (matched.hasNext() && !isFull()) {
MessageReference message = (MessageReference) matched.next();
MessageReference message = matched.next();
message.decrementReferenceCount();
matched.remove();
// Message may have been sitting in the matched list a
@ -530,12 +543,14 @@ public class TopicSubscription extends AbstractSubscription {
broker.getRoot().sendToDeadLetterQueue(getContext(), message);
}
@Override
public String toString() {
return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered="
+ getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded();
}
public void destroy() {
this.active=false;
synchronized (matchedListMutex) {
try {
matched.destroy();
@ -546,8 +561,9 @@ public class TopicSubscription extends AbstractSubscription {
setSlowConsumer(false);
}
@Override
public int getPrefetchSize() {
return (int)info.getPrefetchSize();
return info.getPrefetchSize();
}
}

View File

@ -33,7 +33,7 @@ import org.apache.activemq.usage.SystemUsage;
*
* @version $Revision$
*/
public class AbstractPendingMessageCursor implements PendingMessageCursor {
public abstract class AbstractPendingMessageCursor implements PendingMessageCursor {
protected int memoryUsageHighWaterMark = 70;
protected int maxBatchSize = BaseDestination.MAX_PAGE_SIZE;
protected SystemUsage systemUsage;
@ -76,6 +76,11 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor {
public void addMessageLast(MessageReference node) throws Exception {
}
public boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
addMessageLast(node);
return true;
}
public void addRecoveredMessage(MessageReference node) throws Exception {
addMessageLast(node);

View File

@ -183,9 +183,14 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
* add message to await dispatch
*
* @param node
* @throws Exception
*/
@Override
public synchronized void addMessageLast(MessageReference node) {
public synchronized void addMessageLast(MessageReference node) throws Exception {
tryAddMessageLast(node, 0);
}
public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
if (!node.isExpired()) {
try {
regionDestination = node.getMessage().getRegionDestination();
@ -193,7 +198,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
if (hasSpace() || this.store == null) {
memoryList.add(node);
node.incrementReferenceCount();
return;
return true;
}
}
if (!hasSpace()) {
@ -202,15 +207,18 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
if (hasSpace()) {
memoryList.add(node);
node.incrementReferenceCount();
return;
return true;
} else {
flushToDisk();
}
}
}
systemUsage.getTempUsage().waitForSpace();
ByteSequence bs = getByteSequence(node.getMessage());
getDiskList().addLast(node.getMessageId().toString(), bs);
if (systemUsage.getTempUsage().waitForSpace(maxWaitTime)) {
ByteSequence bs = getByteSequence(node.getMessage());
getDiskList().addLast(node.getMessageId().toString(), bs);
return true;
}
return false;
} catch (Exception e) {
LOG.error("Caught an Exception adding a message: " + node + " first to FilePendingMessageCursor ", e);
@ -219,6 +227,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
} else {
discard(node);
}
return false;
}
/**

View File

@ -85,6 +85,16 @@ public interface PendingMessageCursor extends Service {
* @throws Exception
*/
void addMessageLast(MessageReference node) throws Exception;
/**
* add message to await dispatch - if it can
*
* @param node
* @param maxWaitTime
* @return true if successful
* @throws IOException
* @throws Exception
*/
boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception;
/**
* add message to await dispatch