[AMQ-9344] Add configurable limit to uncommitted transaction

This commit is contained in:
Matt Pavlovich 2024-01-11 13:12:59 -06:00
parent a2bf5b8adb
commit 50baf95bff
No known key found for this signature in database
9 changed files with 287 additions and 2 deletions

View File

@ -135,7 +135,8 @@ public class BrokerService implements Service {
public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
public static final long DEFAULT_START_TIMEOUT = 600000L;
public static final int MAX_SCHEDULER_REPEAT_ALLOWED = 1000;
public static final int DEFAULT_MAX_UNCOMMITTED_COUNT = 0;
private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class);
@SuppressWarnings("unused")
@ -265,6 +266,8 @@ public class BrokerService implements Service {
private int storeOpenWireVersion = OpenWireFormat.DEFAULT_STORE_VERSION;
private final List<Runnable> preShutdownHooks = new CopyOnWriteArrayList<>();
private int maxUncommittedCount = DEFAULT_MAX_UNCOMMITTED_COUNT;
static {
try {
@ -3298,4 +3301,13 @@ public class BrokerService implements Service {
public void setMaxSchedulerRepeatAllowed(int maxSchedulerRepeatAllowed) {
this.maxSchedulerRepeatAllowed = maxSchedulerRepeatAllowed;
}
public int getMaxUncommittedCount() {
return maxUncommittedCount;
}
public void setMaxUncommittedCount(int maxUncommittedCount) {
this.maxUncommittedCount = maxUncommittedCount;
}
}

View File

@ -25,6 +25,8 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import jakarta.jms.JMSException;
import jakarta.jms.ResourceAllocationException;
import javax.transaction.xa.XAException;
import org.apache.activemq.broker.jmx.ManagedRegionBroker;
@ -146,6 +148,7 @@ public class TransactionBroker extends BrokerFilter {
final ActiveMQDestination destination;
final boolean messageSend;
int opCount = 1;
public PreparedDestinationCompletion(final TransactionBroker transactionBroker, ActiveMQDestination destination, boolean messageSend) {
this.transactionBroker = transactionBroker;
this.destination = destination;
@ -291,13 +294,39 @@ public class TransactionBroker extends BrokerFilter {
transaction = getTransaction(context, message.getTransactionId(), false);
}
context.setTransaction(transaction);
try {
// [AMQ-9344] Limit uncommitted transactions by count
verifyUncommittedCount(producerExchange, transaction, message);
next.send(producerExchange, message);
} finally {
context.setTransaction(originalTx);
}
}
protected void verifyUncommittedCount(ProducerBrokerExchange producerExchange, Transaction transaction, Message message) throws Exception {
// maxUncommittedCount <= 0 disables
int maxUncommittedCount = this.getBrokerService().getMaxUncommittedCount();
if (maxUncommittedCount > 0 && transaction.size() >= maxUncommittedCount) {
try {
// Rollback as we are throwing an error the client as throwing the error will cause
// the client to reset to a new transaction so we need to clean up
transaction.rollback();
// Send ResourceAllocationException which will translate to a JMSException
final ResourceAllocationException e = new ResourceAllocationException(
"Can not send message on transaction with id: '" + transaction.getTransactionId().toString()
+ "', Transaction has reached the maximum allowed number of pending send operations before commit of '"
+ maxUncommittedCount + "'", "42900");
LOG.warn("ConnectionId:{} exceeded maxUncommittedCount:{} for destination:{} in transactionId:{}", (producerExchange.getConnectionContext() != null ? producerExchange.getConnectionContext().getConnectionId() : "<not set>"), maxUncommittedCount, message.getDestination().getQualifiedName(), transaction.getTransactionId().toString());
throw e;
} finally {
producerExchange.getRegionDestination().getDestinationStatistics().getMaxUncommittedExceededCount().increment();
}
}
}
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
for (Iterator<Transaction> iter = context.getTransactions().values().iterator(); iter.hasNext();) {
try {

View File

@ -104,6 +104,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import jakarta.jms.ResourceAllocationException;
public class TransportConnection implements Connection, Task, CommandVisitor {
private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class);
private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport");

View File

@ -541,4 +541,19 @@ public class BrokerView implements BrokerViewMBean {
return context;
}
@Override
public int getMaxUncommittedCount() {
return brokerService.getMaxUncommittedCount();
}
@Override
public void setMaxUncommittedCount(int maxUncommittedCount) {
brokerService.setMaxUncommittedCount(maxUncommittedCount);
}
@Override
public long getTotalMaxUncommittedExceededCount() {
return safeGetBroker().getDestinationStatistics().getMaxUncommittedExceededCount().getCount();
}
}

View File

@ -327,4 +327,12 @@ public interface BrokerViewMBean extends Service {
@MBeanInfo("JMSJobScheduler")
ObjectName getJMSJobScheduler();
@MBeanInfo(value="Returns the allowed max uncommitted count per transaction")
int getMaxUncommittedCount();
@MBeanInfo(value="Temporarily set the allowed max uncommitted count per transaction")
void setMaxUncommittedCount(int maxUncommittedCount);
@MBeanInfo(value="The total number of times that the max number of uncommitted count has been exceeded across all destinations")
long getTotalMaxUncommittedExceededCount();
}

View File

@ -594,4 +594,9 @@ public class DestinationView implements DestinationViewMBean {
public boolean isSendDuplicateFromStoreToDLQ() {
return destination.isSendDuplicateFromStoreToDLQ();
}
@Override
public long getMaxUncommittedExceededCount() {
return destination.getDestinationStatistics().getMaxUncommittedExceededCount().getCount();
}
}

View File

@ -479,4 +479,6 @@ public interface DestinationViewMBean {
@MBeanInfo("Total time (ms) messages have been blocked by flow control")
long getTotalBlockedTime();
@MBeanInfo("Number of times the max uncommitted limit has been exceed for this destination")
long getMaxUncommittedExceededCount();
}

View File

@ -44,7 +44,7 @@ public class DestinationStatistics extends StatsImpl {
protected CountStatisticImpl blockedSends;
protected TimeStatisticImpl blockedTime;
protected SizeStatisticImpl messageSize;
protected CountStatisticImpl maxUncommittedExceededCount;
public DestinationStatistics() {
@ -67,6 +67,7 @@ public class DestinationStatistics extends StatsImpl {
blockedSends = new CountStatisticImpl("blockedSends", "number of messages that have to wait for flow control");
blockedTime = new TimeStatisticImpl("blockedTime","amount of time messages are blocked for flow control");
messageSize = new SizeStatisticImpl("messageSize","Size of messages passing through the destination");
maxUncommittedExceededCount = new CountStatisticImpl("maxUncommittedExceededCount", "number of times maxUncommittedCount has been exceeded");
addStatistic("enqueues", enqueues);
addStatistic("dispatched", dispatched);
addStatistic("dequeues", dequeues);
@ -81,6 +82,7 @@ public class DestinationStatistics extends StatsImpl {
addStatistic("blockedSends",blockedSends);
addStatistic("blockedTime",blockedTime);
addStatistic("messageSize",messageSize);
addStatistic("maxUncommittedExceededCount", maxUncommittedExceededCount);
}
public CountStatisticImpl getEnqueues() {
@ -145,6 +147,10 @@ public class DestinationStatistics extends StatsImpl {
return this.messageSize;
}
public CountStatisticImpl getMaxUncommittedExceededCount(){
return this.maxUncommittedExceededCount;
}
public void reset() {
if (this.isDoReset()) {
super.reset();
@ -158,6 +164,7 @@ public class DestinationStatistics extends StatsImpl {
blockedSends.reset();
blockedTime.reset();
messageSize.reset();
maxUncommittedExceededCount.reset();
}
}
@ -178,6 +185,7 @@ public class DestinationStatistics extends StatsImpl {
blockedSends.setEnabled(enabled);
blockedTime.setEnabled(enabled);
messageSize.setEnabled(enabled);
maxUncommittedExceededCount.setEnabled(enabled);
}
@ -198,6 +206,7 @@ public class DestinationStatistics extends StatsImpl {
blockedSends.setParent(parent.blockedSends);
blockedTime.setParent(parent.blockedTime);
messageSize.setParent(parent.messageSize);
maxUncommittedExceededCount.setParent(parent.maxUncommittedExceededCount);
} else {
enqueues.setParent(null);
dispatched.setParent(null);
@ -214,6 +223,7 @@ public class DestinationStatistics extends StatsImpl {
blockedSends.setParent(null);
blockedTime.setParent(null);
messageSize.setParent(null);
maxUncommittedExceededCount.setParent(null);
}
}

View File

@ -0,0 +1,202 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.lang.management.ManagementFactory;
import java.util.Arrays;
import java.util.Collection;
import javax.management.JMX;
import javax.management.MBeanServer;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import jakarta.jms.Connection;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.ResourceAllocationException;
import jakarta.jms.Session;
@RunWith(value = Parameterized.class)
public class MaxUncommittedCountExceededTest {
public static final String DEFAULT_JMX_DOMAIN_NAME = "org.apache.activemq";
public static final String DEFAULT_JMX_BROKER_NAME = "localhost";
public static final String DEFAULT_JMS_USER = "admin";
public static final String DEFAULT_JMS_PASS = "admin";
@Parameterized.Parameters(name="syncSend={0}, exceptionContains={1}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{true, "Can not send message on transaction with id: "},
{false, "has not been started."}
});
}
private final boolean syncSend;
private final String exceptionContains;
public MaxUncommittedCountExceededTest(boolean syncSend, String exceptionContains) {
this.syncSend = syncSend;
this.exceptionContains = exceptionContains;
}
protected ActiveMQConnectionFactory activemqConnectionFactory = null;
protected BrokerService brokerService = null;
@Rule
public TestName testName = new TestName();
// Control session
protected Connection connection = null;
protected Session session = null;
protected MessageProducer messageProducer = null;
protected String methodNameDestinationName = null;
protected MBeanServer mbeanServer = null;
protected QueueViewMBean queueViewMBean = null;
@Before
public void setUp() throws Exception {
brokerService = new BrokerService();
brokerService.setDeleteAllMessagesOnStartup(true);
brokerService.setPersistent(true);
brokerService.setUseJmx(true);
brokerService.addConnector("tcp://localhost:0").setName("Default");
brokerService.setBrokerName("localhost");
brokerService.start();
brokerService.waitUntilStarted(30_000);
brokerService.deleteAllMessages();
assertNotNull(brokerService);
activemqConnectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString());
connection = activemqConnectionFactory.createConnection();
connection.start();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
methodNameDestinationName = "AMQ.TX." + cleanParameterizedMethodName(testName.getMethodName().toUpperCase());
Queue queue = session.createQueue(methodNameDestinationName);
messageProducer = session.createProducer(queue);
mbeanServer = ManagementFactory.getPlatformMBeanServer();
brokerService.getAdminView().addQueue(methodNameDestinationName);
queueViewMBean = getQueueViewMBean(new ActiveMQQueue(methodNameDestinationName));
}
@After
public void tearDown() throws Exception {
if (connection != null) {
try {
connection.close();
} catch (Exception e) {
} finally {
connection = null;
}
}
methodNameDestinationName = null;
activemqConnectionFactory = null;
if(brokerService != null) {
brokerService.deleteAllMessages();
brokerService.stop();
brokerService.waitUntilStopped();
}
}
protected static String cleanParameterizedMethodName(String methodName) {
// clean up parameterized method string:
// TESTMESSAGETIMESTAMPTIMETOLIVE[DESTINATIONTYPE=QUEUE, MESSAGETYPE=BYTES]
// returns: TESTMESSAGETIMESTAMPTIMETOLIVE.QUEUE.BYTES
if (methodName == null || (!methodName.contains("[") && !methodName.contains("]"))) {
return methodName;
}
String[] step1 = methodName.split("\\[", 2);
String[] step2 = step1[1].split("\\]", 2);
String[] step3 = step2[0].split(",", 16);
return step1[0] + "." + step3[0].split("=", 2)[1] + "." + step3[1].split("=", 2)[1];
}
protected QueueViewMBean getQueueViewMBean(ActiveMQDestination destination) throws Exception {
return JMX.newMBeanProxy(mbeanServer, BrokerMBeanSupport.createDestinationName(BrokerMBeanSupport.createBrokerObjectName(DEFAULT_JMX_DOMAIN_NAME, DEFAULT_JMX_BROKER_NAME).toString(), destination), QueueViewMBean.class);
}
protected void configureConnection(Connection connection, boolean syncSend) {
if(syncSend) {
ActiveMQConnection activemqConnection = (ActiveMQConnection)connection;
activemqConnection.setAlwaysSyncSend(true);
activemqConnection.setUseAsyncSend(false);
activemqConnection.setProducerWindowSize(10);
}
}
@Test
public void testUncommittedCountExceeded() throws Exception {
assertEquals(Long.valueOf(0l), Long.valueOf(brokerService.getAdminView().getTotalMaxUncommittedExceededCount()));
assertEquals(Long.valueOf(0l), Long.valueOf(queueViewMBean.getMaxUncommittedExceededCount()));
brokerService.setMaxUncommittedCount(10);
boolean caught = false;
JMSException caughtException = null;
configureConnection(connection, syncSend);
try {
for(int i=0; i < 20; i++) {
Message message = session.createBytesMessage();
message.setIntProperty("IDX", i);
messageProducer.send(message);
}
if(!syncSend) {
session.commit();
}
} catch (JMSException e) {
if(syncSend) {
assertTrue(e instanceof ResourceAllocationException);
}
caught = true;
caughtException = e;
}
assertTrue(caught);
assertNotNull(caughtException);
assertTrue(caughtException.getMessage().contains(exceptionContains));
assertEquals(Long.valueOf(1l), Long.valueOf(brokerService.getAdminView().getTotalMaxUncommittedExceededCount()));
assertEquals(Long.valueOf(1l), Long.valueOf(queueViewMBean.getMaxUncommittedExceededCount()));
}
}