ARTEMIS-2380 Fix delivering message in the case of consume close
This commit is contained in:
parent
8dd1501971
commit
4a61d2dc76
|
@ -129,6 +129,10 @@ public interface Queue extends Bindable,CriticalComponent {
|
||||||
|
|
||||||
void addConsumer(Consumer consumer) throws Exception;
|
void addConsumer(Consumer consumer) throws Exception;
|
||||||
|
|
||||||
|
void addLingerSession(String sessionId);
|
||||||
|
|
||||||
|
void removeLingerSession(String sessionId);
|
||||||
|
|
||||||
void removeConsumer(Consumer consumer);
|
void removeConsumer(Consumer consumer);
|
||||||
|
|
||||||
int getConsumerCount();
|
int getConsumerCount();
|
||||||
|
|
|
@ -429,6 +429,10 @@ public interface ServerSession extends SecurityAuth {
|
||||||
|
|
||||||
List<MessageReference> getInTXMessagesForConsumer(long consumerId);
|
List<MessageReference> getInTXMessagesForConsumer(long consumerId);
|
||||||
|
|
||||||
|
List<MessageReference> getInTxLingerMessages();
|
||||||
|
|
||||||
|
void addLingerConsumer(ServerConsumer consumer);
|
||||||
|
|
||||||
String getValidatedUser();
|
String getValidatedUser();
|
||||||
|
|
||||||
SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception;
|
SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception;
|
||||||
|
@ -490,4 +494,6 @@ public interface ServerSession extends SecurityAuth {
|
||||||
int getProducerCount();
|
int getProducerCount();
|
||||||
|
|
||||||
int getDefaultConsumerWindowSize(SimpleString address);
|
int getDefaultConsumerWindowSize(SimpleString address);
|
||||||
|
|
||||||
|
String toManagementString();
|
||||||
}
|
}
|
|
@ -80,6 +80,7 @@ import org.apache.activemq.artemis.core.server.QueueFactory;
|
||||||
import org.apache.activemq.artemis.core.server.RoutingContext;
|
import org.apache.activemq.artemis.core.server.RoutingContext;
|
||||||
import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler;
|
import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler;
|
||||||
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
||||||
|
import org.apache.activemq.artemis.core.server.ServerSession;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
|
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor;
|
import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor;
|
||||||
import org.apache.activemq.artemis.core.server.management.ManagementService;
|
import org.apache.activemq.artemis.core.server.management.ManagementService;
|
||||||
|
@ -101,6 +102,7 @@ import org.apache.activemq.artemis.utils.Env;
|
||||||
import org.apache.activemq.artemis.utils.ReferenceCounter;
|
import org.apache.activemq.artemis.utils.ReferenceCounter;
|
||||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||||
|
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
|
||||||
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
|
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
|
||||||
import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
|
import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
|
||||||
import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl;
|
import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl;
|
||||||
|
@ -321,6 +323,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
*/
|
*/
|
||||||
private final Object directDeliveryGuard = new Object();
|
private final Object directDeliveryGuard = new Object();
|
||||||
|
|
||||||
|
private final ConcurrentHashSet<String> lingerSessionIds = new ConcurrentHashSet<>();
|
||||||
|
|
||||||
public String debug() {
|
public String debug() {
|
||||||
StringWriter str = new StringWriter();
|
StringWriter str = new StringWriter();
|
||||||
PrintWriter out = new PrintWriter(str);
|
PrintWriter out = new PrintWriter(str);
|
||||||
|
@ -1260,6 +1264,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addLingerSession(String sessionId) {
|
||||||
|
lingerSessionIds.add(sessionId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeLingerSession(String sessionId) {
|
||||||
|
lingerSessionIds.remove(sessionId);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void removeConsumer(final Consumer consumer) {
|
public void removeConsumer(final Consumer consumer) {
|
||||||
|
|
||||||
|
@ -1585,6 +1599,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
mapReturn.put(holder.consumer.toManagementString(), msgs);
|
mapReturn.put(holder.consumer.toManagementString(), msgs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (String lingerSessionId : lingerSessionIds) {
|
||||||
|
ServerSession serverSession = server.getSessionByID(lingerSessionId);
|
||||||
|
List<MessageReference> refs = serverSession == null ? null : serverSession.getInTxLingerMessages();
|
||||||
|
if (refs != null && !refs.isEmpty()) {
|
||||||
|
mapReturn.put(serverSession.toManagementString(), refs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return mapReturn;
|
return mapReturn;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -52,6 +52,8 @@ public class RefsOperation extends TransactionOperationAbstract {
|
||||||
*/
|
*/
|
||||||
protected boolean ignoreRedeliveryCheck = false;
|
protected boolean ignoreRedeliveryCheck = false;
|
||||||
|
|
||||||
|
private String lingerSessionId = null;
|
||||||
|
|
||||||
public RefsOperation(Queue queue, AckReason reason, StorageManager storageManager) {
|
public RefsOperation(Queue queue, AckReason reason, StorageManager storageManager) {
|
||||||
this.queue = queue;
|
this.queue = queue;
|
||||||
this.reason = reason;
|
this.reason = reason;
|
||||||
|
@ -97,6 +99,8 @@ public class RefsOperation extends TransactionOperationAbstract {
|
||||||
List<MessageReference> ackedRefs = new ArrayList<>();
|
List<MessageReference> ackedRefs = new ArrayList<>();
|
||||||
|
|
||||||
for (MessageReference ref : refsToAck) {
|
for (MessageReference ref : refsToAck) {
|
||||||
|
clearLingerRef(ref);
|
||||||
|
|
||||||
ref.emptyConsumerID();
|
ref.emptyConsumerID();
|
||||||
|
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
|
@ -175,8 +179,10 @@ public class RefsOperation extends TransactionOperationAbstract {
|
||||||
@Override
|
@Override
|
||||||
public void afterCommit(final Transaction tx) {
|
public void afterCommit(final Transaction tx) {
|
||||||
for (MessageReference ref : refsToAck) {
|
for (MessageReference ref : refsToAck) {
|
||||||
|
clearLingerRef(ref);
|
||||||
|
|
||||||
synchronized (ref.getQueue()) {
|
synchronized (ref.getQueue()) {
|
||||||
queue.postAcknowledge(ref, reason);
|
ref.getQueue().postAcknowledge(ref, reason);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -190,6 +196,12 @@ public class RefsOperation extends TransactionOperationAbstract {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void clearLingerRef(MessageReference ref) {
|
||||||
|
if (!ref.hasConsumerId() && lingerSessionId != null) {
|
||||||
|
ref.getQueue().removeLingerSession(lingerSessionId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void decrementRefCount(MessageReference refmsg) {
|
private void decrementRefCount(MessageReference refmsg) {
|
||||||
try {
|
try {
|
||||||
refmsg.getMessage().decrementRefCount();
|
refmsg.getMessage().decrementRefCount();
|
||||||
|
@ -228,4 +240,18 @@ public class RefsOperation extends TransactionOperationAbstract {
|
||||||
return refsToAck;
|
return refsToAck;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public synchronized List<MessageReference> getLingerMessages() {
|
||||||
|
List<MessageReference> list = new LinkedList<>();
|
||||||
|
for (MessageReference ref : refsToAck) {
|
||||||
|
if (!ref.hasConsumerId() && lingerSessionId != null) {
|
||||||
|
list.add(ref);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return list;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLingerSession(String lingerSessionId) {
|
||||||
|
this.lingerSessionId = lingerSessionId;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,8 +18,8 @@ package org.apache.activemq.artemis.core.server.impl;
|
||||||
|
|
||||||
import java.math.BigDecimal;
|
import java.math.BigDecimal;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.ArrayDeque;
|
import java.util.ArrayDeque;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
@ -571,6 +571,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
||||||
|
|
||||||
tx.rollback();
|
tx.rollback();
|
||||||
|
|
||||||
|
addLingerRefs();
|
||||||
|
|
||||||
if (!browseOnly) {
|
if (!browseOnly) {
|
||||||
TypedProperties props = new TypedProperties();
|
TypedProperties props = new TypedProperties();
|
||||||
|
|
||||||
|
@ -607,6 +609,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void addLingerRefs() throws Exception {
|
||||||
|
if (!browseOnly) {
|
||||||
|
List<MessageReference> lingerRefs = session.getInTXMessagesForConsumer(this.id);
|
||||||
|
if (lingerRefs != null && !lingerRefs.isEmpty()) {
|
||||||
|
session.addLingerConsumer(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void removeItself() throws Exception {
|
public void removeItself() throws Exception {
|
||||||
if (browseOnly) {
|
if (browseOnly) {
|
||||||
|
|
|
@ -2113,6 +2113,41 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<MessageReference> getInTxLingerMessages() {
|
||||||
|
Transaction transaction = tx;
|
||||||
|
if (transaction == null && callback != null) {
|
||||||
|
transaction = callback.getCurrentTransaction();
|
||||||
|
}
|
||||||
|
RefsOperation operation = transaction == null ? null : (RefsOperation) transaction.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
|
||||||
|
|
||||||
|
return operation == null ? null : operation.getLingerMessages();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addLingerConsumer(ServerConsumer consumer) {
|
||||||
|
Transaction transaction = tx;
|
||||||
|
if (transaction == null && callback != null) {
|
||||||
|
transaction = callback.getCurrentTransaction();
|
||||||
|
}
|
||||||
|
if (transaction != null) {
|
||||||
|
synchronized (transaction) {
|
||||||
|
// Transaction might be committed/rolledback, we need to synchronize and judge state
|
||||||
|
if (transaction.getState() != State.COMMITTED && transaction.getState() != State.ROLLEDBACK) {
|
||||||
|
RefsOperation operation = (RefsOperation) transaction.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
|
||||||
|
List<MessageReference> refs = operation == null ? null : operation.getListOnConsumer(consumer.getID());
|
||||||
|
if (refs != null && !refs.isEmpty()) {
|
||||||
|
for (MessageReference ref : refs) {
|
||||||
|
ref.emptyConsumerID();
|
||||||
|
}
|
||||||
|
operation.setLingerSession(name);
|
||||||
|
consumer.getQueue().addLingerSession(name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SimpleString removePrefix(SimpleString address) {
|
public SimpleString removePrefix(SimpleString address) {
|
||||||
if (prefixEnabled && address != null) {
|
if (prefixEnabled && address != null) {
|
||||||
|
@ -2183,4 +2218,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||||
AddressSettings as = server.getAddressSettingsRepository().getMatch(address.toString());
|
AddressSettings as = server.getAddressSettingsRepository().getMatch(address.toString());
|
||||||
return as.getDefaultConsumerWindowSize();
|
return as.getDefaultConsumerWindowSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toManagementString() {
|
||||||
|
return "ServerSession [id=" + getConnectionID() + ":" + getName() + "]";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1010,6 +1010,16 @@ public class ScheduledDeliveryHandlerTest extends Assert {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addLingerSession(String sessionId) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeLingerSession(String sessionId) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void removeConsumer(Consumer consumer) {
|
public void removeConsumer(Consumer consumer) {
|
||||||
|
|
||||||
|
|
|
@ -41,8 +41,12 @@ public class ReceiveTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
SimpleString addressA;
|
SimpleString addressA;
|
||||||
|
|
||||||
|
SimpleString addressB;
|
||||||
|
|
||||||
SimpleString queueA;
|
SimpleString queueA;
|
||||||
|
|
||||||
|
SimpleString queueB;
|
||||||
|
|
||||||
private ServerLocator locator;
|
private ServerLocator locator;
|
||||||
|
|
||||||
private ActiveMQServer server;
|
private ActiveMQServer server;
|
||||||
|
@ -54,6 +58,8 @@ public class ReceiveTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
addressA = RandomUtil.randomSimpleString();
|
addressA = RandomUtil.randomSimpleString();
|
||||||
queueA = RandomUtil.randomSimpleString();
|
queueA = RandomUtil.randomSimpleString();
|
||||||
|
addressB = RandomUtil.randomSimpleString();
|
||||||
|
queueB = RandomUtil.randomSimpleString();
|
||||||
|
|
||||||
locator = createInVMNonHALocator();
|
locator = createInVMNonHALocator();
|
||||||
server = createServer(false);
|
server = createServer(false);
|
||||||
|
@ -162,4 +168,37 @@ public class ReceiveTest extends ActiveMQTestBase {
|
||||||
session.close();
|
session.close();
|
||||||
sendSession.close();
|
sendSession.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultiConsumersOnSession() throws Exception {
|
||||||
|
ClientSessionFactory cf = createSessionFactory(locator.setCallTimeout(10000000));
|
||||||
|
ClientSession sendSession = cf.createSession(false, true, true);
|
||||||
|
ClientProducer cp1 = sendSession.createProducer(addressA);
|
||||||
|
ClientProducer cp2 = sendSession.createProducer(addressB);
|
||||||
|
|
||||||
|
ClientSession session = cf.createSession(false, true, false);
|
||||||
|
session.createQueue(addressA, queueA, false);
|
||||||
|
session.createQueue(addressB, queueB, false);
|
||||||
|
|
||||||
|
ClientConsumer cc1 = session.createConsumer(queueA);
|
||||||
|
ClientConsumer cc2 = session.createConsumer(queueB);
|
||||||
|
session.start();
|
||||||
|
|
||||||
|
cp1.send(sendSession.createMessage(false));
|
||||||
|
cp2.send(sendSession.createMessage(false));
|
||||||
|
Assert.assertNotNull(cc1.receive().acknowledge());
|
||||||
|
Assert.assertNotNull(cc2.receive().acknowledge());
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
final Queue queue1 = server.locateQueue(queueA);
|
||||||
|
final Queue queue2 = server.locateQueue(queueB);
|
||||||
|
|
||||||
|
Wait.assertTrue(() -> queue1.getMessageCount() == 0, 500, 100);
|
||||||
|
Wait.assertTrue(() -> queue1.getMessagesAcknowledged() == 1, 500, 100);
|
||||||
|
Wait.assertTrue(() -> queue2.getMessageCount() == 0, 500, 100);
|
||||||
|
Wait.assertTrue(() -> queue2.getMessagesAcknowledged() == 1, 500, 100);
|
||||||
|
|
||||||
|
session.close();
|
||||||
|
sendSession.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -638,6 +638,43 @@ public class QueueControlTest extends ManagementTestBase {
|
||||||
session.deleteQueue(queue);
|
session.deleteQueue(queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testListDeliveringMessagesOnClosedConsumer() throws Exception {
|
||||||
|
SimpleString address = RandomUtil.randomSimpleString();
|
||||||
|
SimpleString queue = RandomUtil.randomSimpleString();
|
||||||
|
int intValue = RandomUtil.randomInt();
|
||||||
|
session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
|
||||||
|
|
||||||
|
Queue srvqueue = server.locateQueue(queue);
|
||||||
|
|
||||||
|
QueueControl queueControl = createManagementControl(address, queue);
|
||||||
|
|
||||||
|
ClientProducer producer = session.createProducer(address);
|
||||||
|
ClientMessage message = session.createMessage(durable);
|
||||||
|
message.putIntProperty(new SimpleString("key"), intValue);
|
||||||
|
producer.send(message);
|
||||||
|
producer.send(session.createMessage(durable));
|
||||||
|
|
||||||
|
ClientConsumer consumer = session.createConsumer(queue);
|
||||||
|
session.start();
|
||||||
|
ClientMessage msgRec = consumer.receive(5000);
|
||||||
|
assertNotNull(msgRec);
|
||||||
|
assertEquals(msgRec.getIntProperty("key").intValue(), intValue);
|
||||||
|
assertEquals(1, srvqueue.getDeliveringCount());
|
||||||
|
assertEquals(1, queueControl.listDeliveringMessages().size());
|
||||||
|
|
||||||
|
msgRec.acknowledge();
|
||||||
|
consumer.close();
|
||||||
|
assertEquals(1, srvqueue.getDeliveringCount());
|
||||||
|
|
||||||
|
System.out.println(queueControl.listDeliveringMessagesAsJSON());
|
||||||
|
|
||||||
|
Map<String, Map<String, Object>[]> deliveringMap = queueControl.listDeliveringMessages();
|
||||||
|
assertEquals(1, deliveringMap.size());
|
||||||
|
|
||||||
|
session.deleteQueue(queue);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testListScheduledMessages() throws Exception {
|
public void testListScheduledMessages() throws Exception {
|
||||||
long delay = 2000;
|
long delay = 2000;
|
||||||
|
|
|
@ -354,6 +354,16 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addLingerSession(String sessionId) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeLingerSession(String sessionId) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addRedistributor(final long delay) {
|
public void addRedistributor(final long delay) {
|
||||||
// no-op
|
// no-op
|
||||||
|
|
Loading…
Reference in New Issue