From fb169bc4af15dcad6deb7c050c8177576562e8d6 Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Wed, 15 Feb 2023 16:14:34 -0600 Subject: [PATCH] ARTEMIS-4172 sending large msg via core skips plugins & audit log --- .../core/ServerSessionPacketHandler.java | 2 +- .../integration/plugin/CorePluginTest.java | 18 ++++++++++++-- .../tests/smoke/logging/AuditLoggerTest.java | 24 +++++++++++++++---- 3 files changed, 37 insertions(+), 7 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index a466321243..a1dae880d8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -1107,7 +1107,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { currentLargeMessage.setStorageManager(storageManager); currentLargeMessage = null; try { - session.doSend(session.getCurrentTransaction(), EmbedMessageUtil.extractEmbedded((ICoreMessage) message.toMessage(), storageManager), null, false, producers.get(senderID), false); + session.send(session.getCurrentTransaction(), EmbedMessageUtil.extractEmbedded((ICoreMessage) message.toMessage(), storageManager), false, producers.get(senderID), false); } catch (Exception e) { message.deleteFile(); throw e; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java index a97899cbd2..314492bddb 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java @@ -53,6 +53,7 @@ import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; import org.apache.activemq.artemis.tests.util.JMSTestBase; +import org.apache.activemq.artemis.utils.RandomUtil; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -125,9 +126,17 @@ public class CorePluginTest extends JMSTestBase { queue = createQueue("queue1"); } - @Test public void testSendReceive() throws Exception { + internalTestSendReceive(64); + } + + @Test + public void testSendReceiveLarge() throws Exception { + internalTestSendReceive(1024 * 1024); + } + + private void internalTestSendReceive(int messageSize) throws Exception { final AckPluginVerifier ackVerifier = new AckPluginVerifier((consumer, reason) -> { assertEquals(AckReason.NORMAL, reason); assertNotNull(consumer); @@ -142,7 +151,12 @@ public class CorePluginTest extends JMSTestBase { MessageProducer prod = sess.createProducer(queue); MessageConsumer cons = sess.createConsumer(queue); - TextMessage msg1 = sess.createTextMessage("test"); + byte[] msgs = new byte[messageSize]; + for (int i = 0; i < msgs.length; i++) { + msgs[i] = RandomUtil.randomByte(); + } + + TextMessage msg1 = sess.createTextMessage(new String(msgs)); prod.send(msg1); TextMessage received1 = (TextMessage)cons.receive(1000); assertNotNull(received1); diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerTest.java index 3eb1b79109..116b95def3 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerTest.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerTest.java @@ -119,15 +119,25 @@ public class AuditLoggerTest extends AuditLoggerTestBase { @Test public void testAuditHotLogCore() throws Exception { - internalSend("CORE"); + internalSend("CORE", 64); } @Test public void testAuditHotLogAMQP() throws Exception { - internalSend("AMQP"); + internalSend("AMQP", 64); } - public void internalSend(String protocol) throws Exception { + @Test + public void testAuditHotLogCoreLarge() throws Exception { + internalSend("CORE", 1024 * 1024); + } + + @Test + public void testAuditHotLogAMQPLarge() throws Exception { + internalSend("AMQP", 1024 * 1024); + } + + public void internalSend(String protocol, int messageSize) throws Exception { JMXConnector jmxConnector = getJmxConnector(); MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection(); String brokerName = "0.0.0.0"; // configured e.g. in broker.xml element @@ -149,7 +159,13 @@ public class AuditLoggerTest extends AuditLoggerTestBase { try { Session session = connection.createSession(); MessageProducer producer = session.createProducer(session.createQueue(address.toString())); - TextMessage message = session.createTextMessage("msg1"); + + byte[] msgs = new byte[messageSize]; + for (int i = 0; i < msgs.length; i++) { + msgs[i] = RandomUtil.randomByte(); + } + + TextMessage message = session.createTextMessage(new String(msgs)); message.setStringProperty("str", uniqueStr); producer.send(message);