ARTEMIS-3219 Improve message routing
This commit is contained in:
@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.postoffice.impl;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@ -32,7 +33,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
@ -1081,32 +1081,44 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
private RoutingStatus route(final Message message,
final RoutingContext context,
final boolean direct,
boolean rejectDuplicates,
final Binding bindingMove, boolean sendToDLA) throws Exception {
final boolean rejectDuplicates,
final Binding bindingMove,
final boolean sendToDLA) throws Exception {
RoutingStatus result;
// Sanity check
if (message.getRefCount() > 0) {
throw new IllegalStateException("Message cannot be routed more than once");
final SimpleString address = context.getAddress(message);
AtomicBoolean startedTX = new AtomicBoolean(false);
applyExpiryDelay(message, address);
if (context.isDuplicateDetection() && !checkDuplicateID(message, context, rejectDuplicates, startedTX)) {
return RoutingStatus.DUPLICATED_ID;
final AddressSettings settings = addressSettingsRepository.getMatch(address.toString());
if (settings != null) {
applyExpiryDelay(message, settings);
final boolean startedTX;
if (context.isDuplicateDetection()) {
final DuplicateCheckResult duplicateCheckResult = checkDuplicateID(message, context, rejectDuplicates);
switch (duplicateCheckResult) {
case DuplicateNotStartedTX:
return RoutingStatus.DUPLICATED_ID;
case NoDuplicateStartedTX:
startedTX = true;
case NoDuplicateNotStartedTX:
startedTX = false;
throw new IllegalStateException("Unexpected value: " + duplicateCheckResult);
} else {
startedTX = false;
Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
AddressInfo addressInfo = addressManager.getAddressInfo(address);
Bindings bindings;
final AddressInfo addressInfo = addressManager.getAddressInfo(address);
if (bindingMove != null) {
@ -1114,7 +1126,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
if (addressInfo != null) {
} else if (bindings != null) {
} else if ((bindings = addressManager.getBindingsForRoutingAddress(address)) != null) {
bindings.route(message, context);
if (addressInfo != null) {
@ -1126,7 +1138,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
// this is a debug and not warn because this could be a regular scenario on publish-subscribe queues (or topic subscriptions on JMS)
if (logger.isDebugEnabled()) {
logger.debug("Couldn't find any bindings for address=" + address + " on message=" + message);
logger.debugf("Couldn't find any bindings for address=%s on message=%s", message, address, message);
@ -1135,34 +1147,62 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
if (logger.isTraceEnabled()) {
logger.trace("Message after routed=" + message + "\n" + context.toString());
logger.tracef("Message after routed=%s\n%s", message, context);
try {
final RoutingStatus status;
if (context.getQueueCount() == 0) {
// Send to DLA if appropriate
status = maybeSendToDLA(message, context, address, sendToDLA);
} else {
status = RoutingStatus.OK;
try {
processRoute(message, context, direct);
} catch (ActiveMQAddressFullException e) {
if (startedTX) {
} else if (context.getTransaction() != null) {
throw e;
if (startedTX) {
if (server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, status));
return status;
} catch (Exception e) {
if (server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.onMessageRouteException(message, context, direct, rejectDuplicates, e));
throw e;
AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
if (sendToDLA) {
private RoutingStatus maybeSendToDLA(final Message message,
final RoutingContext context,
final SimpleString address,
final boolean sendToDLAHint) throws Exception {
final RoutingStatus status;
final AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
final boolean sendToDLA;
if (sendToDLAHint) {
// it's already been through here once, giving up now
sendToDLA = false;
} else {
sendToDLA = addressSettings != null ? addressSettings.isSendToDLAOnNoRoute() : AddressSettings.DEFAULT_SEND_TO_DLA_ON_NO_ROUTE;
if (sendToDLA) {
// Send to the DLA for the address
SimpleString dlaAddress = addressSettings != null ? addressSettings.getDeadLetterAddress() : null;
final SimpleString dlaAddress = addressSettings != null ? addressSettings.getDeadLetterAddress() : null;
if (logger.isDebugEnabled()) {
logger.debug("sending message to dla address = " + dlaAddress + ", message=" + message);
logger.debugf("sending message to dla address = %s, message=%s", dlaAddress, message);
if (dlaAddress == null) {
result = RoutingStatus.NO_BINDINGS;
status = RoutingStatus.NO_BINDINGS;
} else {
message.referenceOriginalMessage(message, null);
@ -1171,55 +1211,23 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
route(message, new RoutingContextImpl(context.getTransaction()), false, true, null, sendToDLA);
result = RoutingStatus.NO_BINDINGS_DLA;
route(message, new RoutingContextImpl(context.getTransaction()), false, true, null, true);
status = RoutingStatus.NO_BINDINGS_DLA;
} else {
result = RoutingStatus.NO_BINDINGS;
status = RoutingStatus.NO_BINDINGS;
if (logger.isDebugEnabled()) {
logger.debug("Message " + message + " is not going anywhere as it didn't have a binding on address:" + address);
logger.debugf("Message %s is not going anywhere as it didn't have a binding on address:%s", message, address);
if (message.isLargeMessage()) {
((LargeServerMessage) message).deleteFile();
} else {
result = RoutingStatus.OK;
try {
processRoute(message, context, direct);
} catch (ActiveMQAddressFullException e) {
if (startedTX.get()) {
} else if (context.getTransaction() != null) {
throw e;
if (startedTX.get()) {
} catch (Exception e) {
if (server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.onMessageRouteException(message, context, direct, rejectDuplicates, e));
throw e;
if (server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, result));
return result;
return status;
// HORNETQ-1029
private void applyExpiryDelay(Message message, SimpleString address) {
AddressSettings settings = addressSettingsRepository.getMatch(address.toString());
if (settings != null) {
private static void applyExpiryDelay(Message message, AddressSettings settings) {
long expirationOverride = settings.getExpiryDelay();
// A -1 <expiry-delay> means don't do anything
@ -1239,7 +1247,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
public MessageReference reload(final Message message, final Queue queue, final Transaction tx) throws Exception {
@ -1489,20 +1496,22 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
public void processRoute(final Message message,
final RoutingContext context,
final boolean direct) throws Exception {
final List<MessageReference> refs = new ArrayList<>();
final ArrayList<MessageReference> refs = new ArrayList<>();
Transaction tx = context.getTransaction();
final Transaction tx = context.getTransaction();
Long deliveryTime = null;
final Long deliveryTime;
if (message.hasScheduledDeliveryTime()) {
deliveryTime = message.getScheduledDeliveryTime();
} else {
deliveryTime = null;
PagingStore owningStore = pagingManager.getPageStore(message.getAddressSimpleString());
final SimpleString messageAddress = message.getAddressSimpleString();
final PagingStore owningStore = pagingManager.getPageStore(messageAddress);
for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) {
PagingStore store;
if (entry.getKey() == message.getAddressSimpleString() || entry.getKey().equals(message.getAddressSimpleString())) {
final PagingStore store;
if (entry.getKey().equals(messageAddress)) {
store = owningStore;
} else {
store = pagingManager.getPageStore(entry.getKey());
@ -1518,68 +1527,22 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
for (Queue queue : entry.getValue().getNonDurableQueues()) {
MessageReference reference = MessageReference.Factory.createReference(message, queue);
if (deliveryTime != null) {
Iterator<Queue> iter = entry.getValue().getDurableQueues().iterator();
while (iter.hasNext()) {
Queue queue =;
MessageReference reference = MessageReference.Factory.createReference(message, queue);
if (context.isAlreadyAcked(context.getAddress(message), queue)) {
if (tx != null) {
queue.acknowledge(tx, reference);
final List<Queue> nonDurableQueues = entry.getValue().getNonDurableQueues();
if (!nonDurableQueues.isEmpty()) {
nonDurableQueues.forEach(queue -> {
final MessageReference reference = MessageReference.Factory.createReference(message, queue);
if (deliveryTime != null) {
if (message.isDurable()) {
int durableRefCount = queue.durableUp(message);
if (durableRefCount == 1) {
if (tx != null) {
storageManager.storeMessageTransactional(tx.getID(), message);
} else {
if (message.isLargeMessage()) {
confirmLargeMessageSend(tx, message);
if (tx != null) {
storageManager.storeReferenceTransactional(tx.getID(), queue.getID(), message.getMessageID());
} else {
storageManager.storeReference(queue.getID(), message.getMessageID(), !iter.hasNext());
if (deliveryTime != null && deliveryTime > 0) {
if (tx != null) {
storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference);
} else {
final List<Queue> durableQueues = entry.getValue().getDurableQueues();
if (!durableQueues.isEmpty()) {
processRouteToDurableQueues(message, context, deliveryTime, tx, durableQueues, refs);
@ -1608,6 +1571,59 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
private void processRouteToDurableQueues(final Message message,
final RoutingContext context,
final Long deliveryTime,
final Transaction tx,
final List<Queue> durableQueues,
final ArrayList<MessageReference> refs) throws Exception {
final int durableQueuesCount = durableQueues.size();
final Iterator<Queue> iter = durableQueues.iterator();
for (int i = 0; i < durableQueuesCount; i++) {
final Queue queue =;
final MessageReference reference = MessageReference.Factory.createReference(message, queue);
if (context.isAlreadyAcked(message, queue)) {
if (tx != null) {
queue.acknowledge(tx, reference);
if (deliveryTime != null) {
if (message.isDurable()) {
final int durableRefCount = queue.durableUp(message);
if (durableRefCount == 1) {
if (tx != null) {
storageManager.storeMessageTransactional(tx.getID(), message);
} else {
if (message.isLargeMessage()) {
confirmLargeMessageSend(tx, message);
if (tx != null) {
storageManager.storeReferenceTransactional(tx.getID(), queue.getID(), message.getMessageID());
} else {
final boolean last = i == (durableQueuesCount - 1);
storageManager.storeReference(queue.getID(), message.getMessageID(), last);
if (deliveryTime != null && deliveryTime > 0) {
if (tx != null) {
storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference);
} else {
* @param tx
* @param message
@ -1671,72 +1687,76 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
private boolean checkDuplicateID(final Message message,
private enum DuplicateCheckResult {
DuplicateNotStartedTX, NoDuplicateStartedTX, NoDuplicateNotStartedTX
private DuplicateCheckResult checkDuplicateID(final Message message,
final RoutingContext context,
boolean rejectDuplicates,
AtomicBoolean startedTX) throws Exception {
final boolean rejectDuplicates) throws Exception {
// Check the DuplicateCache for the Bridge first
Object bridgeDup = message.removeExtraBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID);
final Object bridgeDup = message.removeExtraBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID);
if (bridgeDup != null) {
// if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one
byte[] bridgeDupBytes = (byte[]) bridgeDup;
DuplicateIDCache cacheBridge = getDuplicateIDCache(BRIDGE_CACHE_STR.concat(context.getAddress(message).toString()));
if (context.getTransaction() == null) {
context.setTransaction(new TransactionImpl(storageManager));
return checkBridgeDuplicateID(message, context, (byte[]) bridgeDup);
if (!cacheBridge.atomicVerify(bridgeDupBytes, context.getTransaction())) {
message.usageDown(); // this will cause large message delete
return false;
} else {
// if used BridgeDuplicate, it's not going to use the regular duplicate
// since this will would break redistribution (re-setting the duplicateId)
byte[] duplicateIDBytes = message.getDuplicateIDBytes();
DuplicateIDCache cache = null;
boolean isDuplicate = false;
if (duplicateIDBytes != null) {
cache = getDuplicateIDCache(context.getAddress(message));
isDuplicate = cache.contains(duplicateIDBytes);
final byte[] duplicateIDBytes = message.getDuplicateIDBytes();
if (duplicateIDBytes == null) {
return DuplicateCheckResult.NoDuplicateNotStartedTX;
return checkNotBridgeDuplicateID(message, context, rejectDuplicates, duplicateIDBytes);
private DuplicateCheckResult checkNotBridgeDuplicateID(final Message message,
final RoutingContext context,
final boolean rejectDuplicates,
final byte[] duplicateIDBytes) throws Exception {
assert duplicateIDBytes != null && Arrays.equals(message.getDuplicateIDBytes(), duplicateIDBytes);
final DuplicateIDCache cache = getDuplicateIDCache(context.getAddress(message));
final boolean isDuplicate = cache.contains(duplicateIDBytes);
if (rejectDuplicates && isDuplicate) {
String warnMessage = "Duplicate message detected - message will not be routed. Message information:" + message.toString();
if (context.getTransaction() != null) {
final String warnMessage = "Duplicate message detected - message will not be routed. Message information:" + message;
context.getTransaction().markAsRollbackOnly(new ActiveMQDuplicateIdException(warnMessage));
message.usageDown(); // this will cause large message delete
return false;
return DuplicateCheckResult.DuplicateNotStartedTX;
if (isDuplicate) {
assert !rejectDuplicates;
return DuplicateCheckResult.NoDuplicateNotStartedTX;
if (cache != null && !isDuplicate) {
final boolean startedTX;
if (context.getTransaction() == null) {
// We need to store the duplicate id atomically with the message storage, so we need to create a tx for this
context.setTransaction(new TransactionImpl(storageManager));
startedTX = true;
} else {
startedTX = false;
cache.addToCache(duplicateIDBytes, context.getTransaction(), startedTX);
return startedTX ? DuplicateCheckResult.NoDuplicateStartedTX : DuplicateCheckResult.NoDuplicateNotStartedTX;
cache.addToCache(duplicateIDBytes, context.getTransaction(), startedTX.get());
private DuplicateCheckResult checkBridgeDuplicateID(final Message message,
final RoutingContext context,
final byte[] bridgeDupBytes) throws Exception {
assert bridgeDupBytes != null;
boolean startedTX = false;
if (context.getTransaction() == null) {
context.setTransaction(new TransactionImpl(storageManager));
startedTX = true;
// if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one
final DuplicateIDCache cacheBridge = getDuplicateIDCache(BRIDGE_CACHE_STR.concat(context.getAddress(message).toString()));
if (!cacheBridge.atomicVerify(bridgeDupBytes, context.getTransaction())) {
message.usageDown(); // this will cause large message delete
return DuplicateCheckResult.DuplicateNotStartedTX;
return true;
return startedTX ? DuplicateCheckResult.NoDuplicateStartedTX : DuplicateCheckResult.NoDuplicateNotStartedTX;
@ -68,7 +68,7 @@ public interface RoutingContext {
void addQueueWithAck(SimpleString address, Queue queue);
boolean isAlreadyAcked(SimpleString address, Queue queue);
boolean isAlreadyAcked(Message message, Queue queue);
void setAddress(SimpleString address);
@ -192,8 +192,8 @@ public class RoutingContextImpl implements RoutingContext {
public boolean isAlreadyAcked(SimpleString address, Queue queue) {
RouteContextList listing = map.get(address);
public boolean isAlreadyAcked(Message message, Queue queue) {
final RouteContextList listing = map.get(getAddress(message));
return listing == null ? false : listing.isAlreadyAcked(queue);
Reference in New Issue