From c78d87de9b9de8bf2aebed610a9c5dac2e9ac839 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Fri, 18 Oct 2024 11:56:59 -0400 Subject: [PATCH] ARTEMIS-5106 Mark any AMQP TXN as rollback on send exceptions If an exception is thrown in the AMQP send path and there is an active transaction we should mark that as rollback only so the client will see an error when it tries to commit a transaction that had a failed send. --- .../amqp/broker/AMQPSessionCallback.java | 48 ++-- .../plugin/BeforeSendPluginTest.java | 220 ++++++++++++++++++ 2 files changed, 246 insertions(+), 22 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/BeforeSendPluginTest.java diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index b6d59ec4ce..819b3311bb 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -23,6 +23,7 @@ import java.util.concurrent.Executor; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; import org.apache.activemq.artemis.api.core.AutoCreateResult; @@ -565,14 +566,7 @@ public class AMQPSessionCallback implements SessionCallback { sessionExecutor.execute(() -> inSessionSend(context, transaction, message, delivery, receiver, routingContext)); } } catch (Exception e) { - if (message.isLargeMessage()) { - try { - ((LargeServerMessage) message).deleteFile(); - } catch (Exception e1) { - logger.warn("Error while deleting undelivered large AMQP message: {}", e.getMessage()); - } - } - + onSendFailed(message, transaction, e); throw e; } finally { resetContext(oldcontext); @@ -605,11 +599,11 @@ public class AMQPSessionCallback implements SessionCallback { } private void inSessionSend(final ProtonServerReceiverContext context, - final Transaction transaction, - final Message message, - final Delivery delivery, - final Receiver receiver, - final RoutingContext routingContext) { + final Transaction transaction, + final Message message, + final Delivery delivery, + final Receiver receiver, + final RoutingContext routingContext) { OperationContext oldContext = recoverContext(); try { if (invokeIncoming(message, (ActiveMQProtonRemotingConnection) transportConnection.getProtocolConnection()) == null) { @@ -643,20 +637,30 @@ public class AMQPSessionCallback implements SessionCallback { } } catch (Exception e) { logger.warn(e.getMessage(), e); - - if (message.isLargeMessage()) { - try { - ((LargeServerMessage) message).deleteFile(); - } catch (Exception e1) { - logger.warn("Error while deleting undelivered large AMQP message: {}", e.getMessage()); - } - } - + onSendFailed(message, transaction, e); context.deliveryFailed(delivery, receiver, e); } finally { resetContext(oldContext); } + } + private void onSendFailed(Message message, Transaction transaction, Exception cause) { + if (message.isLargeMessage()) { + try { + ((LargeServerMessage) message).deleteFile(); + } catch (Exception e1) { + logger.warn("Error while deleting undelivered large AMQP message: {}", cause.getMessage()); + } + } + + if (transaction != null) { + if (cause instanceof ActiveMQException) { + transaction.markAsRollbackOnly((ActiveMQException) cause); + } else { + transaction.markAsRollbackOnly( + new ActiveMQInternalErrorException("Delivery failure triggered TXN to be marked as rollback only", cause)); + } + } } private void sendError(int errorCode, String errorMessage, Receiver receiver) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/BeforeSendPluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/BeforeSendPluginTest.java new file mode 100644 index 0000000000..340150d3a1 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/BeforeSendPluginTest.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.plugin; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; +import org.apache.activemq.artemis.core.server.ServerSession; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; +import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.JMSTestBase; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import java.lang.invoke.MethodHandles; +import java.util.HashMap; +import java.util.Random; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class BeforeSendPluginTest extends JMSTestBase { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final int BROKER_PLUGIN_MESSAGE_SIZE_LIMIT = 10_000; + + private static final int MIN_LARGE_MESSAGE_SIZE = 16384; + + private static final int LARGE_MESSAGE_SIZE = MIN_LARGE_MESSAGE_SIZE + 1000; + private static final int NON_LARGE_MESSAGE_SIZE = MIN_LARGE_MESSAGE_SIZE - 1000; + + @Override + protected Configuration createDefaultConfig(boolean netty) throws Exception { + Configuration config = super.createDefaultConfig(netty); + config.getAcceptorConfigurations().clear(); + + HashMap params = new HashMap<>(); + params.put(TransportConstants.PORT_PROP_NAME, 61616); + params.put(TransportConstants.PROTOCOLS_PROP_NAME, "CORE,AMQP,OPENWIRE"); + params.put("amqpMinLargeMessageSize", MIN_LARGE_MESSAGE_SIZE); + + TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "netty-acceptor", new HashMap<>()); + + config.addAcceptorConfiguration(transportConfiguration); + + config.registerBrokerPlugin(new BeforeSendPlugin(BROKER_PLUGIN_MESSAGE_SIZE_LIMIT)); + + return config; + } + + @Override + public boolean usePersistence() { + return true; + } + + @Test + public void testCORESend() throws Exception { + internalTestSendReceive("CORE", NON_LARGE_MESSAGE_SIZE, false); + } + + @Test + public void testCORESendLarge() throws Exception { + internalTestSendReceive("CORE", LARGE_MESSAGE_SIZE, false); + } + + @Test + public void testCORESendTransactional() throws Exception { + internalTestSendReceive("CORE", NON_LARGE_MESSAGE_SIZE, true); + } + + @Test + public void testCORESendLargeTransactional() throws Exception { + internalTestSendReceive("CORE", LARGE_MESSAGE_SIZE, true); + } + + @Test + public void testAMQPSend() throws Exception { + internalTestSendReceive("AMQP", NON_LARGE_MESSAGE_SIZE, false); + } + + @Test + public void testAMQPSendLarge() throws Exception { + internalTestSendReceive("AMQP", LARGE_MESSAGE_SIZE, false); + } + + @Test + public void testAMQPSendTransactional() throws Exception { + internalTestSendReceive("AMQP", NON_LARGE_MESSAGE_SIZE, true); + } + + @Test + public void testAMQPSendLargeTransactional() throws Exception { + internalTestSendReceive("AMQP", LARGE_MESSAGE_SIZE, true); + } + + @Test + public void testOpenwireSend() throws Exception { + internalTestSendReceive("OPENWIRE", NON_LARGE_MESSAGE_SIZE, false); + } + + @Test + public void testOpenwireSendLarge() throws Exception { + internalTestSendReceive("OPENWIRE", LARGE_MESSAGE_SIZE, false); + } + + @Test + public void testOpenwireSendTransactional() throws Exception { + internalTestSendReceive("OPENWIRE", NON_LARGE_MESSAGE_SIZE, true); + } + + @Test + public void testOpenwireSendLargeTransactional() throws Exception { + internalTestSendReceive("OPENWIRE", LARGE_MESSAGE_SIZE, true); + } + + private void internalTestSendReceive(String protocol, int messageSize, boolean transacted) throws Exception { + ConnectionFactory connectionFactory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + + server.createQueue(QueueConfiguration.of(getTestMethodName()).setAddress(getTestMethodName()).setRoutingType(RoutingType.ANYCAST)); + + try (Connection connection = connectionFactory.createConnection()) { + final Session session; + + if (transacted) { + session = connection.createSession(true, Session.SESSION_TRANSACTED); + } else { + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + final Queue queue = session.createQueue(getTestMethodName()); + final MessageProducer producer = session.createProducer(queue); + + final byte[] payload = new byte[messageSize]; + final Random random = new Random(System.currentTimeMillis()); + + random.nextBytes(payload); + + boolean hadExceptionOnSend = false; + + final TextMessage sentMessage = session.createTextMessage(new String(payload)); + + try { + producer.send(sentMessage); + } catch (Exception e) { + logger.debug("Caught exception on producer send: ", e); + hadExceptionOnSend = true; + } + + if (!transacted) { + assertTrue(hadExceptionOnSend); + } + + assertEquals(0, server.locateQueue(getTestMethodName()).getMessageCount()); + + if (transacted) { + boolean hadExceptionOnCommit = false; + try { + session.commit(); + } catch (Exception e) { + logger.debug("Caught exception on commit transaction: ", e); + hadExceptionOnCommit = true; + } + + assertTrue(hadExceptionOnCommit); + assertEquals(0, server.locateQueue(getTestMethodName()).getMessageCount()); + } + + connection.close(); + } + + validateNoFilesOnLargeDir(); + } + + private class BeforeSendPlugin implements ActiveMQServerPlugin { + + private final long maxMessageSize; + + BeforeSendPlugin(long maxMessageSize) { + this.maxMessageSize = maxMessageSize; + } + + @Override + public void beforeSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) throws ActiveMQException { + final long messageSize = message.getWholeMessageSize(); + + if (messageSize > maxMessageSize) { + throw new ActiveMQIOErrorException("Message of size " + messageSize + " exceeded size limit of " + maxMessageSize); + } + } + } +}