From 688b894c6205058c24d225b340aea1d8a5599902 Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Fri, 15 Jul 2022 23:54:40 -0500 Subject: [PATCH] ARTEMIS-3896 clarify logging for transactional ops Both audit logging and logging from the LoggingActiveMQServerPlugin are unclear as they relate to transactional sends and acks. Both essentially ignore the transaction which makes it appear that an operation has taken place when, in fact, it hasn't (e.g. a transactional ack is rolled back but the log indicates the ack went through). This commit fix this with the following changes: - Log details when a send or ack is added to a transaction. - Log details when the transaction is committed. - Log when the transaction is rolled back. - Include transaction details in the relevant DEBUG logs. - Simplify INFO level logging for sends & acks in LoggingActiveMQServerPlugin. Ensure details are in the DEBUG logs. Other changes: - Make capitalization more consistent in a handful of audit logs. --- .../activemq/artemis/logs/AuditLogger.java | 71 +++++++++----- .../artemis/core/server/impl/QueueImpl.java | 30 +++++- .../core/server/impl/ServerSessionImpl.java | 28 +++++- .../plugin/ActiveMQServerMessagePlugin.java | 17 ++++ .../impl/LoggingActiveMQServerPlugin.java | 98 +++++++++++++------ .../LoggingActiveMQServerPluginLogger.java | 27 ++--- 6 files changed, 198 insertions(+), 73 deletions(-) diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java index d654e9f813..5633997375 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java @@ -2420,13 +2420,13 @@ public interface AuditLogger extends BasicLogger { * * */ //hot path log using a different logger - static void coreSendMessage(Subject user, String remoteAddress, String messageToString, Object context) { - MESSAGE_LOGGER.logCoreSendMessage(getCaller(user, remoteAddress), messageToString, context); + static void coreSendMessage(Subject user, String remoteAddress, String messageToString, Object context, String tx) { + MESSAGE_LOGGER.coreSendMessage(getCaller(user, remoteAddress), messageToString, context, tx); } @LogMessage(level = Logger.Level.INFO) - @Message(id = 601500, value = "User {0} is sending a message {1}, with Context: {2}", format = Message.Format.MESSAGE_FORMAT) - void logCoreSendMessage(String user, String messageToString, Object context); + @Message(id = 601500, value = "User {0} sent a message {1}, context: {2}, transaction: {3}", format = Message.Format.MESSAGE_FORMAT) + void coreSendMessage(String user, String messageToString, Object context, String tx); //hot path log using a different logger static void coreConsumeMessage(Subject user, String remoteAddress, String queue, String message) { @@ -2438,13 +2438,13 @@ public interface AuditLogger extends BasicLogger { void consumeMessage(String user, String address, String message); //hot path log using a different logger - static void coreAcknowledgeMessage(Subject user, String remoteAddress, String queue, String message) { - MESSAGE_LOGGER.acknowledgeMessage(getCaller(user, remoteAddress), queue, message); + static void coreAcknowledgeMessage(Subject user, String remoteAddress, String queue, String message, String tx) { + MESSAGE_LOGGER.coreAcknowledgeMessage(getCaller(user, remoteAddress), queue, message, tx); } @LogMessage(level = Logger.Level.INFO) - @Message(id = 601502, value = "User {0} is acknowledging a message from {1}: {2}", format = Message.Format.MESSAGE_FORMAT) - void acknowledgeMessage(String user, String queue, String message); + @Message(id = 601502, value = "User {0} acknowledged message from {1}: {2}, transaction: {3}", format = Message.Format.MESSAGE_FORMAT) + void coreAcknowledgeMessage(String user, String queue, String message, String tx); /* * This logger is focused on user interaction from the console or thru resource specific functions in the management layer/JMX @@ -2455,7 +2455,7 @@ public interface AuditLogger extends BasicLogger { } @LogMessage(level = Logger.Level.INFO) - @Message(id = 601701, value = "User {0} successfully created Address: {1} with routing types {2}", format = Message.Format.MESSAGE_FORMAT) + @Message(id = 601701, value = "User {0} successfully created address: {1} with routing types {2}", format = Message.Format.MESSAGE_FORMAT) void createAddressSuccess(String user, String name, String routingTypes); static void createAddressFailure(String name, String routingTypes) { @@ -2463,7 +2463,7 @@ public interface AuditLogger extends BasicLogger { } @LogMessage(level = Logger.Level.INFO) - @Message(id = 601702, value = "User {0} failed to created Address: {1} with routing types {2}", format = Message.Format.MESSAGE_FORMAT) + @Message(id = 601702, value = "User {0} failed to created address: {1} with routing types {2}", format = Message.Format.MESSAGE_FORMAT) void createAddressFailure(String user, String name, String routingTypes); static void updateAddressSuccess(String name, String routingTypes) { @@ -2471,7 +2471,7 @@ public interface AuditLogger extends BasicLogger { } @LogMessage(level = Logger.Level.INFO) - @Message(id = 601703, value = "User {0} successfully updated Address: {1} with routing types {2}", format = Message.Format.MESSAGE_FORMAT) + @Message(id = 601703, value = "User {0} successfully updated address: {1} with routing types {2}", format = Message.Format.MESSAGE_FORMAT) void updateAddressSuccess(String user, String name, String routingTypes); static void updateAddressFailure(String name, String routingTypes) { @@ -2479,7 +2479,7 @@ public interface AuditLogger extends BasicLogger { } @LogMessage(level = Logger.Level.INFO) - @Message(id = 601704, value = "User {0} successfully updated Address: {1} with routing types {2}", format = Message.Format.MESSAGE_FORMAT) + @Message(id = 601704, value = "User {0} successfully updated address: {1} with routing types {2}", format = Message.Format.MESSAGE_FORMAT) void updateAddressFailure(String user, String name, String routingTypes); static void deleteAddressSuccess(String name) { @@ -2487,7 +2487,7 @@ public interface AuditLogger extends BasicLogger { } @LogMessage(level = Logger.Level.INFO) - @Message(id = 601705, value = "User {0} successfully deleted Address: {1}", format = Message.Format.MESSAGE_FORMAT) + @Message(id = 601705, value = "User {0} successfully deleted address: {1}", format = Message.Format.MESSAGE_FORMAT) void deleteAddressSuccess(String user, String name); @@ -2496,7 +2496,7 @@ public interface AuditLogger extends BasicLogger { } @LogMessage(level = Logger.Level.INFO) - @Message(id = 601706, value = "User {0} failed to deleted Address: {1}", format = Message.Format.MESSAGE_FORMAT) + @Message(id = 601706, value = "User {0} failed to deleted address: {1}", format = Message.Format.MESSAGE_FORMAT) void deleteAddressFailure(String user, String name); static void createQueueSuccess(String name, String address, String routingType) { @@ -2504,7 +2504,7 @@ public interface AuditLogger extends BasicLogger { } @LogMessage(level = Logger.Level.INFO) - @Message(id = 601707, value = "User {0} successfully created Queue: {1} on Address: {2} with routing type {3}", format = Message.Format.MESSAGE_FORMAT) + @Message(id = 601707, value = "User {0} successfully created queue: {1} on address: {2} with routing type {3}", format = Message.Format.MESSAGE_FORMAT) void createQueueSuccess(String user, String name, String address, String routingType); static void createQueueFailure(String name, String address, String routingType) { @@ -2512,7 +2512,7 @@ public interface AuditLogger extends BasicLogger { } @LogMessage(level = Logger.Level.INFO) - @Message(id = 601708, value = "User {0} failed to create Queue: {1} on Address: {2} with routing type {3}", format = Message.Format.MESSAGE_FORMAT) + @Message(id = 601708, value = "User {0} failed to create queue: {1} on address: {2} with routing type {3}", format = Message.Format.MESSAGE_FORMAT) void createQueueFailure(String user, String name, String address, String routingType); static void updateQueueSuccess(String name, String routingType) { @@ -2520,7 +2520,7 @@ public interface AuditLogger extends BasicLogger { } @LogMessage(level = Logger.Level.INFO) - @Message(id = 601709, value = "User {0} successfully updated Queue: {1} with routing type {2}", format = Message.Format.MESSAGE_FORMAT) + @Message(id = 601709, value = "User {0} successfully updated queue: {1} with routing type {2}", format = Message.Format.MESSAGE_FORMAT) void updateQueueSuccess(String user, String name, String routingType); static void updateQueueFailure(String name, String routingType) { @@ -2528,7 +2528,7 @@ public interface AuditLogger extends BasicLogger { } @LogMessage(level = Logger.Level.INFO) - @Message(id = 601710, value = "User {0} failed to update Queue: {1} with routing type {2}", format = Message.Format.MESSAGE_FORMAT) + @Message(id = 601710, value = "User {0} failed to update queue: {1} with routing type {2}", format = Message.Format.MESSAGE_FORMAT) void updateQueueFailure(String user, String name, String routingType); @@ -2537,7 +2537,7 @@ public interface AuditLogger extends BasicLogger { } @LogMessage(level = Logger.Level.INFO) - @Message(id = 601711, value = "User {0} successfully deleted Queue: {1}", format = Message.Format.MESSAGE_FORMAT) + @Message(id = 601711, value = "User {0} successfully deleted queue: {1}", format = Message.Format.MESSAGE_FORMAT) void destroyQueueSuccess(String user, String name); static void destroyQueueFailure(String name) { @@ -2545,7 +2545,7 @@ public interface AuditLogger extends BasicLogger { } @LogMessage(level = Logger.Level.INFO) - @Message(id = 601712, value = "User {0} failed to delete Queue: {1}", format = Message.Format.MESSAGE_FORMAT) + @Message(id = 601712, value = "User {0} failed to delete queue: {1}", format = Message.Format.MESSAGE_FORMAT) void destroyQueueFailure(String user, String name); static void removeMessagesSuccess(int removed, String queue) { @@ -2553,7 +2553,7 @@ public interface AuditLogger extends BasicLogger { } @LogMessage(level = Logger.Level.INFO) - @Message(id = 601713, value = "User {0} has removed {1} messages from Queue: {2}", format = Message.Format.MESSAGE_FORMAT) + @Message(id = 601713, value = "User {0} has removed {1} messages from queue: {2}", format = Message.Format.MESSAGE_FORMAT) void removeMessagesSuccess(String user, int removed, String queue); static void removeMessagesFailure(String queue) { @@ -2561,7 +2561,7 @@ public interface AuditLogger extends BasicLogger { } @LogMessage(level = Logger.Level.INFO) - @Message(id = 601714, value = "User {0} failed to remove messages from Queue: {1}", format = Message.Format.MESSAGE_FORMAT) + @Message(id = 601714, value = "User {0} failed to remove messages from queue: {1}", format = Message.Format.MESSAGE_FORMAT) void removeMessagesFailure(String user, String queue); static void userSuccesfullyAuthenticatedInAudit(Subject subject, String remoteAddress) { @@ -2933,4 +2933,31 @@ public interface AuditLogger extends BasicLogger { @LogMessage(level = Logger.Level.INFO) @Message(id = 601758, value = "User {0} is calling schedulePageCleanup on address: {1}", format = Message.Format.MESSAGE_FORMAT) void schedulePageCleanup(String user, Object address); + + //hot path log using a different logger + static void addAckToTransaction(Subject user, String remoteAddress, String queue, String message, String tx) { + MESSAGE_LOGGER.addAckToTransaction(getCaller(user, remoteAddress), queue, message, tx); + } + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 601759, value = "User {0} added acknowledgement of a message from {1}: {2} to transaction: {3}", format = Message.Format.MESSAGE_FORMAT) + void addAckToTransaction(String user, String queue, String message, String tx); + + //hot path log using a different logger + static void addSendToTransaction(Subject user, String remoteAddress, String messageToString, String tx) { + MESSAGE_LOGGER.addSendToTransaction(getCaller(user, remoteAddress), messageToString, tx); + } + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 601760, value = "User {0} added a message send for: {1} to transaction: {2}", format = Message.Format.MESSAGE_FORMAT) + void addSendToTransaction(String user, String messageToString, String tx); + + //hot path log using a different logger + static void rolledBackTransaction(Subject user, String remoteAddress, String tx, String resource) { + MESSAGE_LOGGER.rolledBackTransaction(getCaller(user, remoteAddress), tx, resource); + } + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 601761, value = "User {0} rolled back transaction {1} involving {2}", format = Message.Format.MESSAGE_FORMAT) + void rolledBackTransaction(String user, String tx, String resource); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 8afa15f363..767db5b1f2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.server.impl; +import javax.security.auth.Subject; import java.io.PrintWriter; import java.io.StringWriter; import java.math.BigDecimal; @@ -1882,19 +1883,38 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } if (AuditLogger.isMessageLoggingEnabled()) { - ServerSession session = null; // it's possible for the consumer to be null (e.g. acking the message administratively) - if (consumer != null) { - session = server.getSessionByID(consumer.getSessionID()); + final ServerSession session = consumer != null ? server.getSessionByID(consumer.getSessionID()) : null; + final Subject subject = session == null ? null : session.getRemotingConnection().getAuditSubject(); + final String remoteAddress = session == null ? null : session.getRemotingConnection().getRemoteAddress(); + + if (transactional) { + AuditLogger.addAckToTransaction(subject, remoteAddress, getName().toString(), ref.getMessage().toString(), tx.toString()); + tx.addOperation(new TransactionOperationAbstract() { + @Override + public void afterCommit(Transaction tx) { + auditLogAck(subject, remoteAddress, ref, tx); + } + + @Override + public void afterRollback(Transaction tx) { + AuditLogger.rolledBackTransaction(subject, remoteAddress, tx.toString(), ref.toString()); + } + }); + } else { + auditLogAck(subject, remoteAddress, ref, tx); } - AuditLogger.coreAcknowledgeMessage(session == null ? null : session.getRemotingConnection().getAuditSubject(), session == null ? null : session.getRemotingConnection().getRemoteAddress(), getName().toString(), ref.getMessage().toString()); } if (server != null && server.hasBrokerMessagePlugins()) { - server.callBrokerMessagePlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer)); + server.callBrokerMessagePlugins(plugin -> plugin.messageAcknowledged(tx, ref, reason, consumer)); } } } + private void auditLogAck(Subject subject, String remoteAddress, MessageReference ref, Transaction tx) { + AuditLogger.coreAcknowledgeMessage(subject, remoteAddress, getName().toString(), ref.getMessage().toString(), tx == null ? null : tx.toString()); + } + @Override public void reacknowledge(final Transaction tx, final MessageReference ref) throws Exception { Message message = ref.getMessage(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 2aa1a86d3e..eefb0c2c8f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -1812,10 +1812,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener { message.setMessageID(id); } - if (AuditLogger.isMessageLoggingEnabled()) { - AuditLogger.coreSendMessage(remotingConnection.getAuditSubject(), remotingConnection.getRemoteAddress(), message.toString(), routingContext); - } - SimpleString address = message.getAddressSimpleString(); if (defaultAddress == null && address != null) { @@ -1844,6 +1840,25 @@ public class ServerSessionImpl implements ServerSession, FailureListener { result = doSend(tx, message, address, direct, noAutoCreateQueue, routingContext); } + if (AuditLogger.isMessageLoggingEnabled()) { + if (tx != null && !autoCommitSends) { + AuditLogger.addSendToTransaction(remotingConnection.getAuditSubject(), remotingConnection.getRemoteAddress(), message.toString(), tx.toString()); + tx.addOperation(new TransactionOperationAbstract() { + @Override + public void afterCommit(Transaction tx) { + auditLogSend(message, tx); + } + + @Override + public void afterRollback(Transaction tx) { + AuditLogger.rolledBackTransaction(remotingConnection.getAuditSubject(), remotingConnection.getRemoteAddress(), tx.toString(), message.toString()); + } + }); + } else { + auditLogSend(message, null); + } + } + } catch (Exception e) { if (server.hasBrokerMessagePlugins()) { server.callBrokerMessagePlugins(plugin -> plugin.onSendException(this, tx, message, direct, noAutoCreateQueue, e)); @@ -1851,11 +1866,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener { throw e; } if (server.hasBrokerMessagePlugins()) { - server.callBrokerMessagePlugins(plugin -> plugin.afterSend(this, tx, message, direct, noAutoCreateQueue, result)); + server.callBrokerMessagePlugins(plugin -> plugin.afterSend(this, autoCommitSends ? null : tx, message, direct, noAutoCreateQueue, result)); } return result; } + private void auditLogSend(Message message, Transaction tx) { + AuditLogger.coreSendMessage(remotingConnection.getAuditSubject(), remotingConnection.getRemoteAddress(), message.toString(), routingContext, tx == null ? null : tx.toString()); + } @Override public void requestProducerCredits(SimpleString address, final int credits) throws Exception { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java index a211b338c7..06e7378391 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java @@ -270,8 +270,25 @@ public interface ActiveMQServerMessagePlugin extends ActiveMQServerBasePlugin { * @throws ActiveMQException * */ + @Deprecated default void messageAcknowledged(MessageReference ref, AckReason reason, ServerConsumer consumer) throws ActiveMQException { //by default call the old method for backwards compatibility this.messageAcknowledged(ref, reason); } + + /** + * A message has been acknowledged + * + * @param tx The transaction associated with the ack + * @param ref The acked message + * @param reason The ack reason + * @param consumer the Consumer that acknowledged the message - this field is optional + * and can be null + * @throws ActiveMQException + * + */ + default void messageAcknowledged(Transaction tx, MessageReference ref, AckReason reason, ServerConsumer consumer) throws ActiveMQException { + //by default call the old method for backwards compatibility + this.messageAcknowledged(ref, reason, consumer); + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java index 6455368f47..94dbe6ce8a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java @@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.server.cluster.Bridge; import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.utils.critical.CriticalComponent; @@ -455,7 +456,7 @@ public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serial /** * After a message is sent * - * @param session the session that sends the message + * @param session * @param tx * @param message * @param direct @@ -470,26 +471,46 @@ public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serial boolean direct, boolean noAutoCreateQueue, RoutingStatus result) throws ActiveMQException { - - if (logAll || logSendingEvents) { - + if (logAll || logDeliveringEvents) { if (LoggingActiveMQServerPluginLogger.LOGGER.isDebugEnabled()) { - //details - debug level - LoggingActiveMQServerPluginLogger.LOGGER.afterSendDetails((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())), - message, (session == null ? UNAVAILABLE : session.getName()), - tx, session, direct, noAutoCreateQueue); + LoggingActiveMQServerPluginLogger.LOGGER.afterSendDetails(message, + result.toString(), + tx, + (session == null ? UNAVAILABLE : session.getName()), + (session == null ? UNAVAILABLE : session.getConnectionID().toString()), + direct, + noAutoCreateQueue); } + if (tx != null) { + tx.addOperation(new TransactionOperationAbstract() { + @Override + public void afterCommit(Transaction tx) { + logSend(tx, message, result); + } - if (LoggingActiveMQServerPluginLogger.LOGGER.isInfoEnabled()) { - //info level log - LoggingActiveMQServerPluginLogger.LOGGER.afterSend((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())), - (session == null ? UNAVAILABLE : session.getName()), - (session == null ? UNAVAILABLE : session.getConnectionID().toString()), - result); + @Override + public void afterRollback(Transaction tx) { + if (LoggingActiveMQServerPluginLogger.LOGGER.isDebugEnabled()) { + LoggingActiveMQServerPluginLogger.LOGGER.rolledBackTransaction(tx, message.toString()); + } + } + }); + } else { + logSend(tx, message, result); } } } + private void logSend(Transaction tx, + Message message, + RoutingStatus result) { + if (LoggingActiveMQServerPluginLogger.LOGGER.isInfoEnabled()) { + LoggingActiveMQServerPluginLogger.LOGGER.afterSend((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())), + result, + (tx == null ? UNAVAILABLE : tx.toString())); + } + } + @Override public void onSendException(ServerSession session, Transaction tx, @@ -652,28 +673,49 @@ public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serial /** * A message has been acknowledged * - * @param ref The acked message - * @param reason The ack reason + * @param tx The transaction for the ack + * @param ref The acked message + * @param reason The ack reason + * @param consumer The consumer acking the ref * @throws ActiveMQException */ @Override - public void messageAcknowledged(MessageReference ref, AckReason reason, ServerConsumer consumer) throws ActiveMQException { + public void messageAcknowledged(final Transaction tx, final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws ActiveMQException { if (logAll || logDeliveringEvents) { - - //details - debug logging - LoggingActiveMQServerPluginLogger.LOGGER.messageAcknowledgedDetails(ref, reason); - - if (LoggingActiveMQServerPluginLogger.LOGGER.isInfoEnabled()) { + if (LoggingActiveMQServerPluginLogger.LOGGER.isDebugEnabled()) { Message message = (ref == null ? null : ref.getMessage()); Queue queue = (ref == null ? null : ref.getQueue()); - // info level logging - LoggingActiveMQServerPluginLogger.LOGGER.messageAcknowledged((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())), - (consumer == null ? UNAVAILABLE : consumer.getSessionID() != null ? consumer.getSessionID() : null), - (consumer == null ? UNAVAILABLE : Long.toString(consumer.getID())), - (queue == null ? UNAVAILABLE : queue.getName().toString()), - reason); + LoggingActiveMQServerPluginLogger.LOGGER.messageAcknowledgedDetails((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())), + (consumer == null ? UNAVAILABLE : consumer.getSessionID() != null ? consumer.getSessionID() : null), + (consumer == null ? UNAVAILABLE : Long.toString(consumer.getID())), + (queue == null ? UNAVAILABLE : queue.getName().toString()), + (tx == null ? UNAVAILABLE : tx.toString()), + reason); } + if (tx != null) { + tx.addOperation(new TransactionOperationAbstract() { + @Override + public void afterCommit(Transaction tx) { + logAck(tx, ref); + } + + @Override + public void afterRollback(Transaction tx) { + if (LoggingActiveMQServerPluginLogger.LOGGER.isDebugEnabled()) { + LoggingActiveMQServerPluginLogger.LOGGER.rolledBackTransaction(tx, ref.toString()); + } + } + }); + } else { + logAck(tx, ref); + } + } + } + + private void logAck(Transaction tx, MessageReference ref) { + if (LoggingActiveMQServerPluginLogger.LOGGER.isInfoEnabled()) { + LoggingActiveMQServerPluginLogger.LOGGER.messageAcknowledged(ref, tx); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java index 40b521fc1e..38ada8ed2e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java @@ -104,8 +104,8 @@ public interface LoggingActiveMQServerPluginLogger extends BasicLogger { boolean autoDeleteAddress); @LogMessage(level = Logger.Level.INFO) - @Message(id = 841009, value = "sent message with ID: {0}, session name: {1}, session connectionID: {2}, result: {3}", format = Message.Format.MESSAGE_FORMAT) - void afterSend(String messageID, String sessionName, String sessionConnectionID, RoutingStatus result); + @Message(id = 841009, value = "sent message with ID: {0}, result: {1}, transaction: {2}", format = Message.Format.MESSAGE_FORMAT) + void afterSend(String messageID, RoutingStatus result, String tx); @LogMessage(level = Logger.Level.INFO) @Message(id = 841010, value = "routed message with ID: {0}, result: {1}", format = Message.Format.MESSAGE_FORMAT) @@ -129,9 +129,8 @@ public interface LoggingActiveMQServerPluginLogger extends BasicLogger { void messageExpired(MessageReference message, SimpleString messageExpiryAddress); @LogMessage(level = Logger.Level.INFO) - @Message(id = 841014, value = "acknowledged message ID: {0}, messageRef sessionID: {1}, with messageRef consumerID: {2}, messageRef QueueName: {3}," + - " with ackReason: {4}", format = Message.Format.MESSAGE_FORMAT) - void messageAcknowledged(String messageID, String sessionID, String consumerID, String queueName, AckReason reason); + @Message(id = 841014, value = "messageAcknowledged: {0}, with transaction: {2}", format = Message.Format.MESSAGE_FORMAT) + void messageAcknowledged(MessageReference ref, Transaction tx); @LogMessage(level = Logger.Level.INFO) @Message(id = 841015, value = "deployed bridge: {0}", format = Message.Format.MESSAGE_FORMAT) @@ -218,13 +217,12 @@ public interface LoggingActiveMQServerPluginLogger extends BasicLogger { boolean noAutoCreateQueue); @LogMessage(level = Logger.Level.DEBUG) - @Message(id = 843009, value = "message ID: {0}, message {1}, session name: {2} with tx: {3}, session: {4}, direct: {5}," + - " noAutoCreateQueue: {6}", format = Message.Format.MESSAGE_FORMAT) - void afterSendDetails(String messageID, - org.apache.activemq.artemis.api.core.Message message, - String sessionName, + @Message(id = 843009, value = "afterSend message: {0}, result: {1}, transaction: {2}, session: {3}, connection: {4}, direct: {5}, noAutoCreateQueue: {6}", format = Message.Format.MESSAGE_FORMAT) + void afterSendDetails(org.apache.activemq.artemis.api.core.Message message, + String result, Transaction tx, - ServerSession session, + String sessionName, + String connectionID, boolean direct, boolean noAutoCreateQueue); @@ -260,8 +258,8 @@ public interface LoggingActiveMQServerPluginLogger extends BasicLogger { ServerConsumer consumer); @LogMessage(level = Logger.Level.DEBUG) - @Message(id = 843014, value = "acknowledged message: {0}, with ackReason: {1}", format = Message.Format.MESSAGE_FORMAT) - void messageAcknowledgedDetails(MessageReference ref, AckReason reason); + @Message(id = 843014, value = "messageAcknowledged ID: {0}, sessionID: {1}, consumerID: {2}, queue: {3}, transaction: {4}, ackReason: {5}", format = Message.Format.MESSAGE_FORMAT) + void messageAcknowledgedDetails(String messageID, String sessionID, String consumerID, String queueName, String tx, AckReason reason); @LogMessage(level = Logger.Level.DEBUG) @Message(id = 843015, value = "beforeDeployBridge called with bridgeConfiguration: {0}", format = Message.Format.MESSAGE_FORMAT) @@ -286,4 +284,7 @@ public interface LoggingActiveMQServerPluginLogger extends BasicLogger { boolean direct, boolean rejectDuplicates); + @LogMessage(level = Logger.Level.DEBUG) + @Message(id = 843020, value = "rolled back transaction {0} involving {1}", format = Message.Format.MESSAGE_FORMAT) + void rolledBackTransaction(Transaction tx, String resource); }