diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java index 5c13c6cf68..4cc45b1593 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java @@ -25,12 +25,11 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.postoffice.Binding; @@ -54,9 +53,7 @@ public final class BindingsImpl implements Bindings { // This is public as we use on test assertions public static final int MAX_GROUP_RETRY = 10; - private final ConcurrentMap> routingNameBindingMap = new ConcurrentHashMap<>(); - - private final Map routingNamePositions = new ConcurrentHashMap<>(); + private final CopyOnWriteBindings routingNameBindingMap = new CopyOnWriteBindings(); private final Map bindingsIdMap = new ConcurrentHashMap<>(); @@ -113,6 +110,8 @@ public final class BindingsImpl implements Bindings { } } + + @Override public void addBinding(final Binding binding) { try { @@ -122,30 +121,14 @@ public final class BindingsImpl implements Bindings { if (binding.isExclusive()) { exclusiveBindings.add(binding); } else { - SimpleString routingName = binding.getRoutingName(); - - List bindings = routingNameBindingMap.get(routingName); - - if (bindings == null) { - bindings = new CopyOnWriteArrayList<>(); - - List oldBindings = routingNameBindingMap.putIfAbsent(routingName, bindings); - - if (oldBindings != null) { - bindings = oldBindings; - } - } - - if (!bindings.contains(binding)) { - bindings.add(binding); - } + routingNameBindingMap.addBindingIfAbsent(binding); } bindingsIdMap.put(binding.getID(), binding); bindingsNameMap.put(binding.getUniqueName(), binding); if (binding instanceof RemoteQueueBinding) { - setMessageLoadBalancingType(((RemoteQueueBinding)binding).getMessageLoadBalancingType()); + setMessageLoadBalancingType(((RemoteQueueBinding) binding).getMessageLoadBalancingType()); } if (logger.isTraceEnabled()) { logger.trace("Adding binding " + binding + " into " + this + " bindingTable: " + debugBindings()); @@ -174,17 +157,7 @@ public final class BindingsImpl implements Bindings { if (binding.isExclusive()) { exclusiveBindings.remove(binding); } else { - SimpleString routingName = binding.getRoutingName(); - - List bindings = routingNameBindingMap.get(routingName); - - if (bindings != null) { - bindings.remove(binding); - - if (bindings.isEmpty()) { - routingNameBindingMap.remove(routingName); - } - } + routingNameBindingMap.removeBinding(binding); } bindingsIdMap.remove(binding.getID()); @@ -208,78 +181,56 @@ public final class BindingsImpl implements Bindings { public boolean redistribute(final Message message, final Queue originatingQueue, final RoutingContext context) throws Exception { - if (messageLoadBalancingType.equals(MessageLoadBalancingType.STRICT) || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF)) { + final MessageLoadBalancingType loadBalancingType = this.messageLoadBalancingType; + if (loadBalancingType.equals(MessageLoadBalancingType.STRICT) || loadBalancingType.equals(MessageLoadBalancingType.OFF)) { return false; } if (logger.isTraceEnabled()) { - logger.trace("Redistributing message " + message); + logger.tracef("Redistributing message %s", message); } - SimpleString routingName = originatingQueue.getName(); + final SimpleString routingName = originatingQueue.getName(); - List bindings = routingNameBindingMap.get(routingName); + final Pair bindingsAndPosition = routingNameBindingMap.getBindings(routingName); - if (bindings == null) { + if (bindingsAndPosition == null) { // The value can become null if it's concurrently removed while we're iterating - this is expected // ConcurrentHashMap behaviour! return false; } - Integer ipos = routingNamePositions.get(routingName); + final Binding[] bindings = bindingsAndPosition.getA(); - int pos = ipos != null ? ipos.intValue() : 0; + final CopyOnWriteBindings.BindingIndex bindingIndex = bindingsAndPosition.getB(); - int length = bindings.size(); + assert bindings.length > 0; - int startPos = pos; + final int bindingsCount = bindings.length; - Binding theBinding = null; + int nextPosition = bindingIndex.getIndex(); - // TODO - combine this with similar logic in route() - while (true) { - Binding binding; - try { - binding = bindings.get(pos); - } catch (IndexOutOfBoundsException e) { - // This can occur if binding is removed while in route - if (!bindings.isEmpty()) { - pos = 0; - startPos = 0; - length = bindings.size(); - - continue; - } else { - break; - } - } - - pos = incrementPos(pos, length); - - Filter filter = binding.getFilter(); - - boolean highPrior = binding.isHighAcceptPriority(message); + if (nextPosition >= bindingsCount) { + nextPosition = 0; + } + Binding nextBinding = null; + for (int i = 0; i < bindingsCount; i++) { + final Binding binding = bindings[nextPosition]; + nextPosition = moveNextPosition(nextPosition, bindingsCount); + final Filter filter = binding.getFilter(); + final boolean highPrior = binding.isHighAcceptPriority(message); if (highPrior && binding.getBindable() != originatingQueue && (filter == null || filter.match(message))) { - theBinding = binding; - - break; - } - - if (pos == startPos) { + nextBinding = binding; break; } } - - routingNamePositions.put(routingName, pos); - - if (theBinding != null) { - theBinding.route(message, context); - - return true; - } else { + if (nextBinding == null) { return false; } + bindingIndex.setIndex(nextPosition); + nextBinding.route(message, context); + return true; } @Override @@ -290,8 +241,8 @@ public final class BindingsImpl implements Bindings { private void route(final Message message, final RoutingContext context, final boolean groupRouting) throws Exception { - int currentVersion = version.get(); - boolean reusableContext = context.isReusable(message, currentVersion); + final int currentVersion = version.get(); + final boolean reusableContext = context.isReusable(message, currentVersion); if (!reusableContext) { context.clear(); @@ -300,54 +251,33 @@ public final class BindingsImpl implements Bindings { /* This is a special treatment for scaled-down messages involving SnF queues. * See org.apache.activemq.artemis.core.server.impl.ScaleDownHandler.scaleDownMessages() for the logic that sends messages with this property */ - byte[] ids = message.removeExtraBytesProperty(Message.HDR_SCALEDOWN_TO_IDS); + final byte[] ids = message.removeExtraBytesProperty(Message.HDR_SCALEDOWN_TO_IDS); if (ids != null) { - ByteBuffer buffer = ByteBuffer.wrap(ids); - while (buffer.hasRemaining()) { - long id = buffer.getLong(); - for (Map.Entry entry : bindingsIdMap.entrySet()) { - if (entry.getValue() instanceof RemoteQueueBinding) { - RemoteQueueBinding remoteQueueBinding = (RemoteQueueBinding) entry.getValue(); - if (remoteQueueBinding.getRemoteQueueID() == id) { - message.putExtraBytesProperty(Message.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array()); - } - } - } - } + handleScaledDownMessage(message, ids); } - boolean routed = false; - - boolean hasExclusives = false; - - for (Binding binding : exclusiveBindings) { - if (!hasExclusives) { - context.clear().setReusable(false); - hasExclusives = true; - } - - if (binding.getFilter() == null || binding.getFilter().match(message)) { - binding.getBindable().route(message, context); - routed = true; - } + final boolean routed; + // despite the double check can lead to some race, this can save allocating an iterator for an empty set + if (!exclusiveBindings.isEmpty()) { + routed = routeToExclusiveBindings(message, context); + } else { + routed = false; } if (!routed) { // Remove the ids now, in order to avoid double check - ids = message.removeExtraBytesProperty(Message.HDR_ROUTE_TO_IDS); + final byte[] routeToIds = message.removeExtraBytesProperty(Message.HDR_ROUTE_TO_IDS); - // Fetch the groupId now, in order to avoid double checking - SimpleString groupId = message.getGroupID(); - - if (ids != null) { + SimpleString groupId; + if (routeToIds != null) { context.clear().setReusable(false); - routeFromCluster(message, context, ids); - } else if (groupingHandler != null && groupRouting && groupId != null) { + routeFromCluster(message, context, routeToIds); + } else if (groupRouting && groupingHandler != null && (groupId = message.getGroupID()) != null) { context.clear().setReusable(false); routeUsingStrictOrdering(message, context, groupingHandler, groupId, 0); } else if (CompositeAddress.isFullyQualified(message.getAddress())) { context.clear().setReusable(false); - Binding theBinding = bindingsNameMap.get(CompositeAddress.extractQueueName(message.getAddressSimpleString())); + final Binding theBinding = bindingsNameMap.get(CompositeAddress.extractQueueName(message.getAddressSimpleString())); if (theBinding != null) { theBinding.route(message, context); } @@ -361,35 +291,58 @@ public final class BindingsImpl implements Bindings { } } - private void simpleRouting(Message message, RoutingContext context, int currentVersion) throws Exception { + private boolean routeToExclusiveBindings(final Message message, final RoutingContext context) throws Exception { + boolean hasExclusives = false; + boolean routed = false; + for (Binding binding : exclusiveBindings) { + if (!hasExclusives) { + context.clear().setReusable(false); + hasExclusives = true; + } + final Filter filter = binding.getFilter(); + if (filter == null || filter.match(message)) { + binding.getBindable().route(message, context); + routed = true; + } + } + return routed; + } + + private void handleScaledDownMessage(final Message message, final byte[] ids) { + ByteBuffer buffer = ByteBuffer.wrap(ids); + while (buffer.hasRemaining()) { + long id = buffer.getLong(); + for (Map.Entry entry : bindingsIdMap.entrySet()) { + if (entry.getValue() instanceof RemoteQueueBinding) { + RemoteQueueBinding remoteQueueBinding = (RemoteQueueBinding) entry.getValue(); + if (remoteQueueBinding.getRemoteQueueID() == id) { + message.putExtraBytesProperty(Message.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array()); + } + } + } + } + } + + private void simpleRouting(final Message message, + final RoutingContext context, + final int currentVersion) throws Exception { if (logger.isTraceEnabled()) { - logger.trace("Routing message " + message + " on binding=" + this + " current context::" + context); + logger.tracef("Routing message %s on binding=%s current context::$s", message, this, context); } - for (Map.Entry> entry : routingNameBindingMap.entrySet()) { - SimpleString routingName = entry.getKey(); - - List bindings = entry.getValue(); - - if (bindings == null) { - // The value can become null if it's concurrently removed while we're iterating - this is expected - // ConcurrentHashMap behaviour! - continue; - } - - Binding theBinding = getNextBinding(message, routingName, bindings); - - if (theBinding != null && theBinding.getFilter() == null && bindings.size() == 1 && theBinding.isLocal()) { + routingNameBindingMap.forEach((routingName, bindings, nextPosition) -> { + final Binding nextBinding = getNextBinding(message, bindings, nextPosition); + if (nextBinding != null && nextBinding.getFilter() == null && nextBinding.isLocal() && bindings.length == 1) { context.setReusable(true, currentVersion); } else { // notice that once this is set to false, any calls to setReusable(true) will be moot as the context will ignore it context.setReusable(false, currentVersion); } - if (theBinding != null) { - theBinding.route(message, context); + if (nextBinding != null) { + nextBinding.route(message, context); } - } + }); } @Override @@ -406,99 +359,65 @@ public final class BindingsImpl implements Bindings { * (depending if you are using multi-thread), and not lose messages. */ private Binding getNextBinding(final Message message, - final SimpleString routingName, - final List bindings) { - Integer ipos = routingNamePositions.get(routingName); + final Binding[] bindings, + final CopyOnWriteBindings.BindingIndex bindingIndex) { + int nextPosition = bindingIndex.getIndex(); - int pos = ipos != null ? ipos : 0; + final int bindingsCount = bindings.length; - int length = bindings.size(); - - int startPos = pos; - - Binding theBinding = null; + if (nextPosition >= bindingsCount) { + nextPosition = 0; + } + Binding nextBinding = null; int lastLowPriorityBinding = -1; + // snapshot this, to save loading it on each iteration + final MessageLoadBalancingType loadBalancingType = this.messageLoadBalancingType; - while (true) { - Binding binding; - try { - binding = bindings.get(pos); - } catch (IndexOutOfBoundsException e) { - // This can occur if binding is removed while in route - if (!bindings.isEmpty()) { - pos = 0; - startPos = 0; - length = bindings.size(); - - continue; - } else { - break; - } - } - - if (matchBinding(message, binding)) { + for (int i = 0; i < bindingsCount; i++) { + final Binding binding = bindings[nextPosition]; + if (matchBinding(message, binding, loadBalancingType)) { // bindings.length == 1 ==> only a local queue so we don't check for matching consumers (it's an // unnecessary overhead) - if (length == 1 || (binding.isConnected() && (messageLoadBalancingType.equals(MessageLoadBalancingType.STRICT) || binding.isHighAcceptPriority(message)))) { - theBinding = binding; - - pos = incrementPos(pos, length); - + if (bindingsCount == 1 || (binding.isConnected() && (loadBalancingType.equals(MessageLoadBalancingType.STRICT) || binding.isHighAcceptPriority(message)))) { + nextBinding = binding; + nextPosition = moveNextPosition(nextPosition, bindingsCount); break; - } else { - //https://issues.jboss.org/browse/HORNETQ-1254 When !routeWhenNoConsumers, - // the localQueue should always have the priority over the secondary bindings - if (lastLowPriorityBinding == -1 || messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND) && binding instanceof LocalQueueBinding) { - lastLowPriorityBinding = pos; - } + } + //https://issues.jboss.org/browse/HORNETQ-1254 When !routeWhenNoConsumers, + // the localQueue should always have the priority over the secondary bindings + if (lastLowPriorityBinding == -1 || loadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND) && binding instanceof LocalQueueBinding) { + lastLowPriorityBinding = nextPosition; } } - - pos = incrementPos(pos, length); - - if (pos == startPos) { - - // if no bindings were found, we will apply a secondary level on the routing logic - if (lastLowPriorityBinding != -1) { - try { - theBinding = bindings.get(lastLowPriorityBinding); - } catch (IndexOutOfBoundsException e) { - // This can occur if binding is removed while in route - if (!bindings.isEmpty()) { - pos = 0; - - lastLowPriorityBinding = -1; - - continue; - } else { - break; - } - } - - pos = incrementPos(lastLowPriorityBinding, length); - } - break; + nextPosition = moveNextPosition(nextPosition, bindingsCount); + } + if (nextBinding == null) { + // if no bindings were found, we will apply a secondary level on the routing logic + if (lastLowPriorityBinding != -1) { + nextBinding = bindings[lastLowPriorityBinding]; + nextPosition = moveNextPosition(lastLowPriorityBinding, bindingsCount); } } - if (pos != startPos) { - routingNamePositions.put(routingName, pos); + if (nextBinding != null) { + bindingIndex.setIndex(nextPosition); } - return theBinding; + return nextBinding; } - private boolean matchBinding(Message message, Binding binding) { - if (messageLoadBalancingType.equals(MessageLoadBalancingType.OFF) && binding instanceof RemoteQueueBinding) { + private static boolean matchBinding(final Message message, + final Binding binding, + final MessageLoadBalancingType loadBalancingType) { + if (loadBalancingType.equals(MessageLoadBalancingType.OFF) && binding instanceof RemoteQueueBinding) { return false; } - Filter filter = binding.getFilter(); + final Filter filter = binding.getFilter(); if (filter == null || filter.match(message)) { return true; - } else { - return false; } + return false; } private void routeUsingStrictOrdering(final Message message, @@ -506,17 +425,7 @@ public final class BindingsImpl implements Bindings { final GroupingHandler groupingGroupingHandler, final SimpleString groupId, final int tries) throws Exception { - for (Map.Entry> entry : routingNameBindingMap.entrySet()) { - SimpleString routingName = entry.getKey(); - - List bindings = entry.getValue(); - - if (bindings == null) { - // The value can become null if it's concurrently removed while we're iterating - this is expected - // ConcurrentHashMap behaviour! - continue; - } - + routingNameBindingMap.forEach((routingName, bindings, nextPosition) -> { // concat a full group id, this is for when a binding has multiple bindings // NOTE: In case a dev ever change this rule, QueueImpl::unproposed is using this rule to determine if // the binding belongs to its Queue before removing it @@ -527,9 +436,9 @@ public final class BindingsImpl implements Bindings { if (resp == null) { // ok let's find the next binding to propose - Binding theBinding = getNextBinding(message, routingName, bindings); + Binding theBinding = getNextBinding(message, bindings, nextPosition); if (theBinding == null) { - continue; + return; } resp = groupingGroupingHandler.propose(new Proposal(fullID, theBinding.getClusterName())); @@ -554,10 +463,10 @@ public final class BindingsImpl implements Bindings { routeAndCheckNull(message, context, resp, chosen, groupId, tries); } - } + }); } - private Binding locateBinding(SimpleString clusterName, List bindings) { + private static Binding locateBinding(SimpleString clusterName, Binding[] bindings) { for (Binding binding : bindings) { if (binding.getClusterName().equals(clusterName)) { return binding; @@ -603,21 +512,13 @@ public final class BindingsImpl implements Bindings { if (routingNameBindingMap.isEmpty()) { out.println("\tEMPTY!"); } - for (Map.Entry> entry : routingNameBindingMap.entrySet()) { - out.println("\tkey=" + entry.getKey() + ", value(s):"); - for (Binding bind : entry.getValue()) { + routingNameBindingMap.forEach((routingName, bindings, nextPosition) -> { + out.println("\tkey=" + routingName + ",\tposition=" + nextPosition.getIndex() + "\tvalue(s):"); + for (Binding bind : bindings) { out.println("\t\t" + bind); } out.println(); - } - - out.println("routingNamePositions:"); - if (routingNamePositions.isEmpty()) { - out.println("\tEMPTY!"); - } - for (Map.Entry entry : routingNamePositions.entrySet()) { - out.println("\tkey=" + entry.getKey() + ", value=" + entry.getValue()); - } + }); out.println(); @@ -679,17 +580,21 @@ public final class BindingsImpl implements Bindings { } } - private int incrementPos(int pos, final int length) { - pos++; + private static int moveNextPosition(int position, final int length) { + position++; - if (pos == length) { - pos = 0; + if (position == length) { + position = 0; } - return pos; + return position; } + /** + * debug method: used just for tests!! + * @return + */ public Map> getRoutingNameBindingMap() { - return routingNameBindingMap; + return routingNameBindingMap.copyAsMap(); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CopyOnWriteBindings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CopyOnWriteBindings.java new file mode 100644 index 0000000000..fc79da45c1 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CopyOnWriteBindings.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.postoffice.impl; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.Binding; + +/** + * This is a copy-on-write map of {@link Binding} along with the last index set.
+ */ +final class CopyOnWriteBindings { + + public interface BindingIndex { + + /** + * Cannot return a negative value and returns {@code 0} if uninitialized. + */ + int getIndex(); + + /** + * Cannot set a negative value. + */ + void setIndex(int v); + } + + private static final class BindingsAndPosition extends AtomicReference implements BindingIndex { + + private static final AtomicIntegerFieldUpdater NEXT_POSITION_UPDATER = AtomicIntegerFieldUpdater.newUpdater(BindingsAndPosition.class, "nextPosition"); + + public volatile int nextPosition; + + BindingsAndPosition(Binding[] bindings) { + super(bindings); + NEXT_POSITION_UPDATER.lazySet(this, 0); + } + + @Override + public int getIndex() { + return nextPosition; + } + + @Override + public void setIndex(int v) { + if (v < 0) { + throw new IllegalArgumentException("cannot set a negative position"); + } + NEXT_POSITION_UPDATER.lazySet(this, v); + } + } + + private final ConcurrentHashMap map; + + CopyOnWriteBindings() { + map = new ConcurrentHashMap<>(); + } + + /** + * Add the specified {@code binding}, if not present. + */ + public void addBindingIfAbsent(Binding binding) { + Objects.requireNonNull(binding); + final SimpleString routingName = binding.getRoutingName(); + Objects.requireNonNull(routingName); + BindingsAndPosition bindings; + do { + bindings = map.get(routingName); + if (bindings == null || bindings.get() == TOMBSTONE_BINDINGS) { + final BindingsAndPosition newBindings = new BindingsAndPosition(new Binding[]{binding}); + bindings = map.compute(routingName, (ignored, bindingsAndPosition) -> { + if (bindingsAndPosition == null || bindingsAndPosition.get() == TOMBSTONE_BINDINGS) { + return newBindings; + } + return bindingsAndPosition; + }); + assert bindings != null; + if (bindings == newBindings) { + return; + } + } + } + while (!addBindingIfAbsent(bindings, binding)); + } + + /** + * Remove the specified {@code binding}, if present. + */ + public void removeBinding(Binding binding) { + Objects.requireNonNull(binding); + final SimpleString routingName = binding.getRoutingName(); + Objects.requireNonNull(routingName); + final BindingsAndPosition bindings = map.get(routingName); + if (bindings == null) { + return; + } + final Binding[] newBindings = removeBindingIfPresent(bindings, binding); + if (newBindings == TOMBSTONE_BINDINGS) { + // GC attempt + map.computeIfPresent(routingName, (bindingsRoutingName, existingBindings) -> { + if (existingBindings.get() == TOMBSTONE_BINDINGS) { + return null; + } + return existingBindings; + }); + } + } + + /** + * Returns a snapshot of the bindings, if present and a "lazy" binding index, otherwise {@code null}.
+ * There is no strong commitment on preserving the index value if the related bindings are concurrently modified + * or the index itself is concurrently modified. + */ + public Pair getBindings(SimpleString routingName) { + Objects.requireNonNull(routingName); + BindingsAndPosition bindings = map.get(routingName); + if (bindings == null) { + return null; + } + final Binding[] bindingsSnapshot = bindings.get(); + if (bindingsSnapshot == TOMBSTONE_BINDINGS) { + return null; + } + assert bindingsSnapshot != null && bindingsSnapshot.length > 0; + return new Pair<>(bindingsSnapshot, bindings); + } + + @FunctionalInterface + public interface BindingsConsumer { + + /** + * {@code routingName} cannot be {@code null}. + * {@code bindings} cannot be {@code null} or empty. + * {@code nextPosition} cannot be null. + */ + void accept(SimpleString routingName, Binding[] bindings, BindingIndex nextPosition) throws T; + } + + /** + * Iterates through the bindings and its related indexes.
+ */ + public void forEach(BindingsConsumer bindingsConsumer) throws T { + Objects.requireNonNull(bindingsConsumer); + if (map.isEmpty()) { + return; + } + for (Map.Entry entry : map.entrySet()) { + final BindingsAndPosition value = entry.getValue(); + final Binding[] bindings = value.get(); + if (bindings == TOMBSTONE_BINDINGS) { + continue; + } + assert bindings != null && bindings.length > 0; + bindingsConsumer.accept(entry.getKey(), bindings, value); + } + } + + public boolean isEmpty() { + return map.isEmpty(); + } + + public Map> copyAsMap() { + if (map.isEmpty()) { + return Collections.emptyMap(); + } + final HashMap> copy = new HashMap<>(map.size()); + map.forEach((routingName, bindings) -> { + final Binding[] bindingArray = bindings.get(); + if (bindingArray == TOMBSTONE_BINDINGS) { + return; + } + copy.put(routingName, Arrays.asList(bindingArray)); + }); + return copy; + } + + private static final Binding[] TOMBSTONE_BINDINGS = new Binding[0]; + + private static int indexOfBinding(final Binding[] bindings, final Binding toFind) { + for (int i = 0, size = bindings.length; i < size; i++) { + final Binding binding = bindings[i]; + if (binding.equals(toFind)) { + return i; + } + } + return -1; + } + + /** + * Remove the binding if present and returns the new bindings, {@code null} otherwise. + */ + private static Binding[] removeBindingIfPresent(final AtomicReference bindings, + final Binding bindingToRemove) { + Objects.requireNonNull(bindings); + Objects.requireNonNull(bindingToRemove); + Binding[] oldBindings; + Binding[] newBindings; + do { + oldBindings = bindings.get(); + // no need to check vs TOMBSTONE_BINDINGS, because found === -1; + final int found = indexOfBinding(oldBindings, bindingToRemove); + if (found == -1) { + return null; + } + final int oldBindingsCount = oldBindings.length; + if (oldBindingsCount == 1) { + newBindings = TOMBSTONE_BINDINGS; + } else { + final int newBindingsCount = oldBindingsCount - 1; + newBindings = new Binding[newBindingsCount]; + System.arraycopy(oldBindings, 0, newBindings, 0, found); + final int remaining = newBindingsCount - found; + if (remaining > 0) { + System.arraycopy(oldBindings, found + 1, newBindings, found, remaining); + } + } + } + while (!bindings.compareAndSet(oldBindings, newBindings)); + return newBindings; + } + + /** + * Returns {@code true} if the given binding has been added or already present, + * {@code false} if bindings are going to be garbage-collected. + */ + private static boolean addBindingIfAbsent(final AtomicReference bindings, final Binding newBinding) { + Objects.requireNonNull(bindings); + Objects.requireNonNull(newBinding); + Binding[] oldBindings; + Binding[] newBindings; + do { + oldBindings = bindings.get(); + if (oldBindings == TOMBSTONE_BINDINGS) { + return false; + } + if (indexOfBinding(oldBindings, newBinding) >= 0) { + return true; + } + final int oldLength = oldBindings.length; + newBindings = Arrays.copyOf(oldBindings, oldLength + 1); + assert newBindings[oldLength] == null; + newBindings[oldLength] = newBinding; + } + while (!bindings.compareAndSet(oldBindings, newBindings)); + return true; + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index b027c27c29..365ce0120a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.postoffice.impl; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; @@ -32,7 +33,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException; @@ -1079,34 +1079,46 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding * if a DLA still not found, it should then use previous semantics. * */ private RoutingStatus route(final Message message, - final RoutingContext context, - final boolean direct, - boolean rejectDuplicates, - final Binding bindingMove, boolean sendToDLA) throws Exception { + final RoutingContext context, + final boolean direct, + final boolean rejectDuplicates, + final Binding bindingMove, + final boolean sendToDLA) throws Exception { - - RoutingStatus result; // Sanity check if (message.getRefCount() > 0) { throw new IllegalStateException("Message cannot be routed more than once"); } final SimpleString address = context.getAddress(message); - - AtomicBoolean startedTX = new AtomicBoolean(false); - - applyExpiryDelay(message, address); - - if (context.isDuplicateDetection() && !checkDuplicateID(message, context, rejectDuplicates, startedTX)) { - return RoutingStatus.DUPLICATED_ID; + final AddressSettings settings = addressSettingsRepository.getMatch(address.toString()); + if (settings != null) { + applyExpiryDelay(message, settings); } + final boolean startedTX; + if (context.isDuplicateDetection()) { + final DuplicateCheckResult duplicateCheckResult = checkDuplicateID(message, context, rejectDuplicates); + switch (duplicateCheckResult) { + + case DuplicateNotStartedTX: + return RoutingStatus.DUPLICATED_ID; + case NoDuplicateStartedTX: + startedTX = true; + break; + case NoDuplicateNotStartedTX: + startedTX = false; + //nop + break; + default: + throw new IllegalStateException("Unexpected value: " + duplicateCheckResult); + } + } else { + startedTX = false; + } message.clearInternalProperties(); - - Bindings bindings = addressManager.getBindingsForRoutingAddress(address); - - AddressInfo addressInfo = addressManager.getAddressInfo(address); - + Bindings bindings; + final AddressInfo addressInfo = addressManager.getAddressInfo(address); if (bindingMove != null) { context.clear(); context.setReusable(false); @@ -1114,7 +1126,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding if (addressInfo != null) { addressInfo.incrementRoutedMessageCount(); } - } else if (bindings != null) { + } else if ((bindings = addressManager.getBindingsForRoutingAddress(address)) != null) { bindings.route(message, context); if (addressInfo != null) { addressInfo.incrementRoutedMessageCount(); @@ -1126,7 +1138,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } // this is a debug and not warn because this could be a regular scenario on publish-subscribe queues (or topic subscriptions on JMS) if (logger.isDebugEnabled()) { - logger.debug("Couldn't find any bindings for address=" + address + " on message=" + message); + logger.debugf("Couldn't find any bindings for address=%s on message=%s", message, address, message); } } @@ -1135,62 +1147,19 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } if (logger.isTraceEnabled()) { - logger.trace("Message after routed=" + message + "\n" + context.toString()); + logger.tracef("Message after routed=%s\n%s", message, context); } try { + final RoutingStatus status; if (context.getQueueCount() == 0) { - // Send to DLA if appropriate - - AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString()); - - - if (sendToDLA) { - // it's already been through here once, giving up now - sendToDLA = false; - } else { - sendToDLA = addressSettings != null ? addressSettings.isSendToDLAOnNoRoute() : AddressSettings.DEFAULT_SEND_TO_DLA_ON_NO_ROUTE; - } - - if (sendToDLA) { - // Send to the DLA for the address - - SimpleString dlaAddress = addressSettings != null ? addressSettings.getDeadLetterAddress() : null; - - if (logger.isDebugEnabled()) { - logger.debug("sending message to dla address = " + dlaAddress + ", message=" + message); - } - - if (dlaAddress == null) { - result = RoutingStatus.NO_BINDINGS; - ActiveMQServerLogger.LOGGER.noDLA(address); - } else { - message.referenceOriginalMessage(message, null); - - message.setAddress(dlaAddress); - - message.reencode(); - - route(message, new RoutingContextImpl(context.getTransaction()), false, true, null, sendToDLA); - result = RoutingStatus.NO_BINDINGS_DLA; - } - } else { - result = RoutingStatus.NO_BINDINGS; - - if (logger.isDebugEnabled()) { - logger.debug("Message " + message + " is not going anywhere as it didn't have a binding on address:" + address); - } - - if (message.isLargeMessage()) { - ((LargeServerMessage) message).deleteFile(); - } - } + status = maybeSendToDLA(message, context, address, sendToDLA); } else { - result = RoutingStatus.OK; + status = RoutingStatus.OK; try { processRoute(message, context, direct); } catch (ActiveMQAddressFullException e) { - if (startedTX.get()) { + if (startedTX) { context.getTransaction().rollback(); } else if (context.getTransaction() != null) { context.getTransaction().markAsRollbackOnly(e); @@ -1198,45 +1167,83 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding throw e; } } - - if (startedTX.get()) { + if (startedTX) { context.getTransaction().commit(); } + if (server.hasBrokerMessagePlugins()) { + server.callBrokerMessagePlugins(plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, status)); + } + return status; } catch (Exception e) { if (server.hasBrokerMessagePlugins()) { server.callBrokerMessagePlugins(plugin -> plugin.onMessageRouteException(message, context, direct, rejectDuplicates, e)); } throw e; } + } - if (server.hasBrokerMessagePlugins()) { - server.callBrokerMessagePlugins(plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, result)); + private RoutingStatus maybeSendToDLA(final Message message, + final RoutingContext context, + final SimpleString address, + final boolean sendToDLAHint) throws Exception { + final RoutingStatus status; + final AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString()); + final boolean sendToDLA; + if (sendToDLAHint) { + // it's already been through here once, giving up now + sendToDLA = false; + } else { + sendToDLA = addressSettings != null ? addressSettings.isSendToDLAOnNoRoute() : AddressSettings.DEFAULT_SEND_TO_DLA_ON_NO_ROUTE; } + if (sendToDLA) { + // Send to the DLA for the address + final SimpleString dlaAddress = addressSettings != null ? addressSettings.getDeadLetterAddress() : null; + if (logger.isDebugEnabled()) { + logger.debugf("sending message to dla address = %s, message=%s", dlaAddress, message); + } + if (dlaAddress == null) { + status = RoutingStatus.NO_BINDINGS; + ActiveMQServerLogger.LOGGER.noDLA(address); + } else { + message.referenceOriginalMessage(message, null); - return result; + message.setAddress(dlaAddress); + + message.reencode(); + + route(message, new RoutingContextImpl(context.getTransaction()), false, true, null, true); + status = RoutingStatus.NO_BINDINGS_DLA; + } + } else { + status = RoutingStatus.NO_BINDINGS; + if (logger.isDebugEnabled()) { + logger.debugf("Message %s is not going anywhere as it didn't have a binding on address:%s", message, address); + } + if (message.isLargeMessage()) { + ((LargeServerMessage) message).deleteFile(); + } + } + return status; } // HORNETQ-1029 - private void applyExpiryDelay(Message message, SimpleString address) { - AddressSettings settings = addressSettingsRepository.getMatch(address.toString()); - if (settings != null) { - long expirationOverride = settings.getExpiryDelay(); + private static void applyExpiryDelay(Message message, AddressSettings settings) { + long expirationOverride = settings.getExpiryDelay(); - // A -1 means don't do anything - if (expirationOverride >= 0) { - // only override the expiration on messages where the expiration hasn't been set by the user - if (message.getExpiration() == 0) { - message.setExpiration(System.currentTimeMillis() + expirationOverride); - } - } else { - long minExpiration = settings.getMinExpiryDelay(); - long maxExpiration = settings.getMaxExpiryDelay(); + // A -1 means don't do anything + if (expirationOverride >= 0) { + // only override the expiration on messages where the expiration hasn't been set by the user + if (message.getExpiration() == 0) { + message.setExpiration(System.currentTimeMillis() + expirationOverride); + } + } else { + long minExpiration = settings.getMinExpiryDelay(); + long maxExpiration = settings.getMaxExpiryDelay(); - if (maxExpiration != AddressSettings.DEFAULT_MAX_EXPIRY_DELAY && (message.getExpiration() == 0 || message.getExpiration() > (System.currentTimeMillis() + maxExpiration))) { - message.setExpiration(System.currentTimeMillis() + maxExpiration); - } else if (minExpiration != AddressSettings.DEFAULT_MIN_EXPIRY_DELAY && message.getExpiration() < (System.currentTimeMillis() + minExpiration)) { - message.setExpiration(System.currentTimeMillis() + minExpiration); - } + if (maxExpiration != AddressSettings.DEFAULT_MAX_EXPIRY_DELAY && (message.getExpiration() == 0 || message.getExpiration() > (System.currentTimeMillis() + maxExpiration))) { + message.setExpiration(System.currentTimeMillis() + maxExpiration); + } else if (minExpiration != AddressSettings.DEFAULT_MIN_EXPIRY_DELAY && message.getExpiration() < (System.currentTimeMillis() + minExpiration)) { + message.setExpiration(System.currentTimeMillis() + minExpiration); } } } @@ -1489,20 +1496,22 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding public void processRoute(final Message message, final RoutingContext context, final boolean direct) throws Exception { - final List refs = new ArrayList<>(); + final ArrayList refs = new ArrayList<>(); - Transaction tx = context.getTransaction(); + final Transaction tx = context.getTransaction(); - Long deliveryTime = null; + final Long deliveryTime; if (message.hasScheduledDeliveryTime()) { deliveryTime = message.getScheduledDeliveryTime(); + } else { + deliveryTime = null; } - - PagingStore owningStore = pagingManager.getPageStore(message.getAddressSimpleString()); + final SimpleString messageAddress = message.getAddressSimpleString(); + final PagingStore owningStore = pagingManager.getPageStore(messageAddress); message.setOwner(owningStore); for (Map.Entry entry : context.getContexListing().entrySet()) { - PagingStore store; - if (entry.getKey() == message.getAddressSimpleString() || entry.getKey().equals(message.getAddressSimpleString())) { + final PagingStore store; + if (entry.getKey().equals(messageAddress)) { store = owningStore; } else { store = pagingManager.getPageStore(entry.getKey()); @@ -1518,68 +1527,22 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding continue; } - for (Queue queue : entry.getValue().getNonDurableQueues()) { - MessageReference reference = MessageReference.Factory.createReference(message, queue); - - if (deliveryTime != null) { - reference.setScheduledDeliveryTime(deliveryTime); - } - refs.add(reference); - - queue.refUp(reference); + final List nonDurableQueues = entry.getValue().getNonDurableQueues(); + if (!nonDurableQueues.isEmpty()) { + refs.ensureCapacity(nonDurableQueues.size()); + nonDurableQueues.forEach(queue -> { + final MessageReference reference = MessageReference.Factory.createReference(message, queue); + if (deliveryTime != null) { + reference.setScheduledDeliveryTime(deliveryTime); + } + refs.add(reference); + queue.refUp(reference); + }); } - Iterator iter = entry.getValue().getDurableQueues().iterator(); - - while (iter.hasNext()) { - Queue queue = iter.next(); - - MessageReference reference = MessageReference.Factory.createReference(message, queue); - - if (context.isAlreadyAcked(context.getAddress(message), queue)) { - reference.setAlreadyAcked(); - if (tx != null) { - queue.acknowledge(tx, reference); - } - } - - if (deliveryTime != null) { - reference.setScheduledDeliveryTime(deliveryTime); - } - refs.add(reference); - queue.refUp(reference); - - if (message.isDurable()) { - int durableRefCount = queue.durableUp(message); - - if (durableRefCount == 1) { - if (tx != null) { - storageManager.storeMessageTransactional(tx.getID(), message); - } else { - storageManager.storeMessage(message); - } - - if (message.isLargeMessage()) { - confirmLargeMessageSend(tx, message); - } - } - - if (tx != null) { - storageManager.storeReferenceTransactional(tx.getID(), queue.getID(), message.getMessageID()); - - tx.setContainsPersistent(); - } else { - storageManager.storeReference(queue.getID(), message.getMessageID(), !iter.hasNext()); - } - - if (deliveryTime != null && deliveryTime > 0) { - if (tx != null) { - storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference); - } else { - storageManager.updateScheduledDeliveryTime(reference); - } - } - } + final List durableQueues = entry.getValue().getDurableQueues(); + if (!durableQueues.isEmpty()) { + processRouteToDurableQueues(message, context, deliveryTime, tx, durableQueues, refs); } } @@ -1608,6 +1571,59 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } + private void processRouteToDurableQueues(final Message message, + final RoutingContext context, + final Long deliveryTime, + final Transaction tx, + final List durableQueues, + final ArrayList refs) throws Exception { + final int durableQueuesCount = durableQueues.size(); + refs.ensureCapacity(durableQueuesCount); + final Iterator iter = durableQueues.iterator(); + for (int i = 0; i < durableQueuesCount; i++) { + final Queue queue = iter.next(); + final MessageReference reference = MessageReference.Factory.createReference(message, queue); + if (context.isAlreadyAcked(message, queue)) { + reference.setAlreadyAcked(); + if (tx != null) { + queue.acknowledge(tx, reference); + } + } + if (deliveryTime != null) { + reference.setScheduledDeliveryTime(deliveryTime); + } + refs.add(reference); + queue.refUp(reference); + if (message.isDurable()) { + final int durableRefCount = queue.durableUp(message); + if (durableRefCount == 1) { + if (tx != null) { + storageManager.storeMessageTransactional(tx.getID(), message); + } else { + storageManager.storeMessage(message); + } + if (message.isLargeMessage()) { + confirmLargeMessageSend(tx, message); + } + } + if (tx != null) { + storageManager.storeReferenceTransactional(tx.getID(), queue.getID(), message.getMessageID()); + tx.setContainsPersistent(); + } else { + final boolean last = i == (durableQueuesCount - 1); + storageManager.storeReference(queue.getID(), message.getMessageID(), last); + } + if (deliveryTime != null && deliveryTime > 0) { + if (tx != null) { + storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference); + } else { + storageManager.updateScheduledDeliveryTime(reference); + } + } + } + } + } + /** * @param tx * @param message @@ -1671,72 +1687,76 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } - private boolean checkDuplicateID(final Message message, - final RoutingContext context, - boolean rejectDuplicates, - AtomicBoolean startedTX) throws Exception { + private enum DuplicateCheckResult { + DuplicateNotStartedTX, NoDuplicateStartedTX, NoDuplicateNotStartedTX + } + + private DuplicateCheckResult checkDuplicateID(final Message message, + final RoutingContext context, + final boolean rejectDuplicates) throws Exception { // Check the DuplicateCache for the Bridge first - - Object bridgeDup = message.removeExtraBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID); + final Object bridgeDup = message.removeExtraBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID); if (bridgeDup != null) { - // if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one - byte[] bridgeDupBytes = (byte[]) bridgeDup; - - DuplicateIDCache cacheBridge = getDuplicateIDCache(BRIDGE_CACHE_STR.concat(context.getAddress(message).toString())); - - if (context.getTransaction() == null) { - context.setTransaction(new TransactionImpl(storageManager)); - startedTX.set(true); - } - - if (!cacheBridge.atomicVerify(bridgeDupBytes, context.getTransaction())) { - context.getTransaction().rollback(); - startedTX.set(false); - message.usageDown(); // this will cause large message delete - return false; - } - } else { - // if used BridgeDuplicate, it's not going to use the regular duplicate - // since this will would break redistribution (re-setting the duplicateId) - byte[] duplicateIDBytes = message.getDuplicateIDBytes(); - - DuplicateIDCache cache = null; - - boolean isDuplicate = false; - - if (duplicateIDBytes != null) { - cache = getDuplicateIDCache(context.getAddress(message)); - - isDuplicate = cache.contains(duplicateIDBytes); - - if (rejectDuplicates && isDuplicate) { - ActiveMQServerLogger.LOGGER.duplicateMessageDetected(message); - - String warnMessage = "Duplicate message detected - message will not be routed. Message information:" + message.toString(); - - if (context.getTransaction() != null) { - context.getTransaction().markAsRollbackOnly(new ActiveMQDuplicateIdException(warnMessage)); - } - - message.usageDown(); // this will cause large message delete - - return false; - } - } - - if (cache != null && !isDuplicate) { - if (context.getTransaction() == null) { - // We need to store the duplicate id atomically with the message storage, so we need to create a tx for this - context.setTransaction(new TransactionImpl(storageManager)); - - startedTX.set(true); - } - - cache.addToCache(duplicateIDBytes, context.getTransaction(), startedTX.get()); - } + return checkBridgeDuplicateID(message, context, (byte[]) bridgeDup); } + // if used BridgeDuplicate, it's not going to use the regular duplicate + // since this will would break redistribution (re-setting the duplicateId) + final byte[] duplicateIDBytes = message.getDuplicateIDBytes(); + if (duplicateIDBytes == null) { + return DuplicateCheckResult.NoDuplicateNotStartedTX; + } + return checkNotBridgeDuplicateID(message, context, rejectDuplicates, duplicateIDBytes); + } - return true; + private DuplicateCheckResult checkNotBridgeDuplicateID(final Message message, + final RoutingContext context, + final boolean rejectDuplicates, + final byte[] duplicateIDBytes) throws Exception { + assert duplicateIDBytes != null && Arrays.equals(message.getDuplicateIDBytes(), duplicateIDBytes); + final DuplicateIDCache cache = getDuplicateIDCache(context.getAddress(message)); + final boolean isDuplicate = cache.contains(duplicateIDBytes); + if (rejectDuplicates && isDuplicate) { + ActiveMQServerLogger.LOGGER.duplicateMessageDetected(message); + if (context.getTransaction() != null) { + final String warnMessage = "Duplicate message detected - message will not be routed. Message information:" + message; + context.getTransaction().markAsRollbackOnly(new ActiveMQDuplicateIdException(warnMessage)); + } + message.usageDown(); // this will cause large message delete + return DuplicateCheckResult.DuplicateNotStartedTX; + } + if (isDuplicate) { + assert !rejectDuplicates; + return DuplicateCheckResult.NoDuplicateNotStartedTX; + } + final boolean startedTX; + if (context.getTransaction() == null) { + // We need to store the duplicate id atomically with the message storage, so we need to create a tx for this + context.setTransaction(new TransactionImpl(storageManager)); + startedTX = true; + } else { + startedTX = false; + } + cache.addToCache(duplicateIDBytes, context.getTransaction(), startedTX); + return startedTX ? DuplicateCheckResult.NoDuplicateStartedTX : DuplicateCheckResult.NoDuplicateNotStartedTX; + } + + private DuplicateCheckResult checkBridgeDuplicateID(final Message message, + final RoutingContext context, + final byte[] bridgeDupBytes) throws Exception { + assert bridgeDupBytes != null; + boolean startedTX = false; + if (context.getTransaction() == null) { + context.setTransaction(new TransactionImpl(storageManager)); + startedTX = true; + } + // if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one + final DuplicateIDCache cacheBridge = getDuplicateIDCache(BRIDGE_CACHE_STR.concat(context.getAddress(message).toString())); + if (!cacheBridge.atomicVerify(bridgeDupBytes, context.getTransaction())) { + context.getTransaction().rollback(); + message.usageDown(); // this will cause large message delete + return DuplicateCheckResult.DuplicateNotStartedTX; + } + return startedTX ? DuplicateCheckResult.NoDuplicateStartedTX : DuplicateCheckResult.NoDuplicateNotStartedTX; } /** diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java index 1b2b665f13..a6e11a9622 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java @@ -68,7 +68,7 @@ public interface RoutingContext { void addQueueWithAck(SimpleString address, Queue queue); - boolean isAlreadyAcked(SimpleString address, Queue queue); + boolean isAlreadyAcked(Message message, Queue queue); void setAddress(SimpleString address); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java index 3cd88695b0..7a9929498b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java @@ -192,8 +192,8 @@ public class RoutingContextImpl implements RoutingContext { } @Override - public boolean isAlreadyAcked(SimpleString address, Queue queue) { - RouteContextList listing = map.get(address); + public boolean isAlreadyAcked(Message message, Queue queue) { + final RouteContextList listing = map.get(getAddress(message)); return listing == null ? false : listing.isAlreadyAcked(queue); }