This commit is contained in:
Clebert Suconic 2020-08-27 10:16:03 -04:00
commit e19af8ee45
4 changed files with 74 additions and 28 deletions

View File

@ -2419,13 +2419,13 @@ public interface AuditLogger extends BasicLogger {
*
* */
//hot path log using a different logger
static void coreSendMessage(String user, Object context) {
MESSAGE_LOGGER.sendMessage(getCaller(user), context);
static void coreSendMessage(String user, String messageToString, Object context) {
MESSAGE_LOGGER.logCoreSendMessage(getCaller(user), messageToString, context);
}
@LogMessage(level = Logger.Level.INFO)
@Message(id = 601500, value = "User {0} is sending a core message with Context: {1}", format = Message.Format.MESSAGE_FORMAT)
void sendMessage(String user, Object context);
@Message(id = 601500, value = "User {0} is sending a message {1}, with Context: {2}", format = Message.Format.MESSAGE_FORMAT)
void logCoreSendMessage(String user, String messageToString, Object context);
//hot path log using a different logger
static void coreConsumeMessage(String queue) {

View File

@ -1638,15 +1638,14 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
@Override
public String toString() {
/* return "AMQPMessage [durable=" + isDurable() +
return "AMQPMessage [durable=" + isDurable() +
", messageID=" + getMessageID() +
", address=" + getAddress() +
", size=" + getEncodeSize() +
", applicationProperties=" + applicationProperties +
", applicationProperties=" + getApplicationPropertiesMap(false) +
", properties=" + properties +
", extraProperties = " + getExtraProperties() +
"]"; */
return super.toString();
"]";
}
@Override

View File

@ -1744,7 +1744,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
boolean noAutoCreateQueue,
RoutingContext routingContext) throws Exception {
if (AuditLogger.isMessageEnabled()) {
AuditLogger.coreSendMessage(getUsername(), routingContext);
AuditLogger.coreSendMessage(getUsername(), messageParameter.toString(), routingContext);
}
final Message message = LargeServerMessageImpl.checkLargeMessage(messageParameter, storageManager);

View File

@ -21,8 +21,6 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
@ -33,10 +31,17 @@ import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.MBeanServerConnection;
import javax.management.MBeanServerInvocationHandler;
import javax.management.remote.JMXConnector;
@ -131,7 +136,7 @@ public class AuditLoggerTest extends SmokeTestBase {
checkAuditLogRecord(true, "gets security check failure:", "guest does not have permission='DELETE_NON_DURABLE_QUEUE'");
//hot patch not in log
checkAuditLogRecord(true, "is sending a core message");
checkAuditLogRecord(true, "is sending a message");
}
protected JMXConnector getJmxConnector() throws MalformedURLException {
@ -159,7 +164,16 @@ public class AuditLoggerTest extends SmokeTestBase {
}
@Test
public void testAuditHotLog() throws Exception {
public void testAuditHotLogCore() throws Exception {
internalSend("CORE");
}
@Test
public void testAuditHotLogAMQP() throws Exception {
internalSend("AMQP");
}
public void internalSend(String protocol) throws Exception {
JMXConnector jmxConnector = getJmxConnector();
MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection();
String brokerName = "0.0.0.0"; // configured e.g. in broker.xml <broker-name> element
@ -172,25 +186,41 @@ public class AuditLoggerTest extends SmokeTestBase {
Assert.assertEquals(0, addressControl.getQueueNames().length);
session.createQueue(new QueueConfiguration(address).setRoutingType(RoutingType.ANYCAST));
Assert.assertEquals(1, addressControl.getQueueNames().length);
String uniqueStr = Base64.encodeBytes(UUID.randomUUID().toString().getBytes());
String uniqueStr = RandomUtil.randomString();
ClientProducer producer = session.createProducer(address);
producer.send(session.createMessage(true));
producer.send(session.createMessage(true));
// addressControl.sendMessage(null, Message.BYTES_TYPE, uniqueStr, false, null, null);
session.close();
Wait.waitFor(() -> addressControl.getMessageCount() == 2);
Assert.assertEquals(2, addressControl.getMessageCount());
ConnectionFactory factory = createConnectionFactory(protocol, "tcp://localhost:61616");
Connection connection = factory.createConnection();
try {
Session session = connection.createSession();
MessageProducer producer = session.createProducer(session.createQueue(address.toString()));
TextMessage message = session.createTextMessage("msg1");
message.setStringProperty("str", uniqueStr);
producer.send(message);
checkAuditLogRecord(true, "sending a core message");
message = session.createTextMessage("msg2");
message.setStringProperty("str", "Hello2");
producer.send(message);
// addressControl.sendMessage(null, Message.BYTES_TYPE, uniqueStr, false, null, null);
ClientConsumer consumer = session.createConsumer(address);
session.start();
ClientMessage clientMessage = consumer.receiveImmediate();
Assert.assertNotNull(clientMessage);
clientMessage = consumer.receiveImmediate();
Assert.assertNotNull(clientMessage);
checkAuditLogRecord(true, "is consuming a message from");
Wait.waitFor(() -> addressControl.getMessageCount() == 2);
Assert.assertEquals(2, addressControl.getMessageCount());
checkAuditLogRecord(true, "sending a message");
checkAuditLogRecord(true, uniqueStr);
checkAuditLogRecord(true, "Hello2");
connection.start();
MessageConsumer consumer = session.createConsumer(session.createQueue(address.toString()));
javax.jms.Message clientMessage = consumer.receive(5000);
Assert.assertNotNull(clientMessage);
clientMessage = consumer.receive(5000);
Assert.assertNotNull(clientMessage);
checkAuditLogRecord(true, "is consuming a message from");
} finally {
connection.close();
}
}
//check the audit log has a line that contains all the values
@ -223,4 +253,21 @@ public class AuditLoggerTest extends SmokeTestBase {
}
}
}
public static ConnectionFactory createConnectionFactory(String protocol, String uri) {
if (protocol.toUpperCase().equals("OPENWIRE")) {
return new org.apache.activemq.ActiveMQConnectionFactory(uri);
} else if (protocol.toUpperCase().equals("AMQP")) {
if (uri.startsWith("tcp://")) {
// replacing tcp:// by amqp://
uri = "amqp" + uri.substring(3);
}
return new JmsConnectionFactory(uri);
} else if (protocol.toUpperCase().equals("CORE") || protocol.toUpperCase().equals("ARTEMIS")) {
return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory(uri);
} else {
throw new IllegalStateException("Unkown:" + protocol);
}
}
}