ARTEMIS-3234 - fix and test, the existing tests suffered with suppressInternalManagementObjects defaulting to true. credit accounting is now independent of the ack list such that preack for advisories can work
This commit is contained in:
parent
9679843161
commit
fa80c03049
|
@ -70,7 +70,7 @@ public class AMQConsumer {
|
||||||
|
|
||||||
private int prefetchSize;
|
private int prefetchSize;
|
||||||
private final AtomicInteger currentWindow;
|
private final AtomicInteger currentWindow;
|
||||||
private int deliveredAcks;
|
private int deliveredAcksCreditExtension = 0;
|
||||||
private long messagePullSequence = 0;
|
private long messagePullSequence = 0;
|
||||||
private final AtomicReference<MessagePullHandler> messagePullHandler = new AtomicReference<>(null);
|
private final AtomicReference<MessagePullHandler> messagePullHandler = new AtomicReference<>(null);
|
||||||
//internal means we don't expose
|
//internal means we don't expose
|
||||||
|
@ -90,7 +90,6 @@ public class AMQConsumer {
|
||||||
this.scheduledPool = scheduledPool;
|
this.scheduledPool = scheduledPool;
|
||||||
this.prefetchSize = info.getPrefetchSize();
|
this.prefetchSize = info.getPrefetchSize();
|
||||||
this.currentWindow = new AtomicInteger(prefetchSize);
|
this.currentWindow = new AtomicInteger(prefetchSize);
|
||||||
this.deliveredAcks = 0;
|
|
||||||
if (prefetchSize == 0) {
|
if (prefetchSize == 0) {
|
||||||
messagePullHandler.set(new MessagePullHandler());
|
messagePullHandler.set(new MessagePullHandler());
|
||||||
}
|
}
|
||||||
|
@ -295,6 +294,28 @@ public class AMQConsumer {
|
||||||
*/
|
*/
|
||||||
public void acknowledge(MessageAck ack) throws Exception {
|
public void acknowledge(MessageAck ack) throws Exception {
|
||||||
|
|
||||||
|
if (ack.isRedeliveredAck()) {
|
||||||
|
// we don't mind if the client thinks it is a redelivery
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final int ackMessageCount = ack.getMessageCount();
|
||||||
|
acquireCredit(ackMessageCount);
|
||||||
|
|
||||||
|
if (ack.isDeliveredAck()) {
|
||||||
|
deliveredAcksCreditExtension += ackMessageCount;
|
||||||
|
// our work is done
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// some sort of real ack, rebalance deliveredAcksCreditExtension
|
||||||
|
if (deliveredAcksCreditExtension > 0) {
|
||||||
|
deliveredAcksCreditExtension -= ackMessageCount;
|
||||||
|
if (deliveredAcksCreditExtension >= 0) {
|
||||||
|
currentWindow.addAndGet(-ackMessageCount);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
final MessageId startID, lastID;
|
final MessageId startID, lastID;
|
||||||
|
|
||||||
if (ack.getFirstMessageId() == null) {
|
if (ack.getFirstMessageId() == null) {
|
||||||
|
@ -309,59 +330,42 @@ public class AMQConsumer {
|
||||||
if (serverConsumer.getQueue().isNonDestructive()) {
|
if (serverConsumer.getQueue().isNonDestructive()) {
|
||||||
removeReferences = false;
|
removeReferences = false;
|
||||||
}
|
}
|
||||||
if (ack.isRedeliveredAck() || ack.isDeliveredAck() || ack.isExpiredAck()) {
|
|
||||||
removeReferences = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
List<MessageReference> ackList = serverConsumer.scanDeliveringReferences(removeReferences, reference -> startID.equals(reference.getProtocolData()), reference -> lastID.equals(reference.getProtocolData()));
|
final List<MessageReference> ackList = serverConsumer.scanDeliveringReferences(removeReferences, reference -> startID.equals(reference.getProtocolData()), reference -> lastID.equals(reference.getProtocolData()));
|
||||||
|
|
||||||
if (removeReferences && (ack.isIndividualAck() || ack.isStandardAck() || ack.isPoisonAck())) {
|
if (!ackList.isEmpty()) {
|
||||||
if (deliveredAcks < ackList.size()) {
|
if (ack.isExpiredAck()) {
|
||||||
acquireCredit(ackList.size() - deliveredAcks);
|
|
||||||
deliveredAcks = 0;
|
|
||||||
} else {
|
|
||||||
deliveredAcks -= ackList.size();
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (ack.isDeliveredAck()) {
|
|
||||||
this.deliveredAcks += ack.getMessageCount();
|
|
||||||
}
|
|
||||||
|
|
||||||
acquireCredit(ack.getMessageCount());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (removeReferences) {
|
|
||||||
|
|
||||||
Transaction originalTX = session.getCoreSession().getCurrentTransaction();
|
|
||||||
Transaction transaction;
|
|
||||||
|
|
||||||
if (originalTX == null) {
|
|
||||||
transaction = session.getCoreSession().newTransaction();
|
|
||||||
} else {
|
|
||||||
transaction = originalTX;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ack.isIndividualAck() || ack.isStandardAck()) {
|
|
||||||
for (MessageReference ref : ackList) {
|
for (MessageReference ref : ackList) {
|
||||||
ref.acknowledge(transaction, serverConsumer);
|
ref.getQueue().expire(ref, serverConsumer);
|
||||||
}
|
}
|
||||||
} else if (ack.isPoisonAck()) {
|
} else if (removeReferences) {
|
||||||
for (MessageReference ref : ackList) {
|
|
||||||
Throwable poisonCause = ack.getPoisonCause();
|
Transaction originalTX = session.getCoreSession().getCurrentTransaction();
|
||||||
if (poisonCause != null) {
|
Transaction transaction;
|
||||||
ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, new SimpleString(poisonCause.toString()));
|
|
||||||
|
if (originalTX == null) {
|
||||||
|
transaction = session.getCoreSession().newTransaction();
|
||||||
|
} else {
|
||||||
|
transaction = originalTX;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ack.isIndividualAck() || ack.isStandardAck()) {
|
||||||
|
for (MessageReference ref : ackList) {
|
||||||
|
ref.acknowledge(transaction, serverConsumer);
|
||||||
|
}
|
||||||
|
} else if (ack.isPoisonAck()) {
|
||||||
|
for (MessageReference ref : ackList) {
|
||||||
|
Throwable poisonCause = ack.getPoisonCause();
|
||||||
|
if (poisonCause != null) {
|
||||||
|
ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, new SimpleString(poisonCause.toString()));
|
||||||
|
}
|
||||||
|
ref.getQueue().sendToDeadLetterAddress(transaction, ref);
|
||||||
}
|
}
|
||||||
ref.getQueue().sendToDeadLetterAddress(transaction, ref);
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if (originalTX == null) {
|
if (originalTX == null) {
|
||||||
transaction.commit(true);
|
transaction.commit(true);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
if (ack.isExpiredAck()) {
|
|
||||||
for (MessageReference ref : ackList) {
|
|
||||||
ref.getQueue().expire(ref, serverConsumer);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,6 @@ package org.apache.activemq.artemis.tests.integration.openwire;
|
||||||
|
|
||||||
import org.apache.activemq.advisory.AdvisorySupport;
|
import org.apache.activemq.advisory.AdvisorySupport;
|
||||||
import org.apache.activemq.artemis.api.core.management.AddressControl;
|
import org.apache.activemq.artemis.api.core.management.AddressControl;
|
||||||
import org.apache.activemq.artemis.api.core.management.QueueControl;
|
|
||||||
import org.apache.activemq.artemis.tests.util.Wait;
|
import org.apache.activemq.artemis.tests.util.Wait;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -30,6 +29,8 @@ import javax.jms.TemporaryTopic;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.core.config.Configuration;
|
||||||
|
|
||||||
public class AdvisoryOpenWireTest extends BasicOpenWireTest {
|
public class AdvisoryOpenWireTest extends BasicOpenWireTest {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -44,12 +45,17 @@ public class AdvisoryOpenWireTest extends BasicOpenWireTest {
|
||||||
super.setUp();
|
super.setUp();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void extraServerConfig(Configuration serverConfig) {
|
||||||
|
// ensure advisory addresses are visible
|
||||||
|
serverConfig.getAcceptorConfigurations().iterator().next().getExtraParams().put("suppressInternalManagementObjects", "false");
|
||||||
|
super.extraServerConfig(serverConfig);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testTempTopicLeak() throws Exception {
|
public void testTempTopicLeak() throws Exception {
|
||||||
Connection connection = null;
|
|
||||||
|
|
||||||
try {
|
try (Connection connection = factory.createConnection()) {
|
||||||
connection = factory.createConnection();
|
|
||||||
connection.start();
|
connection.start();
|
||||||
|
|
||||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
@ -57,32 +63,32 @@ public class AdvisoryOpenWireTest extends BasicOpenWireTest {
|
||||||
TemporaryTopic temporaryTopic = session.createTemporaryTopic();
|
TemporaryTopic temporaryTopic = session.createTemporaryTopic();
|
||||||
temporaryTopic.delete();
|
temporaryTopic.delete();
|
||||||
|
|
||||||
Object[] queueResources = server.getManagementService().getResources(QueueControl.class);
|
AddressControl advisoryAddress = assertNonNullAddressControl("ActiveMQ.Advisory.TempTopic");
|
||||||
|
Wait.waitFor(() -> advisoryAddress.getMessageCount() == 0);
|
||||||
|
|
||||||
for (Object queueResource : queueResources) {
|
Wait.assertEquals(0, advisoryAddress::getMessageCount);
|
||||||
|
Wait.assertEquals(2, advisoryAddress::getRoutedMessageCount);
|
||||||
|
|
||||||
if (((QueueControl) queueResource).getAddress().equals("ActiveMQ.Advisory.TempTopic")) {
|
}
|
||||||
QueueControl queueControl = (QueueControl) queueResource;
|
}
|
||||||
Wait.waitFor(() -> queueControl.getMessageCount() == 0);
|
|
||||||
assertNotNull("addressControl for temp advisory", queueControl);
|
|
||||||
|
|
||||||
Wait.assertEquals(0, queueControl::getMessageCount);
|
private AddressControl assertNonNullAddressControl(String match) {
|
||||||
Wait.assertEquals(2, queueControl::getMessagesAdded);
|
AddressControl advisoryAddressControl = null;
|
||||||
}
|
Object[] addressResources = server.getManagementService().getResources(AddressControl.class);
|
||||||
}
|
|
||||||
} finally {
|
for (Object addressResource : addressResources) {
|
||||||
if (connection != null) {
|
if (((AddressControl) addressResource).getAddress().equals(match)) {
|
||||||
connection.close();
|
advisoryAddressControl = (AddressControl) addressResource;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
assertNotNull("addressControl for temp advisory", advisoryAddressControl);
|
||||||
|
return advisoryAddressControl;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testTempQueueLeak() throws Exception {
|
public void testTempQueueLeak() throws Exception {
|
||||||
Connection connection = null;
|
|
||||||
|
|
||||||
try {
|
try (Connection connection = factory.createConnection()) {
|
||||||
connection = factory.createConnection();
|
|
||||||
connection.start();
|
connection.start();
|
||||||
|
|
||||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
@ -90,23 +96,12 @@ public class AdvisoryOpenWireTest extends BasicOpenWireTest {
|
||||||
TemporaryQueue temporaryQueue = session.createTemporaryQueue();
|
TemporaryQueue temporaryQueue = session.createTemporaryQueue();
|
||||||
temporaryQueue.delete();
|
temporaryQueue.delete();
|
||||||
|
|
||||||
Object[] queueResources = server.getManagementService().getResources(QueueControl.class);
|
AddressControl advisoryAddress = assertNonNullAddressControl("ActiveMQ.Advisory.TempQueue");
|
||||||
|
Wait.waitFor(() -> advisoryAddress.getMessageCount() == 0);
|
||||||
|
|
||||||
for (Object queueResource : queueResources) {
|
Wait.assertEquals(0, advisoryAddress::getMessageCount);
|
||||||
|
Wait.assertEquals(2, advisoryAddress::getRoutedMessageCount);
|
||||||
|
|
||||||
if (((QueueControl) queueResource).getAddress().equals("ActiveMQ.Advisory.TempQueue")) {
|
|
||||||
QueueControl queueControl = (QueueControl) queueResource;
|
|
||||||
Wait.waitFor(() -> queueControl.getMessageCount() == 0);
|
|
||||||
assertNotNull("addressControl for temp advisory", queueControl);
|
|
||||||
Wait.assertEquals(0, queueControl::getMessageCount);
|
|
||||||
Wait.assertEquals(2, queueControl::getMessagesAdded);
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
if (connection != null) {
|
|
||||||
connection.close();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -127,20 +122,45 @@ public class AdvisoryOpenWireTest extends BasicOpenWireTest {
|
||||||
temporaryQueue.delete();
|
temporaryQueue.delete();
|
||||||
}
|
}
|
||||||
|
|
||||||
Object[] addressResources = server.getManagementService().getResources(AddressControl.class);
|
AddressControl advisoryAddress = assertNonNullAddressControl("ActiveMQ.Advisory.TempQueue");
|
||||||
|
|
||||||
for (Object addressResource : addressResources) {
|
Wait.waitFor(() -> advisoryAddress.getMessageCount() == 0);
|
||||||
|
Wait.assertEquals(0, advisoryAddress::getMessageCount);
|
||||||
|
|
||||||
if (((AddressControl) addressResource).getAddress().equals("ActiveMQ.Advisory.TempQueue")) {
|
} finally {
|
||||||
AddressControl addressControl = (AddressControl) addressResource;
|
for (Connection conn : connections) {
|
||||||
Wait.waitFor(() -> addressControl.getMessageCount() == 0);
|
if (conn != null) {
|
||||||
assertNotNull("addressControl for temp advisory", addressControl);
|
conn.close();
|
||||||
Wait.assertEquals(0, addressControl::getMessageCount);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLongLivedConnectionGetsAllPastPrefetch() throws Exception {
|
||||||
|
final Connection[] connections = new Connection[2];
|
||||||
|
|
||||||
|
final int numTempDestinations = 600; // such that 2x exceeds default 1k prefetch for advisory consumer
|
||||||
|
try {
|
||||||
|
for (int i = 0; i < connections.length; i++) {
|
||||||
|
connections[i] = factory.createConnection();
|
||||||
|
connections[i].start();
|
||||||
|
}
|
||||||
|
|
||||||
|
Session session = connections[0].createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
||||||
|
for (int i = 0; i < numTempDestinations; i++) {
|
||||||
|
TemporaryQueue temporaryQueue = session.createTemporaryQueue();
|
||||||
|
temporaryQueue.delete();
|
||||||
|
}
|
||||||
|
|
||||||
|
AddressControl advisoryAddress = assertNonNullAddressControl("ActiveMQ.Advisory.TempQueue");
|
||||||
|
Wait.waitFor(() -> advisoryAddress.getMessageCount() == 0);
|
||||||
|
Wait.assertEquals(0, advisoryAddress::getMessageCount);
|
||||||
|
|
||||||
|
// there is an advisory for create and another for delete
|
||||||
|
assertEquals("all routed", numTempDestinations * 2, advisoryAddress.getRoutedMessageCount());
|
||||||
|
|
||||||
//sleep a bit to allow message count to go down.
|
|
||||||
} finally {
|
} finally {
|
||||||
for (Connection conn : connections) {
|
for (Connection conn : connections) {
|
||||||
if (conn != null) {
|
if (conn != null) {
|
||||||
|
|
Loading…
Reference in New Issue