diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index 3ff1b84995..adcb72e4e3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -129,6 +129,10 @@ public interface Queue extends Bindable,CriticalComponent { void addConsumer(Consumer consumer) throws Exception; + void addLingerSession(String sessionId); + + void removeLingerSession(String sessionId); + void removeConsumer(Consumer consumer); int getConsumerCount(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java index 210fe89792..f940ec254a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java @@ -429,6 +429,10 @@ public interface ServerSession extends SecurityAuth { List getInTXMessagesForConsumer(long consumerId); + List getInTxLingerMessages(); + + void addLingerConsumer(ServerConsumer consumer); + String getValidatedUser(); SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception; @@ -490,4 +494,6 @@ public interface ServerSession extends SecurityAuth { int getProducerCount(); int getDefaultConsumerWindowSize(SimpleString address); + + String toManagementString(); } \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 04ca5d4573..a9eab4fa00 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -80,6 +80,7 @@ import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler; import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor; import org.apache.activemq.artemis.core.server.management.ManagementService; @@ -101,6 +102,7 @@ import org.apache.activemq.artemis.utils.Env; import org.apache.activemq.artemis.utils.ReferenceCounter; import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; +import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.apache.activemq.artemis.utils.collections.PriorityLinkedList; import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl; @@ -321,6 +323,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { */ private final Object directDeliveryGuard = new Object(); + private final ConcurrentHashSet lingerSessionIds = new ConcurrentHashSet<>(); + public String debug() { StringWriter str = new StringWriter(); PrintWriter out = new PrintWriter(str); @@ -1260,6 +1264,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } + @Override + public void addLingerSession(String sessionId) { + lingerSessionIds.add(sessionId); + } + + @Override + public void removeLingerSession(String sessionId) { + lingerSessionIds.remove(sessionId); + } + @Override public void removeConsumer(final Consumer consumer) { @@ -1585,6 +1599,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { mapReturn.put(holder.consumer.toManagementString(), msgs); } } + + for (String lingerSessionId : lingerSessionIds) { + ServerSession serverSession = server.getSessionByID(lingerSessionId); + List refs = serverSession == null ? null : serverSession.getInTxLingerMessages(); + if (refs != null && !refs.isEmpty()) { + mapReturn.put(serverSession.toManagementString(), refs); + } + } + return mapReturn; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java index f0b6d340bd..d3cd425701 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java @@ -52,6 +52,8 @@ public class RefsOperation extends TransactionOperationAbstract { */ protected boolean ignoreRedeliveryCheck = false; + private String lingerSessionId = null; + public RefsOperation(Queue queue, AckReason reason, StorageManager storageManager) { this.queue = queue; this.reason = reason; @@ -97,6 +99,8 @@ public class RefsOperation extends TransactionOperationAbstract { List ackedRefs = new ArrayList<>(); for (MessageReference ref : refsToAck) { + clearLingerRef(ref); + ref.emptyConsumerID(); if (logger.isTraceEnabled()) { @@ -175,8 +179,10 @@ public class RefsOperation extends TransactionOperationAbstract { @Override public void afterCommit(final Transaction tx) { for (MessageReference ref : refsToAck) { + clearLingerRef(ref); + synchronized (ref.getQueue()) { - queue.postAcknowledge(ref, reason); + ref.getQueue().postAcknowledge(ref, reason); } } @@ -190,6 +196,12 @@ public class RefsOperation extends TransactionOperationAbstract { } } + private void clearLingerRef(MessageReference ref) { + if (!ref.hasConsumerId() && lingerSessionId != null) { + ref.getQueue().removeLingerSession(lingerSessionId); + } + } + private void decrementRefCount(MessageReference refmsg) { try { refmsg.getMessage().decrementRefCount(); @@ -228,4 +240,18 @@ public class RefsOperation extends TransactionOperationAbstract { return refsToAck; } + public synchronized List getLingerMessages() { + List list = new LinkedList<>(); + for (MessageReference ref : refsToAck) { + if (!ref.hasConsumerId() && lingerSessionId != null) { + list.add(ref); + } + } + + return list; + } + + public void setLingerSession(String lingerSessionId) { + this.lingerSessionId = lingerSessionId; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index ddba797cef..c668f6ed9a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -18,8 +18,8 @@ package org.apache.activemq.artemis.core.server.impl; import java.math.BigDecimal; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; @@ -571,6 +571,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { tx.rollback(); + addLingerRefs(); + if (!browseOnly) { TypedProperties props = new TypedProperties(); @@ -607,6 +609,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } } + private void addLingerRefs() throws Exception { + if (!browseOnly) { + List lingerRefs = session.getInTXMessagesForConsumer(this.id); + if (lingerRefs != null && !lingerRefs.isEmpty()) { + session.addLingerConsumer(this); + } + } + } + @Override public void removeItself() throws Exception { if (browseOnly) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 7c26028176..08f2d4e507 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -2113,6 +2113,41 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } } + @Override + public List getInTxLingerMessages() { + Transaction transaction = tx; + if (transaction == null && callback != null) { + transaction = callback.getCurrentTransaction(); + } + RefsOperation operation = transaction == null ? null : (RefsOperation) transaction.getProperty(TransactionPropertyIndexes.REFS_OPERATION); + + return operation == null ? null : operation.getLingerMessages(); + } + + @Override + public void addLingerConsumer(ServerConsumer consumer) { + Transaction transaction = tx; + if (transaction == null && callback != null) { + transaction = callback.getCurrentTransaction(); + } + if (transaction != null) { + synchronized (transaction) { + // Transaction might be committed/rolledback, we need to synchronize and judge state + if (transaction.getState() != State.COMMITTED && transaction.getState() != State.ROLLEDBACK) { + RefsOperation operation = (RefsOperation) transaction.getProperty(TransactionPropertyIndexes.REFS_OPERATION); + List refs = operation == null ? null : operation.getListOnConsumer(consumer.getID()); + if (refs != null && !refs.isEmpty()) { + for (MessageReference ref : refs) { + ref.emptyConsumerID(); + } + operation.setLingerSession(name); + consumer.getQueue().addLingerSession(name); + } + } + } + } + } + @Override public SimpleString removePrefix(SimpleString address) { if (prefixEnabled && address != null) { @@ -2183,4 +2218,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener { AddressSettings as = server.getAddressSettingsRepository().getMatch(address.toString()); return as.getDefaultConsumerWindowSize(); } + + @Override + public String toManagementString() { + return "ServerSession [id=" + getConnectionID() + ":" + getName() + "]"; + } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 5646905b4d..e266ef5bdd 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -1010,6 +1010,16 @@ public class ScheduledDeliveryHandlerTest extends Assert { } + @Override + public void addLingerSession(String sessionId) { + + } + + @Override + public void removeLingerSession(String sessionId) { + + } + @Override public void removeConsumer(Consumer consumer) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ReceiveTest.java index 40b8333e70..cca8b1046d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ReceiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ReceiveTest.java @@ -41,8 +41,12 @@ public class ReceiveTest extends ActiveMQTestBase { SimpleString addressA; + SimpleString addressB; + SimpleString queueA; + SimpleString queueB; + private ServerLocator locator; private ActiveMQServer server; @@ -54,6 +58,8 @@ public class ReceiveTest extends ActiveMQTestBase { addressA = RandomUtil.randomSimpleString(); queueA = RandomUtil.randomSimpleString(); + addressB = RandomUtil.randomSimpleString(); + queueB = RandomUtil.randomSimpleString(); locator = createInVMNonHALocator(); server = createServer(false); @@ -162,4 +168,37 @@ public class ReceiveTest extends ActiveMQTestBase { session.close(); sendSession.close(); } + + @Test + public void testMultiConsumersOnSession() throws Exception { + ClientSessionFactory cf = createSessionFactory(locator.setCallTimeout(10000000)); + ClientSession sendSession = cf.createSession(false, true, true); + ClientProducer cp1 = sendSession.createProducer(addressA); + ClientProducer cp2 = sendSession.createProducer(addressB); + + ClientSession session = cf.createSession(false, true, false); + session.createQueue(addressA, queueA, false); + session.createQueue(addressB, queueB, false); + + ClientConsumer cc1 = session.createConsumer(queueA); + ClientConsumer cc2 = session.createConsumer(queueB); + session.start(); + + cp1.send(sendSession.createMessage(false)); + cp2.send(sendSession.createMessage(false)); + Assert.assertNotNull(cc1.receive().acknowledge()); + Assert.assertNotNull(cc2.receive().acknowledge()); + session.commit(); + + final Queue queue1 = server.locateQueue(queueA); + final Queue queue2 = server.locateQueue(queueB); + + Wait.assertTrue(() -> queue1.getMessageCount() == 0, 500, 100); + Wait.assertTrue(() -> queue1.getMessagesAcknowledged() == 1, 500, 100); + Wait.assertTrue(() -> queue2.getMessageCount() == 0, 500, 100); + Wait.assertTrue(() -> queue2.getMessagesAcknowledged() == 1, 500, 100); + + session.close(); + sendSession.close(); + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java index 15b01b872a..32e52cc1d3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java @@ -638,6 +638,43 @@ public class QueueControlTest extends ManagementTestBase { session.deleteQueue(queue); } + @Test + public void testListDeliveringMessagesOnClosedConsumer() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + int intValue = RandomUtil.randomInt(); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); + + Queue srvqueue = server.locateQueue(queue); + + QueueControl queueControl = createManagementControl(address, queue); + + ClientProducer producer = session.createProducer(address); + ClientMessage message = session.createMessage(durable); + message.putIntProperty(new SimpleString("key"), intValue); + producer.send(message); + producer.send(session.createMessage(durable)); + + ClientConsumer consumer = session.createConsumer(queue); + session.start(); + ClientMessage msgRec = consumer.receive(5000); + assertNotNull(msgRec); + assertEquals(msgRec.getIntProperty("key").intValue(), intValue); + assertEquals(1, srvqueue.getDeliveringCount()); + assertEquals(1, queueControl.listDeliveringMessages().size()); + + msgRec.acknowledge(); + consumer.close(); + assertEquals(1, srvqueue.getDeliveringCount()); + + System.out.println(queueControl.listDeliveringMessagesAsJSON()); + + Map[]> deliveringMap = queueControl.listDeliveringMessages(); + assertEquals(1, deliveringMap.size()); + + session.deleteQueue(queue); + } + @Test public void testListScheduledMessages() throws Exception { long delay = 2000; diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index 4cf5346f6f..7b7890fc39 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -354,6 +354,16 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { } + @Override + public void addLingerSession(String sessionId) { + + } + + @Override + public void removeLingerSession(String sessionId) { + + } + @Override public void addRedistributor(final long delay) { // no-op