ARTEMIS-4172 sending large msg via core skips plugins & audit log

This commit is contained in:
Justin Bertram 2023-02-15 16:14:34 -06:00 committed by clebertsuconic
parent 82fc42987a
commit fb169bc4af
3 changed files with 37 additions and 7 deletions
artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core
tests
integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin
smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging

View File

@ -1107,7 +1107,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
currentLargeMessage.setStorageManager(storageManager);
currentLargeMessage = null;
try {
session.doSend(session.getCurrentTransaction(), EmbedMessageUtil.extractEmbedded((ICoreMessage) message.toMessage(), storageManager), null, false, producers.get(senderID), false);
session.send(session.getCurrentTransaction(), EmbedMessageUtil.extractEmbedded((ICoreMessage) message.toMessage(), storageManager), false, producers.get(senderID), false);
} catch (Exception e) {
message.deleteFile();
throw e;

View File

@ -53,6 +53,7 @@ import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -125,9 +126,17 @@ public class CorePluginTest extends JMSTestBase {
queue = createQueue("queue1");
}
@Test
public void testSendReceive() throws Exception {
internalTestSendReceive(64);
}
@Test
public void testSendReceiveLarge() throws Exception {
internalTestSendReceive(1024 * 1024);
}
private void internalTestSendReceive(int messageSize) throws Exception {
final AckPluginVerifier ackVerifier = new AckPluginVerifier((consumer, reason) -> {
assertEquals(AckReason.NORMAL, reason);
assertNotNull(consumer);
@ -142,7 +151,12 @@ public class CorePluginTest extends JMSTestBase {
MessageProducer prod = sess.createProducer(queue);
MessageConsumer cons = sess.createConsumer(queue);
TextMessage msg1 = sess.createTextMessage("test");
byte[] msgs = new byte[messageSize];
for (int i = 0; i < msgs.length; i++) {
msgs[i] = RandomUtil.randomByte();
}
TextMessage msg1 = sess.createTextMessage(new String(msgs));
prod.send(msg1);
TextMessage received1 = (TextMessage)cons.receive(1000);
assertNotNull(received1);

View File

@ -119,15 +119,25 @@ public class AuditLoggerTest extends AuditLoggerTestBase {
@Test
public void testAuditHotLogCore() throws Exception {
internalSend("CORE");
internalSend("CORE", 64);
}
@Test
public void testAuditHotLogAMQP() throws Exception {
internalSend("AMQP");
internalSend("AMQP", 64);
}
public void internalSend(String protocol) throws Exception {
@Test
public void testAuditHotLogCoreLarge() throws Exception {
internalSend("CORE", 1024 * 1024);
}
@Test
public void testAuditHotLogAMQPLarge() throws Exception {
internalSend("AMQP", 1024 * 1024);
}
public void internalSend(String protocol, int messageSize) throws Exception {
JMXConnector jmxConnector = getJmxConnector();
MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection();
String brokerName = "0.0.0.0"; // configured e.g. in broker.xml <broker-name> element
@ -149,7 +159,13 @@ public class AuditLoggerTest extends AuditLoggerTestBase {
try {
Session session = connection.createSession();
MessageProducer producer = session.createProducer(session.createQueue(address.toString()));
TextMessage message = session.createTextMessage("msg1");
byte[] msgs = new byte[messageSize];
for (int i = 0; i < msgs.length; i++) {
msgs[i] = RandomUtil.randomByte();
}
TextMessage message = session.createTextMessage(new String(msgs));
message.setStringProperty("str", uniqueStr);
producer.send(message);