This commit is contained in:
Michael Andre Pearce 2018-08-24 21:35:34 +01:00
commit 21c6886151
6 changed files with 221 additions and 85 deletions

View File

@ -877,6 +877,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
logger.trace("Message after routed=" + message); logger.trace("Message after routed=" + message);
} }
try {
if (context.getQueueCount() == 0) { if (context.getQueueCount() == 0) {
// Send to DLA if appropriate // Send to DLA if appropriate
@ -934,6 +935,12 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
if (startedTX.get()) { if (startedTX.get()) {
context.getTransaction().commit(); context.getTransaction().commit();
} }
} catch (Exception e) {
if (server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.onMessageRouteException(message, context, direct, rejectDuplicates, e));
}
throw e;
}
if (server.hasBrokerMessagePlugins()) { if (server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, result)); server.callBrokerMessagePlugins(plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, result));

View File

@ -1426,6 +1426,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
server.callBrokerMessagePlugins(plugin -> plugin.beforeSend(this, tx, message, direct, noAutoCreateQueue)); server.callBrokerMessagePlugins(plugin -> plugin.beforeSend(this, tx, message, direct, noAutoCreateQueue));
} }
final RoutingStatus result;
try {
// If the protocol doesn't support flow control, we have no choice other than fail the communication // If the protocol doesn't support flow control, we have no choice other than fail the communication
if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) { if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) {
ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit(); ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit();
@ -1433,7 +1435,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
throw exception; throw exception;
} }
final RoutingStatus result;
//large message may come from StompSession directly, in which //large message may come from StompSession directly, in which
//case the id header already generated. //case the id header already generated.
if (!message.isLargeMessage()) { if (!message.isLargeMessage()) {
@ -1470,10 +1471,15 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
result = doSend(tx, message, address, direct, noAutoCreateQueue); result = doSend(tx, message, address, direct, noAutoCreateQueue);
} }
} catch (Exception e) {
if (server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.onSendException(this, tx, message, direct, noAutoCreateQueue, e));
}
throw e;
}
if (server.hasBrokerMessagePlugins()) { if (server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.afterSend(this, tx, message, direct, noAutoCreateQueue, result)); server.callBrokerMessagePlugins(plugin -> plugin.afterSend(this, tx, message, direct, noAutoCreateQueue, result));
} }
return result; return result;
} }

View File

@ -65,6 +65,21 @@ public interface ActiveMQServerMessagePlugin extends ActiveMQServerBasePlugin {
this.afterSend(tx, message, direct, noAutoCreateQueue, result); this.afterSend(tx, message, direct, noAutoCreateQueue, result);
} }
/**
* When there was an exception sending the message
*
* @param session
* @param tx
* @param message
* @param direct
* @param noAutoCreateQueue
* @param e the exception that occurred when sending the message
* @throws ActiveMQException
*/
default void onSendException(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue,
Exception e) throws ActiveMQException {
}
/** /**
* Before a message is sent * Before a message is sent
@ -128,6 +143,21 @@ public interface ActiveMQServerMessagePlugin extends ActiveMQServerBasePlugin {
} }
/**
* When there was an error routing the message
*
* @param message
* @param context
* @param direct
* @param rejectDuplicates
* @param e the exception that occurred during message routing
* @throws ActiveMQException
*/
default void onMessageRouteException(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates,
Exception e) throws ActiveMQException {
}
/** /**
* Before a message is delivered to a client consumer * Before a message is delivered to a client consumer
* *

View File

@ -491,6 +491,32 @@ public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serial
} }
} }
@Override
public void onSendException(ServerSession session,
Transaction tx,
Message message,
boolean direct,
boolean noAutoCreateQueue,
Exception e) throws ActiveMQException {
if (logAll || logSendingEvents) {
if (LoggingActiveMQServerPluginLogger.LOGGER.isDebugEnabled()) {
//details - debug level
LoggingActiveMQServerPluginLogger.LOGGER.onSendErrorDetails((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())),
message, (session == null ? UNAVAILABLE : session.getName()),
tx, session, direct, noAutoCreateQueue);
}
if (LoggingActiveMQServerPluginLogger.LOGGER.isInfoEnabled()) {
//info level log
LoggingActiveMQServerPluginLogger.LOGGER.onSendError((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())),
(session == null ? UNAVAILABLE : session.getName()),
(session == null ? UNAVAILABLE : session.getConnectionID().toString()),
e);
}
}
}
/** /**
* Before a message is routed * Before a message is routed
* *
@ -540,6 +566,26 @@ public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serial
} }
} }
@Override
public void onMessageRouteException(Message message,
RoutingContext context,
boolean direct,
boolean rejectDuplicates,
Exception e) throws ActiveMQException {
if (logAll || logSendingEvents) {
//details - debug level logging
LoggingActiveMQServerPluginLogger.LOGGER.onMessageRouteErrorDetails(message, context, direct, rejectDuplicates);
if (LoggingActiveMQServerPluginLogger.LOGGER.isInfoEnabled()) {
//info level log
LoggingActiveMQServerPluginLogger.LOGGER.onMessageRouteError((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())),
e);
}
}
}
/** /**
* Before a message is delivered to a client consumer * Before a message is delivered to a client consumer
* *

View File

@ -141,6 +141,15 @@ public interface LoggingActiveMQServerPluginLogger extends BasicLogger {
@Message(id = 841016, value = "criticalFailure called with criticalComponent: {0}", format = Message.Format.MESSAGE_FORMAT) @Message(id = 841016, value = "criticalFailure called with criticalComponent: {0}", format = Message.Format.MESSAGE_FORMAT)
void criticalFailure(CriticalComponent components); void criticalFailure(CriticalComponent components);
@LogMessage(level = Logger.Level.INFO)
@Message(id = 841017, value = "error sending message with ID: {0}, session name: {1}, session connectionID: {2}," +
" exception: {3}", format = Message.Format.MESSAGE_FORMAT)
void onSendError(String messageID, String sessionName, String sessionConnectionID, Exception e);
@LogMessage(level = Logger.Level.INFO)
@Message(id = 841018, value = "error routing message with ID: {0}, exception: {1}", format = Message.Format.MESSAGE_FORMAT)
void onMessageRouteError(String messageID, Exception e);
//DEBUG messages //DEBUG messages
@LogMessage(level = Logger.Level.DEBUG) @LogMessage(level = Logger.Level.DEBUG)
@ -258,4 +267,23 @@ public interface LoggingActiveMQServerPluginLogger extends BasicLogger {
@Message(id = 843015, value = "beforeDeployBridge called with bridgeConfiguration: {0}", format = Message.Format.MESSAGE_FORMAT) @Message(id = 843015, value = "beforeDeployBridge called with bridgeConfiguration: {0}", format = Message.Format.MESSAGE_FORMAT)
void beforeDeployBridge(BridgeConfiguration config); void beforeDeployBridge(BridgeConfiguration config);
@LogMessage(level = Logger.Level.DEBUG)
@Message(id = 843016, value = "onSendError message ID: {0}, message {1}, session name: {2} with tx: {3}, session: {4}, direct: {5}," +
" noAutoCreateQueue: {6}", format = Message.Format.MESSAGE_FORMAT)
void onSendErrorDetails(String messageID,
org.apache.activemq.artemis.api.core.Message message,
String sessionName,
Transaction tx,
ServerSession session,
boolean direct,
boolean noAutoCreateQueue);
@LogMessage(level = Logger.Level.DEBUG)
@Message(id = 843017, value = "onMessageRouteError message: {0}, with context: {1}, direct: {2}, rejectDuplicates: {3}",
format = Message.Format.MESSAGE_FORMAT)
void onMessageRouteErrorDetails(org.apache.activemq.artemis.api.core.Message message,
RoutingContext context,
boolean direct,
boolean rejectDuplicates);
} }

View File

@ -90,8 +90,10 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
public static final String MESSAGE_ACKED = "messageAcknowledged"; public static final String MESSAGE_ACKED = "messageAcknowledged";
public static final String BEFORE_SEND = "beforeSend"; public static final String BEFORE_SEND = "beforeSend";
public static final String AFTER_SEND = "afterSend"; public static final String AFTER_SEND = "afterSend";
public static final String ON_SEND_EXCEPTION = "onSendException";
public static final String BEFORE_MESSAGE_ROUTE = "beforeMessageRoute"; public static final String BEFORE_MESSAGE_ROUTE = "beforeMessageRoute";
public static final String AFTER_MESSAGE_ROUTE = "afterMessageRoute"; public static final String AFTER_MESSAGE_ROUTE = "afterMessageRoute";
public static final String ON_MESSAGE_ROUTE_EXCEPTION = "onMessageRouteException";
public static final String BEFORE_DELIVER = "beforeDeliver"; public static final String BEFORE_DELIVER = "beforeDeliver";
public static final String AFTER_DELIVER = "afterDeliver"; public static final String AFTER_DELIVER = "afterDeliver";
public static final String BEFORE_DEPLOY_BRIDGE = "beforeDeployBridge"; public static final String BEFORE_DEPLOY_BRIDGE = "beforeDeployBridge";
@ -304,6 +306,14 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
methodCalled(AFTER_SEND); methodCalled(AFTER_SEND);
} }
@Override
public void onSendException(ServerSession session, Transaction tx, Message message, boolean direct,
boolean noAutoCreateQueue, Exception e) {
Preconditions.checkNotNull(message);
Preconditions.checkNotNull(e);
methodCalled(ON_SEND_EXCEPTION);
}
@Override @Override
public void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) { public void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) {
Preconditions.checkNotNull(message); Preconditions.checkNotNull(message);
@ -320,6 +330,15 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
methodCalled(AFTER_MESSAGE_ROUTE); methodCalled(AFTER_MESSAGE_ROUTE);
} }
@Override
public void onMessageRouteException(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates,
Exception e) {
Preconditions.checkNotNull(message);
Preconditions.checkNotNull(context);
Preconditions.checkNotNull(e);
methodCalled(ON_MESSAGE_ROUTE_EXCEPTION);
}
@Override @Override
public void beforeDeliver(ServerConsumer consumer, MessageReference reference) { public void beforeDeliver(ServerConsumer consumer, MessageReference reference) {
Preconditions.checkNotNull(reference); Preconditions.checkNotNull(reference);