Added test case that makes use of producer window flow control. So now even async sends can be flow controled so that an individual publisher can be stopped without stopping the entire connection.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@518638 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-03-15 14:25:40 +00:00
parent e4bc8e14f6
commit f7a30b80fc
2 changed files with 50 additions and 22 deletions

View File

@ -364,24 +364,28 @@ public class Queue implements Destination, Task {
synchronized( messagesWaitingForSpace ) {
messagesWaitingForSpace.add(new Runnable() {
public void run() {
// While waiting for space to free up... the message may have expired.
if(message.isExpired()){
if (log.isDebugEnabled()) {
log.debug("Expired message: " + message);
}
if( !message.isResponseRequired() ) {
ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack);
}
return;
}
try {
doMessageSend(producerExchange, message);
if( message.isResponseRequired() ) {
Response response = new Response();
response.setCorrelationId(message.getCommandId());
context.getConnection().dispatchAsync(response);
} else {
ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack);
}
} catch (Exception e) {
if( message.isResponseRequired() ) {
ExceptionResponse response = new ExceptionResponse(e);
response.setCorrelationId(message.getCommandId());
context.getConnection().dispatchAsync(response);
} else {
ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack);
}
}
}
@ -391,8 +395,7 @@ public class Queue implements Destination, Task {
if( !usageManager.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask) ) {
// so call it directly here.
sendMessagesWaitingForSpaceTask.run();
}
}
context.setDontSendReponse(true);
return;
}
@ -412,10 +415,6 @@ public class Queue implements Destination, Task {
if (log.isDebugEnabled()) {
log.debug("Expired message: " + message);
}
if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || !message.isResponseRequired() ) {
ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack);
}
return;
}
}
@ -431,7 +430,10 @@ public class Queue implements Destination, Task {
if(store!=null&&message.isPersistent()){
store.addMessage(context,message);
}
if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || !message.isResponseRequired() ) {
ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack);
}
if(context.isInTransaction()){
// If this is a transacted message.. increase the usage now so that a big TX does not blow up
// our memory. This increment is decremented once the tx finishes..
@ -446,10 +448,6 @@ public class Queue implements Destination, Task {
if (log.isDebugEnabled()) {
log.debug("Expired message: " + message);
}
if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || !message.isResponseRequired() ) {
ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack);
}
return;
}
sendMessage(context,message);

View File

@ -30,6 +30,36 @@ public class ProducerFlowControlTest extends JmsTestSupport {
private TransportConnector connector;
private ActiveMQConnection connection;
public void test2ndPubisherWithProducerWindowSendConnectionThatIsBlocked() throws Exception {
ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) createConnectionFactory();
factory.setProducerWindowSize(1024*64);
connection = (ActiveMQConnection) factory.createConnection();
connections.add(connection);
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queueB);
// Test sending to Queue A
// 1 few sends should not block until the producer window is used up.
fillQueue(queueA);
// Test sending to Queue B it should not block since the connection should not be blocked.
CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
assertTrue( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );
TextMessage msg = (TextMessage) consumer.receive();
assertEquals("Message 1", msg.getText());
msg.acknowledge();
pubishDoneToQeueuB = asyncSendTo(queueB, "Message 2");
assertTrue( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );
msg = (TextMessage) consumer.receive();
assertEquals("Message 2", msg.getText());
msg.acknowledge();
}
public void test2ndPubisherWithSyncSendConnectionThatIsBlocked() throws Exception {
ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) createConnectionFactory();
factory.setAlwaysSyncSend(true);