diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index b027c27c29..365ce0120a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.postoffice.impl; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; @@ -32,7 +33,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException; @@ -1079,34 +1079,46 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding * if a DLA still not found, it should then use previous semantics. * */ private RoutingStatus route(final Message message, - final RoutingContext context, - final boolean direct, - boolean rejectDuplicates, - final Binding bindingMove, boolean sendToDLA) throws Exception { + final RoutingContext context, + final boolean direct, + final boolean rejectDuplicates, + final Binding bindingMove, + final boolean sendToDLA) throws Exception { - - RoutingStatus result; // Sanity check if (message.getRefCount() > 0) { throw new IllegalStateException("Message cannot be routed more than once"); } final SimpleString address = context.getAddress(message); - - AtomicBoolean startedTX = new AtomicBoolean(false); - - applyExpiryDelay(message, address); - - if (context.isDuplicateDetection() && !checkDuplicateID(message, context, rejectDuplicates, startedTX)) { - return RoutingStatus.DUPLICATED_ID; + final AddressSettings settings = addressSettingsRepository.getMatch(address.toString()); + if (settings != null) { + applyExpiryDelay(message, settings); } + final boolean startedTX; + if (context.isDuplicateDetection()) { + final DuplicateCheckResult duplicateCheckResult = checkDuplicateID(message, context, rejectDuplicates); + switch (duplicateCheckResult) { + + case DuplicateNotStartedTX: + return RoutingStatus.DUPLICATED_ID; + case NoDuplicateStartedTX: + startedTX = true; + break; + case NoDuplicateNotStartedTX: + startedTX = false; + //nop + break; + default: + throw new IllegalStateException("Unexpected value: " + duplicateCheckResult); + } + } else { + startedTX = false; + } message.clearInternalProperties(); - - Bindings bindings = addressManager.getBindingsForRoutingAddress(address); - - AddressInfo addressInfo = addressManager.getAddressInfo(address); - + Bindings bindings; + final AddressInfo addressInfo = addressManager.getAddressInfo(address); if (bindingMove != null) { context.clear(); context.setReusable(false); @@ -1114,7 +1126,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding if (addressInfo != null) { addressInfo.incrementRoutedMessageCount(); } - } else if (bindings != null) { + } else if ((bindings = addressManager.getBindingsForRoutingAddress(address)) != null) { bindings.route(message, context); if (addressInfo != null) { addressInfo.incrementRoutedMessageCount(); @@ -1126,7 +1138,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } // this is a debug and not warn because this could be a regular scenario on publish-subscribe queues (or topic subscriptions on JMS) if (logger.isDebugEnabled()) { - logger.debug("Couldn't find any bindings for address=" + address + " on message=" + message); + logger.debugf("Couldn't find any bindings for address=%s on message=%s", message, address, message); } } @@ -1135,62 +1147,19 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } if (logger.isTraceEnabled()) { - logger.trace("Message after routed=" + message + "\n" + context.toString()); + logger.tracef("Message after routed=%s\n%s", message, context); } try { + final RoutingStatus status; if (context.getQueueCount() == 0) { - // Send to DLA if appropriate - - AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString()); - - - if (sendToDLA) { - // it's already been through here once, giving up now - sendToDLA = false; - } else { - sendToDLA = addressSettings != null ? addressSettings.isSendToDLAOnNoRoute() : AddressSettings.DEFAULT_SEND_TO_DLA_ON_NO_ROUTE; - } - - if (sendToDLA) { - // Send to the DLA for the address - - SimpleString dlaAddress = addressSettings != null ? addressSettings.getDeadLetterAddress() : null; - - if (logger.isDebugEnabled()) { - logger.debug("sending message to dla address = " + dlaAddress + ", message=" + message); - } - - if (dlaAddress == null) { - result = RoutingStatus.NO_BINDINGS; - ActiveMQServerLogger.LOGGER.noDLA(address); - } else { - message.referenceOriginalMessage(message, null); - - message.setAddress(dlaAddress); - - message.reencode(); - - route(message, new RoutingContextImpl(context.getTransaction()), false, true, null, sendToDLA); - result = RoutingStatus.NO_BINDINGS_DLA; - } - } else { - result = RoutingStatus.NO_BINDINGS; - - if (logger.isDebugEnabled()) { - logger.debug("Message " + message + " is not going anywhere as it didn't have a binding on address:" + address); - } - - if (message.isLargeMessage()) { - ((LargeServerMessage) message).deleteFile(); - } - } + status = maybeSendToDLA(message, context, address, sendToDLA); } else { - result = RoutingStatus.OK; + status = RoutingStatus.OK; try { processRoute(message, context, direct); } catch (ActiveMQAddressFullException e) { - if (startedTX.get()) { + if (startedTX) { context.getTransaction().rollback(); } else if (context.getTransaction() != null) { context.getTransaction().markAsRollbackOnly(e); @@ -1198,45 +1167,83 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding throw e; } } - - if (startedTX.get()) { + if (startedTX) { context.getTransaction().commit(); } + if (server.hasBrokerMessagePlugins()) { + server.callBrokerMessagePlugins(plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, status)); + } + return status; } catch (Exception e) { if (server.hasBrokerMessagePlugins()) { server.callBrokerMessagePlugins(plugin -> plugin.onMessageRouteException(message, context, direct, rejectDuplicates, e)); } throw e; } + } - if (server.hasBrokerMessagePlugins()) { - server.callBrokerMessagePlugins(plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, result)); + private RoutingStatus maybeSendToDLA(final Message message, + final RoutingContext context, + final SimpleString address, + final boolean sendToDLAHint) throws Exception { + final RoutingStatus status; + final AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString()); + final boolean sendToDLA; + if (sendToDLAHint) { + // it's already been through here once, giving up now + sendToDLA = false; + } else { + sendToDLA = addressSettings != null ? addressSettings.isSendToDLAOnNoRoute() : AddressSettings.DEFAULT_SEND_TO_DLA_ON_NO_ROUTE; } + if (sendToDLA) { + // Send to the DLA for the address + final SimpleString dlaAddress = addressSettings != null ? addressSettings.getDeadLetterAddress() : null; + if (logger.isDebugEnabled()) { + logger.debugf("sending message to dla address = %s, message=%s", dlaAddress, message); + } + if (dlaAddress == null) { + status = RoutingStatus.NO_BINDINGS; + ActiveMQServerLogger.LOGGER.noDLA(address); + } else { + message.referenceOriginalMessage(message, null); - return result; + message.setAddress(dlaAddress); + + message.reencode(); + + route(message, new RoutingContextImpl(context.getTransaction()), false, true, null, true); + status = RoutingStatus.NO_BINDINGS_DLA; + } + } else { + status = RoutingStatus.NO_BINDINGS; + if (logger.isDebugEnabled()) { + logger.debugf("Message %s is not going anywhere as it didn't have a binding on address:%s", message, address); + } + if (message.isLargeMessage()) { + ((LargeServerMessage) message).deleteFile(); + } + } + return status; } // HORNETQ-1029 - private void applyExpiryDelay(Message message, SimpleString address) { - AddressSettings settings = addressSettingsRepository.getMatch(address.toString()); - if (settings != null) { - long expirationOverride = settings.getExpiryDelay(); + private static void applyExpiryDelay(Message message, AddressSettings settings) { + long expirationOverride = settings.getExpiryDelay(); - // A -1 means don't do anything - if (expirationOverride >= 0) { - // only override the expiration on messages where the expiration hasn't been set by the user - if (message.getExpiration() == 0) { - message.setExpiration(System.currentTimeMillis() + expirationOverride); - } - } else { - long minExpiration = settings.getMinExpiryDelay(); - long maxExpiration = settings.getMaxExpiryDelay(); + // A -1 means don't do anything + if (expirationOverride >= 0) { + // only override the expiration on messages where the expiration hasn't been set by the user + if (message.getExpiration() == 0) { + message.setExpiration(System.currentTimeMillis() + expirationOverride); + } + } else { + long minExpiration = settings.getMinExpiryDelay(); + long maxExpiration = settings.getMaxExpiryDelay(); - if (maxExpiration != AddressSettings.DEFAULT_MAX_EXPIRY_DELAY && (message.getExpiration() == 0 || message.getExpiration() > (System.currentTimeMillis() + maxExpiration))) { - message.setExpiration(System.currentTimeMillis() + maxExpiration); - } else if (minExpiration != AddressSettings.DEFAULT_MIN_EXPIRY_DELAY && message.getExpiration() < (System.currentTimeMillis() + minExpiration)) { - message.setExpiration(System.currentTimeMillis() + minExpiration); - } + if (maxExpiration != AddressSettings.DEFAULT_MAX_EXPIRY_DELAY && (message.getExpiration() == 0 || message.getExpiration() > (System.currentTimeMillis() + maxExpiration))) { + message.setExpiration(System.currentTimeMillis() + maxExpiration); + } else if (minExpiration != AddressSettings.DEFAULT_MIN_EXPIRY_DELAY && message.getExpiration() < (System.currentTimeMillis() + minExpiration)) { + message.setExpiration(System.currentTimeMillis() + minExpiration); } } } @@ -1489,20 +1496,22 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding public void processRoute(final Message message, final RoutingContext context, final boolean direct) throws Exception { - final List refs = new ArrayList<>(); + final ArrayList refs = new ArrayList<>(); - Transaction tx = context.getTransaction(); + final Transaction tx = context.getTransaction(); - Long deliveryTime = null; + final Long deliveryTime; if (message.hasScheduledDeliveryTime()) { deliveryTime = message.getScheduledDeliveryTime(); + } else { + deliveryTime = null; } - - PagingStore owningStore = pagingManager.getPageStore(message.getAddressSimpleString()); + final SimpleString messageAddress = message.getAddressSimpleString(); + final PagingStore owningStore = pagingManager.getPageStore(messageAddress); message.setOwner(owningStore); for (Map.Entry entry : context.getContexListing().entrySet()) { - PagingStore store; - if (entry.getKey() == message.getAddressSimpleString() || entry.getKey().equals(message.getAddressSimpleString())) { + final PagingStore store; + if (entry.getKey().equals(messageAddress)) { store = owningStore; } else { store = pagingManager.getPageStore(entry.getKey()); @@ -1518,68 +1527,22 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding continue; } - for (Queue queue : entry.getValue().getNonDurableQueues()) { - MessageReference reference = MessageReference.Factory.createReference(message, queue); - - if (deliveryTime != null) { - reference.setScheduledDeliveryTime(deliveryTime); - } - refs.add(reference); - - queue.refUp(reference); + final List nonDurableQueues = entry.getValue().getNonDurableQueues(); + if (!nonDurableQueues.isEmpty()) { + refs.ensureCapacity(nonDurableQueues.size()); + nonDurableQueues.forEach(queue -> { + final MessageReference reference = MessageReference.Factory.createReference(message, queue); + if (deliveryTime != null) { + reference.setScheduledDeliveryTime(deliveryTime); + } + refs.add(reference); + queue.refUp(reference); + }); } - Iterator iter = entry.getValue().getDurableQueues().iterator(); - - while (iter.hasNext()) { - Queue queue = iter.next(); - - MessageReference reference = MessageReference.Factory.createReference(message, queue); - - if (context.isAlreadyAcked(context.getAddress(message), queue)) { - reference.setAlreadyAcked(); - if (tx != null) { - queue.acknowledge(tx, reference); - } - } - - if (deliveryTime != null) { - reference.setScheduledDeliveryTime(deliveryTime); - } - refs.add(reference); - queue.refUp(reference); - - if (message.isDurable()) { - int durableRefCount = queue.durableUp(message); - - if (durableRefCount == 1) { - if (tx != null) { - storageManager.storeMessageTransactional(tx.getID(), message); - } else { - storageManager.storeMessage(message); - } - - if (message.isLargeMessage()) { - confirmLargeMessageSend(tx, message); - } - } - - if (tx != null) { - storageManager.storeReferenceTransactional(tx.getID(), queue.getID(), message.getMessageID()); - - tx.setContainsPersistent(); - } else { - storageManager.storeReference(queue.getID(), message.getMessageID(), !iter.hasNext()); - } - - if (deliveryTime != null && deliveryTime > 0) { - if (tx != null) { - storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference); - } else { - storageManager.updateScheduledDeliveryTime(reference); - } - } - } + final List durableQueues = entry.getValue().getDurableQueues(); + if (!durableQueues.isEmpty()) { + processRouteToDurableQueues(message, context, deliveryTime, tx, durableQueues, refs); } } @@ -1608,6 +1571,59 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } + private void processRouteToDurableQueues(final Message message, + final RoutingContext context, + final Long deliveryTime, + final Transaction tx, + final List durableQueues, + final ArrayList refs) throws Exception { + final int durableQueuesCount = durableQueues.size(); + refs.ensureCapacity(durableQueuesCount); + final Iterator iter = durableQueues.iterator(); + for (int i = 0; i < durableQueuesCount; i++) { + final Queue queue = iter.next(); + final MessageReference reference = MessageReference.Factory.createReference(message, queue); + if (context.isAlreadyAcked(message, queue)) { + reference.setAlreadyAcked(); + if (tx != null) { + queue.acknowledge(tx, reference); + } + } + if (deliveryTime != null) { + reference.setScheduledDeliveryTime(deliveryTime); + } + refs.add(reference); + queue.refUp(reference); + if (message.isDurable()) { + final int durableRefCount = queue.durableUp(message); + if (durableRefCount == 1) { + if (tx != null) { + storageManager.storeMessageTransactional(tx.getID(), message); + } else { + storageManager.storeMessage(message); + } + if (message.isLargeMessage()) { + confirmLargeMessageSend(tx, message); + } + } + if (tx != null) { + storageManager.storeReferenceTransactional(tx.getID(), queue.getID(), message.getMessageID()); + tx.setContainsPersistent(); + } else { + final boolean last = i == (durableQueuesCount - 1); + storageManager.storeReference(queue.getID(), message.getMessageID(), last); + } + if (deliveryTime != null && deliveryTime > 0) { + if (tx != null) { + storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference); + } else { + storageManager.updateScheduledDeliveryTime(reference); + } + } + } + } + } + /** * @param tx * @param message @@ -1671,72 +1687,76 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } - private boolean checkDuplicateID(final Message message, - final RoutingContext context, - boolean rejectDuplicates, - AtomicBoolean startedTX) throws Exception { + private enum DuplicateCheckResult { + DuplicateNotStartedTX, NoDuplicateStartedTX, NoDuplicateNotStartedTX + } + + private DuplicateCheckResult checkDuplicateID(final Message message, + final RoutingContext context, + final boolean rejectDuplicates) throws Exception { // Check the DuplicateCache for the Bridge first - - Object bridgeDup = message.removeExtraBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID); + final Object bridgeDup = message.removeExtraBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID); if (bridgeDup != null) { - // if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one - byte[] bridgeDupBytes = (byte[]) bridgeDup; - - DuplicateIDCache cacheBridge = getDuplicateIDCache(BRIDGE_CACHE_STR.concat(context.getAddress(message).toString())); - - if (context.getTransaction() == null) { - context.setTransaction(new TransactionImpl(storageManager)); - startedTX.set(true); - } - - if (!cacheBridge.atomicVerify(bridgeDupBytes, context.getTransaction())) { - context.getTransaction().rollback(); - startedTX.set(false); - message.usageDown(); // this will cause large message delete - return false; - } - } else { - // if used BridgeDuplicate, it's not going to use the regular duplicate - // since this will would break redistribution (re-setting the duplicateId) - byte[] duplicateIDBytes = message.getDuplicateIDBytes(); - - DuplicateIDCache cache = null; - - boolean isDuplicate = false; - - if (duplicateIDBytes != null) { - cache = getDuplicateIDCache(context.getAddress(message)); - - isDuplicate = cache.contains(duplicateIDBytes); - - if (rejectDuplicates && isDuplicate) { - ActiveMQServerLogger.LOGGER.duplicateMessageDetected(message); - - String warnMessage = "Duplicate message detected - message will not be routed. Message information:" + message.toString(); - - if (context.getTransaction() != null) { - context.getTransaction().markAsRollbackOnly(new ActiveMQDuplicateIdException(warnMessage)); - } - - message.usageDown(); // this will cause large message delete - - return false; - } - } - - if (cache != null && !isDuplicate) { - if (context.getTransaction() == null) { - // We need to store the duplicate id atomically with the message storage, so we need to create a tx for this - context.setTransaction(new TransactionImpl(storageManager)); - - startedTX.set(true); - } - - cache.addToCache(duplicateIDBytes, context.getTransaction(), startedTX.get()); - } + return checkBridgeDuplicateID(message, context, (byte[]) bridgeDup); } + // if used BridgeDuplicate, it's not going to use the regular duplicate + // since this will would break redistribution (re-setting the duplicateId) + final byte[] duplicateIDBytes = message.getDuplicateIDBytes(); + if (duplicateIDBytes == null) { + return DuplicateCheckResult.NoDuplicateNotStartedTX; + } + return checkNotBridgeDuplicateID(message, context, rejectDuplicates, duplicateIDBytes); + } - return true; + private DuplicateCheckResult checkNotBridgeDuplicateID(final Message message, + final RoutingContext context, + final boolean rejectDuplicates, + final byte[] duplicateIDBytes) throws Exception { + assert duplicateIDBytes != null && Arrays.equals(message.getDuplicateIDBytes(), duplicateIDBytes); + final DuplicateIDCache cache = getDuplicateIDCache(context.getAddress(message)); + final boolean isDuplicate = cache.contains(duplicateIDBytes); + if (rejectDuplicates && isDuplicate) { + ActiveMQServerLogger.LOGGER.duplicateMessageDetected(message); + if (context.getTransaction() != null) { + final String warnMessage = "Duplicate message detected - message will not be routed. Message information:" + message; + context.getTransaction().markAsRollbackOnly(new ActiveMQDuplicateIdException(warnMessage)); + } + message.usageDown(); // this will cause large message delete + return DuplicateCheckResult.DuplicateNotStartedTX; + } + if (isDuplicate) { + assert !rejectDuplicates; + return DuplicateCheckResult.NoDuplicateNotStartedTX; + } + final boolean startedTX; + if (context.getTransaction() == null) { + // We need to store the duplicate id atomically with the message storage, so we need to create a tx for this + context.setTransaction(new TransactionImpl(storageManager)); + startedTX = true; + } else { + startedTX = false; + } + cache.addToCache(duplicateIDBytes, context.getTransaction(), startedTX); + return startedTX ? DuplicateCheckResult.NoDuplicateStartedTX : DuplicateCheckResult.NoDuplicateNotStartedTX; + } + + private DuplicateCheckResult checkBridgeDuplicateID(final Message message, + final RoutingContext context, + final byte[] bridgeDupBytes) throws Exception { + assert bridgeDupBytes != null; + boolean startedTX = false; + if (context.getTransaction() == null) { + context.setTransaction(new TransactionImpl(storageManager)); + startedTX = true; + } + // if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one + final DuplicateIDCache cacheBridge = getDuplicateIDCache(BRIDGE_CACHE_STR.concat(context.getAddress(message).toString())); + if (!cacheBridge.atomicVerify(bridgeDupBytes, context.getTransaction())) { + context.getTransaction().rollback(); + message.usageDown(); // this will cause large message delete + return DuplicateCheckResult.DuplicateNotStartedTX; + } + return startedTX ? DuplicateCheckResult.NoDuplicateStartedTX : DuplicateCheckResult.NoDuplicateNotStartedTX; } /** diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java index 1b2b665f13..a6e11a9622 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java @@ -68,7 +68,7 @@ public interface RoutingContext { void addQueueWithAck(SimpleString address, Queue queue); - boolean isAlreadyAcked(SimpleString address, Queue queue); + boolean isAlreadyAcked(Message message, Queue queue); void setAddress(SimpleString address); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java index 3cd88695b0..7a9929498b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java @@ -192,8 +192,8 @@ public class RoutingContextImpl implements RoutingContext { } @Override - public boolean isAlreadyAcked(SimpleString address, Queue queue) { - RouteContextList listing = map.get(address); + public boolean isAlreadyAcked(Message message, Queue queue) { + final RouteContextList listing = map.get(getAddress(message)); return listing == null ? false : listing.isAlreadyAcked(queue); }