diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java index 8311379f6e..d9d2770b62 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -135,7 +135,8 @@ public class BrokerService implements Service { public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32; public static final long DEFAULT_START_TIMEOUT = 600000L; public static final int MAX_SCHEDULER_REPEAT_ALLOWED = 1000; - + public static final int DEFAULT_MAX_UNCOMMITTED_COUNT = 0; + private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class); @SuppressWarnings("unused") @@ -265,6 +266,8 @@ public class BrokerService implements Service { private int storeOpenWireVersion = OpenWireFormat.DEFAULT_STORE_VERSION; private final List preShutdownHooks = new CopyOnWriteArrayList<>(); + private int maxUncommittedCount = DEFAULT_MAX_UNCOMMITTED_COUNT; + static { try { @@ -3298,4 +3301,13 @@ public class BrokerService implements Service { public void setMaxSchedulerRepeatAllowed(int maxSchedulerRepeatAllowed) { this.maxSchedulerRepeatAllowed = maxSchedulerRepeatAllowed; } + + public int getMaxUncommittedCount() { + return maxUncommittedCount; + } + + public void setMaxUncommittedCount(int maxUncommittedCount) { + this.maxUncommittedCount = maxUncommittedCount; + } + } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java index e0e70bb313..12e9f7742a 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java @@ -25,6 +25,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import jakarta.jms.JMSException; +import jakarta.jms.ResourceAllocationException; + import javax.transaction.xa.XAException; import org.apache.activemq.broker.jmx.ManagedRegionBroker; @@ -146,6 +148,7 @@ public class TransactionBroker extends BrokerFilter { final ActiveMQDestination destination; final boolean messageSend; int opCount = 1; + public PreparedDestinationCompletion(final TransactionBroker transactionBroker, ActiveMQDestination destination, boolean messageSend) { this.transactionBroker = transactionBroker; this.destination = destination; @@ -291,13 +294,39 @@ public class TransactionBroker extends BrokerFilter { transaction = getTransaction(context, message.getTransactionId(), false); } context.setTransaction(transaction); + try { + // [AMQ-9344] Limit uncommitted transactions by count + verifyUncommittedCount(producerExchange, transaction, message); next.send(producerExchange, message); } finally { context.setTransaction(originalTx); } } + protected void verifyUncommittedCount(ProducerBrokerExchange producerExchange, Transaction transaction, Message message) throws Exception { + // maxUncommittedCount <= 0 disables + int maxUncommittedCount = this.getBrokerService().getMaxUncommittedCount(); + if (maxUncommittedCount > 0 && transaction.size() >= maxUncommittedCount) { + + try { + // Rollback as we are throwing an error the client as throwing the error will cause + // the client to reset to a new transaction so we need to clean up + transaction.rollback(); + + // Send ResourceAllocationException which will translate to a JMSException + final ResourceAllocationException e = new ResourceAllocationException( + "Can not send message on transaction with id: '" + transaction.getTransactionId().toString() + + "', Transaction has reached the maximum allowed number of pending send operations before commit of '" + + maxUncommittedCount + "'", "42900"); + LOG.warn("ConnectionId:{} exceeded maxUncommittedCount:{} for destination:{} in transactionId:{}", (producerExchange.getConnectionContext() != null ? producerExchange.getConnectionContext().getConnectionId() : ""), maxUncommittedCount, message.getDestination().getQualifiedName(), transaction.getTransactionId().toString()); + throw e; + } finally { + producerExchange.getRegionDestination().getDestinationStatistics().getMaxUncommittedExceededCount().increment(); + } + } + } + public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { for (Iterator iter = context.getTransactions().values().iterator(); iter.hasNext();) { try { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index 74b1bc7541..cc745c48fc 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -104,6 +104,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; +import jakarta.jms.ResourceAllocationException; + public class TransportConnection implements Connection, Task, CommandVisitor { private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class); private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport"); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java index 70a8083ca6..e8ec158dae 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java @@ -541,4 +541,19 @@ public class BrokerView implements BrokerViewMBean { return context; } + + @Override + public int getMaxUncommittedCount() { + return brokerService.getMaxUncommittedCount(); + } + + @Override + public void setMaxUncommittedCount(int maxUncommittedCount) { + brokerService.setMaxUncommittedCount(maxUncommittedCount); + } + + @Override + public long getTotalMaxUncommittedExceededCount() { + return safeGetBroker().getDestinationStatistics().getMaxUncommittedExceededCount().getCount(); + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java index 9c65c3b51b..7584a71734 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java @@ -327,4 +327,12 @@ public interface BrokerViewMBean extends Service { @MBeanInfo("JMSJobScheduler") ObjectName getJMSJobScheduler(); + @MBeanInfo(value="Returns the allowed max uncommitted count per transaction") + int getMaxUncommittedCount(); + + @MBeanInfo(value="Temporarily set the allowed max uncommitted count per transaction") + void setMaxUncommittedCount(int maxUncommittedCount); + + @MBeanInfo(value="The total number of times that the max number of uncommitted count has been exceeded across all destinations") + long getTotalMaxUncommittedExceededCount(); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java index cf521d584c..8abcc67163 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java @@ -594,4 +594,9 @@ public class DestinationView implements DestinationViewMBean { public boolean isSendDuplicateFromStoreToDLQ() { return destination.isSendDuplicateFromStoreToDLQ(); } + + @Override + public long getMaxUncommittedExceededCount() { + return destination.getDestinationStatistics().getMaxUncommittedExceededCount().getCount(); + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java index f80147d19b..45ed51b994 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java @@ -479,4 +479,6 @@ public interface DestinationViewMBean { @MBeanInfo("Total time (ms) messages have been blocked by flow control") long getTotalBlockedTime(); + @MBeanInfo("Number of times the max uncommitted limit has been exceed for this destination") + long getMaxUncommittedExceededCount(); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java index 88dd9118e3..9d30c622f3 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java @@ -44,7 +44,7 @@ public class DestinationStatistics extends StatsImpl { protected CountStatisticImpl blockedSends; protected TimeStatisticImpl blockedTime; protected SizeStatisticImpl messageSize; - + protected CountStatisticImpl maxUncommittedExceededCount; public DestinationStatistics() { @@ -67,6 +67,7 @@ public class DestinationStatistics extends StatsImpl { blockedSends = new CountStatisticImpl("blockedSends", "number of messages that have to wait for flow control"); blockedTime = new TimeStatisticImpl("blockedTime","amount of time messages are blocked for flow control"); messageSize = new SizeStatisticImpl("messageSize","Size of messages passing through the destination"); + maxUncommittedExceededCount = new CountStatisticImpl("maxUncommittedExceededCount", "number of times maxUncommittedCount has been exceeded"); addStatistic("enqueues", enqueues); addStatistic("dispatched", dispatched); addStatistic("dequeues", dequeues); @@ -81,6 +82,7 @@ public class DestinationStatistics extends StatsImpl { addStatistic("blockedSends",blockedSends); addStatistic("blockedTime",blockedTime); addStatistic("messageSize",messageSize); + addStatistic("maxUncommittedExceededCount", maxUncommittedExceededCount); } public CountStatisticImpl getEnqueues() { @@ -145,6 +147,10 @@ public class DestinationStatistics extends StatsImpl { return this.messageSize; } + public CountStatisticImpl getMaxUncommittedExceededCount(){ + return this.maxUncommittedExceededCount; + } + public void reset() { if (this.isDoReset()) { super.reset(); @@ -158,6 +164,7 @@ public class DestinationStatistics extends StatsImpl { blockedSends.reset(); blockedTime.reset(); messageSize.reset(); + maxUncommittedExceededCount.reset(); } } @@ -178,6 +185,7 @@ public class DestinationStatistics extends StatsImpl { blockedSends.setEnabled(enabled); blockedTime.setEnabled(enabled); messageSize.setEnabled(enabled); + maxUncommittedExceededCount.setEnabled(enabled); } @@ -198,6 +206,7 @@ public class DestinationStatistics extends StatsImpl { blockedSends.setParent(parent.blockedSends); blockedTime.setParent(parent.blockedTime); messageSize.setParent(parent.messageSize); + maxUncommittedExceededCount.setParent(parent.maxUncommittedExceededCount); } else { enqueues.setParent(null); dispatched.setParent(null); @@ -214,6 +223,7 @@ public class DestinationStatistics extends StatsImpl { blockedSends.setParent(null); blockedTime.setParent(null); messageSize.setParent(null); + maxUncommittedExceededCount.setParent(null); } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MaxUncommittedCountExceededTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MaxUncommittedCountExceededTest.java new file mode 100644 index 0000000000..320ae4f118 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MaxUncommittedCountExceededTest.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.lang.management.ManagementFactory; +import java.util.Arrays; +import java.util.Collection; + +import javax.management.JMX; +import javax.management.MBeanServer; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.BrokerMBeanSupport; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import jakarta.jms.Connection; +import jakarta.jms.JMSException; +import jakarta.jms.Message; +import jakarta.jms.MessageProducer; +import jakarta.jms.Queue; +import jakarta.jms.ResourceAllocationException; +import jakarta.jms.Session; + +@RunWith(value = Parameterized.class) +public class MaxUncommittedCountExceededTest { + + public static final String DEFAULT_JMX_DOMAIN_NAME = "org.apache.activemq"; + public static final String DEFAULT_JMX_BROKER_NAME = "localhost"; + + public static final String DEFAULT_JMS_USER = "admin"; + public static final String DEFAULT_JMS_PASS = "admin"; + + @Parameterized.Parameters(name="syncSend={0}, exceptionContains={1}") + public static Collection data() { + return Arrays.asList(new Object[][] { + {true, "Can not send message on transaction with id: "}, + {false, "has not been started."} + }); + } + + private final boolean syncSend; + private final String exceptionContains; + + public MaxUncommittedCountExceededTest(boolean syncSend, String exceptionContains) { + this.syncSend = syncSend; + this.exceptionContains = exceptionContains; + } + + protected ActiveMQConnectionFactory activemqConnectionFactory = null; + protected BrokerService brokerService = null; + + @Rule + public TestName testName = new TestName(); + + // Control session + protected Connection connection = null; + protected Session session = null; + protected MessageProducer messageProducer = null; + + protected String methodNameDestinationName = null; + protected MBeanServer mbeanServer = null; + protected QueueViewMBean queueViewMBean = null; + + @Before + public void setUp() throws Exception { + brokerService = new BrokerService(); + brokerService.setDeleteAllMessagesOnStartup(true); + brokerService.setPersistent(true); + brokerService.setUseJmx(true); + brokerService.addConnector("tcp://localhost:0").setName("Default"); + brokerService.setBrokerName("localhost"); + brokerService.start(); + brokerService.waitUntilStarted(30_000); + brokerService.deleteAllMessages(); + assertNotNull(brokerService); + + activemqConnectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString()); + connection = activemqConnectionFactory.createConnection(); + connection.start(); + session = connection.createSession(true, Session.SESSION_TRANSACTED); + methodNameDestinationName = "AMQ.TX." + cleanParameterizedMethodName(testName.getMethodName().toUpperCase()); + Queue queue = session.createQueue(methodNameDestinationName); + messageProducer = session.createProducer(queue); + mbeanServer = ManagementFactory.getPlatformMBeanServer(); + brokerService.getAdminView().addQueue(methodNameDestinationName); + queueViewMBean = getQueueViewMBean(new ActiveMQQueue(methodNameDestinationName)); + } + + @After + public void tearDown() throws Exception { + if (connection != null) { + try { + connection.close(); + } catch (Exception e) { + } finally { + connection = null; + } + } + + methodNameDestinationName = null; + activemqConnectionFactory = null; + if(brokerService != null) { + brokerService.deleteAllMessages(); + brokerService.stop(); + brokerService.waitUntilStopped(); + } + } + + protected static String cleanParameterizedMethodName(String methodName) { + // clean up parameterized method string: + // TESTMESSAGETIMESTAMPTIMETOLIVE[DESTINATIONTYPE=QUEUE, MESSAGETYPE=BYTES] + // returns: TESTMESSAGETIMESTAMPTIMETOLIVE.QUEUE.BYTES + + if (methodName == null || (!methodName.contains("[") && !methodName.contains("]"))) { + return methodName; + } + + String[] step1 = methodName.split("\\[", 2); + String[] step2 = step1[1].split("\\]", 2); + String[] step3 = step2[0].split(",", 16); + + return step1[0] + "." + step3[0].split("=", 2)[1] + "." + step3[1].split("=", 2)[1]; + } + + protected QueueViewMBean getQueueViewMBean(ActiveMQDestination destination) throws Exception { + return JMX.newMBeanProxy(mbeanServer, BrokerMBeanSupport.createDestinationName(BrokerMBeanSupport.createBrokerObjectName(DEFAULT_JMX_DOMAIN_NAME, DEFAULT_JMX_BROKER_NAME).toString(), destination), QueueViewMBean.class); + } + + protected void configureConnection(Connection connection, boolean syncSend) { + if(syncSend) { + ActiveMQConnection activemqConnection = (ActiveMQConnection)connection; + activemqConnection.setAlwaysSyncSend(true); + activemqConnection.setUseAsyncSend(false); + activemqConnection.setProducerWindowSize(10); + } + } + + @Test + public void testUncommittedCountExceeded() throws Exception { + assertEquals(Long.valueOf(0l), Long.valueOf(brokerService.getAdminView().getTotalMaxUncommittedExceededCount())); + assertEquals(Long.valueOf(0l), Long.valueOf(queueViewMBean.getMaxUncommittedExceededCount())); + + brokerService.setMaxUncommittedCount(10); + boolean caught = false; + JMSException caughtException = null; + + configureConnection(connection, syncSend); + + try { + for(int i=0; i < 20; i++) { + Message message = session.createBytesMessage(); + message.setIntProperty("IDX", i); + messageProducer.send(message); + } + + if(!syncSend) { + session.commit(); + } + } catch (JMSException e) { + if(syncSend) { + assertTrue(e instanceof ResourceAllocationException); + } + caught = true; + caughtException = e; + } + + assertTrue(caught); + assertNotNull(caughtException); + assertTrue(caughtException.getMessage().contains(exceptionContains)); + assertEquals(Long.valueOf(1l), Long.valueOf(brokerService.getAdminView().getTotalMaxUncommittedExceededCount())); + assertEquals(Long.valueOf(1l), Long.valueOf(queueViewMBean.getMaxUncommittedExceededCount())); + } +}