From f554806ec315cb8b4d0844b12674a825e08d44a3 Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Fri, 23 Jul 2021 12:54:01 -0500 Subject: [PATCH] ARTEMIS-3400 add audit logging for message ack Aside from adding audit logging for message acknowledgement this commit also consolidates the two nearly identical acknowledge method implementations in o.a.a.a.c.s.i.QueueImpl. This avoids duplicating code for audit logging, plugin invocation, etc. There is no semantic change. --- .../activemq/artemis/logs/AuditLogger.java | 9 +++ .../artemis/core/server/impl/QueueImpl.java | 77 ++++++++++--------- .../logging/AuditLoggerAMQPMutualSSLTest.java | 1 + .../tests/smoke/logging/AuditLoggerTest.java | 3 +- 4 files changed, 51 insertions(+), 39 deletions(-) diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java index 9a41027262..94a879cde3 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java @@ -2437,6 +2437,15 @@ public interface AuditLogger extends BasicLogger { @Message(id = 601501, value = "User {0} is consuming a message from {1}: {2}", format = Message.Format.MESSAGE_FORMAT) void consumeMessage(String user, String address, String message); + //hot path log using a different logger + static void coreAcknowledgeMessage(Subject user, String remoteAddress, String queue, String message) { + MESSAGE_LOGGER.acknowledgeMessage(getCaller(user, remoteAddress), queue, message); + } + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 601502, value = "User {0} is acknowledging a message from {1}: {2}", format = Message.Format.MESSAGE_FORMAT) + void acknowledgeMessage(String user, String queue, String message); + /* * This logger is focused on user interaction from the console or thru resource specific functions in the management layer/JMX * */ diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 37af7e4259..300bed6a5d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -101,6 +101,7 @@ import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; import org.apache.activemq.artemis.core.transaction.impl.BindingsTransactionImpl; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; +import org.apache.activemq.artemis.logs.AuditLogger; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.utils.ArtemisCloseable; import org.apache.activemq.artemis.utils.BooleanUtil; @@ -1825,32 +1826,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public void acknowledge(final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception { - if (nonDestructive && reason == AckReason.NORMAL) { - decDelivering(ref); - if (logger.isDebugEnabled()) { - logger.debug("acknowledge ignored nonDestructive=true and reason=NORMAL"); - } - } else { - if (ref.isPaged()) { - pageSubscription.ack((PagedReference) ref); - postAcknowledge(ref, reason); - } else { - Message message = ref.getMessage(); - - boolean durableRef = message.isDurable() && isDurable(); - - if (durableRef) { - storageManager.storeAcknowledge(id, message.getMessageID()); - } - postAcknowledge(ref, reason); - } - - ackAttempts.incrementAndGet(); - - if (server != null && server.hasBrokerMessagePlugins()) { - server.callBrokerMessagePlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer)); - } - } + acknowledge(null, ref, reason, consumer); } @Override @@ -1860,34 +1836,59 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public void acknowledge(final Transaction tx, final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception { - RefsOperation refsOperation = getRefsOperation(tx, reason); + boolean transactional = tx != null; + RefsOperation refsOperation = null; + if (transactional) { + refsOperation = getRefsOperation(tx, reason); + } if (nonDestructive && reason == AckReason.NORMAL) { - refsOperation.addOnlyRefAck(ref); + if (transactional) { + refsOperation.addOnlyRefAck(ref); + } else { + decDelivering(ref); + } if (logger.isDebugEnabled()) { logger.debug("acknowledge tx ignored nonDestructive=true and reason=NORMAL"); } } else { if (ref.isPaged()) { - pageSubscription.ackTx(tx, (PagedReference) ref); - - refsOperation.addAck(ref); + if (transactional) { + pageSubscription.ackTx(tx, (PagedReference) ref); + refsOperation.addAck(ref); + } else { + pageSubscription.ack((PagedReference) ref); + postAcknowledge(ref, reason); + } } else { Message message = ref.getMessage(); boolean durableRef = message.isDurable() && isDurable(); if (durableRef) { - storageManager.storeAcknowledgeTransactional(tx.getID(), id, message.getMessageID()); - - tx.setContainsPersistent(); + if (transactional) { + storageManager.storeAcknowledgeTransactional(tx.getID(), id, message.getMessageID()); + tx.setContainsPersistent(); + } else { + storageManager.storeAcknowledge(id, message.getMessageID()); + } + } + if (transactional) { + ackAttempts.incrementAndGet(); + refsOperation.addAck(ref); + } else { + postAcknowledge(ref, reason); } - - ackAttempts.incrementAndGet(); - - refsOperation.addAck(ref); } + if (!transactional) { + ackAttempts.incrementAndGet(); + } + + if (AuditLogger.isMessageLoggingEnabled()) { + ServerSession session = server.getSessionByID(consumer.getSessionID()); + AuditLogger.coreAcknowledgeMessage(session.getRemotingConnection().getAuditSubject(), session.getRemotingConnection().getRemoteAddress(), getName().toString(), ref.getMessage().toString()); + } if (server != null && server.hasBrokerMessagePlugins()) { server.callBrokerMessagePlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer)); } diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerAMQPMutualSSLTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerAMQPMutualSSLTest.java index 64f9d195e6..f21680a0d5 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerAMQPMutualSSLTest.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerAMQPMutualSSLTest.java @@ -85,5 +85,6 @@ public class AuditLoggerAMQPMutualSSLTest extends AuditLoggerTestBase { checkAuditLogRecord(true, "AMQ601500: User myUser(producers)@", "is sending a message AMQPStandardMessage"); checkAuditLogRecord(true, "AMQ601265: User myUser(producers)@", "is creating a core consumer"); checkAuditLogRecord(true, "AMQ601501: User myUser(producers)@", "is consuming a message from exampleQueue"); + checkAuditLogRecord(true, "AMQ601502: User myUser(producers)@", "is acknowledging a message from exampleQueue: AMQPStandardMessage"); } } diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerTest.java index 4cd33b985d..8e94d86e25 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerTest.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerTest.java @@ -171,9 +171,10 @@ public class AuditLoggerTest extends AuditLoggerTestBase { Assert.assertNotNull(clientMessage); clientMessage = consumer.receive(5000); Assert.assertNotNull(clientMessage); - checkAuditLogRecord(true, "is consuming a message from"); } finally { connection.close(); } + checkAuditLogRecord(true, "is consuming a message from"); + checkAuditLogRecord(true, "is acknowledging a message from"); } }