diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 9a3e84478f..598c32b46b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -877,62 +877,69 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding logger.trace("Message after routed=" + message); } - if (context.getQueueCount() == 0) { - // Send to DLA if appropriate + try { + if (context.getQueueCount() == 0) { + // Send to DLA if appropriate - AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString()); + AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString()); - boolean sendToDLA = addressSettings.isSendToDLAOnNoRoute(); + boolean sendToDLA = addressSettings.isSendToDLAOnNoRoute(); - if (sendToDLA) { - // Send to the DLA for the address + if (sendToDLA) { + // Send to the DLA for the address - SimpleString dlaAddress = addressSettings.getDeadLetterAddress(); + SimpleString dlaAddress = addressSettings.getDeadLetterAddress(); - if (logger.isDebugEnabled()) { - logger.debug("sending message to dla address = " + dlaAddress + ", message=" + message); - } + if (logger.isDebugEnabled()) { + logger.debug("sending message to dla address = " + dlaAddress + ", message=" + message); + } - if (dlaAddress == null) { - result = RoutingStatus.NO_BINDINGS; - ActiveMQServerLogger.LOGGER.noDLA(address); + if (dlaAddress == null) { + result = RoutingStatus.NO_BINDINGS; + ActiveMQServerLogger.LOGGER.noDLA(address); + } else { + message.referenceOriginalMessage(message, null); + + message.setAddress(dlaAddress); + + message.reencode(); + + route(message, context.getTransaction(), false); + result = RoutingStatus.NO_BINDINGS_DLA; + } } else { - message.referenceOriginalMessage(message, null); + result = RoutingStatus.NO_BINDINGS; - message.setAddress(dlaAddress); + if (logger.isDebugEnabled()) { + logger.debug("Message " + message + " is not going anywhere as it didn't have a binding on address:" + address); + } - message.reencode(); - - route(message, context.getTransaction(), false); - result = RoutingStatus.NO_BINDINGS_DLA; + if (message.isLargeMessage()) { + ((LargeServerMessage) message).deleteFile(); + } } } else { - result = RoutingStatus.NO_BINDINGS; - - if (logger.isDebugEnabled()) { - logger.debug("Message " + message + " is not going anywhere as it didn't have a binding on address:" + address); - } - - if (message.isLargeMessage()) { - ((LargeServerMessage) message).deleteFile(); + result = RoutingStatus.OK; + try { + processRoute(message, context, direct); + } catch (ActiveMQAddressFullException e) { + if (startedTX.get()) { + context.getTransaction().rollback(); + } else if (context.getTransaction() != null) { + context.getTransaction().markAsRollbackOnly(e); + } + throw e; } } - } else { - result = RoutingStatus.OK; - try { - processRoute(message, context, direct); - } catch (ActiveMQAddressFullException e) { - if (startedTX.get()) { - context.getTransaction().rollback(); - } else if (context.getTransaction() != null) { - context.getTransaction().markAsRollbackOnly(e); - } - throw e; - } - } - if (startedTX.get()) { - context.getTransaction().commit(); + if (startedTX.get()) { + context.getTransaction().commit(); + } + } catch (Exception e) { + if (server.hasBrokerMessagePlugins()) { + server.callBrokerMessagePlugins(plugin -> plugin.onMessageRouteException(message, context, direct, rejectDuplicates, e)); + } + throw e; } if (server.hasBrokerMessagePlugins()) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 4910e6621b..d868a2fb91 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -1426,54 +1426,60 @@ public class ServerSessionImpl implements ServerSession, FailureListener { server.callBrokerMessagePlugins(plugin -> plugin.beforeSend(this, tx, message, direct, noAutoCreateQueue)); } - // If the protocol doesn't support flow control, we have no choice other than fail the communication - if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) { - ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit(); - this.getRemotingConnection().fail(exception); - throw exception; - } - final RoutingStatus result; - //large message may come from StompSession directly, in which - //case the id header already generated. - if (!message.isLargeMessage()) { - long id = storageManager.generateID(); - // This will re-encode the message - message.setMessageID(id); + try { + // If the protocol doesn't support flow control, we have no choice other than fail the communication + if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) { + ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit(); + this.getRemotingConnection().fail(exception); + throw exception; + } + + //large message may come from StompSession directly, in which + //case the id header already generated. + if (!message.isLargeMessage()) { + long id = storageManager.generateID(); + // This will re-encode the message + message.setMessageID(id); + } + + SimpleString address = message.getAddressSimpleString(); + + if (defaultAddress == null && address != null) { + defaultAddress = address; + } + + if (address == null) { + // We don't want to force a re-encode when the message gets sent to the consumer + message.setAddress(defaultAddress); + } + + if (logger.isTraceEnabled()) { + logger.trace("send(message=" + message + ", direct=" + direct + ") being called"); + } + + if (message.getAddress() == null) { + // This could happen with some tests that are ignoring messages + throw ActiveMQMessageBundle.BUNDLE.noAddress(); + } + + if (message.getAddressSimpleString().equals(managementAddress)) { + // It's a management message + + result = handleManagementMessage(tx, message, direct); + } else { + result = doSend(tx, message, address, direct, noAutoCreateQueue); + } + + } catch (Exception e) { + if (server.hasBrokerMessagePlugins()) { + server.callBrokerMessagePlugins(plugin -> plugin.onSendException(this, tx, message, direct, noAutoCreateQueue, e)); + } + throw e; } - - SimpleString address = message.getAddressSimpleString(); - - if (defaultAddress == null && address != null) { - defaultAddress = address; - } - - if (address == null) { - // We don't want to force a re-encode when the message gets sent to the consumer - message.setAddress(defaultAddress); - } - - if (logger.isTraceEnabled()) { - logger.trace("send(message=" + message + ", direct=" + direct + ") being called"); - } - - if (message.getAddress() == null) { - // This could happen with some tests that are ignoring messages - throw ActiveMQMessageBundle.BUNDLE.noAddress(); - } - - if (message.getAddressSimpleString().equals(managementAddress)) { - // It's a management message - - result = handleManagementMessage(tx, message, direct); - } else { - result = doSend(tx, message, address, direct, noAutoCreateQueue); - } - if (server.hasBrokerMessagePlugins()) { server.callBrokerMessagePlugins(plugin -> plugin.afterSend(this, tx, message, direct, noAutoCreateQueue, result)); } - return result; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java index aef0970a4f..404e8a4f9c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java @@ -65,6 +65,21 @@ public interface ActiveMQServerMessagePlugin extends ActiveMQServerBasePlugin { this.afterSend(tx, message, direct, noAutoCreateQueue, result); } + /** + * When there was an exception sending the message + * + * @param session + * @param tx + * @param message + * @param direct + * @param noAutoCreateQueue + * @param e the exception that occurred when sending the message + * @throws ActiveMQException + */ + default void onSendException(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue, + Exception e) throws ActiveMQException { + + } /** * Before a message is sent @@ -128,6 +143,21 @@ public interface ActiveMQServerMessagePlugin extends ActiveMQServerBasePlugin { } + /** + * When there was an error routing the message + * + * @param message + * @param context + * @param direct + * @param rejectDuplicates + * @param e the exception that occurred during message routing + * @throws ActiveMQException + */ + default void onMessageRouteException(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates, + Exception e) throws ActiveMQException { + + } + /** * Before a message is delivered to a client consumer * diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java index ff23b59cb5..3483472ce0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java @@ -491,6 +491,32 @@ public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serial } } + @Override + public void onSendException(ServerSession session, + Transaction tx, + Message message, + boolean direct, + boolean noAutoCreateQueue, + Exception e) throws ActiveMQException { + if (logAll || logSendingEvents) { + + if (LoggingActiveMQServerPluginLogger.LOGGER.isDebugEnabled()) { + //details - debug level + LoggingActiveMQServerPluginLogger.LOGGER.onSendErrorDetails((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())), + message, (session == null ? UNAVAILABLE : session.getName()), + tx, session, direct, noAutoCreateQueue); + } + + if (LoggingActiveMQServerPluginLogger.LOGGER.isInfoEnabled()) { + //info level log + LoggingActiveMQServerPluginLogger.LOGGER.onSendError((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())), + (session == null ? UNAVAILABLE : session.getName()), + (session == null ? UNAVAILABLE : session.getConnectionID().toString()), + e); + } + } + } + /** * Before a message is routed * @@ -540,6 +566,26 @@ public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serial } } + @Override + public void onMessageRouteException(Message message, + RoutingContext context, + boolean direct, + boolean rejectDuplicates, + Exception e) throws ActiveMQException { + if (logAll || logSendingEvents) { + + //details - debug level logging + LoggingActiveMQServerPluginLogger.LOGGER.onMessageRouteErrorDetails(message, context, direct, rejectDuplicates); + + if (LoggingActiveMQServerPluginLogger.LOGGER.isInfoEnabled()) { + //info level log + LoggingActiveMQServerPluginLogger.LOGGER.onMessageRouteError((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())), + e); + } + + } + } + /** * Before a message is delivered to a client consumer * diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java index f519dd0e0e..fa697ba624 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java @@ -141,6 +141,15 @@ public interface LoggingActiveMQServerPluginLogger extends BasicLogger { @Message(id = 841016, value = "criticalFailure called with criticalComponent: {0}", format = Message.Format.MESSAGE_FORMAT) void criticalFailure(CriticalComponent components); + @LogMessage(level = Logger.Level.INFO) + @Message(id = 841017, value = "error sending message with ID: {0}, session name: {1}, session connectionID: {2}," + + " exception: {3}", format = Message.Format.MESSAGE_FORMAT) + void onSendError(String messageID, String sessionName, String sessionConnectionID, Exception e); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 841018, value = "error routing message with ID: {0}, exception: {1}", format = Message.Format.MESSAGE_FORMAT) + void onMessageRouteError(String messageID, Exception e); + //DEBUG messages @LogMessage(level = Logger.Level.DEBUG) @@ -258,4 +267,23 @@ public interface LoggingActiveMQServerPluginLogger extends BasicLogger { @Message(id = 843015, value = "beforeDeployBridge called with bridgeConfiguration: {0}", format = Message.Format.MESSAGE_FORMAT) void beforeDeployBridge(BridgeConfiguration config); + @LogMessage(level = Logger.Level.DEBUG) + @Message(id = 843016, value = "onSendError message ID: {0}, message {1}, session name: {2} with tx: {3}, session: {4}, direct: {5}," + + " noAutoCreateQueue: {6}", format = Message.Format.MESSAGE_FORMAT) + void onSendErrorDetails(String messageID, + org.apache.activemq.artemis.api.core.Message message, + String sessionName, + Transaction tx, + ServerSession session, + boolean direct, + boolean noAutoCreateQueue); + + @LogMessage(level = Logger.Level.DEBUG) + @Message(id = 843017, value = "onMessageRouteError message: {0}, with context: {1}, direct: {2}, rejectDuplicates: {3}", + format = Message.Format.MESSAGE_FORMAT) + void onMessageRouteErrorDetails(org.apache.activemq.artemis.api.core.Message message, + RoutingContext context, + boolean direct, + boolean rejectDuplicates); + } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java index 9c24505946..0d802cf073 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java @@ -90,8 +90,10 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin { public static final String MESSAGE_ACKED = "messageAcknowledged"; public static final String BEFORE_SEND = "beforeSend"; public static final String AFTER_SEND = "afterSend"; + public static final String ON_SEND_EXCEPTION = "onSendException"; public static final String BEFORE_MESSAGE_ROUTE = "beforeMessageRoute"; public static final String AFTER_MESSAGE_ROUTE = "afterMessageRoute"; + public static final String ON_MESSAGE_ROUTE_EXCEPTION = "onMessageRouteException"; public static final String BEFORE_DELIVER = "beforeDeliver"; public static final String AFTER_DELIVER = "afterDeliver"; public static final String BEFORE_DEPLOY_BRIDGE = "beforeDeployBridge"; @@ -304,6 +306,14 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin { methodCalled(AFTER_SEND); } + @Override + public void onSendException(ServerSession session, Transaction tx, Message message, boolean direct, + boolean noAutoCreateQueue, Exception e) { + Preconditions.checkNotNull(message); + Preconditions.checkNotNull(e); + methodCalled(ON_SEND_EXCEPTION); + } + @Override public void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) { Preconditions.checkNotNull(message); @@ -320,6 +330,15 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin { methodCalled(AFTER_MESSAGE_ROUTE); } + @Override + public void onMessageRouteException(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates, + Exception e) { + Preconditions.checkNotNull(message); + Preconditions.checkNotNull(context); + Preconditions.checkNotNull(e); + methodCalled(ON_MESSAGE_ROUTE_EXCEPTION); + } + @Override public void beforeDeliver(ServerConsumer consumer, MessageReference reference) { Preconditions.checkNotNull(reference);