ARTEMIS-2044 Add onSendException, onMessageRouteException to ActiveMQServerMessagePlugin

This commit is contained in:
Carsten Lohmann 2018-08-22 10:06:30 +02:00 committed by Michael Andre Pearce
parent 46bc10eeaf
commit 95ec8ea433
6 changed files with 221 additions and 85 deletions

View File

@ -877,62 +877,69 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
logger.trace("Message after routed=" + message);
}
if (context.getQueueCount() == 0) {
// Send to DLA if appropriate
try {
if (context.getQueueCount() == 0) {
// Send to DLA if appropriate
AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
boolean sendToDLA = addressSettings.isSendToDLAOnNoRoute();
boolean sendToDLA = addressSettings.isSendToDLAOnNoRoute();
if (sendToDLA) {
// Send to the DLA for the address
if (sendToDLA) {
// Send to the DLA for the address
SimpleString dlaAddress = addressSettings.getDeadLetterAddress();
SimpleString dlaAddress = addressSettings.getDeadLetterAddress();
if (logger.isDebugEnabled()) {
logger.debug("sending message to dla address = " + dlaAddress + ", message=" + message);
}
if (logger.isDebugEnabled()) {
logger.debug("sending message to dla address = " + dlaAddress + ", message=" + message);
}
if (dlaAddress == null) {
result = RoutingStatus.NO_BINDINGS;
ActiveMQServerLogger.LOGGER.noDLA(address);
if (dlaAddress == null) {
result = RoutingStatus.NO_BINDINGS;
ActiveMQServerLogger.LOGGER.noDLA(address);
} else {
message.referenceOriginalMessage(message, null);
message.setAddress(dlaAddress);
message.reencode();
route(message, context.getTransaction(), false);
result = RoutingStatus.NO_BINDINGS_DLA;
}
} else {
message.referenceOriginalMessage(message, null);
result = RoutingStatus.NO_BINDINGS;
message.setAddress(dlaAddress);
if (logger.isDebugEnabled()) {
logger.debug("Message " + message + " is not going anywhere as it didn't have a binding on address:" + address);
}
message.reencode();
route(message, context.getTransaction(), false);
result = RoutingStatus.NO_BINDINGS_DLA;
if (message.isLargeMessage()) {
((LargeServerMessage) message).deleteFile();
}
}
} else {
result = RoutingStatus.NO_BINDINGS;
if (logger.isDebugEnabled()) {
logger.debug("Message " + message + " is not going anywhere as it didn't have a binding on address:" + address);
}
if (message.isLargeMessage()) {
((LargeServerMessage) message).deleteFile();
result = RoutingStatus.OK;
try {
processRoute(message, context, direct);
} catch (ActiveMQAddressFullException e) {
if (startedTX.get()) {
context.getTransaction().rollback();
} else if (context.getTransaction() != null) {
context.getTransaction().markAsRollbackOnly(e);
}
throw e;
}
}
} else {
result = RoutingStatus.OK;
try {
processRoute(message, context, direct);
} catch (ActiveMQAddressFullException e) {
if (startedTX.get()) {
context.getTransaction().rollback();
} else if (context.getTransaction() != null) {
context.getTransaction().markAsRollbackOnly(e);
}
throw e;
}
}
if (startedTX.get()) {
context.getTransaction().commit();
if (startedTX.get()) {
context.getTransaction().commit();
}
} catch (Exception e) {
if (server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.onMessageRouteException(message, context, direct, rejectDuplicates, e));
}
throw e;
}
if (server.hasBrokerMessagePlugins()) {

View File

@ -1426,54 +1426,60 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
server.callBrokerMessagePlugins(plugin -> plugin.beforeSend(this, tx, message, direct, noAutoCreateQueue));
}
// If the protocol doesn't support flow control, we have no choice other than fail the communication
if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) {
ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit();
this.getRemotingConnection().fail(exception);
throw exception;
}
final RoutingStatus result;
//large message may come from StompSession directly, in which
//case the id header already generated.
if (!message.isLargeMessage()) {
long id = storageManager.generateID();
// This will re-encode the message
message.setMessageID(id);
try {
// If the protocol doesn't support flow control, we have no choice other than fail the communication
if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) {
ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit();
this.getRemotingConnection().fail(exception);
throw exception;
}
//large message may come from StompSession directly, in which
//case the id header already generated.
if (!message.isLargeMessage()) {
long id = storageManager.generateID();
// This will re-encode the message
message.setMessageID(id);
}
SimpleString address = message.getAddressSimpleString();
if (defaultAddress == null && address != null) {
defaultAddress = address;
}
if (address == null) {
// We don't want to force a re-encode when the message gets sent to the consumer
message.setAddress(defaultAddress);
}
if (logger.isTraceEnabled()) {
logger.trace("send(message=" + message + ", direct=" + direct + ") being called");
}
if (message.getAddress() == null) {
// This could happen with some tests that are ignoring messages
throw ActiveMQMessageBundle.BUNDLE.noAddress();
}
if (message.getAddressSimpleString().equals(managementAddress)) {
// It's a management message
result = handleManagementMessage(tx, message, direct);
} else {
result = doSend(tx, message, address, direct, noAutoCreateQueue);
}
} catch (Exception e) {
if (server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.onSendException(this, tx, message, direct, noAutoCreateQueue, e));
}
throw e;
}
SimpleString address = message.getAddressSimpleString();
if (defaultAddress == null && address != null) {
defaultAddress = address;
}
if (address == null) {
// We don't want to force a re-encode when the message gets sent to the consumer
message.setAddress(defaultAddress);
}
if (logger.isTraceEnabled()) {
logger.trace("send(message=" + message + ", direct=" + direct + ") being called");
}
if (message.getAddress() == null) {
// This could happen with some tests that are ignoring messages
throw ActiveMQMessageBundle.BUNDLE.noAddress();
}
if (message.getAddressSimpleString().equals(managementAddress)) {
// It's a management message
result = handleManagementMessage(tx, message, direct);
} else {
result = doSend(tx, message, address, direct, noAutoCreateQueue);
}
if (server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.afterSend(this, tx, message, direct, noAutoCreateQueue, result));
}
return result;
}

View File

@ -65,6 +65,21 @@ public interface ActiveMQServerMessagePlugin extends ActiveMQServerBasePlugin {
this.afterSend(tx, message, direct, noAutoCreateQueue, result);
}
/**
* When there was an exception sending the message
*
* @param session
* @param tx
* @param message
* @param direct
* @param noAutoCreateQueue
* @param e the exception that occurred when sending the message
* @throws ActiveMQException
*/
default void onSendException(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue,
Exception e) throws ActiveMQException {
}
/**
* Before a message is sent
@ -128,6 +143,21 @@ public interface ActiveMQServerMessagePlugin extends ActiveMQServerBasePlugin {
}
/**
* When there was an error routing the message
*
* @param message
* @param context
* @param direct
* @param rejectDuplicates
* @param e the exception that occurred during message routing
* @throws ActiveMQException
*/
default void onMessageRouteException(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates,
Exception e) throws ActiveMQException {
}
/**
* Before a message is delivered to a client consumer
*

View File

@ -491,6 +491,32 @@ public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serial
}
}
@Override
public void onSendException(ServerSession session,
Transaction tx,
Message message,
boolean direct,
boolean noAutoCreateQueue,
Exception e) throws ActiveMQException {
if (logAll || logSendingEvents) {
if (LoggingActiveMQServerPluginLogger.LOGGER.isDebugEnabled()) {
//details - debug level
LoggingActiveMQServerPluginLogger.LOGGER.onSendErrorDetails((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())),
message, (session == null ? UNAVAILABLE : session.getName()),
tx, session, direct, noAutoCreateQueue);
}
if (LoggingActiveMQServerPluginLogger.LOGGER.isInfoEnabled()) {
//info level log
LoggingActiveMQServerPluginLogger.LOGGER.onSendError((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())),
(session == null ? UNAVAILABLE : session.getName()),
(session == null ? UNAVAILABLE : session.getConnectionID().toString()),
e);
}
}
}
/**
* Before a message is routed
*
@ -540,6 +566,26 @@ public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serial
}
}
@Override
public void onMessageRouteException(Message message,
RoutingContext context,
boolean direct,
boolean rejectDuplicates,
Exception e) throws ActiveMQException {
if (logAll || logSendingEvents) {
//details - debug level logging
LoggingActiveMQServerPluginLogger.LOGGER.onMessageRouteErrorDetails(message, context, direct, rejectDuplicates);
if (LoggingActiveMQServerPluginLogger.LOGGER.isInfoEnabled()) {
//info level log
LoggingActiveMQServerPluginLogger.LOGGER.onMessageRouteError((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())),
e);
}
}
}
/**
* Before a message is delivered to a client consumer
*

View File

@ -141,6 +141,15 @@ public interface LoggingActiveMQServerPluginLogger extends BasicLogger {
@Message(id = 841016, value = "criticalFailure called with criticalComponent: {0}", format = Message.Format.MESSAGE_FORMAT)
void criticalFailure(CriticalComponent components);
@LogMessage(level = Logger.Level.INFO)
@Message(id = 841017, value = "error sending message with ID: {0}, session name: {1}, session connectionID: {2}," +
" exception: {3}", format = Message.Format.MESSAGE_FORMAT)
void onSendError(String messageID, String sessionName, String sessionConnectionID, Exception e);
@LogMessage(level = Logger.Level.INFO)
@Message(id = 841018, value = "error routing message with ID: {0}, exception: {1}", format = Message.Format.MESSAGE_FORMAT)
void onMessageRouteError(String messageID, Exception e);
//DEBUG messages
@LogMessage(level = Logger.Level.DEBUG)
@ -258,4 +267,23 @@ public interface LoggingActiveMQServerPluginLogger extends BasicLogger {
@Message(id = 843015, value = "beforeDeployBridge called with bridgeConfiguration: {0}", format = Message.Format.MESSAGE_FORMAT)
void beforeDeployBridge(BridgeConfiguration config);
@LogMessage(level = Logger.Level.DEBUG)
@Message(id = 843016, value = "onSendError message ID: {0}, message {1}, session name: {2} with tx: {3}, session: {4}, direct: {5}," +
" noAutoCreateQueue: {6}", format = Message.Format.MESSAGE_FORMAT)
void onSendErrorDetails(String messageID,
org.apache.activemq.artemis.api.core.Message message,
String sessionName,
Transaction tx,
ServerSession session,
boolean direct,
boolean noAutoCreateQueue);
@LogMessage(level = Logger.Level.DEBUG)
@Message(id = 843017, value = "onMessageRouteError message: {0}, with context: {1}, direct: {2}, rejectDuplicates: {3}",
format = Message.Format.MESSAGE_FORMAT)
void onMessageRouteErrorDetails(org.apache.activemq.artemis.api.core.Message message,
RoutingContext context,
boolean direct,
boolean rejectDuplicates);
}

View File

@ -90,8 +90,10 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
public static final String MESSAGE_ACKED = "messageAcknowledged";
public static final String BEFORE_SEND = "beforeSend";
public static final String AFTER_SEND = "afterSend";
public static final String ON_SEND_EXCEPTION = "onSendException";
public static final String BEFORE_MESSAGE_ROUTE = "beforeMessageRoute";
public static final String AFTER_MESSAGE_ROUTE = "afterMessageRoute";
public static final String ON_MESSAGE_ROUTE_EXCEPTION = "onMessageRouteException";
public static final String BEFORE_DELIVER = "beforeDeliver";
public static final String AFTER_DELIVER = "afterDeliver";
public static final String BEFORE_DEPLOY_BRIDGE = "beforeDeployBridge";
@ -304,6 +306,14 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
methodCalled(AFTER_SEND);
}
@Override
public void onSendException(ServerSession session, Transaction tx, Message message, boolean direct,
boolean noAutoCreateQueue, Exception e) {
Preconditions.checkNotNull(message);
Preconditions.checkNotNull(e);
methodCalled(ON_SEND_EXCEPTION);
}
@Override
public void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) {
Preconditions.checkNotNull(message);
@ -320,6 +330,15 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
methodCalled(AFTER_MESSAGE_ROUTE);
}
@Override
public void onMessageRouteException(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates,
Exception e) {
Preconditions.checkNotNull(message);
Preconditions.checkNotNull(context);
Preconditions.checkNotNull(e);
methodCalled(ON_MESSAGE_ROUTE_EXCEPTION);
}
@Override
public void beforeDeliver(ServerConsumer consumer, MessageReference reference) {
Preconditions.checkNotNull(reference);