merging 904160 - https://issues.apache.org/activemq/browse/AMQ-2507 - producer flow control timeout

git-svn-id: https://svn.apache.org/repos/asf/activemq/branches/activemq-5.3@904171 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2010-01-28 17:22:27 +00:00
parent 60a797c4b1
commit 0cb2f403b7
3 changed files with 219 additions and 18 deletions

View File

@ -27,15 +27,19 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.ResourceAllocationException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
@ -96,7 +100,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
private final Object sendLock = new Object();
private ExecutorService executor;
protected final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
protected final Map<MessageId, Runnable> messagesWaitingForSpace = Collections.synchronizedMap(new LinkedHashMap<MessageId, Runnable>());
private final Object dispatchMutex = new Object();
private boolean useConsumerPriority = true;
private boolean strictOrderDispatch = false;
@ -118,9 +122,73 @@ public class Queue extends BaseDestination implements Task, UsageListener {
expireMessages();
}
};
private final Object iteratingMutex = new Object() {};
private static final Scheduler scheduler = Scheduler.getInstance();
class TimeoutMessage implements Delayed {
Message message;
ConnectionContext context;
long trigger;
public TimeoutMessage(Message message, ConnectionContext context, long delay) {
this.message = message;
this.context = context;
this.trigger = System.currentTimeMillis() + delay;
}
public long getDelay(TimeUnit unit) {
long n = trigger - System.currentTimeMillis();
return unit.convert(n, TimeUnit.MILLISECONDS);
}
public int compareTo(Delayed delayed) {
long other = ((TimeoutMessage)delayed).trigger;
int returnValue;
if (this.trigger < other) {
returnValue = -1;
} else if (this.trigger > other) {
returnValue = 1;
} else {
returnValue = 0;
}
return returnValue;
}
}
DelayQueue<TimeoutMessage> flowControlTimeoutMessages = new DelayQueue<TimeoutMessage>();
class FlowControlTimeoutTask extends Thread {
public void run() {
TimeoutMessage timeout;
try {
while (true) {
timeout = flowControlTimeoutMessages.take();
if (timeout != null) {
synchronized (messagesWaitingForSpace) {
if (messagesWaitingForSpace.remove(timeout.message.getMessageId()) != null) {
ExceptionResponse response = new ExceptionResponse(new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + timeout.message.getProducerId() + ") to prevent flooding "
+ getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"));
response.setCorrelationId(timeout.message.getCommandId());
timeout.context.getConnection().dispatchAsync(response);
}
}
}
}
} catch (InterruptedException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Producer Flow Control Timeout Task is stopping");
}
}
}
};
private final FlowControlTimeoutTask flowControlTimeoutTask = new FlowControlTimeoutTask();
private static final Comparator<Subscription> orderedCompare = new Comparator<Subscription>() {
public int compare(Subscription s1, Subscription s2) {
@ -401,7 +469,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
if (systemUsage.isSendFailIfNoSpace()) {
throw new javax.jms.ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
+ getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
}
@ -412,7 +480,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
// for space.
final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy();
synchronized (messagesWaitingForSpace) {
messagesWaitingForSpace.add(new Runnable() {
messagesWaitingForSpace.put(message.getMessageId(), new Runnable() {
public void run() {
try {
@ -447,6 +515,10 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
});
if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage.getSendFailIfNoSpaceAfterTimeout()));
}
registerCallbackForNotFullNotification();
context.setDontSendReponse(true);
return;
@ -497,7 +569,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
if (systemUsage.isSendFailIfNoSpace()) {
throw new javax.jms.ResourceAllocationException(logMessage);
throw new ResourceAllocationException(logMessage);
}
waitForSpace(context, systemUsage.getStoreUsage(), logMessage);
@ -616,6 +688,15 @@ public class Queue extends BaseDestination implements Task, UsageListener {
if (getExpireMessagesPeriod() > 0) {
scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
}
flowControlTimeoutTask.setName("Producer Flow Control Timeout Task");
// Start flow control timeout task
// Prevent trying to start it multiple times
if (!flowControlTimeoutTask.isAlive()) {
flowControlTimeoutTask.start();
}
doPageIn(false);
}
@ -629,6 +710,10 @@ public class Queue extends BaseDestination implements Task, UsageListener {
scheduler.cancel(expireMessagesTask);
if (flowControlTimeoutTask.isAlive()) {
flowControlTimeoutTask.interrupt();
}
if (messages != null) {
messages.stop();
}
@ -1074,9 +1159,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
// do early to allow dispatch of these waiting messages
synchronized (messagesWaitingForSpace) {
while (!messagesWaitingForSpace.isEmpty()) {
Iterator<Runnable> it = messagesWaitingForSpace.values().iterator();
while (it.hasNext()) {
if (!memoryUsage.isFull()) {
Runnable op = messagesWaitingForSpace.removeFirst();
Runnable op = it.next();
it.remove();
op.run();
} else {
registerCallbackForNotFullNotification();
@ -1286,7 +1373,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
final String logMessage = "Usage Manager Temp Store is Full. Stopping producer (" + msg.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
if (systemUsage.isSendFailIfNoSpace()) {
throw new javax.jms.ResourceAllocationException(logMessage);
throw new ResourceAllocationException(logMessage);
}
waitForSpace(context, messages.getSystemUsage().getTempUsage(), logMessage);
@ -1619,7 +1706,12 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
}
private final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException {
private final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException {
if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout())) {
throw new ResourceAllocationException(warning);
}
} else {
long start = System.currentTimeMillis();
long nextWarn = start + blockedProducerWarningInterval;
while (!usage.waitForSpace(1000)) {
@ -1634,4 +1726,5 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
}
}
}
}

View File

@ -43,6 +43,10 @@ public class SystemUsage implements Service {
*/
private boolean sendFailIfNoSpaceExplicitySet;
private boolean sendFailIfNoSpace;
private boolean sendFailIfNoSpaceAfterTimeoutExplicitySet;
private long sendFailIfNoSpaceAfterTimeout = 0;
private List<SystemUsage> children = new CopyOnWriteArrayList<SystemUsage>();
public SystemUsage() {
@ -154,6 +158,19 @@ public class SystemUsage implements Service {
this.sendFailIfNoSpaceExplicitySet = sendFailIfNoSpaceExplicitySet;
}
public long getSendFailIfNoSpaceAfterTimeout() {
if (sendFailIfNoSpaceAfterTimeoutExplicitySet || parent == null) {
return sendFailIfNoSpaceAfterTimeout;
} else {
return parent.getSendFailIfNoSpaceAfterTimeout();
}
}
public void setSendFailIfNoSpaceAfterTimeout(long sendFailIfNoSpaceAfterTimeout) {
this.sendFailIfNoSpaceAfterTimeoutExplicitySet = true;
this.sendFailIfNoSpaceAfterTimeout = sendFailIfNoSpaceAfterTimeout;
}
public void setName(String name) {
this.name = name;
this.memoryUsage.setName(name + ":memory");

View File

@ -0,0 +1,91 @@
package org.apache.activemq.bugs;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.DeliveryMode;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ResourceAllocationException;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.log4j.Logger;
public class JmsTimeoutTest extends EmbeddedBrokerTestSupport {
private final static Logger logger = Logger.getLogger( JmsTimeoutTest.class );
private int messageSize=1024*64;
private int messageCount=10000;
private final AtomicInteger exceptionCount = new AtomicInteger(0);
/**
* Test the case where the broker is blocked due to a memory limit
* and a producer timeout is set on the connection.
* @throws Exception
*/
public void testBlockedProducerConnectionTimeout() throws Exception {
final ActiveMQConnection cx = (ActiveMQConnection)createConnection();
final ActiveMQDestination queue = createDestination("testqueue");
// we should not take longer than 5 seconds to return from send
cx.setSendTimeout(10000);
Runnable r = new Runnable() {
public void run() {
try {
logger.info("Sender thread starting");
Session session = cx.createSession(false, 1);
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage message = session.createTextMessage(createMessageText());
for(int count=0; count<messageCount; count++){
producer.send(message);
// Currently after the timeout producer just
// returns but there is no way to know that
// the send timed out
}
logger.info("Done sending..");
} catch (JMSException e) {
e.printStackTrace();
exceptionCount.incrementAndGet();
return;
}
}
};
cx.start();
Thread producerThread = new Thread(r);
producerThread.start();
producerThread.join(30000);
cx.close();
// We should have a few timeout exceptions as memory store will fill up
assertTrue(exceptionCount.get() > 0);
}
protected void setUp() throws Exception {
bindAddress = "tcp://localhost:61616";
broker = createBroker();
broker.setDeleteAllMessagesOnStartup(true);
broker.getSystemUsage().getMemoryUsage().setLimit(5*1024*1024);
broker.getSystemUsage().setSendFailIfNoSpaceAfterTimeout(5000);
super.setUp();
}
private String createMessageText() {
StringBuffer buffer = new StringBuffer();
buffer.append("<filler>");
for (int i = buffer.length(); i < messageSize; i++) {
buffer.append('X');
}
buffer.append("</filler>");
return buffer.toString();
}
}