mirror of https://github.com/apache/activemq.git
https://issues.apache.org/activemq/browse/AMQ-2507 - producer flow control timeout
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@904160 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
87cc36a359
commit
83128fc45b
|
@ -27,15 +27,19 @@ import java.util.Iterator;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.CopyOnWriteArraySet;
|
import java.util.concurrent.CopyOnWriteArraySet;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.DelayQueue;
|
||||||
|
import java.util.concurrent.Delayed;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import javax.jms.InvalidSelectorException;
|
import javax.jms.InvalidSelectorException;
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.ResourceAllocationException;
|
||||||
|
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.ConnectionContext;
|
import org.apache.activemq.broker.ConnectionContext;
|
||||||
|
@ -96,7 +100,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
||||||
private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
|
private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
|
||||||
private final Object sendLock = new Object();
|
private final Object sendLock = new Object();
|
||||||
private ExecutorService executor;
|
private ExecutorService executor;
|
||||||
protected final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
|
protected final Map<MessageId, Runnable> messagesWaitingForSpace = Collections.synchronizedMap(new LinkedHashMap<MessageId, Runnable>());
|
||||||
private final Object dispatchMutex = new Object();
|
private final Object dispatchMutex = new Object();
|
||||||
private boolean useConsumerPriority = true;
|
private boolean useConsumerPriority = true;
|
||||||
private boolean strictOrderDispatch = false;
|
private boolean strictOrderDispatch = false;
|
||||||
|
@ -118,8 +122,72 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
||||||
expireMessages();
|
expireMessages();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
private final Object iteratingMutex = new Object() {};
|
private final Object iteratingMutex = new Object() {};
|
||||||
private static final Scheduler scheduler = Scheduler.getInstance();
|
private static final Scheduler scheduler = Scheduler.getInstance();
|
||||||
|
|
||||||
|
class TimeoutMessage implements Delayed {
|
||||||
|
|
||||||
|
Message message;
|
||||||
|
ConnectionContext context;
|
||||||
|
long trigger;
|
||||||
|
|
||||||
|
public TimeoutMessage(Message message, ConnectionContext context, long delay) {
|
||||||
|
this.message = message;
|
||||||
|
this.context = context;
|
||||||
|
this.trigger = System.currentTimeMillis() + delay;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getDelay(TimeUnit unit) {
|
||||||
|
long n = trigger - System.currentTimeMillis();
|
||||||
|
return unit.convert(n, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int compareTo(Delayed delayed) {
|
||||||
|
long other = ((TimeoutMessage)delayed).trigger;
|
||||||
|
int returnValue;
|
||||||
|
if (this.trigger < other) {
|
||||||
|
returnValue = -1;
|
||||||
|
} else if (this.trigger > other) {
|
||||||
|
returnValue = 1;
|
||||||
|
} else {
|
||||||
|
returnValue = 0;
|
||||||
|
}
|
||||||
|
return returnValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
DelayQueue<TimeoutMessage> flowControlTimeoutMessages = new DelayQueue<TimeoutMessage>();
|
||||||
|
|
||||||
|
class FlowControlTimeoutTask extends Thread {
|
||||||
|
|
||||||
|
public void run() {
|
||||||
|
TimeoutMessage timeout;
|
||||||
|
try {
|
||||||
|
while (true) {
|
||||||
|
timeout = flowControlTimeoutMessages.take();
|
||||||
|
if (timeout != null) {
|
||||||
|
synchronized (messagesWaitingForSpace) {
|
||||||
|
if (messagesWaitingForSpace.remove(timeout.message.getMessageId()) != null) {
|
||||||
|
ExceptionResponse response = new ExceptionResponse(new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + timeout.message.getProducerId() + ") to prevent flooding "
|
||||||
|
+ getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"));
|
||||||
|
response.setCorrelationId(timeout.message.getCommandId());
|
||||||
|
timeout.context.getConnection().dispatchAsync(response);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Producer Flow Control Timeout Task is stopping");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
private final FlowControlTimeoutTask flowControlTimeoutTask = new FlowControlTimeoutTask();
|
||||||
|
|
||||||
|
|
||||||
private static final Comparator<Subscription> orderedCompare = new Comparator<Subscription>() {
|
private static final Comparator<Subscription> orderedCompare = new Comparator<Subscription>() {
|
||||||
|
|
||||||
|
@ -401,7 +469,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (systemUsage.isSendFailIfNoSpace()) {
|
if (systemUsage.isSendFailIfNoSpace()) {
|
||||||
throw new javax.jms.ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
|
throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
|
||||||
+ getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
|
+ getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -412,7 +480,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
||||||
// for space.
|
// for space.
|
||||||
final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy();
|
final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy();
|
||||||
synchronized (messagesWaitingForSpace) {
|
synchronized (messagesWaitingForSpace) {
|
||||||
messagesWaitingForSpace.add(new Runnable() {
|
messagesWaitingForSpace.put(message.getMessageId(), new Runnable() {
|
||||||
public void run() {
|
public void run() {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -446,6 +514,10 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
|
||||||
|
flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage.getSendFailIfNoSpaceAfterTimeout()));
|
||||||
|
}
|
||||||
|
|
||||||
registerCallbackForNotFullNotification();
|
registerCallbackForNotFullNotification();
|
||||||
context.setDontSendReponse(true);
|
context.setDontSendReponse(true);
|
||||||
|
@ -497,7 +569,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
||||||
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
|
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
|
||||||
|
|
||||||
if (systemUsage.isSendFailIfNoSpace()) {
|
if (systemUsage.isSendFailIfNoSpace()) {
|
||||||
throw new javax.jms.ResourceAllocationException(logMessage);
|
throw new ResourceAllocationException(logMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
waitForSpace(context, systemUsage.getStoreUsage(), logMessage);
|
waitForSpace(context, systemUsage.getStoreUsage(), logMessage);
|
||||||
|
@ -619,6 +691,15 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
||||||
if (getExpireMessagesPeriod() > 0) {
|
if (getExpireMessagesPeriod() > 0) {
|
||||||
scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
|
scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
flowControlTimeoutTask.setName("Producer Flow Control Timeout Task");
|
||||||
|
|
||||||
|
// Start flow control timeout task
|
||||||
|
// Prevent trying to start it multiple times
|
||||||
|
if (!flowControlTimeoutTask.isAlive()) {
|
||||||
|
flowControlTimeoutTask.start();
|
||||||
|
}
|
||||||
|
|
||||||
doPageIn(false);
|
doPageIn(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -631,6 +712,10 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
scheduler.cancel(expireMessagesTask);
|
scheduler.cancel(expireMessagesTask);
|
||||||
|
|
||||||
|
if (flowControlTimeoutTask.isAlive()) {
|
||||||
|
flowControlTimeoutTask.interrupt();
|
||||||
|
}
|
||||||
|
|
||||||
if (messages != null) {
|
if (messages != null) {
|
||||||
messages.stop();
|
messages.stop();
|
||||||
|
@ -1077,9 +1162,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
||||||
|
|
||||||
// do early to allow dispatch of these waiting messages
|
// do early to allow dispatch of these waiting messages
|
||||||
synchronized (messagesWaitingForSpace) {
|
synchronized (messagesWaitingForSpace) {
|
||||||
while (!messagesWaitingForSpace.isEmpty()) {
|
Iterator<Runnable> it = messagesWaitingForSpace.values().iterator();
|
||||||
|
while (it.hasNext()) {
|
||||||
if (!memoryUsage.isFull()) {
|
if (!memoryUsage.isFull()) {
|
||||||
Runnable op = messagesWaitingForSpace.removeFirst();
|
Runnable op = it.next();
|
||||||
|
it.remove();
|
||||||
op.run();
|
op.run();
|
||||||
} else {
|
} else {
|
||||||
registerCallbackForNotFullNotification();
|
registerCallbackForNotFullNotification();
|
||||||
|
@ -1289,7 +1376,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
||||||
final String logMessage = "Usage Manager Temp Store is Full. Stopping producer (" + msg.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
|
final String logMessage = "Usage Manager Temp Store is Full. Stopping producer (" + msg.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
|
||||||
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
|
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
|
||||||
if (systemUsage.isSendFailIfNoSpace()) {
|
if (systemUsage.isSendFailIfNoSpace()) {
|
||||||
throw new javax.jms.ResourceAllocationException(logMessage);
|
throw new ResourceAllocationException(logMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
waitForSpace(context, messages.getSystemUsage().getTempUsage(), logMessage);
|
waitForSpace(context, messages.getSystemUsage().getTempUsage(), logMessage);
|
||||||
|
@ -1622,18 +1709,24 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException {
|
private final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException {
|
||||||
long start = System.currentTimeMillis();
|
if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
|
||||||
long nextWarn = start + blockedProducerWarningInterval;
|
if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout())) {
|
||||||
while (!usage.waitForSpace(1000)) {
|
throw new ResourceAllocationException(warning);
|
||||||
if (context.getStopping().get()) {
|
|
||||||
throw new IOException("Connection closed, send aborted.");
|
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
long now = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
if (now >= nextWarn) {
|
long nextWarn = start + blockedProducerWarningInterval;
|
||||||
LOG.info(warning + " (blocking for: " + (now - start) / 1000 + "s)");
|
while (!usage.waitForSpace(1000)) {
|
||||||
nextWarn = now + blockedProducerWarningInterval;
|
if (context.getStopping().get()) {
|
||||||
|
throw new IOException("Connection closed, send aborted.");
|
||||||
|
}
|
||||||
|
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
if (now >= nextWarn) {
|
||||||
|
LOG.info(warning + " (blocking for: " + (now - start) / 1000 + "s)");
|
||||||
|
nextWarn = now + blockedProducerWarningInterval;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,6 +43,9 @@ public class SystemUsage implements Service {
|
||||||
*/
|
*/
|
||||||
private boolean sendFailIfNoSpaceExplicitySet;
|
private boolean sendFailIfNoSpaceExplicitySet;
|
||||||
private boolean sendFailIfNoSpace;
|
private boolean sendFailIfNoSpace;
|
||||||
|
private boolean sendFailIfNoSpaceAfterTimeoutExplicitySet;
|
||||||
|
private long sendFailIfNoSpaceAfterTimeout = 0;
|
||||||
|
|
||||||
private final List<SystemUsage> children = new CopyOnWriteArrayList<SystemUsage>();
|
private final List<SystemUsage> children = new CopyOnWriteArrayList<SystemUsage>();
|
||||||
|
|
||||||
public SystemUsage() {
|
public SystemUsage() {
|
||||||
|
@ -155,6 +158,19 @@ public class SystemUsage implements Service {
|
||||||
this.sendFailIfNoSpaceExplicitySet = sendFailIfNoSpaceExplicitySet;
|
this.sendFailIfNoSpaceExplicitySet = sendFailIfNoSpaceExplicitySet;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getSendFailIfNoSpaceAfterTimeout() {
|
||||||
|
if (sendFailIfNoSpaceAfterTimeoutExplicitySet || parent == null) {
|
||||||
|
return sendFailIfNoSpaceAfterTimeout;
|
||||||
|
} else {
|
||||||
|
return parent.getSendFailIfNoSpaceAfterTimeout();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSendFailIfNoSpaceAfterTimeout(long sendFailIfNoSpaceAfterTimeout) {
|
||||||
|
this.sendFailIfNoSpaceAfterTimeoutExplicitySet = true;
|
||||||
|
this.sendFailIfNoSpaceAfterTimeout = sendFailIfNoSpaceAfterTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
public void setName(String name) {
|
public void setName(String name) {
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.memoryUsage.setName(name + ":memory");
|
this.memoryUsage.setName(name + ":memory");
|
||||||
|
|
|
@ -0,0 +1,91 @@
|
||||||
|
package org.apache.activemq.bugs;
|
||||||
|
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import javax.jms.DeliveryMode;
|
||||||
|
import javax.jms.ExceptionListener;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.ResourceAllocationException;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import javax.jms.TextMessage;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
|
import org.apache.activemq.EmbeddedBrokerTestSupport;
|
||||||
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
|
|
||||||
|
public class JmsTimeoutTest extends EmbeddedBrokerTestSupport {
|
||||||
|
|
||||||
|
private final static Logger logger = Logger.getLogger( JmsTimeoutTest.class );
|
||||||
|
|
||||||
|
private int messageSize=1024*64;
|
||||||
|
private int messageCount=10000;
|
||||||
|
private final AtomicInteger exceptionCount = new AtomicInteger(0);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test the case where the broker is blocked due to a memory limit
|
||||||
|
* and a producer timeout is set on the connection.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public void testBlockedProducerConnectionTimeout() throws Exception {
|
||||||
|
final ActiveMQConnection cx = (ActiveMQConnection)createConnection();
|
||||||
|
final ActiveMQDestination queue = createDestination("testqueue");
|
||||||
|
|
||||||
|
// we should not take longer than 5 seconds to return from send
|
||||||
|
cx.setSendTimeout(10000);
|
||||||
|
|
||||||
|
Runnable r = new Runnable() {
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
logger.info("Sender thread starting");
|
||||||
|
Session session = cx.createSession(false, 1);
|
||||||
|
MessageProducer producer = session.createProducer(queue);
|
||||||
|
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||||
|
|
||||||
|
TextMessage message = session.createTextMessage(createMessageText());
|
||||||
|
for(int count=0; count<messageCount; count++){
|
||||||
|
producer.send(message);
|
||||||
|
// Currently after the timeout producer just
|
||||||
|
// returns but there is no way to know that
|
||||||
|
// the send timed out
|
||||||
|
}
|
||||||
|
logger.info("Done sending..");
|
||||||
|
} catch (JMSException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
exceptionCount.incrementAndGet();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
cx.start();
|
||||||
|
Thread producerThread = new Thread(r);
|
||||||
|
producerThread.start();
|
||||||
|
producerThread.join(30000);
|
||||||
|
cx.close();
|
||||||
|
// We should have a few timeout exceptions as memory store will fill up
|
||||||
|
assertTrue(exceptionCount.get() > 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void setUp() throws Exception {
|
||||||
|
bindAddress = "tcp://localhost:61616";
|
||||||
|
broker = createBroker();
|
||||||
|
broker.setDeleteAllMessagesOnStartup(true);
|
||||||
|
broker.getSystemUsage().getMemoryUsage().setLimit(5*1024*1024);
|
||||||
|
broker.getSystemUsage().setSendFailIfNoSpaceAfterTimeout(5000);
|
||||||
|
super.setUp();
|
||||||
|
}
|
||||||
|
|
||||||
|
private String createMessageText() {
|
||||||
|
StringBuffer buffer = new StringBuffer();
|
||||||
|
buffer.append("<filler>");
|
||||||
|
for (int i = buffer.length(); i < messageSize; i++) {
|
||||||
|
buffer.append('X');
|
||||||
|
}
|
||||||
|
buffer.append("</filler>");
|
||||||
|
return buffer.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue