diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 18c37d5c6b..69314c75b7 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -27,15 +27,19 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; +import javax.jms.ResourceAllocationException; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; @@ -96,7 +100,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory(); private final Object sendLock = new Object(); private ExecutorService executor; - protected final LinkedList messagesWaitingForSpace = new LinkedList(); + protected final Map messagesWaitingForSpace = Collections.synchronizedMap(new LinkedHashMap()); private final Object dispatchMutex = new Object(); private boolean useConsumerPriority = true; private boolean strictOrderDispatch = false; @@ -118,8 +122,72 @@ public class Queue extends BaseDestination implements Task, UsageListener { expireMessages(); } }; + private final Object iteratingMutex = new Object() {}; private static final Scheduler scheduler = Scheduler.getInstance(); + + class TimeoutMessage implements Delayed { + + Message message; + ConnectionContext context; + long trigger; + + public TimeoutMessage(Message message, ConnectionContext context, long delay) { + this.message = message; + this.context = context; + this.trigger = System.currentTimeMillis() + delay; + } + + public long getDelay(TimeUnit unit) { + long n = trigger - System.currentTimeMillis(); + return unit.convert(n, TimeUnit.MILLISECONDS); + } + + public int compareTo(Delayed delayed) { + long other = ((TimeoutMessage)delayed).trigger; + int returnValue; + if (this.trigger < other) { + returnValue = -1; + } else if (this.trigger > other) { + returnValue = 1; + } else { + returnValue = 0; + } + return returnValue; + } + + } + + DelayQueue flowControlTimeoutMessages = new DelayQueue(); + + class FlowControlTimeoutTask extends Thread { + + public void run() { + TimeoutMessage timeout; + try { + while (true) { + timeout = flowControlTimeoutMessages.take(); + if (timeout != null) { + synchronized (messagesWaitingForSpace) { + if (messagesWaitingForSpace.remove(timeout.message.getMessageId()) != null) { + ExceptionResponse response = new ExceptionResponse(new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + timeout.message.getProducerId() + ") to prevent flooding " + + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info")); + response.setCorrelationId(timeout.message.getCommandId()); + timeout.context.getConnection().dispatchAsync(response); + } + } + } + } + } catch (InterruptedException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Producer Flow Control Timeout Task is stopping"); + } + } + } + }; + + private final FlowControlTimeoutTask flowControlTimeoutTask = new FlowControlTimeoutTask(); + private static final Comparator orderedCompare = new Comparator() { @@ -401,7 +469,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { } if (systemUsage.isSendFailIfNoSpace()) { - throw new javax.jms.ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " + throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"); } @@ -412,7 +480,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { // for space. final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy(); synchronized (messagesWaitingForSpace) { - messagesWaitingForSpace.add(new Runnable() { + messagesWaitingForSpace.put(message.getMessageId(), new Runnable() { public void run() { try { @@ -446,6 +514,10 @@ public class Queue extends BaseDestination implements Task, UsageListener { } } }); + + if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) { + flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage.getSendFailIfNoSpaceAfterTimeout())); + } registerCallbackForNotFullNotification(); context.setDontSendReponse(true); @@ -497,7 +569,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { + " See http://activemq.apache.org/producer-flow-control.html for more info"; if (systemUsage.isSendFailIfNoSpace()) { - throw new javax.jms.ResourceAllocationException(logMessage); + throw new ResourceAllocationException(logMessage); } waitForSpace(context, systemUsage.getStoreUsage(), logMessage); @@ -619,6 +691,15 @@ public class Queue extends BaseDestination implements Task, UsageListener { if (getExpireMessagesPeriod() > 0) { scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod()); } + + flowControlTimeoutTask.setName("Producer Flow Control Timeout Task"); + + // Start flow control timeout task + // Prevent trying to start it multiple times + if (!flowControlTimeoutTask.isAlive()) { + flowControlTimeoutTask.start(); + } + doPageIn(false); } @@ -631,6 +712,10 @@ public class Queue extends BaseDestination implements Task, UsageListener { } scheduler.cancel(expireMessagesTask); + + if (flowControlTimeoutTask.isAlive()) { + flowControlTimeoutTask.interrupt(); + } if (messages != null) { messages.stop(); @@ -1077,9 +1162,11 @@ public class Queue extends BaseDestination implements Task, UsageListener { // do early to allow dispatch of these waiting messages synchronized (messagesWaitingForSpace) { - while (!messagesWaitingForSpace.isEmpty()) { + Iterator it = messagesWaitingForSpace.values().iterator(); + while (it.hasNext()) { if (!memoryUsage.isFull()) { - Runnable op = messagesWaitingForSpace.removeFirst(); + Runnable op = it.next(); + it.remove(); op.run(); } else { registerCallbackForNotFullNotification(); @@ -1289,7 +1376,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { final String logMessage = "Usage Manager Temp Store is Full. Stopping producer (" + msg.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"; if (systemUsage.isSendFailIfNoSpace()) { - throw new javax.jms.ResourceAllocationException(logMessage); + throw new ResourceAllocationException(logMessage); } waitForSpace(context, messages.getSystemUsage().getTempUsage(), logMessage); @@ -1622,18 +1709,24 @@ public class Queue extends BaseDestination implements Task, UsageListener { } } - private final void waitForSpace(ConnectionContext context, Usage usage, String warning) throws IOException, InterruptedException { - long start = System.currentTimeMillis(); - long nextWarn = start + blockedProducerWarningInterval; - while (!usage.waitForSpace(1000)) { - if (context.getStopping().get()) { - throw new IOException("Connection closed, send aborted."); + private final void waitForSpace(ConnectionContext context, Usage usage, String warning) throws IOException, InterruptedException, ResourceAllocationException { + if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) { + if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout())) { + throw new ResourceAllocationException(warning); } - - long now = System.currentTimeMillis(); - if (now >= nextWarn) { - LOG.info(warning + " (blocking for: " + (now - start) / 1000 + "s)"); - nextWarn = now + blockedProducerWarningInterval; + } else { + long start = System.currentTimeMillis(); + long nextWarn = start + blockedProducerWarningInterval; + while (!usage.waitForSpace(1000)) { + if (context.getStopping().get()) { + throw new IOException("Connection closed, send aborted."); + } + + long now = System.currentTimeMillis(); + if (now >= nextWarn) { + LOG.info(warning + " (blocking for: " + (now - start) / 1000 + "s)"); + nextWarn = now + blockedProducerWarningInterval; + } } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java b/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java index d154deb1b0..e8e27b315a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java +++ b/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java @@ -43,6 +43,9 @@ public class SystemUsage implements Service { */ private boolean sendFailIfNoSpaceExplicitySet; private boolean sendFailIfNoSpace; + private boolean sendFailIfNoSpaceAfterTimeoutExplicitySet; + private long sendFailIfNoSpaceAfterTimeout = 0; + private final List children = new CopyOnWriteArrayList(); public SystemUsage() { @@ -155,6 +158,19 @@ public class SystemUsage implements Service { this.sendFailIfNoSpaceExplicitySet = sendFailIfNoSpaceExplicitySet; } + public long getSendFailIfNoSpaceAfterTimeout() { + if (sendFailIfNoSpaceAfterTimeoutExplicitySet || parent == null) { + return sendFailIfNoSpaceAfterTimeout; + } else { + return parent.getSendFailIfNoSpaceAfterTimeout(); + } + } + + public void setSendFailIfNoSpaceAfterTimeout(long sendFailIfNoSpaceAfterTimeout) { + this.sendFailIfNoSpaceAfterTimeoutExplicitySet = true; + this.sendFailIfNoSpaceAfterTimeout = sendFailIfNoSpaceAfterTimeout; + } + public void setName(String name) { this.name = name; this.memoryUsage.setName(name + ":memory"); diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java new file mode 100644 index 0000000000..47b62b7e8c --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java @@ -0,0 +1,91 @@ +package org.apache.activemq.bugs; + + +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.DeliveryMode; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.ResourceAllocationException; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.log4j.Logger; + + +public class JmsTimeoutTest extends EmbeddedBrokerTestSupport { + + private final static Logger logger = Logger.getLogger( JmsTimeoutTest.class ); + + private int messageSize=1024*64; + private int messageCount=10000; + private final AtomicInteger exceptionCount = new AtomicInteger(0); + + /** + * Test the case where the broker is blocked due to a memory limit + * and a producer timeout is set on the connection. + * @throws Exception + */ + public void testBlockedProducerConnectionTimeout() throws Exception { + final ActiveMQConnection cx = (ActiveMQConnection)createConnection(); + final ActiveMQDestination queue = createDestination("testqueue"); + + // we should not take longer than 5 seconds to return from send + cx.setSendTimeout(10000); + + Runnable r = new Runnable() { + public void run() { + try { + logger.info("Sender thread starting"); + Session session = cx.createSession(false, 1); + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + TextMessage message = session.createTextMessage(createMessageText()); + for(int count=0; count 0); + } + + protected void setUp() throws Exception { + bindAddress = "tcp://localhost:61616"; + broker = createBroker(); + broker.setDeleteAllMessagesOnStartup(true); + broker.getSystemUsage().getMemoryUsage().setLimit(5*1024*1024); + broker.getSystemUsage().setSendFailIfNoSpaceAfterTimeout(5000); + super.setUp(); + } + + private String createMessageText() { + StringBuffer buffer = new StringBuffer(); + buffer.append(""); + for (int i = buffer.length(); i < messageSize; i++) { + buffer.append('X'); + } + buffer.append(""); + return buffer.toString(); + } + + } \ No newline at end of file