resolve AMQ-2033 - patch applied with thanks

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@727353 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2008-12-17 12:00:40 +00:00
parent 70f4eba372
commit 9a432f22f9
2 changed files with 290 additions and 238 deletions

View File

@ -94,6 +94,7 @@ public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport impl
if (session.connection.getProtocolVersion() >= 3 && this.info.getWindowSize() > 0) {
producerWindow = new MemoryUsage("Producer Window: " + producerId);
producerWindow.setLimit(this.info.getWindowSize());
producerWindow.start();
}
this.defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE;
@ -151,6 +152,9 @@ public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport impl
public void dispose() {
if (!closed) {
this.session.removeProducer(this);
if (producerWindow != null) {
producerWindow.stop();
}
closed = true;
}
}

View File

@ -76,6 +76,54 @@ public class ProducerFlowControlTest extends JmsTestSupport {
msg.acknowledge();
}
public void testPubisherRecoverAfterBlock() throws Exception {
ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
factory.setProducerWindowSize(1024 * 64);
factory.setUseAsyncSend(true);
connection = (ActiveMQConnection)factory.createConnection();
connections.add(connection);
connection.start();
final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
final MessageProducer producer = session.createProducer(queueA);
final AtomicBoolean done = new AtomicBoolean(true);
final AtomicBoolean keepGoing = new AtomicBoolean(true);
Thread thread = new Thread("Filler") {
@Override
public void run() {
while (keepGoing.get()) {
done.set(false);
try {
producer.send(session.createTextMessage("Test message"));
} catch (JMSException e) {
}
}
}
};
thread.start();
while (true) {
Thread.sleep(1000);
// the producer is blocked once the done flag stays true.
if (done.get()) {
break;
}
done.set(true);
}
// after receiveing messges, producer should continue sending messages
// (done == false)
MessageConsumer consumer = session.createConsumer(queueA);
TextMessage msg;
for (int idx = 0; idx < 5; ++idx) {
msg = (TextMessage) consumer.receive(1000);
msg.acknowledge();
}
Thread.sleep(1000);
keepGoing.set(false);
assertFalse(done.get());
}
public void test2ndPubisherWithSyncSendConnectionThatIsBlocked() throws Exception {
ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
factory.setAlwaysSyncSend(true);