ARTEMIS-2902 - expose at queue control messages held in a prepared tx

https://issues.apache.org/jira/browse/ARTEMIS-2902
This commit is contained in:
Andy Taylor 2020-09-15 12:01:23 +01:00 committed by Clebert Suconic
parent 1dd901a8e7
commit c29a8cda5c
5 changed files with 191 additions and 0 deletions

View File

@ -2414,6 +2414,14 @@ public interface AuditLogger extends BasicLogger {
@Message(id = 601513, value = "User {0} is getting file property on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void getFile(String user, Object source, Object... args);
static void getPreparedTransactionMessageCount(Object source) {
LOGGER.getPreparedTransactionMessageCount(getCaller(), source);
}
@LogMessage(level = Logger.Level.INFO)
@Message(id = 601514, value = "User {0} is getting preparedTransactionMessageCount property on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void getPreparedTransactionMessageCount(String user, Object source, Object... args);
/*
* This logger is for message production and consumption and is on the hot path so enabled independently
*

View File

@ -718,4 +718,10 @@ public interface QueueControl {
*/
@Attribute(desc = "Get the header key to notify a consumer of a group change")
String getGroupFirstKey();
/**
* Will return the number of messages stuck in prepared transactions
*/
@Attribute(desc = "return how many messages are stuck in prepared transactions")
int getPreparedTransactionMessageCount();
}

View File

@ -22,6 +22,7 @@ import javax.json.JsonObjectBuilder;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanOperationInfo;
import javax.management.openmbean.CompositeData;
import javax.transaction.xa.Xid;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
@ -50,8 +51,12 @@ import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperation;
import org.apache.activemq.artemis.selector.filter.Filterable;
import org.apache.activemq.artemis.logs.AuditLogger;
import org.apache.activemq.artemis.utils.JsonLoader;
@ -1854,6 +1859,48 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
}
}
@Override
public int getPreparedTransactionMessageCount() {
if (AuditLogger.isEnabled()) {
AuditLogger.getPreparedTransactionMessageCount(queue);
}
checkStarted();
clearIO();
try {
int count = 0;
ResourceManager resourceManager = server.getResourceManager();
if (resourceManager != null) {
List<Xid> preparedTransactions = resourceManager.getPreparedTransactions();
for (Xid preparedTransaction : preparedTransactions) {
Transaction transaction = resourceManager.getTransaction(preparedTransaction);
if (transaction != null) {
List<TransactionOperation> allOperations = transaction.getAllOperations();
for (TransactionOperation operation : allOperations) {
if (operation instanceof RefsOperation) {
RefsOperation refsOperation = (RefsOperation) operation;
List<MessageReference> references = refsOperation.getReferencesToAcknowledge();
for (MessageReference reference : references) {
if (reference != null && reference.getQueue().getName().equals(queue.getName())) {
count++;
}
}
}
}
}
}
}
return count;
} finally {
blockOnIO();
}
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------

View File

@ -23,6 +23,7 @@ import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.TabularDataSupport;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.Arrays;
@ -109,6 +110,126 @@ public class QueueControlTest extends ManagementTestBase {
this.durable = durable;
}
@Test
public void testGetPreparedTransactionMessageCount() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(durable));
ClientProducer producer = session.createProducer(address);
for (int i = 0; i < 10; i++) {
producer.send(session.createMessage(true));
}
producer.close();
ClientSession xaSession = locator.createSessionFactory().createXASession();
ClientConsumer consumer = xaSession.createConsumer(queue);
Xid xid = newXID();
xaSession.start(xid, XAResource.TMNOFLAGS);
xaSession.start();
for (int i = 0; i < 10; i++) {
ClientMessage receive = consumer.receive();
receive.acknowledge();
}
xaSession.end(xid, XAResource.TMSUCCESS);
xaSession.prepare(xid);
QueueControl queueControl = createManagementControl(address, queue);
int count = queueControl.getPreparedTransactionMessageCount();
Assert.assertEquals(10, count);
consumer.close();
session.deleteQueue(queue);
}
@Test
public void testGetPreparedTransactionMessageCountDifferentQueues() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString address2 = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
SimpleString queue2 = RandomUtil.randomSimpleString();
session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(durable));
session.createQueue(new QueueConfiguration(queue2).setAddress(address2).setDurable(durable));
ClientProducer producer = session.createProducer(address);
ClientProducer producer2 = session.createProducer(address2);
for (int i = 0; i < 10; i++) {
producer.send(session.createMessage(true));
producer2.send(session.createMessage(true));
}
producer.close();
producer2.close();
ClientSession xaSession = locator.createSessionFactory().createXASession();
ClientConsumer consumer = xaSession.createConsumer(queue);
ClientConsumer consumer2 = xaSession.createConsumer(queue2);
Xid xid = newXID();
xaSession.start(xid, XAResource.TMNOFLAGS);
xaSession.start();
for (int i = 0; i < 10; i++) {
ClientMessage receive = consumer.receive();
receive.acknowledge();
receive = consumer2.receive();
receive.acknowledge();
}
xaSession.end(xid, XAResource.TMSUCCESS);
xaSession.prepare(xid);
QueueControl queueControl = createManagementControl(address, queue);
int count = queueControl.getPreparedTransactionMessageCount();
Assert.assertEquals(10, count);
consumer.close();
consumer2.close();
session.deleteQueue(queue);
session.deleteQueue(queue2);
}
@Test
public void testGetPreparedTransactionMessageCountNoTX() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(durable));
QueueControl queueControl = createManagementControl(address, queue);
int count = queueControl.getPreparedTransactionMessageCount();
Assert.assertEquals(0, count);
session.deleteQueue(queue);
}
@Test
public void testAttributes() throws Exception {

View File

@ -49,6 +49,15 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
}
}
@Override
public int getPreparedTransactionMessageCount() {
try {
return (Integer) proxy.invokeOperation("getPreparedTransactionMessageCount");
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
@Override
public void resetAllGroups() {
try {