mirror of https://github.com/apache/activemq.git
Merge branch 'mattrpav-AMQ-9344'
This commit is contained in:
commit
43f1399697
|
@ -135,6 +135,7 @@ public class BrokerService implements Service {
|
|||
public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
|
||||
public static final long DEFAULT_START_TIMEOUT = 600000L;
|
||||
public static final int MAX_SCHEDULER_REPEAT_ALLOWED = 1000;
|
||||
public static final int DEFAULT_MAX_UNCOMMITTED_COUNT = 0;
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class);
|
||||
|
||||
|
@ -265,6 +266,8 @@ public class BrokerService implements Service {
|
|||
private int storeOpenWireVersion = OpenWireFormat.DEFAULT_STORE_VERSION;
|
||||
private final List<Runnable> preShutdownHooks = new CopyOnWriteArrayList<>();
|
||||
|
||||
private int maxUncommittedCount = DEFAULT_MAX_UNCOMMITTED_COUNT;
|
||||
|
||||
static {
|
||||
|
||||
try {
|
||||
|
@ -3298,4 +3301,13 @@ public class BrokerService implements Service {
|
|||
public void setMaxSchedulerRepeatAllowed(int maxSchedulerRepeatAllowed) {
|
||||
this.maxSchedulerRepeatAllowed = maxSchedulerRepeatAllowed;
|
||||
}
|
||||
|
||||
public int getMaxUncommittedCount() {
|
||||
return maxUncommittedCount;
|
||||
}
|
||||
|
||||
public void setMaxUncommittedCount(int maxUncommittedCount) {
|
||||
this.maxUncommittedCount = maxUncommittedCount;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -25,6 +25,8 @@ import java.util.Map;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import jakarta.jms.JMSException;
|
||||
import jakarta.jms.ResourceAllocationException;
|
||||
|
||||
import javax.transaction.xa.XAException;
|
||||
|
||||
import org.apache.activemq.broker.jmx.ManagedRegionBroker;
|
||||
|
@ -146,6 +148,7 @@ public class TransactionBroker extends BrokerFilter {
|
|||
final ActiveMQDestination destination;
|
||||
final boolean messageSend;
|
||||
int opCount = 1;
|
||||
|
||||
public PreparedDestinationCompletion(final TransactionBroker transactionBroker, ActiveMQDestination destination, boolean messageSend) {
|
||||
this.transactionBroker = transactionBroker;
|
||||
this.destination = destination;
|
||||
|
@ -291,13 +294,39 @@ public class TransactionBroker extends BrokerFilter {
|
|||
transaction = getTransaction(context, message.getTransactionId(), false);
|
||||
}
|
||||
context.setTransaction(transaction);
|
||||
|
||||
try {
|
||||
// [AMQ-9344] Limit uncommitted transactions by count
|
||||
verifyUncommittedCount(producerExchange, transaction, message);
|
||||
next.send(producerExchange, message);
|
||||
} finally {
|
||||
context.setTransaction(originalTx);
|
||||
}
|
||||
}
|
||||
|
||||
protected void verifyUncommittedCount(ProducerBrokerExchange producerExchange, Transaction transaction, Message message) throws Exception {
|
||||
// maxUncommittedCount <= 0 disables
|
||||
int maxUncommittedCount = this.getBrokerService().getMaxUncommittedCount();
|
||||
if (maxUncommittedCount > 0 && transaction.size() >= maxUncommittedCount) {
|
||||
|
||||
try {
|
||||
// Rollback as we are throwing an error the client as throwing the error will cause
|
||||
// the client to reset to a new transaction so we need to clean up
|
||||
transaction.rollback();
|
||||
|
||||
// Send ResourceAllocationException which will translate to a JMSException
|
||||
final ResourceAllocationException e = new ResourceAllocationException(
|
||||
"Can not send message on transaction with id: '" + transaction.getTransactionId().toString()
|
||||
+ "', Transaction has reached the maximum allowed number of pending send operations before commit of '"
|
||||
+ maxUncommittedCount + "'", "42900");
|
||||
LOG.warn("ConnectionId:{} exceeded maxUncommittedCount:{} for destination:{} in transactionId:{}", (producerExchange.getConnectionContext() != null ? producerExchange.getConnectionContext().getConnectionId() : "<not set>"), maxUncommittedCount, message.getDestination().getQualifiedName(), transaction.getTransactionId().toString());
|
||||
throw e;
|
||||
} finally {
|
||||
producerExchange.getRegionDestination().getDestinationStatistics().getMaxUncommittedExceededCount().increment();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
|
||||
for (Iterator<Transaction> iter = context.getTransactions().values().iterator(); iter.hasNext();) {
|
||||
try {
|
||||
|
|
|
@ -104,6 +104,8 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
import org.slf4j.MDC;
|
||||
|
||||
import jakarta.jms.ResourceAllocationException;
|
||||
|
||||
public class TransportConnection implements Connection, Task, CommandVisitor {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class);
|
||||
private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport");
|
||||
|
|
|
@ -541,4 +541,19 @@ public class BrokerView implements BrokerViewMBean {
|
|||
|
||||
return context;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxUncommittedCount() {
|
||||
return brokerService.getMaxUncommittedCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMaxUncommittedCount(int maxUncommittedCount) {
|
||||
brokerService.setMaxUncommittedCount(maxUncommittedCount);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getTotalMaxUncommittedExceededCount() {
|
||||
return safeGetBroker().getDestinationStatistics().getMaxUncommittedExceededCount().getCount();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -327,4 +327,12 @@ public interface BrokerViewMBean extends Service {
|
|||
@MBeanInfo("JMSJobScheduler")
|
||||
ObjectName getJMSJobScheduler();
|
||||
|
||||
@MBeanInfo(value="Returns the allowed max uncommitted count per transaction")
|
||||
int getMaxUncommittedCount();
|
||||
|
||||
@MBeanInfo(value="Temporarily set the allowed max uncommitted count per transaction")
|
||||
void setMaxUncommittedCount(int maxUncommittedCount);
|
||||
|
||||
@MBeanInfo(value="The total number of times that the max number of uncommitted count has been exceeded across all destinations")
|
||||
long getTotalMaxUncommittedExceededCount();
|
||||
}
|
||||
|
|
|
@ -594,4 +594,9 @@ public class DestinationView implements DestinationViewMBean {
|
|||
public boolean isSendDuplicateFromStoreToDLQ() {
|
||||
return destination.isSendDuplicateFromStoreToDLQ();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMaxUncommittedExceededCount() {
|
||||
return destination.getDestinationStatistics().getMaxUncommittedExceededCount().getCount();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -479,4 +479,6 @@ public interface DestinationViewMBean {
|
|||
@MBeanInfo("Total time (ms) messages have been blocked by flow control")
|
||||
long getTotalBlockedTime();
|
||||
|
||||
@MBeanInfo("Number of times the max uncommitted limit has been exceed for this destination")
|
||||
long getMaxUncommittedExceededCount();
|
||||
}
|
||||
|
|
|
@ -44,7 +44,7 @@ public class DestinationStatistics extends StatsImpl {
|
|||
protected CountStatisticImpl blockedSends;
|
||||
protected TimeStatisticImpl blockedTime;
|
||||
protected SizeStatisticImpl messageSize;
|
||||
|
||||
protected CountStatisticImpl maxUncommittedExceededCount;
|
||||
|
||||
public DestinationStatistics() {
|
||||
|
||||
|
@ -67,6 +67,7 @@ public class DestinationStatistics extends StatsImpl {
|
|||
blockedSends = new CountStatisticImpl("blockedSends", "number of messages that have to wait for flow control");
|
||||
blockedTime = new TimeStatisticImpl("blockedTime","amount of time messages are blocked for flow control");
|
||||
messageSize = new SizeStatisticImpl("messageSize","Size of messages passing through the destination");
|
||||
maxUncommittedExceededCount = new CountStatisticImpl("maxUncommittedExceededCount", "number of times maxUncommittedCount has been exceeded");
|
||||
addStatistic("enqueues", enqueues);
|
||||
addStatistic("dispatched", dispatched);
|
||||
addStatistic("dequeues", dequeues);
|
||||
|
@ -81,6 +82,7 @@ public class DestinationStatistics extends StatsImpl {
|
|||
addStatistic("blockedSends",blockedSends);
|
||||
addStatistic("blockedTime",blockedTime);
|
||||
addStatistic("messageSize",messageSize);
|
||||
addStatistic("maxUncommittedExceededCount", maxUncommittedExceededCount);
|
||||
}
|
||||
|
||||
public CountStatisticImpl getEnqueues() {
|
||||
|
@ -145,6 +147,10 @@ public class DestinationStatistics extends StatsImpl {
|
|||
return this.messageSize;
|
||||
}
|
||||
|
||||
public CountStatisticImpl getMaxUncommittedExceededCount(){
|
||||
return this.maxUncommittedExceededCount;
|
||||
}
|
||||
|
||||
public void reset() {
|
||||
if (this.isDoReset()) {
|
||||
super.reset();
|
||||
|
@ -158,6 +164,7 @@ public class DestinationStatistics extends StatsImpl {
|
|||
blockedSends.reset();
|
||||
blockedTime.reset();
|
||||
messageSize.reset();
|
||||
maxUncommittedExceededCount.reset();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -178,6 +185,7 @@ public class DestinationStatistics extends StatsImpl {
|
|||
blockedSends.setEnabled(enabled);
|
||||
blockedTime.setEnabled(enabled);
|
||||
messageSize.setEnabled(enabled);
|
||||
maxUncommittedExceededCount.setEnabled(enabled);
|
||||
|
||||
}
|
||||
|
||||
|
@ -198,6 +206,7 @@ public class DestinationStatistics extends StatsImpl {
|
|||
blockedSends.setParent(parent.blockedSends);
|
||||
blockedTime.setParent(parent.blockedTime);
|
||||
messageSize.setParent(parent.messageSize);
|
||||
maxUncommittedExceededCount.setParent(parent.maxUncommittedExceededCount);
|
||||
} else {
|
||||
enqueues.setParent(null);
|
||||
dispatched.setParent(null);
|
||||
|
@ -214,6 +223,7 @@ public class DestinationStatistics extends StatsImpl {
|
|||
blockedSends.setParent(null);
|
||||
blockedTime.setParent(null);
|
||||
messageSize.setParent(null);
|
||||
maxUncommittedExceededCount.setParent(null);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,202 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.usecases;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
||||
import javax.management.JMX;
|
||||
import javax.management.MBeanServer;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnection;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
|
||||
import org.apache.activemq.broker.jmx.QueueViewMBean;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TestName;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import jakarta.jms.Connection;
|
||||
import jakarta.jms.JMSException;
|
||||
import jakarta.jms.Message;
|
||||
import jakarta.jms.MessageProducer;
|
||||
import jakarta.jms.Queue;
|
||||
import jakarta.jms.ResourceAllocationException;
|
||||
import jakarta.jms.Session;
|
||||
|
||||
@RunWith(value = Parameterized.class)
|
||||
public class MaxUncommittedCountExceededTest {
|
||||
|
||||
public static final String DEFAULT_JMX_DOMAIN_NAME = "org.apache.activemq";
|
||||
public static final String DEFAULT_JMX_BROKER_NAME = "localhost";
|
||||
|
||||
public static final String DEFAULT_JMS_USER = "admin";
|
||||
public static final String DEFAULT_JMS_PASS = "admin";
|
||||
|
||||
@Parameterized.Parameters(name="syncSend={0}, exceptionContains={1}")
|
||||
public static Collection<Object[]> data() {
|
||||
return Arrays.asList(new Object[][] {
|
||||
{true, "Can not send message on transaction with id: "},
|
||||
{false, "has not been started."}
|
||||
});
|
||||
}
|
||||
|
||||
private final boolean syncSend;
|
||||
private final String exceptionContains;
|
||||
|
||||
public MaxUncommittedCountExceededTest(boolean syncSend, String exceptionContains) {
|
||||
this.syncSend = syncSend;
|
||||
this.exceptionContains = exceptionContains;
|
||||
}
|
||||
|
||||
protected ActiveMQConnectionFactory activemqConnectionFactory = null;
|
||||
protected BrokerService brokerService = null;
|
||||
|
||||
@Rule
|
||||
public TestName testName = new TestName();
|
||||
|
||||
// Control session
|
||||
protected Connection connection = null;
|
||||
protected Session session = null;
|
||||
protected MessageProducer messageProducer = null;
|
||||
|
||||
protected String methodNameDestinationName = null;
|
||||
protected MBeanServer mbeanServer = null;
|
||||
protected QueueViewMBean queueViewMBean = null;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
brokerService = new BrokerService();
|
||||
brokerService.setDeleteAllMessagesOnStartup(true);
|
||||
brokerService.setPersistent(true);
|
||||
brokerService.setUseJmx(true);
|
||||
brokerService.addConnector("tcp://localhost:0").setName("Default");
|
||||
brokerService.setBrokerName("localhost");
|
||||
brokerService.start();
|
||||
brokerService.waitUntilStarted(30_000);
|
||||
brokerService.deleteAllMessages();
|
||||
assertNotNull(brokerService);
|
||||
|
||||
activemqConnectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString());
|
||||
connection = activemqConnectionFactory.createConnection();
|
||||
connection.start();
|
||||
session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
methodNameDestinationName = "AMQ.TX." + cleanParameterizedMethodName(testName.getMethodName().toUpperCase());
|
||||
Queue queue = session.createQueue(methodNameDestinationName);
|
||||
messageProducer = session.createProducer(queue);
|
||||
mbeanServer = ManagementFactory.getPlatformMBeanServer();
|
||||
brokerService.getAdminView().addQueue(methodNameDestinationName);
|
||||
queueViewMBean = getQueueViewMBean(new ActiveMQQueue(methodNameDestinationName));
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
if (connection != null) {
|
||||
try {
|
||||
connection.close();
|
||||
} catch (Exception e) {
|
||||
} finally {
|
||||
connection = null;
|
||||
}
|
||||
}
|
||||
|
||||
methodNameDestinationName = null;
|
||||
activemqConnectionFactory = null;
|
||||
if(brokerService != null) {
|
||||
brokerService.deleteAllMessages();
|
||||
brokerService.stop();
|
||||
brokerService.waitUntilStopped();
|
||||
}
|
||||
}
|
||||
|
||||
protected static String cleanParameterizedMethodName(String methodName) {
|
||||
// clean up parameterized method string:
|
||||
// TESTMESSAGETIMESTAMPTIMETOLIVE[DESTINATIONTYPE=QUEUE, MESSAGETYPE=BYTES]
|
||||
// returns: TESTMESSAGETIMESTAMPTIMETOLIVE.QUEUE.BYTES
|
||||
|
||||
if (methodName == null || (!methodName.contains("[") && !methodName.contains("]"))) {
|
||||
return methodName;
|
||||
}
|
||||
|
||||
String[] step1 = methodName.split("\\[", 2);
|
||||
String[] step2 = step1[1].split("\\]", 2);
|
||||
String[] step3 = step2[0].split(",", 16);
|
||||
|
||||
return step1[0] + "." + step3[0].split("=", 2)[1] + "." + step3[1].split("=", 2)[1];
|
||||
}
|
||||
|
||||
protected QueueViewMBean getQueueViewMBean(ActiveMQDestination destination) throws Exception {
|
||||
return JMX.newMBeanProxy(mbeanServer, BrokerMBeanSupport.createDestinationName(BrokerMBeanSupport.createBrokerObjectName(DEFAULT_JMX_DOMAIN_NAME, DEFAULT_JMX_BROKER_NAME).toString(), destination), QueueViewMBean.class);
|
||||
}
|
||||
|
||||
protected void configureConnection(Connection connection, boolean syncSend) {
|
||||
if(syncSend) {
|
||||
ActiveMQConnection activemqConnection = (ActiveMQConnection)connection;
|
||||
activemqConnection.setAlwaysSyncSend(true);
|
||||
activemqConnection.setUseAsyncSend(false);
|
||||
activemqConnection.setProducerWindowSize(10);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUncommittedCountExceeded() throws Exception {
|
||||
assertEquals(Long.valueOf(0l), Long.valueOf(brokerService.getAdminView().getTotalMaxUncommittedExceededCount()));
|
||||
assertEquals(Long.valueOf(0l), Long.valueOf(queueViewMBean.getMaxUncommittedExceededCount()));
|
||||
|
||||
brokerService.setMaxUncommittedCount(10);
|
||||
boolean caught = false;
|
||||
JMSException caughtException = null;
|
||||
|
||||
configureConnection(connection, syncSend);
|
||||
|
||||
try {
|
||||
for(int i=0; i < 20; i++) {
|
||||
Message message = session.createBytesMessage();
|
||||
message.setIntProperty("IDX", i);
|
||||
messageProducer.send(message);
|
||||
}
|
||||
|
||||
if(!syncSend) {
|
||||
session.commit();
|
||||
}
|
||||
} catch (JMSException e) {
|
||||
if(syncSend) {
|
||||
assertTrue(e instanceof ResourceAllocationException);
|
||||
}
|
||||
caught = true;
|
||||
caughtException = e;
|
||||
}
|
||||
|
||||
assertTrue(caught);
|
||||
assertNotNull(caughtException);
|
||||
assertTrue(caughtException.getMessage().contains(exceptionContains));
|
||||
assertEquals(Long.valueOf(1l), Long.valueOf(brokerService.getAdminView().getTotalMaxUncommittedExceededCount()));
|
||||
assertEquals(Long.valueOf(1l), Long.valueOf(queueViewMBean.getMaxUncommittedExceededCount()));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue