https://issues.apache.org/jira/browse/AMQ-5476 - ZeroPrefetchConsumerTest regression - fix default in connection factory and refactor prefetchExtension support - https://issues.apache.org/activemq/browse/AMQ-2560

This commit is contained in:
gtully 2014-12-15 14:12:08 +00:00
parent 411c7547ac
commit 2d9959a6f6
3 changed files with 38 additions and 38 deletions

View File

@ -234,26 +234,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
index++;
acknowledge(context, ack, node);
if (ack.getLastMessageId().equals(messageId)) {
// contract prefetch if dispatch required a pull
if (getPrefetchSize() == 0) {
// Protect extension update against parallel updates.
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(0, currentExtension - index);
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
break;
}
}
} else if (usePrefetchExtension && context.isInTransaction()) {
// extend prefetch window only if not a pulling consumer
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(currentExtension, index);
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
break;
}
}
}
destination = (Destination) node.getRegionDestination();
callDispatchMatched = true;
break;
@ -283,14 +263,17 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
registerRemoveSync(context, node);
}
// Protect extension update against parallel updates.
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(0, currentExtension - 1);
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
break;
if (usePrefetchExtension && getPrefetchSize() != 0 && ack.isInTransaction()) {
// allow transaction batch to exceed prefetch
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(currentExtension, currentExtension + 1);
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
break;
}
}
}
acknowledge(context, ack, node);
destination = (Destination) node.getRegionDestination();
callDispatchMatched = true;
@ -313,7 +296,8 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
nodeDest.getDestinationStatistics().getInflight().decrement();
}
if (ack.getLastMessageId().equals(node.getMessageId())) {
if (usePrefetchExtension) {
if (usePrefetchExtension && getPrefetchSize() != 0) {
// allow batch to exceed prefetch
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(currentExtension, index + 1);
@ -425,6 +409,19 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
context.getTransaction().addSynchronization(
new Synchronization() {
@Override
public void beforeEnd() {
if (usePrefetchExtension && getPrefetchSize() != 0) {
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(0, currentExtension - 1);
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
break;
}
}
}
}
@Override
public void afterCommit()
throws Exception {
@ -516,7 +513,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
*/
@Override
public boolean isFull() {
return dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize();
return getPrefetchSize() == 0 ? prefetchExtension.get() == 0 : dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize();
}
/**
@ -537,7 +534,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
@Override
public int countBeforeFull() {
return info.getPrefetchSize() + prefetchExtension.get() - dispatched.size();
return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize() + prefetchExtension.get() - dispatched.size();
}
@Override
@ -696,13 +693,12 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
okForAckAsDispatchDone.countDown();
// No reentrant lock - Patch needed to IndirectMessageReference on method lock
MessageDispatch md = createMessageDispatch(node, message);
// NULL messages don't count... they don't get Acked.
if (node != QueueMessageReference.NULL_MESSAGE) {
dispatchCounter++;
dispatched.add(node);
} else {
}
if (getPrefetchSize() == 0) {
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(0, currentExtension - 1);

View File

@ -173,7 +173,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
private long consumerFailoverRedeliveryWaitPeriod = 0;
private boolean checkForDuplicates = true;
private ClientInternalExceptionListener clientInternalExceptionListener;
private boolean messagePrioritySupported = true;
private boolean messagePrioritySupported = false;
private boolean transactedIndividualAck = false;
private boolean nonBlockingRedelivery = false;
private int maxThreadPoolSize = ActiveMQConnection.DEFAULT_THREAD_POOL_SIZE;

View File

@ -174,7 +174,7 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport {
}
private void doTestManyMessageConsumer(boolean transacted) throws Exception {
Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("Msg1"));
@ -221,12 +221,11 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport {
session.commit();
}
// Now using other consumer
// this call should return the next message (Msg5) still left on the queue
// this call should return the next message still left on the queue
answer = (TextMessage)consumer.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg6");
// read one more message without commit
// Now using other consumer
// this call should return the next message (Msg5) still left on the queue
// this call should return the next message still left on the queue
answer = (TextMessage)consumer.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg7");
if (transacted) {
@ -247,12 +246,17 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport {
doTestManyMessageConsumerWithSend(true);
}
public void testManyMessageConsumerWithTxSendPrioritySupport() throws Exception {
((ActiveMQConnection)connection).setMessagePrioritySupported(true);
doTestManyMessageConsumerWithSend(true);
}
public void testManyMessageConsumerWithSendNoTransaction() throws Exception {
doTestManyMessageConsumerWithSend(false);
}
private void doTestManyMessageConsumerWithSend(boolean transacted) throws Exception {
Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED :Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("Msg1"));