This commit is contained in:
Clebert Suconic 2021-04-12 08:29:59 -04:00
commit f580ecb56f
2 changed files with 115 additions and 91 deletions

View File

@ -70,7 +70,7 @@ public class AMQConsumer {
private int prefetchSize;
private final AtomicInteger currentWindow;
private int deliveredAcks;
private int deliveredAcksCreditExtension = 0;
private long messagePullSequence = 0;
private final AtomicReference<MessagePullHandler> messagePullHandler = new AtomicReference<>(null);
//internal means we don't expose
@ -90,7 +90,6 @@ public class AMQConsumer {
this.scheduledPool = scheduledPool;
this.prefetchSize = info.getPrefetchSize();
this.currentWindow = new AtomicInteger(prefetchSize);
this.deliveredAcks = 0;
if (prefetchSize == 0) {
messagePullHandler.set(new MessagePullHandler());
}
@ -295,6 +294,28 @@ public class AMQConsumer {
*/
public void acknowledge(MessageAck ack) throws Exception {
if (ack.isRedeliveredAck()) {
// we don't mind if the client thinks it is a redelivery
return;
}
final int ackMessageCount = ack.getMessageCount();
acquireCredit(ackMessageCount);
if (ack.isDeliveredAck()) {
deliveredAcksCreditExtension += ackMessageCount;
// our work is done
return;
}
// some sort of real ack, rebalance deliveredAcksCreditExtension
if (deliveredAcksCreditExtension > 0) {
deliveredAcksCreditExtension -= ackMessageCount;
if (deliveredAcksCreditExtension >= 0) {
currentWindow.addAndGet(-ackMessageCount);
}
}
final MessageId startID, lastID;
if (ack.getFirstMessageId() == null) {
@ -309,59 +330,42 @@ public class AMQConsumer {
if (serverConsumer.getQueue().isNonDestructive()) {
removeReferences = false;
}
if (ack.isRedeliveredAck() || ack.isDeliveredAck() || ack.isExpiredAck()) {
removeReferences = false;
}
List<MessageReference> ackList = serverConsumer.scanDeliveringReferences(removeReferences, reference -> startID.equals(reference.getProtocolData()), reference -> lastID.equals(reference.getProtocolData()));
final List<MessageReference> ackList = serverConsumer.scanDeliveringReferences(removeReferences, reference -> startID.equals(reference.getProtocolData()), reference -> lastID.equals(reference.getProtocolData()));
if (removeReferences && (ack.isIndividualAck() || ack.isStandardAck() || ack.isPoisonAck())) {
if (deliveredAcks < ackList.size()) {
acquireCredit(ackList.size() - deliveredAcks);
deliveredAcks = 0;
} else {
deliveredAcks -= ackList.size();
}
} else {
if (ack.isDeliveredAck()) {
this.deliveredAcks += ack.getMessageCount();
}
acquireCredit(ack.getMessageCount());
}
if (removeReferences) {
Transaction originalTX = session.getCoreSession().getCurrentTransaction();
Transaction transaction;
if (originalTX == null) {
transaction = session.getCoreSession().newTransaction();
} else {
transaction = originalTX;
}
if (ack.isIndividualAck() || ack.isStandardAck()) {
if (!ackList.isEmpty()) {
if (ack.isExpiredAck()) {
for (MessageReference ref : ackList) {
ref.acknowledge(transaction, serverConsumer);
ref.getQueue().expire(ref, serverConsumer);
}
} else if (ack.isPoisonAck()) {
for (MessageReference ref : ackList) {
Throwable poisonCause = ack.getPoisonCause();
if (poisonCause != null) {
ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, new SimpleString(poisonCause.toString()));
} else if (removeReferences) {
Transaction originalTX = session.getCoreSession().getCurrentTransaction();
Transaction transaction;
if (originalTX == null) {
transaction = session.getCoreSession().newTransaction();
} else {
transaction = originalTX;
}
if (ack.isIndividualAck() || ack.isStandardAck()) {
for (MessageReference ref : ackList) {
ref.acknowledge(transaction, serverConsumer);
}
} else if (ack.isPoisonAck()) {
for (MessageReference ref : ackList) {
Throwable poisonCause = ack.getPoisonCause();
if (poisonCause != null) {
ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, new SimpleString(poisonCause.toString()));
}
ref.getQueue().sendToDeadLetterAddress(transaction, ref);
}
ref.getQueue().sendToDeadLetterAddress(transaction, ref);
}
}
if (originalTX == null) {
transaction.commit(true);
}
}
if (ack.isExpiredAck()) {
for (MessageReference ref : ackList) {
ref.getQueue().expire(ref, serverConsumer);
if (originalTX == null) {
transaction.commit(true);
}
}
}
}

View File

@ -18,7 +18,6 @@ package org.apache.activemq.artemis.tests.integration.openwire;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Before;
import org.junit.Test;
@ -30,6 +29,8 @@ import javax.jms.TemporaryTopic;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.config.Configuration;
public class AdvisoryOpenWireTest extends BasicOpenWireTest {
@Override
@ -44,12 +45,17 @@ public class AdvisoryOpenWireTest extends BasicOpenWireTest {
super.setUp();
}
@Override
protected void extraServerConfig(Configuration serverConfig) {
// ensure advisory addresses are visible
serverConfig.getAcceptorConfigurations().iterator().next().getExtraParams().put("suppressInternalManagementObjects", "false");
super.extraServerConfig(serverConfig);
}
@Test
public void testTempTopicLeak() throws Exception {
Connection connection = null;
try {
connection = factory.createConnection();
try (Connection connection = factory.createConnection()) {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -57,32 +63,32 @@ public class AdvisoryOpenWireTest extends BasicOpenWireTest {
TemporaryTopic temporaryTopic = session.createTemporaryTopic();
temporaryTopic.delete();
Object[] queueResources = server.getManagementService().getResources(QueueControl.class);
AddressControl advisoryAddress = assertNonNullAddressControl("ActiveMQ.Advisory.TempTopic");
Wait.waitFor(() -> advisoryAddress.getMessageCount() == 0);
for (Object queueResource : queueResources) {
Wait.assertEquals(0, advisoryAddress::getMessageCount);
Wait.assertEquals(2, advisoryAddress::getRoutedMessageCount);
if (((QueueControl) queueResource).getAddress().equals("ActiveMQ.Advisory.TempTopic")) {
QueueControl queueControl = (QueueControl) queueResource;
Wait.waitFor(() -> queueControl.getMessageCount() == 0);
assertNotNull("addressControl for temp advisory", queueControl);
}
}
Wait.assertEquals(0, queueControl::getMessageCount);
Wait.assertEquals(2, queueControl::getMessagesAdded);
}
}
} finally {
if (connection != null) {
connection.close();
private AddressControl assertNonNullAddressControl(String match) {
AddressControl advisoryAddressControl = null;
Object[] addressResources = server.getManagementService().getResources(AddressControl.class);
for (Object addressResource : addressResources) {
if (((AddressControl) addressResource).getAddress().equals(match)) {
advisoryAddressControl = (AddressControl) addressResource;
}
}
assertNotNull("addressControl for temp advisory", advisoryAddressControl);
return advisoryAddressControl;
}
@Test
public void testTempQueueLeak() throws Exception {
Connection connection = null;
try {
connection = factory.createConnection();
try (Connection connection = factory.createConnection()) {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -90,23 +96,12 @@ public class AdvisoryOpenWireTest extends BasicOpenWireTest {
TemporaryQueue temporaryQueue = session.createTemporaryQueue();
temporaryQueue.delete();
Object[] queueResources = server.getManagementService().getResources(QueueControl.class);
AddressControl advisoryAddress = assertNonNullAddressControl("ActiveMQ.Advisory.TempQueue");
Wait.waitFor(() -> advisoryAddress.getMessageCount() == 0);
for (Object queueResource : queueResources) {
Wait.assertEquals(0, advisoryAddress::getMessageCount);
Wait.assertEquals(2, advisoryAddress::getRoutedMessageCount);
if (((QueueControl) queueResource).getAddress().equals("ActiveMQ.Advisory.TempQueue")) {
QueueControl queueControl = (QueueControl) queueResource;
Wait.waitFor(() -> queueControl.getMessageCount() == 0);
assertNotNull("addressControl for temp advisory", queueControl);
Wait.assertEquals(0, queueControl::getMessageCount);
Wait.assertEquals(2, queueControl::getMessagesAdded);
}
}
} finally {
if (connection != null) {
connection.close();
}
}
}
@ -127,20 +122,45 @@ public class AdvisoryOpenWireTest extends BasicOpenWireTest {
temporaryQueue.delete();
}
Object[] addressResources = server.getManagementService().getResources(AddressControl.class);
AddressControl advisoryAddress = assertNonNullAddressControl("ActiveMQ.Advisory.TempQueue");
for (Object addressResource : addressResources) {
Wait.waitFor(() -> advisoryAddress.getMessageCount() == 0);
Wait.assertEquals(0, advisoryAddress::getMessageCount);
if (((AddressControl) addressResource).getAddress().equals("ActiveMQ.Advisory.TempQueue")) {
AddressControl addressControl = (AddressControl) addressResource;
Wait.waitFor(() -> addressControl.getMessageCount() == 0);
assertNotNull("addressControl for temp advisory", addressControl);
Wait.assertEquals(0, addressControl::getMessageCount);
} finally {
for (Connection conn : connections) {
if (conn != null) {
conn.close();
}
}
}
}
@Test
public void testLongLivedConnectionGetsAllPastPrefetch() throws Exception {
final Connection[] connections = new Connection[2];
final int numTempDestinations = 600; // such that 2x exceeds default 1k prefetch for advisory consumer
try {
for (int i = 0; i < connections.length; i++) {
connections[i] = factory.createConnection();
connections[i].start();
}
Session session = connections[0].createSession(false, Session.AUTO_ACKNOWLEDGE);
for (int i = 0; i < numTempDestinations; i++) {
TemporaryQueue temporaryQueue = session.createTemporaryQueue();
temporaryQueue.delete();
}
AddressControl advisoryAddress = assertNonNullAddressControl("ActiveMQ.Advisory.TempQueue");
Wait.waitFor(() -> advisoryAddress.getMessageCount() == 0);
Wait.assertEquals(0, advisoryAddress::getMessageCount);
// there is an advisory for create and another for delete
assertEquals("all routed", numTempDestinations * 2, advisoryAddress.getRoutedMessageCount());
//sleep a bit to allow message count to go down.
} finally {
for (Connection conn : connections) {
if (conn != null) {