resolve https://issues.apache.org/activemq/browse/AMQ-2523 - have Usage use a shared executor with a limited pool of 10 threads and an unbounded queue. Fix potential dropped dispatch attempts when messages waiting for space fail due to memory again being full, can result is hung consumer. In this case, a new notification for not full needs to be registered. Added relevant test case.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@895975 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-01-05 10:29:19 +00:00
parent 155495485e
commit 97e0fccdae
6 changed files with 196 additions and 56 deletions

View File

@ -406,8 +406,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} }
// We can avoid blocking due to low usage if the producer is sending // We can avoid blocking due to low usage if the producer is sending
// a sync message or // a sync message or if it is using a producer window
// if it is using a producer window
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) { if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
// copy the exchange state since the context will be modified while we are waiting // copy the exchange state since the context will be modified while we are waiting
// for space. // for space.
@ -441,17 +440,14 @@ public class Queue extends BaseDestination implements Task, UsageListener {
ExceptionResponse response = new ExceptionResponse(e); ExceptionResponse response = new ExceptionResponse(e);
response.setCorrelationId(message.getCommandId()); response.setCorrelationId(message.getCommandId());
context.getConnection().dispatchAsync(response); context.getConnection().dispatchAsync(response);
} else {
LOG.debug("unexpected exception on deferred send of :" + message, e);
} }
} }
} }
}); });
// If the user manager is not full, then the task will not registerCallbackForNotFullNotification();
// get called..
if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
// so call it directly here.
sendMessagesWaitingForSpaceTask.run();
}
context.setDontSendReponse(true); context.setDontSendReponse(true);
return; return;
} }
@ -482,6 +478,15 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} }
} }
private void registerCallbackForNotFullNotification() {
// If the usage manager is not full, then the task will not
// get called..
if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
// so call it directly here.
sendMessagesWaitingForSpaceTask.run();
}
}
void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception { void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
final ConnectionContext context = producerExchange.getConnectionContext(); final ConnectionContext context = producerExchange.getConnectionContext();
synchronized (sendLock) { synchronized (sendLock) {
@ -1069,9 +1074,14 @@ public class Queue extends BaseDestination implements Task, UsageListener {
// do early to allow dispatch of these waiting messages // do early to allow dispatch of these waiting messages
synchronized (messagesWaitingForSpace) { synchronized (messagesWaitingForSpace) {
while (!messagesWaitingForSpace.isEmpty() && !memoryUsage.isFull()) { while (!messagesWaitingForSpace.isEmpty()) {
if (!memoryUsage.isFull()) {
Runnable op = messagesWaitingForSpace.removeFirst(); Runnable op = messagesWaitingForSpace.removeFirst();
op.run(); op.run();
} else {
registerCallbackForNotFullNotification();
break;
}
} }
} }

View File

@ -43,6 +43,7 @@ import org.apache.commons.logging.LogFactory;
public abstract class Usage<T extends Usage> implements Service { public abstract class Usage<T extends Usage> implements Service {
private static final Log LOG = LogFactory.getLog(Usage.class); private static final Log LOG = LogFactory.getLog(Usage.class);
private static ThreadPoolExecutor executor;
protected final Object usageMutex = new Object(); protected final Object usageMutex = new Object();
protected int percentUsage; protected int percentUsage;
protected T parent; protected T parent;
@ -55,7 +56,7 @@ public abstract class Usage<T extends Usage> implements Service {
private List<T> children = new CopyOnWriteArrayList<T>(); private List<T> children = new CopyOnWriteArrayList<T>();
private final List<Runnable> callbacks = new LinkedList<Runnable>(); private final List<Runnable> callbacks = new LinkedList<Runnable>();
private int pollingTime = 100; private int pollingTime = 100;
private volatile ThreadPoolExecutor executor;
private AtomicBoolean started=new AtomicBoolean(); private AtomicBoolean started=new AtomicBoolean();
public Usage(T parent, String name, float portion) { public Usage(T parent, String name, float portion) {
@ -247,6 +248,7 @@ public abstract class Usage<T extends Usage> implements Service {
if (oldPercentUsage >= 100 && newPercentUsage < 100) { if (oldPercentUsage >= 100 && newPercentUsage < 100) {
synchronized (usageMutex) { synchronized (usageMutex) {
usageMutex.notifyAll(); usageMutex.notifyAll();
if (!callbacks.isEmpty()) {
for (Iterator<Runnable> iter = new ArrayList<Runnable>(callbacks).iterator(); iter.hasNext();) { for (Iterator<Runnable> iter = new ArrayList<Runnable>(callbacks).iterator(); iter.hasNext();) {
Runnable callback = iter.next(); Runnable callback = iter.next();
getExecutor().execute(callback); getExecutor().execute(callback);
@ -254,16 +256,16 @@ public abstract class Usage<T extends Usage> implements Service {
callbacks.clear(); callbacks.clear();
} }
} }
}
if (!listeners.isEmpty()) {
// Let the listeners know on a separate thread // Let the listeners know on a separate thread
Runnable listenerNotifier = new Runnable() { Runnable listenerNotifier = new Runnable() {
public void run() { public void run() {
for (Iterator<UsageListener> iter = listeners.iterator(); iter.hasNext();) { for (Iterator<UsageListener> iter = listeners.iterator(); iter.hasNext();) {
UsageListener l = iter.next(); UsageListener l = iter.next();
l.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage); l.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage);
} }
} }
}; };
if (started.get()) { if (started.get()) {
getExecutor().execute(listenerNotifier); getExecutor().execute(listenerNotifier);
@ -272,6 +274,7 @@ public abstract class Usage<T extends Usage> implements Service {
} }
} }
} }
}
public String getName() { public String getName() {
return name; return name;
@ -299,9 +302,7 @@ public abstract class Usage<T extends Usage> implements Service {
if (parent != null) { if (parent != null) {
parent.removeChild(this); parent.removeChild(this);
} }
if (this.executor != null){
this.executor.shutdownNow();
}
//clear down any callbacks //clear down any callbacks
synchronized (usageMutex) { synchronized (usageMutex) {
usageMutex.notifyAll(); usageMutex.notifyAll();
@ -402,22 +403,17 @@ public abstract class Usage<T extends Usage> implements Service {
} }
protected Executor getExecutor() { protected Executor getExecutor() {
if (this.executor == null) { return executor;
synchronized(usageMutex) { }
if (this.executor == null) {
this.executor = new ThreadPoolExecutor(1, 1, 0, static {
TimeUnit.NANOSECONDS, executor = new ThreadPoolExecutor(10, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runnable) { public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, getName() Thread thread = new Thread(runnable, "Usage Async Task");
+ " Usage Thread Pool");
thread.setDaemon(true); thread.setDaemon(true);
return thread; return thread;
} }
}); });
} }
}
}
return this.executor;
}
} }

View File

@ -62,6 +62,11 @@ public class ProducerFlowControlSendFailTest extends ProducerFlowControlTest {
// with sendFailIfNoSpace set, there is no blocking of the connection // with sendFailIfNoSpace set, there is no blocking of the connection
} }
@Override
public void testAsyncPubisherRecoverAfterBlock() throws Exception {
// sendFail means no flowControllwindow as there is no producer ack, just an exception
}
@Override @Override
public void testPubisherRecoverAfterBlock() throws Exception { public void testPubisherRecoverAfterBlock() throws Exception {
ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory(); ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();

View File

@ -37,9 +37,11 @@ import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePoli
import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy; import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.transport.tcp.TcpTransport; import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class ProducerFlowControlTest extends JmsTestSupport { public class ProducerFlowControlTest extends JmsTestSupport {
static final Log LOG = LogFactory.getLog(ProducerFlowControlTest.class);
ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A"); ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A");
ActiveMQQueue queueB = new ActiveMQQueue("QUEUE.B"); ActiveMQQueue queueB = new ActiveMQQueue("QUEUE.B");
protected TransportConnector connector; protected TransportConnector connector;
@ -80,8 +82,6 @@ public class ProducerFlowControlTest extends JmsTestSupport {
public void testPubisherRecoverAfterBlock() throws Exception { public void testPubisherRecoverAfterBlock() throws Exception {
ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory(); ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
factory.setProducerWindowSize(1024 * 64);
factory.setUseAsyncSend(true);
connection = (ActiveMQConnection)factory.createConnection(); connection = (ActiveMQConnection)factory.createConnection();
connections.add(connection); connections.add(connection);
connection.start(); connection.start();
@ -94,12 +94,14 @@ public class ProducerFlowControlTest extends JmsTestSupport {
Thread thread = new Thread("Filler") { Thread thread = new Thread("Filler") {
int i;
@Override @Override
public void run() { public void run() {
while (keepGoing.get()) { while (keepGoing.get()) {
done.set(false); done.set(false);
try { try {
producer.send(session.createTextMessage("Test message")); producer.send(session.createTextMessage("Test message " + ++i));
LOG.info("sent: " + i);
} catch (JMSException e) { } catch (JMSException e) {
} }
} }
@ -114,12 +116,61 @@ public class ProducerFlowControlTest extends JmsTestSupport {
TextMessage msg; TextMessage msg;
for (int idx = 0; idx < 5; ++idx) { for (int idx = 0; idx < 5; ++idx) {
msg = (TextMessage) consumer.receive(1000); msg = (TextMessage) consumer.receive(1000);
LOG.info("received: " + idx + ", msg: " + msg.getJMSMessageID());
msg.acknowledge(); msg.acknowledge();
} }
Thread.sleep(1000); Thread.sleep(1000);
keepGoing.set(false); keepGoing.set(false);
assertFalse(done.get()); assertFalse("producer has resumed", done.get());
}
public void testAsyncPubisherRecoverAfterBlock() throws Exception {
ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
factory.setProducerWindowSize(1024 * 5);
factory.setUseAsyncSend(true);
connection = (ActiveMQConnection)factory.createConnection();
connections.add(connection);
connection.start();
final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
final MessageProducer producer = session.createProducer(queueA);
final AtomicBoolean done = new AtomicBoolean(true);
final AtomicBoolean keepGoing = new AtomicBoolean(true);
Thread thread = new Thread("Filler") {
int i;
@Override
public void run() {
while (keepGoing.get()) {
done.set(false);
try {
producer.send(session.createTextMessage("Test message " + ++i));
LOG.info("sent: " + i);
} catch (JMSException e) {
}
}
}
};
thread.start();
waitForBlockedOrResourceLimit(done);
// after receiveing messges, producer should continue sending messages
// (done == false)
MessageConsumer consumer = session.createConsumer(queueA);
TextMessage msg;
for (int idx = 0; idx < 5; ++idx) {
msg = (TextMessage) consumer.receive(1000);
assertNotNull("Got a message", msg);
LOG.info("received: " + idx + ", msg: " + msg.getJMSMessageID());
msg.acknowledge();
}
Thread.sleep(1000);
keepGoing.set(false);
assertFalse("producer has resumed", done.get());
} }
public void test2ndPubisherWithSyncSendConnectionThatIsBlocked() throws Exception { public void test2ndPubisherWithSyncSendConnectionThatIsBlocked() throws Exception {

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usage;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class MemoryUsageTest {
MemoryUsage underTest;
@Test
public final void testPercentUsageNeedsNoThread() {
int activeThreadCount = Thread.activeCount();
underTest.setLimit(10);
underTest.start();
underTest.increaseUsage(1);
assertEquals("usage is correct", 10, underTest.getPercentUsage());
assertEquals("no new thread created withough listener or callback",activeThreadCount, Thread.activeCount());
}
@Test
public final void testAddUsageListenerStartsThread() throws Exception {
int activeThreadCount = Thread.activeCount();
underTest = new MemoryUsage();
underTest.setLimit(10);
underTest.start();
final CountDownLatch called = new CountDownLatch(1);
final String[] listnerThreadNameHolder = new String[1];
underTest.addUsageListener(new UsageListener() {
public void onUsageChanged(Usage usage, int oldPercentUsage,
int newPercentUsage) {
called.countDown();
listnerThreadNameHolder[0] = Thread.currentThread().toString();
}
});
underTest.increaseUsage(1);
assertTrue("listner was called", called.await(30, TimeUnit.SECONDS));
assertTrue("listner called from another thread", !Thread.currentThread().toString().equals(listnerThreadNameHolder[0]));
assertEquals("usage is correct", 10, underTest.getPercentUsage());
assertEquals("new thread created with listener", activeThreadCount + 1, Thread.activeCount());
}
@Before
public void setUp() throws Exception {
underTest = new MemoryUsage();
}
@After
public void tearDown() {
assertNotNull(underTest);
underTest.stop();
}
}

View File

@ -36,7 +36,8 @@ public class HttpsTransportBrokerTest extends HttpTransportBrokerTest {
//System.setProperty("javax.net.debug", "ssl,handshake,data,trustmanager"); //System.setProperty("javax.net.debug", "ssl,handshake,data,trustmanager");
super.setUp(); super.setUp();
Thread.sleep(1000); Thread.sleep(2000);
Thread.yield();
} }
protected void tearDown() throws Exception { protected void tearDown() throws Exception {