diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java index 11fd9d497a..85ef064128 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java @@ -2419,13 +2419,13 @@ public interface AuditLogger extends BasicLogger { * * */ //hot path log using a different logger - static void coreSendMessage(String user, Object context) { - MESSAGE_LOGGER.sendMessage(getCaller(user), context); + static void coreSendMessage(String user, String messageToString, Object context) { + MESSAGE_LOGGER.logCoreSendMessage(getCaller(user), messageToString, context); } @LogMessage(level = Logger.Level.INFO) - @Message(id = 601500, value = "User {0} is sending a core message with Context: {1}", format = Message.Format.MESSAGE_FORMAT) - void sendMessage(String user, Object context); + @Message(id = 601500, value = "User {0} is sending a message {1}, with Context: {2}", format = Message.Format.MESSAGE_FORMAT) + void logCoreSendMessage(String user, String messageToString, Object context); //hot path log using a different logger static void coreConsumeMessage(String queue) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index c89ba5c6f4..8dc5e6ccf9 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -1638,15 +1638,14 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. @Override public String toString() { - /* return "AMQPMessage [durable=" + isDurable() + + return "AMQPMessage [durable=" + isDurable() + ", messageID=" + getMessageID() + ", address=" + getAddress() + ", size=" + getEncodeSize() + - ", applicationProperties=" + applicationProperties + + ", applicationProperties=" + getApplicationPropertiesMap(false) + ", properties=" + properties + ", extraProperties = " + getExtraProperties() + - "]"; */ - return super.toString(); + "]"; } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 9da5344bb7..454681ac47 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -1744,7 +1744,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { boolean noAutoCreateQueue, RoutingContext routingContext) throws Exception { if (AuditLogger.isMessageEnabled()) { - AuditLogger.coreSendMessage(getUsername(), routingContext); + AuditLogger.coreSendMessage(getUsername(), messageParameter.toString(), routingContext); } final Message message = LargeServerMessageImpl.checkLargeMessage(messageParameter, storageManager); diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerTest.java index 34029599e2..dd77614cb9 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerTest.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerTest.java @@ -21,8 +21,6 @@ import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.api.core.client.ClientConsumer; -import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; @@ -33,10 +31,17 @@ import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase; import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.Wait; +import org.apache.qpid.jms.JmsConnectionFactory; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; import javax.management.MBeanServerConnection; import javax.management.MBeanServerInvocationHandler; import javax.management.remote.JMXConnector; @@ -131,7 +136,7 @@ public class AuditLoggerTest extends SmokeTestBase { checkAuditLogRecord(true, "gets security check failure:", "guest does not have permission='DELETE_NON_DURABLE_QUEUE'"); //hot patch not in log - checkAuditLogRecord(true, "is sending a core message"); + checkAuditLogRecord(true, "is sending a message"); } protected JMXConnector getJmxConnector() throws MalformedURLException { @@ -159,7 +164,16 @@ public class AuditLoggerTest extends SmokeTestBase { } @Test - public void testAuditHotLog() throws Exception { + public void testAuditHotLogCore() throws Exception { + internalSend("CORE"); + } + + @Test + public void testAuditHotLogAMQP() throws Exception { + internalSend("AMQP"); + } + + public void internalSend(String protocol) throws Exception { JMXConnector jmxConnector = getJmxConnector(); MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection(); String brokerName = "0.0.0.0"; // configured e.g. in broker.xml element @@ -172,25 +186,41 @@ public class AuditLoggerTest extends SmokeTestBase { Assert.assertEquals(0, addressControl.getQueueNames().length); session.createQueue(new QueueConfiguration(address).setRoutingType(RoutingType.ANYCAST)); Assert.assertEquals(1, addressControl.getQueueNames().length); - String uniqueStr = Base64.encodeBytes(UUID.randomUUID().toString().getBytes()); + String uniqueStr = RandomUtil.randomString(); - ClientProducer producer = session.createProducer(address); - producer.send(session.createMessage(true)); - producer.send(session.createMessage(true)); - // addressControl.sendMessage(null, Message.BYTES_TYPE, uniqueStr, false, null, null); + session.close(); - Wait.waitFor(() -> addressControl.getMessageCount() == 2); - Assert.assertEquals(2, addressControl.getMessageCount()); + ConnectionFactory factory = createConnectionFactory(protocol, "tcp://localhost:61616"); + Connection connection = factory.createConnection(); + try { + Session session = connection.createSession(); + MessageProducer producer = session.createProducer(session.createQueue(address.toString())); + TextMessage message = session.createTextMessage("msg1"); + message.setStringProperty("str", uniqueStr); + producer.send(message); - checkAuditLogRecord(true, "sending a core message"); + message = session.createTextMessage("msg2"); + message.setStringProperty("str", "Hello2"); + producer.send(message); + // addressControl.sendMessage(null, Message.BYTES_TYPE, uniqueStr, false, null, null); - ClientConsumer consumer = session.createConsumer(address); - session.start(); - ClientMessage clientMessage = consumer.receiveImmediate(); - Assert.assertNotNull(clientMessage); - clientMessage = consumer.receiveImmediate(); - Assert.assertNotNull(clientMessage); - checkAuditLogRecord(true, "is consuming a message from"); + Wait.waitFor(() -> addressControl.getMessageCount() == 2); + Assert.assertEquals(2, addressControl.getMessageCount()); + + checkAuditLogRecord(true, "sending a message"); + checkAuditLogRecord(true, uniqueStr); + checkAuditLogRecord(true, "Hello2"); + + connection.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(address.toString())); + javax.jms.Message clientMessage = consumer.receive(5000); + Assert.assertNotNull(clientMessage); + clientMessage = consumer.receive(5000); + Assert.assertNotNull(clientMessage); + checkAuditLogRecord(true, "is consuming a message from"); + } finally { + connection.close(); + } } //check the audit log has a line that contains all the values @@ -223,4 +253,21 @@ public class AuditLoggerTest extends SmokeTestBase { } } } + + public static ConnectionFactory createConnectionFactory(String protocol, String uri) { + if (protocol.toUpperCase().equals("OPENWIRE")) { + return new org.apache.activemq.ActiveMQConnectionFactory(uri); + } else if (protocol.toUpperCase().equals("AMQP")) { + + if (uri.startsWith("tcp://")) { + // replacing tcp:// by amqp:// + uri = "amqp" + uri.substring(3); + } + return new JmsConnectionFactory(uri); + } else if (protocol.toUpperCase().equals("CORE") || protocol.toUpperCase().equals("ARTEMIS")) { + return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory(uri); + } else { + throw new IllegalStateException("Unkown:" + protocol); + } + } }