add variant that uses syncSend and verifies a ResourceAllocationException in the sending thread

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@831377 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-10-30 16:43:05 +00:00
parent 89eecadd9d
commit 97d7c390df
1 changed files with 45 additions and 0 deletions

View File

@ -17,6 +17,7 @@
package org.apache.activemq;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.ConnectionFactory;
@ -111,6 +112,50 @@ public class ProducerFlowControlSendFailTest extends ProducerFlowControlTest {
keepGoing.set(false);
}
public void testPubisherRecoverAfterBlockWithSyncSend() throws Exception {
ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
factory.setExceptionListener(null);
factory.setUseAsyncSend(false);
connection = (ActiveMQConnection)factory.createConnection();
connections.add(connection);
connection.start();
final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
final MessageProducer producer = session.createProducer(queueA);
final AtomicBoolean keepGoing = new AtomicBoolean(true);
final AtomicInteger exceptionCount = new AtomicInteger(0);
Thread thread = new Thread("Filler") {
@Override
public void run() {
while (keepGoing.get()) {
try {
producer.send(session.createTextMessage("Test message"));
} catch (JMSException arg0) {
if (arg0 instanceof ResourceAllocationException) {
gotResourceException.set(true);
exceptionCount.incrementAndGet();
}
}
}
}
};
thread.start();
waitForBlockedOrResourceLimit(new AtomicBoolean(false));
// resourceException on second message, resumption if we
// can receive 10
MessageConsumer consumer = session.createConsumer(queueA);
TextMessage msg;
for (int idx = 0; idx < 10; ++idx) {
msg = (TextMessage) consumer.receive(1000);
if (msg != null) {
msg.acknowledge();
}
}
assertTrue("we were blocked at least 5 times", 5 < exceptionCount.get());
keepGoing.set(false);
}
@Override
protected ConnectionFactory createConnectionFactory() throws Exception {