git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@901269 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-01-20 16:41:30 +00:00
parent ee55abb921
commit 8de3bd29bf
2 changed files with 171 additions and 15 deletions

View File

@ -252,20 +252,13 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
}
index++;
acknowledge(context, ack, node);
if (ack.getLastMessageId().equals(messageId)) {
if (context.isInTransaction()) {
// extend prefetch window only if not a pulling
// consumer
if (getPrefetchSize() != 0) {
prefetchExtension = Math.max(
prefetchExtension, index );
}
} else {
// contract prefetch if dispatch required a pull
if (getPrefetchSize() == 0) {
prefetchExtension = Math.max(0, prefetchExtension - index);
}
if (ack.getLastMessageId().equals(messageId)) {
// contract prefetch if dispatch required a pull
if (getPrefetchSize() == 0) {
prefetchExtension = Math.max(0, prefetchExtension - index);
} else if (context.isInTransaction()) {
// extend prefetch window only if not a pulling consumer
prefetchExtension = Math.max(prefetchExtension, index);
}
destination = node.getRegionDestination();
callDispatchMatched = true;

View File

@ -155,15 +155,178 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport {
answer = (TextMessage)consumer2.receiveNoWait();
assertNull("Should have not received a message!", answer);
}
// https://issues.apache.org/activemq/browse/AMQ-2567
public void testManyMessageConsumer() throws Exception {
doTestManyMessageConsumer(true);
}
public void testManyMessageConsumerNoTransaction() throws Exception {
doTestManyMessageConsumer(false);
}
private void doTestManyMessageConsumer(boolean transacted) throws Exception {
Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("Msg1"));
producer.send(session.createTextMessage("Msg2"));
producer.send(session.createTextMessage("Msg3"));
producer.send(session.createTextMessage("Msg4"));
producer.send(session.createTextMessage("Msg5"));
producer.send(session.createTextMessage("Msg6"));
producer.send(session.createTextMessage("Msg7"));
producer.send(session.createTextMessage("Msg8"));
if (transacted) {
session.commit();
}
// now lets receive it
MessageConsumer consumer = session.createConsumer(queue);
MessageConsumer consumer2 = session.createConsumer(queue);
TextMessage answer = (TextMessage)consumer.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg1");
if (transacted) {
session.commit();
}
answer = (TextMessage)consumer.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg2");
if (transacted) {
session.commit();
}
answer = (TextMessage)consumer.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg3");
if (transacted) {
session.commit();
}
// this call would return null if prefetchSize > 0
answer = (TextMessage)consumer.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg4");
if (transacted) {
session.commit();
}
// Now using other consumer
// this call should return the next message (Msg5) still left on the queue
answer = (TextMessage)consumer2.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg5");
if (transacted) {
session.commit();
}
// Now using other consumer
// this call should return the next message (Msg5) still left on the queue
answer = (TextMessage)consumer.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg6");
// read one more message without commit
// Now using other consumer
// this call should return the next message (Msg5) still left on the queue
answer = (TextMessage)consumer.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg7");
if (transacted) {
session.commit();
}
// Now using other consumer
// this call should return the next message (Msg5) still left on the queue
answer = (TextMessage)consumer2.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg8");
if (transacted) {
session.commit();
}
answer = (TextMessage)consumer.receiveNoWait();
assertNull("Should have not received a message!", answer);
}
public void testManyMessageConsumerWithSend() throws Exception {
doTestManyMessageConsumerWithSend(true);
}
public void testManyMessageConsumerWithSendNoTransaction() throws Exception {
doTestManyMessageConsumerWithSend(false);
}
private void doTestManyMessageConsumerWithSend(boolean transacted) throws Exception {
Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("Msg1"));
producer.send(session.createTextMessage("Msg2"));
producer.send(session.createTextMessage("Msg3"));
producer.send(session.createTextMessage("Msg4"));
producer.send(session.createTextMessage("Msg5"));
producer.send(session.createTextMessage("Msg6"));
producer.send(session.createTextMessage("Msg7"));
producer.send(session.createTextMessage("Msg8"));
if (transacted) {
session.commit();
}
// now lets receive it
MessageConsumer consumer = session.createConsumer(queue);
MessageConsumer consumer2 = session.createConsumer(queue);
TextMessage answer = (TextMessage)consumer.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg1");
if (transacted) {
session.commit();
}
answer = (TextMessage)consumer.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg2");
if (transacted) {
session.commit();
}
answer = (TextMessage)consumer.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg3");
if (transacted) {
session.commit();
}
// Now using other consumer take 2
answer = (TextMessage)consumer2.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg4");
answer = (TextMessage)consumer2.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg5");
// ensure prefetch extension ok by sending another that could get dispatched
producer.send(session.createTextMessage("Msg9"));
if (transacted) {
session.commit();
}
answer = (TextMessage)consumer.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg6");
// read one more message without commit
// and using other consumer
answer = (TextMessage)consumer2.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg7");
if (transacted) {
session.commit();
}
answer = (TextMessage)consumer2.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg8");
if (transacted) {
session.commit();
}
answer = (TextMessage)consumer.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg9");
if (transacted) {
session.commit();
}
answer = (TextMessage)consumer.receiveNoWait();
assertNull("Should have not received a message!", answer);
}
protected void setUp() throws Exception {
bindAddress = "tcp://localhost:61616";
bindAddress = "tcp://localhost:0";
super.setUp();
connection = createConnection();
connection.start();
queue = createQueue();
}
protected void startBroker() throws Exception {
super.startBroker();
bindAddress = broker.getTransportConnectors().get(0).getConnectUri().toString();
}
protected void tearDown() throws Exception {
connection.close();