This commit is contained in:
Wei Yang 2019-09-04 11:02:19 +08:00
commit 9c1cbf3dc9
10 changed files with 208 additions and 2 deletions

View File

@ -129,6 +129,10 @@ public interface Queue extends Bindable,CriticalComponent {
void addConsumer(Consumer consumer) throws Exception;
void addLingerSession(String sessionId);
void removeLingerSession(String sessionId);
void removeConsumer(Consumer consumer);
int getConsumerCount();

View File

@ -429,6 +429,10 @@ public interface ServerSession extends SecurityAuth {
List<MessageReference> getInTXMessagesForConsumer(long consumerId);
List<MessageReference> getInTxLingerMessages();
void addLingerConsumer(ServerConsumer consumer);
String getValidatedUser();
SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception;
@ -490,4 +494,6 @@ public interface ServerSession extends SecurityAuth {
int getProducerCount();
int getDefaultConsumerWindowSize(SimpleString address);
String toManagementString();
}

View File

@ -80,6 +80,7 @@ import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor;
import org.apache.activemq.artemis.core.server.management.ManagementService;
@ -101,6 +102,7 @@ import org.apache.activemq.artemis.utils.Env;
import org.apache.activemq.artemis.utils.ReferenceCounter;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl;
@ -321,6 +323,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
*/
private final Object directDeliveryGuard = new Object();
private final ConcurrentHashSet<String> lingerSessionIds = new ConcurrentHashSet<>();
public String debug() {
StringWriter str = new StringWriter();
PrintWriter out = new PrintWriter(str);
@ -1260,6 +1264,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
@Override
public void addLingerSession(String sessionId) {
lingerSessionIds.add(sessionId);
}
@Override
public void removeLingerSession(String sessionId) {
lingerSessionIds.remove(sessionId);
}
@Override
public void removeConsumer(final Consumer consumer) {
@ -1585,6 +1599,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
mapReturn.put(holder.consumer.toManagementString(), msgs);
}
}
for (String lingerSessionId : lingerSessionIds) {
ServerSession serverSession = server.getSessionByID(lingerSessionId);
List<MessageReference> refs = serverSession == null ? null : serverSession.getInTxLingerMessages();
if (refs != null && !refs.isEmpty()) {
mapReturn.put(serverSession.toManagementString(), refs);
}
}
return mapReturn;
}

View File

@ -52,6 +52,8 @@ public class RefsOperation extends TransactionOperationAbstract {
*/
protected boolean ignoreRedeliveryCheck = false;
private String lingerSessionId = null;
public RefsOperation(Queue queue, AckReason reason, StorageManager storageManager) {
this.queue = queue;
this.reason = reason;
@ -97,6 +99,8 @@ public class RefsOperation extends TransactionOperationAbstract {
List<MessageReference> ackedRefs = new ArrayList<>();
for (MessageReference ref : refsToAck) {
clearLingerRef(ref);
ref.emptyConsumerID();
if (logger.isTraceEnabled()) {
@ -175,8 +179,10 @@ public class RefsOperation extends TransactionOperationAbstract {
@Override
public void afterCommit(final Transaction tx) {
for (MessageReference ref : refsToAck) {
clearLingerRef(ref);
synchronized (ref.getQueue()) {
queue.postAcknowledge(ref, reason);
ref.getQueue().postAcknowledge(ref, reason);
}
}
@ -190,6 +196,12 @@ public class RefsOperation extends TransactionOperationAbstract {
}
}
private void clearLingerRef(MessageReference ref) {
if (!ref.hasConsumerId() && lingerSessionId != null) {
ref.getQueue().removeLingerSession(lingerSessionId);
}
}
private void decrementRefCount(MessageReference refmsg) {
try {
refmsg.getMessage().decrementRefCount();
@ -228,4 +240,18 @@ public class RefsOperation extends TransactionOperationAbstract {
return refsToAck;
}
public synchronized List<MessageReference> getLingerMessages() {
List<MessageReference> list = new LinkedList<>();
for (MessageReference ref : refsToAck) {
if (!ref.hasConsumerId() && lingerSessionId != null) {
list.add(ref);
}
}
return list;
}
public void setLingerSession(String lingerSessionId) {
this.lingerSessionId = lingerSessionId;
}
}

View File

@ -18,8 +18,8 @@ package org.apache.activemq.artemis.core.server.impl;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
@ -571,6 +571,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
tx.rollback();
addLingerRefs();
if (!browseOnly) {
TypedProperties props = new TypedProperties();
@ -607,6 +609,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
}
private void addLingerRefs() throws Exception {
if (!browseOnly) {
List<MessageReference> lingerRefs = session.getInTXMessagesForConsumer(this.id);
if (lingerRefs != null && !lingerRefs.isEmpty()) {
session.addLingerConsumer(this);
}
}
}
@Override
public void removeItself() throws Exception {
if (browseOnly) {

View File

@ -2113,6 +2113,41 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
}
@Override
public List<MessageReference> getInTxLingerMessages() {
Transaction transaction = tx;
if (transaction == null && callback != null) {
transaction = callback.getCurrentTransaction();
}
RefsOperation operation = transaction == null ? null : (RefsOperation) transaction.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
return operation == null ? null : operation.getLingerMessages();
}
@Override
public void addLingerConsumer(ServerConsumer consumer) {
Transaction transaction = tx;
if (transaction == null && callback != null) {
transaction = callback.getCurrentTransaction();
}
if (transaction != null) {
synchronized (transaction) {
// Transaction might be committed/rolledback, we need to synchronize and judge state
if (transaction.getState() != State.COMMITTED && transaction.getState() != State.ROLLEDBACK) {
RefsOperation operation = (RefsOperation) transaction.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
List<MessageReference> refs = operation == null ? null : operation.getListOnConsumer(consumer.getID());
if (refs != null && !refs.isEmpty()) {
for (MessageReference ref : refs) {
ref.emptyConsumerID();
}
operation.setLingerSession(name);
consumer.getQueue().addLingerSession(name);
}
}
}
}
}
@Override
public SimpleString removePrefix(SimpleString address) {
if (prefixEnabled && address != null) {
@ -2183,4 +2218,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
AddressSettings as = server.getAddressSettingsRepository().getMatch(address.toString());
return as.getDefaultConsumerWindowSize();
}
@Override
public String toManagementString() {
return "ServerSession [id=" + getConnectionID() + ":" + getName() + "]";
}
}

View File

@ -1010,6 +1010,16 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
public void addLingerSession(String sessionId) {
}
@Override
public void removeLingerSession(String sessionId) {
}
@Override
public void removeConsumer(Consumer consumer) {

View File

@ -41,8 +41,12 @@ public class ReceiveTest extends ActiveMQTestBase {
SimpleString addressA;
SimpleString addressB;
SimpleString queueA;
SimpleString queueB;
private ServerLocator locator;
private ActiveMQServer server;
@ -54,6 +58,8 @@ public class ReceiveTest extends ActiveMQTestBase {
addressA = RandomUtil.randomSimpleString();
queueA = RandomUtil.randomSimpleString();
addressB = RandomUtil.randomSimpleString();
queueB = RandomUtil.randomSimpleString();
locator = createInVMNonHALocator();
server = createServer(false);
@ -162,4 +168,37 @@ public class ReceiveTest extends ActiveMQTestBase {
session.close();
sendSession.close();
}
@Test
public void testMultiConsumersOnSession() throws Exception {
ClientSessionFactory cf = createSessionFactory(locator.setCallTimeout(10000000));
ClientSession sendSession = cf.createSession(false, true, true);
ClientProducer cp1 = sendSession.createProducer(addressA);
ClientProducer cp2 = sendSession.createProducer(addressB);
ClientSession session = cf.createSession(false, true, false);
session.createQueue(addressA, queueA, false);
session.createQueue(addressB, queueB, false);
ClientConsumer cc1 = session.createConsumer(queueA);
ClientConsumer cc2 = session.createConsumer(queueB);
session.start();
cp1.send(sendSession.createMessage(false));
cp2.send(sendSession.createMessage(false));
Assert.assertNotNull(cc1.receive().acknowledge());
Assert.assertNotNull(cc2.receive().acknowledge());
session.commit();
final Queue queue1 = server.locateQueue(queueA);
final Queue queue2 = server.locateQueue(queueB);
Wait.assertTrue(() -> queue1.getMessageCount() == 0, 500, 100);
Wait.assertTrue(() -> queue1.getMessagesAcknowledged() == 1, 500, 100);
Wait.assertTrue(() -> queue2.getMessageCount() == 0, 500, 100);
Wait.assertTrue(() -> queue2.getMessagesAcknowledged() == 1, 500, 100);
session.close();
sendSession.close();
}
}

View File

@ -638,6 +638,43 @@ public class QueueControlTest extends ManagementTestBase {
session.deleteQueue(queue);
}
@Test
public void testListDeliveringMessagesOnClosedConsumer() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
int intValue = RandomUtil.randomInt();
session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
Queue srvqueue = server.locateQueue(queue);
QueueControl queueControl = createManagementControl(address, queue);
ClientProducer producer = session.createProducer(address);
ClientMessage message = session.createMessage(durable);
message.putIntProperty(new SimpleString("key"), intValue);
producer.send(message);
producer.send(session.createMessage(durable));
ClientConsumer consumer = session.createConsumer(queue);
session.start();
ClientMessage msgRec = consumer.receive(5000);
assertNotNull(msgRec);
assertEquals(msgRec.getIntProperty("key").intValue(), intValue);
assertEquals(1, srvqueue.getDeliveringCount());
assertEquals(1, queueControl.listDeliveringMessages().size());
msgRec.acknowledge();
consumer.close();
assertEquals(1, srvqueue.getDeliveringCount());
System.out.println(queueControl.listDeliveringMessagesAsJSON());
Map<String, Map<String, Object>[]> deliveringMap = queueControl.listDeliveringMessages();
assertEquals(1, deliveringMap.size());
session.deleteQueue(queue);
}
@Test
public void testListScheduledMessages() throws Exception {
long delay = 2000;

View File

@ -354,6 +354,16 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
public void addLingerSession(String sessionId) {
}
@Override
public void removeLingerSession(String sessionId) {
}
@Override
public void addRedistributor(final long delay) {
// no-op