ARTEMIS-3400 add audit logging for message ack

Aside from adding audit logging for message acknowledgement this commit
also consolidates the two nearly identical acknowledge method
implementations in o.a.a.a.c.s.i.QueueImpl. This avoids duplicating
code for audit logging, plugin invocation, etc. There is no semantic
change.
This commit is contained in:
Justin Bertram 2021-07-23 12:54:01 -05:00 committed by Clebert Suconic
parent 2954829e3e
commit f554806ec3
4 changed files with 51 additions and 39 deletions

View File

@ -2437,6 +2437,15 @@ public interface AuditLogger extends BasicLogger {
@Message(id = 601501, value = "User {0} is consuming a message from {1}: {2}", format = Message.Format.MESSAGE_FORMAT)
void consumeMessage(String user, String address, String message);
//hot path log using a different logger
static void coreAcknowledgeMessage(Subject user, String remoteAddress, String queue, String message) {
MESSAGE_LOGGER.acknowledgeMessage(getCaller(user, remoteAddress), queue, message);
}
@LogMessage(level = Logger.Level.INFO)
@Message(id = 601502, value = "User {0} is acknowledging a message from {1}: {2}", format = Message.Format.MESSAGE_FORMAT)
void acknowledgeMessage(String user, String queue, String message);
/*
* This logger is focused on user interaction from the console or thru resource specific functions in the management layer/JMX
* */

View File

@ -101,6 +101,7 @@ import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.core.transaction.impl.BindingsTransactionImpl;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.logs.AuditLogger;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.ArtemisCloseable;
import org.apache.activemq.artemis.utils.BooleanUtil;
@ -1825,32 +1826,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public void acknowledge(final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception {
if (nonDestructive && reason == AckReason.NORMAL) {
decDelivering(ref);
if (logger.isDebugEnabled()) {
logger.debug("acknowledge ignored nonDestructive=true and reason=NORMAL");
}
} else {
if (ref.isPaged()) {
pageSubscription.ack((PagedReference) ref);
postAcknowledge(ref, reason);
} else {
Message message = ref.getMessage();
boolean durableRef = message.isDurable() && isDurable();
if (durableRef) {
storageManager.storeAcknowledge(id, message.getMessageID());
}
postAcknowledge(ref, reason);
}
ackAttempts.incrementAndGet();
if (server != null && server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer));
}
}
acknowledge(null, ref, reason, consumer);
}
@Override
@ -1860,34 +1836,59 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public void acknowledge(final Transaction tx, final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception {
RefsOperation refsOperation = getRefsOperation(tx, reason);
boolean transactional = tx != null;
RefsOperation refsOperation = null;
if (transactional) {
refsOperation = getRefsOperation(tx, reason);
}
if (nonDestructive && reason == AckReason.NORMAL) {
refsOperation.addOnlyRefAck(ref);
if (transactional) {
refsOperation.addOnlyRefAck(ref);
} else {
decDelivering(ref);
}
if (logger.isDebugEnabled()) {
logger.debug("acknowledge tx ignored nonDestructive=true and reason=NORMAL");
}
} else {
if (ref.isPaged()) {
pageSubscription.ackTx(tx, (PagedReference) ref);
refsOperation.addAck(ref);
if (transactional) {
pageSubscription.ackTx(tx, (PagedReference) ref);
refsOperation.addAck(ref);
} else {
pageSubscription.ack((PagedReference) ref);
postAcknowledge(ref, reason);
}
} else {
Message message = ref.getMessage();
boolean durableRef = message.isDurable() && isDurable();
if (durableRef) {
storageManager.storeAcknowledgeTransactional(tx.getID(), id, message.getMessageID());
tx.setContainsPersistent();
if (transactional) {
storageManager.storeAcknowledgeTransactional(tx.getID(), id, message.getMessageID());
tx.setContainsPersistent();
} else {
storageManager.storeAcknowledge(id, message.getMessageID());
}
}
if (transactional) {
ackAttempts.incrementAndGet();
refsOperation.addAck(ref);
} else {
postAcknowledge(ref, reason);
}
ackAttempts.incrementAndGet();
refsOperation.addAck(ref);
}
if (!transactional) {
ackAttempts.incrementAndGet();
}
if (AuditLogger.isMessageLoggingEnabled()) {
ServerSession session = server.getSessionByID(consumer.getSessionID());
AuditLogger.coreAcknowledgeMessage(session.getRemotingConnection().getAuditSubject(), session.getRemotingConnection().getRemoteAddress(), getName().toString(), ref.getMessage().toString());
}
if (server != null && server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer));
}

View File

@ -85,5 +85,6 @@ public class AuditLoggerAMQPMutualSSLTest extends AuditLoggerTestBase {
checkAuditLogRecord(true, "AMQ601500: User myUser(producers)@", "is sending a message AMQPStandardMessage");
checkAuditLogRecord(true, "AMQ601265: User myUser(producers)@", "is creating a core consumer");
checkAuditLogRecord(true, "AMQ601501: User myUser(producers)@", "is consuming a message from exampleQueue");
checkAuditLogRecord(true, "AMQ601502: User myUser(producers)@", "is acknowledging a message from exampleQueue: AMQPStandardMessage");
}
}

View File

@ -171,9 +171,10 @@ public class AuditLoggerTest extends AuditLoggerTestBase {
Assert.assertNotNull(clientMessage);
clientMessage = consumer.receive(5000);
Assert.assertNotNull(clientMessage);
checkAuditLogRecord(true, "is consuming a message from");
} finally {
connection.close();
}
checkAuditLogRecord(true, "is consuming a message from");
checkAuditLogRecord(true, "is acknowledging a message from");
}
}