[AMQ-6824] - fix up prefetchExtension growth on transaction completion and delivered ack and tie in boolean usePrefetchExtension

(cherry picked from commit 41a100766c)
This commit is contained in:
gtully 2017-09-26 11:30:18 +01:00 committed by Timothy Bish
parent eccbd75bcf
commit 26788f5fd7
8 changed files with 291 additions and 120 deletions

View File

@ -330,4 +330,37 @@ public abstract class AbstractSubscription implements Subscription {
public AtomicInteger getPrefetchExtension() {
return this.prefetchExtension;
}
protected void contractPrefetchExtension(int amount) {
if (isUsePrefetchExtension() && getPrefetchSize() != 0) {
decrementPrefetchExtension(amount);
}
}
protected void expandPrefetchExtension(int amount) {
if (isUsePrefetchExtension() && getPrefetchSize() != 0) {
incrementPrefetchExtension(amount);
}
}
protected void decrementPrefetchExtension(int amount) {
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(0, currentExtension - amount);
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
break;
}
}
}
private void incrementPrefetchExtension(int amount) {
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(currentExtension, currentExtension + amount);
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
break;
}
}
}
}

View File

@ -226,6 +226,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
getSubscriptionStatistics().getDequeues().increment();
((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
removeList.add(node);
contractPrefetchExtension(1);
} else {
registerRemoveSync(context, node);
}
@ -258,28 +259,18 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
dispatched.remove(node);
getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
contractPrefetchExtension(1);
} else {
registerRemoveSync(context, node);
expandPrefetchExtension(1);
}
if (isUsePrefetchExtension() && getPrefetchSize() != 0 && ack.isInTransaction()) {
// allow transaction batch to exceed prefetch
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(currentExtension, currentExtension + 1);
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
break;
}
}
}
acknowledge(context, ack, node);
destination = (Destination) node.getRegionDestination();
callDispatchMatched = true;
break;
}
}
}else if (ack.isDeliveredAck()) {
} else if (ack.isDeliveredAck()) {
// Message was delivered but not acknowledged: update pre-fetch
// counters.
int index = 0;
@ -287,16 +278,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
final MessageReference node = iter.next();
Destination nodeDest = (Destination) node.getRegionDestination();
if (ack.getLastMessageId().equals(node.getMessageId())) {
if (isUsePrefetchExtension() && getPrefetchSize() != 0) {
// allow batch to exceed prefetch
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(currentExtension, index + 1);
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
break;
}
}
}
expandPrefetchExtension(ack.getMessageCount());
destination = nodeDest;
callDispatchMatched = true;
break;
@ -327,17 +309,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
nodeDest.getDestinationStatistics().getInflight().decrement();
if (ack.getLastMessageId().equals(messageId)) {
if (isUsePrefetchExtension() && getPrefetchSize() != 0) {
// allow batch to exceed prefetch
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(currentExtension, index + 1);
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
break;
}
}
}
contractPrefetchExtension(1);
destination = (Destination) node.getRegionDestination();
callDispatchMatched = true;
break;
@ -399,13 +371,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
index++;
acknowledge(context, ack, node);
if (ack.getLastMessageId().equals(messageId)) {
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(0, currentExtension - (index + 1));
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
break;
}
}
contractPrefetchExtension(1);
destination = nodeDest;
callDispatchMatched = true;
break;
@ -441,38 +407,24 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
context.getTransaction().addSynchronization(
new Synchronization() {
@Override
public void beforeEnd() {
if (isUsePrefetchExtension() && getPrefetchSize() != 0) {
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(0, currentExtension - 1);
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
break;
}
}
}
}
@Override
public void afterCommit()
throws Exception {
Destination nodeDest = (Destination) node.getRegionDestination();
synchronized(dispatchLock) {
synchronized (dispatchLock) {
getSubscriptionStatistics().getDequeues().increment();
dispatched.remove(node);
getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
nodeDest.getDestinationStatistics().getInflight().decrement();
}
contractPrefetchExtension(1);
nodeDest.wakeup();
dispatchPending();
}
@Override
public void afterRollback() throws Exception {
synchronized(dispatchLock) {
// poisionAck will decrement - otherwise still inflight on client
}
contractPrefetchExtension(1);
}
});
}

View File

@ -1681,6 +1681,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
if (!added || browser.atMax()) {
browser.decrementQueueRef();
browserDispatches.remove(browserDispatch);
} else {
wakeup();
}
} catch (Exception e) {
LOG.warn("exception on dispatch to browser: {}", browserDispatch.getBrowser(), e);

View File

@ -281,24 +281,18 @@ public class TopicSubscription extends AbstractSubscription {
throw new JMSException("Poison ack cannot be transacted: " + ack);
}
updateStatsOnAck(context, ack);
if (getPrefetchSize() != 0) {
decrementPrefetchExtension(ack.getMessageCount());
}
contractPrefetchExtension(ack.getMessageCount());
} else if (ack.isIndividualAck()) {
updateStatsOnAck(context, ack);
if (getPrefetchSize() != 0 && ack.isInTransaction()) {
incrementPrefetchExtension(ack.getMessageCount());
if (ack.isInTransaction()) {
expandPrefetchExtension(1);
}
} else if (ack.isExpiredAck()) {
updateStatsOnAck(ack);
if (getPrefetchSize() != 0) {
incrementPrefetchExtension(ack.getMessageCount());
}
contractPrefetchExtension(ack.getMessageCount());
} else if (ack.isDeliveredAck()) {
// Message was delivered but not acknowledged: update pre-fetch counters.
if (getPrefetchSize() != 0) {
incrementPrefetchExtension(ack.getMessageCount());
}
expandPrefetchExtension(ack.getMessageCount());
} else if (ack.isRedeliveredAck()) {
// No processing for redelivered needed
return;
@ -314,14 +308,13 @@ public class TopicSubscription extends AbstractSubscription {
context.getTransaction().addSynchronization(new Synchronization() {
@Override
public void beforeEnd() {
if (getPrefetchSize() != 0) {
decrementPrefetchExtension(ack.getMessageCount());
}
public void afterRollback() {
contractPrefetchExtension(ack.getMessageCount());
}
@Override
public void afterCommit() throws Exception {
contractPrefetchExtension(ack.getMessageCount());
updateStatsOnAck(ack);
dispatchMatched();
}
@ -417,31 +410,11 @@ public class TopicSubscription extends AbstractSubscription {
if (ack.isExpiredAck()) {
destination.getDestinationStatistics().getExpired().add(ack.getMessageCount());
}
if (!ack.isInTransaction()) {
contractPrefetchExtension(1);
}
}
}
private void incrementPrefetchExtension(int amount) {
if (!isUsePrefetchExtension()) {
return;
}
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(0, currentExtension + amount);
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
break;
}
}
}
private void decrementPrefetchExtension(int amount) {
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(0, currentExtension - amount);
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
break;
}
}
}
@Override

View File

@ -28,6 +28,7 @@ import java.io.StringReader;
import java.net.SocketTimeoutException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@ -46,10 +47,15 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.ObjectName;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.AbstractSubscription;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.util.Wait;
@ -2263,6 +2269,91 @@ public class StompTest extends StompTestSupport {
assertEquals(bigBody, sframe.getBody());
}
@Test(timeout = 60000)
public void testAckInTransactionTopic() throws Exception {
doTestAckInTransaction(true);
}
@Test(timeout = 60000)
public void testAckInTransactionQueue() throws Exception {
doTestAckInTransaction(false);
}
public void doTestAckInTransaction(boolean topic) throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
stompConnection.receive();
String destination = (topic ? "/topic" : "/queue") + "/test";
stompConnection.subscribe(destination, Stomp.Headers.Subscribe.AckModeValues.CLIENT);
for (int j = 0; j < 5; j++) {
for (int i = 0; i < 10; i++) {
stompConnection.send(destination , "message" + i);
}
stompConnection.begin("tx"+j);
for (int i = 0; i < 10; i++) {
StompFrame message = stompConnection.receive();
stompConnection.ack(message, "tx"+j);
}
stompConnection.commit("tx"+j);
}
List<Subscription> subs = getDestinationConsumers(brokerService,
ActiveMQDestination.createDestination("test", topic ? ActiveMQDestination.TOPIC_TYPE : ActiveMQDestination.QUEUE_TYPE));
for (Subscription subscription : subs) {
final AbstractSubscription abstractSubscription = (AbstractSubscription) subscription;
assertTrue("prefetchExtension should be back to Zero after commit", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("ext: " + abstractSubscription.getPrefetchExtension().get());
return abstractSubscription.getPrefetchExtension().get() == 0;
}
}));
}
}
public static List<Subscription> getDestinationConsumers(BrokerService broker, ActiveMQDestination destination) {
List<Subscription> result = null;
org.apache.activemq.broker.region.Destination dest = getDestination(broker, destination);
if (dest != null) {
result = dest.getConsumers();
}
return result;
}
public static org.apache.activemq.broker.region.Destination getDestination(BrokerService target, ActiveMQDestination destination) {
org.apache.activemq.broker.region.Destination result = null;
for (org.apache.activemq.broker.region.Destination dest : getDestinationMap(target, destination).values()) {
if (dest.getName().equals(destination.getPhysicalName())) {
result = dest;
break;
}
}
return result;
}
private static Map<ActiveMQDestination, org.apache.activemq.broker.region.Destination> getDestinationMap(BrokerService target,
ActiveMQDestination destination) {
RegionBroker regionBroker = (RegionBroker) target.getRegionBroker();
if (destination.isTemporary()) {
return destination.isQueue() ? regionBroker.getTempQueueRegion().getDestinationMap() :
regionBroker.getTempTopicRegion().getDestinationMap();
}
return destination.isQueue() ?
regionBroker.getQueueRegion().getDestinationMap() :
regionBroker.getTopicRegion().getDestinationMap();
}
protected SamplePojo createObjectFromJson(String data) throws Exception {
HierarchicalStreamReader in = new JettisonMappedXmlDriver().createReader(new StringReader(data));
return createObject(in);

View File

@ -19,6 +19,7 @@ package org.apache.activemq;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@ -42,11 +43,16 @@ import javax.management.ObjectName;
import junit.framework.Test;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.broker.region.QueueSubscription;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.activemq.TestSupport.getDestinationConsumers;
/**
* Test cases used to test the JMS message consumer.
*/
@ -223,6 +229,83 @@ public class JMSConsumerTest extends JmsTestSupport {
message.acknowledge();
}
public void testReceiveTopicWithPrefetch1() throws Exception {
// Set prefetch to 1
connection.getPrefetchPolicy().setAll(1);
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
destination = createDestination(session, Byte.valueOf(ActiveMQDestination.TOPIC_TYPE));
MessageConsumer consumer = session.createConsumer(destination);
// Send the messages
sendMessages(session, destination, 4);
// Make sure 4 messages were delivered.
Message message = null;
for (int i = 0; i < 4; i++) {
message = consumer.receive(1000);
assertNotNull(message);
}
final List<Subscription> subscriptions = getDestinationConsumers(broker, destination);
assertTrue("prefetch extension back to 0",
subscriptions.stream().
filter(s -> s instanceof TopicSubscription).
mapToInt(s -> ((TopicSubscription)s).getPrefetchExtension().get()).
allMatch(e -> e == 4));
assertNull(consumer.receiveNoWait());
message.acknowledge();
assertTrue("prefetch extension back to 0",
subscriptions.stream().
filter(s -> s instanceof TopicSubscription).
mapToInt(s -> ((TopicSubscription)s).getPrefetchExtension().get()).
allMatch(e -> e == 0));
}
public void testReceiveQueueWithPrefetch1() throws Exception {
// Set prefetch to 1
connection.getPrefetchPolicy().setAll(1);
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
destination = createDestination(session, Byte.valueOf(ActiveMQDestination.QUEUE_TYPE));
MessageConsumer consumer = session.createConsumer(destination);
// Send the messages
sendMessages(session, destination, 4);
// Make sure 4 messages were delivered.
Message message = null;
for (int i = 0; i < 4; i++) {
message = consumer.receive(1000);
assertNotNull(message);
}
final List<Subscription> subscriptions = getDestinationConsumers(broker, destination);
assertTrue("prefetch extension..",
subscriptions.stream().
filter(s -> s instanceof QueueSubscription).
mapToInt(s -> ((QueueSubscription)s).getPrefetchExtension().get()).
allMatch(e -> e == 4));
assertNull(consumer.receiveNoWait());
message.acknowledge();
assertTrue("prefetch extension back to 0",
subscriptions.stream().
filter(s -> s instanceof QueueSubscription).
mapToInt(s -> ((QueueSubscription)s).getPrefetchExtension().get()).
allMatch(e -> e == 0));
}
public void initCombosForTestDurableConsumerSelectorChange() {
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});

View File

@ -51,4 +51,10 @@ public class JMSXAConsumerTest extends JMSConsumerTest {
// needs client ack, xa is auto ack if no transaction
public void testExceptionOnClientAckAfterConsumerClose() throws Exception {
}
public void testReceiveTopicWithPrefetch1() throws Exception {
}
public void testReceiveQueueWithPrefetch1() throws Exception {
}
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.usecases;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationStatistics;
@ -98,16 +99,38 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
verifyDestinationDlq(destination, numMessagesToSend, view);
}
public void testExpiredMessages_onTopic_withPrefetchExtension() throws Exception {
final ActiveMQDestination destination = new ActiveMQTopic("test");
final int numMessagesToSend = 10000;
public void testClientAckInflight_onTopic_withPrefetchExtension() throws Exception {
usePrefetchExtension = true;
doTestClientAckInflight_onTopic_checkPrefetchExtension();
}
public void testClientAckInflight_onTopic_withOutPrefetchExtension() throws Exception {
usePrefetchExtension = false;
doTestClientAckInflight_onTopic_checkPrefetchExtension();
}
public void doTestClientAckInflight_onTopic_checkPrefetchExtension() throws Exception {
final ActiveMQDestination destination = new ActiveMQTopic("test");
buildBroker(destination);
verifyMessageExpirationOnDestination(destination, numMessagesToSend);
// We don't check the DLQ because non-persistent messages on topics are discarded instead.
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
"failover://"+brokerUri);
ActiveMQPrefetchPolicy prefetchTwo = new ActiveMQPrefetchPolicy();
prefetchTwo.setAll(6);
factory.setPrefetchPolicy(prefetchTwo);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination);
produce(10, destination);
Message m = null;
for (int i=0; i<5; i++) {
m = consumer.receive(4000);
}
assertNotNull(m);
final List<Subscription> subscriptions = getDestinationConsumers(broker, destination);
@ -115,29 +138,37 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
subscriptions.stream().
filter(s -> s instanceof TopicSubscription).
mapToInt(s -> ((TopicSubscription)s).getPrefetchExtension().get()).
allMatch(e -> e > 0));
}
allMatch(e -> usePrefetchExtension ? e > 1 : e == 0));
public void testExpiredMessages_onTopic_withoutPrefetchExtension() throws Exception {
final ActiveMQDestination destination = new ActiveMQTopic("test");
final int numMessagesToSend = 10000;
m.acknowledge();
usePrefetchExtension = false;
buildBroker(destination);
verifyMessageExpirationOnDestination(destination, numMessagesToSend);
// We don't check the DLQ because non-persistent messages on topics are discarded instead.
final List<Subscription> subscriptions = getDestinationConsumers(broker, destination);
assertTrue("prefetch extension was incremented",
assertTrue("prefetch extension was not incremented",
subscriptions.stream().
filter(s -> s instanceof TopicSubscription).
mapToInt(s -> ((TopicSubscription)s).getPrefetchExtension().get()).
allMatch(e -> e == 0));
}
private void produce(int num, ActiveMQDestination destination) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
"failover://"+brokerUri);
Connection connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
int i = 0;
while (i++ < num) {
Message message = useTextMessage ? session
.createTextMessage("test") : session
.createObjectMessage("test");
producer.send(message);
}
connection.close();
}
private void buildBroker(ActiveMQDestination destination) throws Exception {
broker = createBroker(deleteAllMessages, usePrefetchExtension, 100, destination);
brokerUri = broker.getTransportConnectors().get(0).getPublishableConnectString();