This commit is contained in:
Clebert Suconic 2021-04-05 10:01:00 -04:00
commit 7868959b52
5 changed files with 660 additions and 465 deletions

View File

@ -25,12 +25,11 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Binding;
@ -54,9 +53,7 @@ public final class BindingsImpl implements Bindings {
// This is public as we use on test assertions // This is public as we use on test assertions
public static final int MAX_GROUP_RETRY = 10; public static final int MAX_GROUP_RETRY = 10;
private final ConcurrentMap<SimpleString, List<Binding>> routingNameBindingMap = new ConcurrentHashMap<>(); private final CopyOnWriteBindings routingNameBindingMap = new CopyOnWriteBindings();
private final Map<SimpleString, Integer> routingNamePositions = new ConcurrentHashMap<>();
private final Map<Long, Binding> bindingsIdMap = new ConcurrentHashMap<>(); private final Map<Long, Binding> bindingsIdMap = new ConcurrentHashMap<>();
@ -113,6 +110,8 @@ public final class BindingsImpl implements Bindings {
} }
} }
@Override @Override
public void addBinding(final Binding binding) { public void addBinding(final Binding binding) {
try { try {
@ -122,30 +121,14 @@ public final class BindingsImpl implements Bindings {
if (binding.isExclusive()) { if (binding.isExclusive()) {
exclusiveBindings.add(binding); exclusiveBindings.add(binding);
} else { } else {
SimpleString routingName = binding.getRoutingName(); routingNameBindingMap.addBindingIfAbsent(binding);
List<Binding> bindings = routingNameBindingMap.get(routingName);
if (bindings == null) {
bindings = new CopyOnWriteArrayList<>();
List<Binding> oldBindings = routingNameBindingMap.putIfAbsent(routingName, bindings);
if (oldBindings != null) {
bindings = oldBindings;
}
}
if (!bindings.contains(binding)) {
bindings.add(binding);
}
} }
bindingsIdMap.put(binding.getID(), binding); bindingsIdMap.put(binding.getID(), binding);
bindingsNameMap.put(binding.getUniqueName(), binding); bindingsNameMap.put(binding.getUniqueName(), binding);
if (binding instanceof RemoteQueueBinding) { if (binding instanceof RemoteQueueBinding) {
setMessageLoadBalancingType(((RemoteQueueBinding)binding).getMessageLoadBalancingType()); setMessageLoadBalancingType(((RemoteQueueBinding) binding).getMessageLoadBalancingType());
} }
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Adding binding " + binding + " into " + this + " bindingTable: " + debugBindings()); logger.trace("Adding binding " + binding + " into " + this + " bindingTable: " + debugBindings());
@ -174,17 +157,7 @@ public final class BindingsImpl implements Bindings {
if (binding.isExclusive()) { if (binding.isExclusive()) {
exclusiveBindings.remove(binding); exclusiveBindings.remove(binding);
} else { } else {
SimpleString routingName = binding.getRoutingName(); routingNameBindingMap.removeBinding(binding);
List<Binding> bindings = routingNameBindingMap.get(routingName);
if (bindings != null) {
bindings.remove(binding);
if (bindings.isEmpty()) {
routingNameBindingMap.remove(routingName);
}
}
} }
bindingsIdMap.remove(binding.getID()); bindingsIdMap.remove(binding.getID());
@ -208,78 +181,56 @@ public final class BindingsImpl implements Bindings {
public boolean redistribute(final Message message, public boolean redistribute(final Message message,
final Queue originatingQueue, final Queue originatingQueue,
final RoutingContext context) throws Exception { final RoutingContext context) throws Exception {
if (messageLoadBalancingType.equals(MessageLoadBalancingType.STRICT) || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF)) { final MessageLoadBalancingType loadBalancingType = this.messageLoadBalancingType;
if (loadBalancingType.equals(MessageLoadBalancingType.STRICT) || loadBalancingType.equals(MessageLoadBalancingType.OFF)) {
return false; return false;
} }
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Redistributing message " + message); logger.tracef("Redistributing message %s", message);
} }
SimpleString routingName = originatingQueue.getName(); final SimpleString routingName = originatingQueue.getName();
List<Binding> bindings = routingNameBindingMap.get(routingName); final Pair<Binding[], CopyOnWriteBindings.BindingIndex> bindingsAndPosition = routingNameBindingMap.getBindings(routingName);
if (bindings == null) { if (bindingsAndPosition == null) {
// The value can become null if it's concurrently removed while we're iterating - this is expected // The value can become null if it's concurrently removed while we're iterating - this is expected
// ConcurrentHashMap behaviour! // ConcurrentHashMap behaviour!
return false; return false;
} }
Integer ipos = routingNamePositions.get(routingName); final Binding[] bindings = bindingsAndPosition.getA();
int pos = ipos != null ? ipos.intValue() : 0; final CopyOnWriteBindings.BindingIndex bindingIndex = bindingsAndPosition.getB();
int length = bindings.size(); assert bindings.length > 0;
int startPos = pos; final int bindingsCount = bindings.length;
Binding theBinding = null; int nextPosition = bindingIndex.getIndex();
// TODO - combine this with similar logic in route() if (nextPosition >= bindingsCount) {
while (true) { nextPosition = 0;
Binding binding; }
try {
binding = bindings.get(pos);
} catch (IndexOutOfBoundsException e) {
// This can occur if binding is removed while in route
if (!bindings.isEmpty()) {
pos = 0;
startPos = 0;
length = bindings.size();
continue;
} else {
break;
}
}
pos = incrementPos(pos, length);
Filter filter = binding.getFilter();
boolean highPrior = binding.isHighAcceptPriority(message);
Binding nextBinding = null;
for (int i = 0; i < bindingsCount; i++) {
final Binding binding = bindings[nextPosition];
nextPosition = moveNextPosition(nextPosition, bindingsCount);
final Filter filter = binding.getFilter();
final boolean highPrior = binding.isHighAcceptPriority(message);
if (highPrior && binding.getBindable() != originatingQueue && (filter == null || filter.match(message))) { if (highPrior && binding.getBindable() != originatingQueue && (filter == null || filter.match(message))) {
theBinding = binding; nextBinding = binding;
break;
}
if (pos == startPos) {
break; break;
} }
} }
if (nextBinding == null) {
routingNamePositions.put(routingName, pos);
if (theBinding != null) {
theBinding.route(message, context);
return true;
} else {
return false; return false;
} }
bindingIndex.setIndex(nextPosition);
nextBinding.route(message, context);
return true;
} }
@Override @Override
@ -290,8 +241,8 @@ public final class BindingsImpl implements Bindings {
private void route(final Message message, private void route(final Message message,
final RoutingContext context, final RoutingContext context,
final boolean groupRouting) throws Exception { final boolean groupRouting) throws Exception {
int currentVersion = version.get(); final int currentVersion = version.get();
boolean reusableContext = context.isReusable(message, currentVersion); final boolean reusableContext = context.isReusable(message, currentVersion);
if (!reusableContext) { if (!reusableContext) {
context.clear(); context.clear();
@ -300,54 +251,33 @@ public final class BindingsImpl implements Bindings {
/* This is a special treatment for scaled-down messages involving SnF queues. /* This is a special treatment for scaled-down messages involving SnF queues.
* See org.apache.activemq.artemis.core.server.impl.ScaleDownHandler.scaleDownMessages() for the logic that sends messages with this property * See org.apache.activemq.artemis.core.server.impl.ScaleDownHandler.scaleDownMessages() for the logic that sends messages with this property
*/ */
byte[] ids = message.removeExtraBytesProperty(Message.HDR_SCALEDOWN_TO_IDS); final byte[] ids = message.removeExtraBytesProperty(Message.HDR_SCALEDOWN_TO_IDS);
if (ids != null) { if (ids != null) {
ByteBuffer buffer = ByteBuffer.wrap(ids); handleScaledDownMessage(message, ids);
while (buffer.hasRemaining()) {
long id = buffer.getLong();
for (Map.Entry<Long, Binding> entry : bindingsIdMap.entrySet()) {
if (entry.getValue() instanceof RemoteQueueBinding) {
RemoteQueueBinding remoteQueueBinding = (RemoteQueueBinding) entry.getValue();
if (remoteQueueBinding.getRemoteQueueID() == id) {
message.putExtraBytesProperty(Message.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array());
}
}
}
}
} }
boolean routed = false; final boolean routed;
// despite the double check can lead to some race, this can save allocating an iterator for an empty set
boolean hasExclusives = false; if (!exclusiveBindings.isEmpty()) {
routed = routeToExclusiveBindings(message, context);
for (Binding binding : exclusiveBindings) { } else {
if (!hasExclusives) { routed = false;
context.clear().setReusable(false);
hasExclusives = true;
}
if (binding.getFilter() == null || binding.getFilter().match(message)) {
binding.getBindable().route(message, context);
routed = true;
}
} }
if (!routed) { if (!routed) {
// Remove the ids now, in order to avoid double check // Remove the ids now, in order to avoid double check
ids = message.removeExtraBytesProperty(Message.HDR_ROUTE_TO_IDS); final byte[] routeToIds = message.removeExtraBytesProperty(Message.HDR_ROUTE_TO_IDS);
// Fetch the groupId now, in order to avoid double checking SimpleString groupId;
SimpleString groupId = message.getGroupID(); if (routeToIds != null) {
if (ids != null) {
context.clear().setReusable(false); context.clear().setReusable(false);
routeFromCluster(message, context, ids); routeFromCluster(message, context, routeToIds);
} else if (groupingHandler != null && groupRouting && groupId != null) { } else if (groupRouting && groupingHandler != null && (groupId = message.getGroupID()) != null) {
context.clear().setReusable(false); context.clear().setReusable(false);
routeUsingStrictOrdering(message, context, groupingHandler, groupId, 0); routeUsingStrictOrdering(message, context, groupingHandler, groupId, 0);
} else if (CompositeAddress.isFullyQualified(message.getAddress())) { } else if (CompositeAddress.isFullyQualified(message.getAddress())) {
context.clear().setReusable(false); context.clear().setReusable(false);
Binding theBinding = bindingsNameMap.get(CompositeAddress.extractQueueName(message.getAddressSimpleString())); final Binding theBinding = bindingsNameMap.get(CompositeAddress.extractQueueName(message.getAddressSimpleString()));
if (theBinding != null) { if (theBinding != null) {
theBinding.route(message, context); theBinding.route(message, context);
} }
@ -361,35 +291,58 @@ public final class BindingsImpl implements Bindings {
} }
} }
private void simpleRouting(Message message, RoutingContext context, int currentVersion) throws Exception { private boolean routeToExclusiveBindings(final Message message, final RoutingContext context) throws Exception {
boolean hasExclusives = false;
boolean routed = false;
for (Binding binding : exclusiveBindings) {
if (!hasExclusives) {
context.clear().setReusable(false);
hasExclusives = true;
}
final Filter filter = binding.getFilter();
if (filter == null || filter.match(message)) {
binding.getBindable().route(message, context);
routed = true;
}
}
return routed;
}
private void handleScaledDownMessage(final Message message, final byte[] ids) {
ByteBuffer buffer = ByteBuffer.wrap(ids);
while (buffer.hasRemaining()) {
long id = buffer.getLong();
for (Map.Entry<Long, Binding> entry : bindingsIdMap.entrySet()) {
if (entry.getValue() instanceof RemoteQueueBinding) {
RemoteQueueBinding remoteQueueBinding = (RemoteQueueBinding) entry.getValue();
if (remoteQueueBinding.getRemoteQueueID() == id) {
message.putExtraBytesProperty(Message.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array());
}
}
}
}
}
private void simpleRouting(final Message message,
final RoutingContext context,
final int currentVersion) throws Exception {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Routing message " + message + " on binding=" + this + " current context::" + context); logger.tracef("Routing message %s on binding=%s current context::$s", message, this, context);
} }
for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet()) { routingNameBindingMap.forEach((routingName, bindings, nextPosition) -> {
SimpleString routingName = entry.getKey(); final Binding nextBinding = getNextBinding(message, bindings, nextPosition);
if (nextBinding != null && nextBinding.getFilter() == null && nextBinding.isLocal() && bindings.length == 1) {
List<Binding> bindings = entry.getValue();
if (bindings == null) {
// The value can become null if it's concurrently removed while we're iterating - this is expected
// ConcurrentHashMap behaviour!
continue;
}
Binding theBinding = getNextBinding(message, routingName, bindings);
if (theBinding != null && theBinding.getFilter() == null && bindings.size() == 1 && theBinding.isLocal()) {
context.setReusable(true, currentVersion); context.setReusable(true, currentVersion);
} else { } else {
// notice that once this is set to false, any calls to setReusable(true) will be moot as the context will ignore it // notice that once this is set to false, any calls to setReusable(true) will be moot as the context will ignore it
context.setReusable(false, currentVersion); context.setReusable(false, currentVersion);
} }
if (theBinding != null) { if (nextBinding != null) {
theBinding.route(message, context); nextBinding.route(message, context);
} }
} });
} }
@Override @Override
@ -406,99 +359,65 @@ public final class BindingsImpl implements Bindings {
* (depending if you are using multi-thread), and not lose messages. * (depending if you are using multi-thread), and not lose messages.
*/ */
private Binding getNextBinding(final Message message, private Binding getNextBinding(final Message message,
final SimpleString routingName, final Binding[] bindings,
final List<Binding> bindings) { final CopyOnWriteBindings.BindingIndex bindingIndex) {
Integer ipos = routingNamePositions.get(routingName); int nextPosition = bindingIndex.getIndex();
int pos = ipos != null ? ipos : 0; final int bindingsCount = bindings.length;
int length = bindings.size(); if (nextPosition >= bindingsCount) {
nextPosition = 0;
int startPos = pos; }
Binding theBinding = null;
Binding nextBinding = null;
int lastLowPriorityBinding = -1; int lastLowPriorityBinding = -1;
// snapshot this, to save loading it on each iteration
final MessageLoadBalancingType loadBalancingType = this.messageLoadBalancingType;
while (true) { for (int i = 0; i < bindingsCount; i++) {
Binding binding; final Binding binding = bindings[nextPosition];
try { if (matchBinding(message, binding, loadBalancingType)) {
binding = bindings.get(pos);
} catch (IndexOutOfBoundsException e) {
// This can occur if binding is removed while in route
if (!bindings.isEmpty()) {
pos = 0;
startPos = 0;
length = bindings.size();
continue;
} else {
break;
}
}
if (matchBinding(message, binding)) {
// bindings.length == 1 ==> only a local queue so we don't check for matching consumers (it's an // bindings.length == 1 ==> only a local queue so we don't check for matching consumers (it's an
// unnecessary overhead) // unnecessary overhead)
if (length == 1 || (binding.isConnected() && (messageLoadBalancingType.equals(MessageLoadBalancingType.STRICT) || binding.isHighAcceptPriority(message)))) { if (bindingsCount == 1 || (binding.isConnected() && (loadBalancingType.equals(MessageLoadBalancingType.STRICT) || binding.isHighAcceptPriority(message)))) {
theBinding = binding; nextBinding = binding;
nextPosition = moveNextPosition(nextPosition, bindingsCount);
pos = incrementPos(pos, length);
break; break;
} else { }
//https://issues.jboss.org/browse/HORNETQ-1254 When !routeWhenNoConsumers, //https://issues.jboss.org/browse/HORNETQ-1254 When !routeWhenNoConsumers,
// the localQueue should always have the priority over the secondary bindings // the localQueue should always have the priority over the secondary bindings
if (lastLowPriorityBinding == -1 || messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND) && binding instanceof LocalQueueBinding) { if (lastLowPriorityBinding == -1 || loadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND) && binding instanceof LocalQueueBinding) {
lastLowPriorityBinding = pos; lastLowPriorityBinding = nextPosition;
}
} }
} }
nextPosition = moveNextPosition(nextPosition, bindingsCount);
pos = incrementPos(pos, length); }
if (nextBinding == null) {
if (pos == startPos) { // if no bindings were found, we will apply a secondary level on the routing logic
if (lastLowPriorityBinding != -1) {
// if no bindings were found, we will apply a secondary level on the routing logic nextBinding = bindings[lastLowPriorityBinding];
if (lastLowPriorityBinding != -1) { nextPosition = moveNextPosition(lastLowPriorityBinding, bindingsCount);
try {
theBinding = bindings.get(lastLowPriorityBinding);
} catch (IndexOutOfBoundsException e) {
// This can occur if binding is removed while in route
if (!bindings.isEmpty()) {
pos = 0;
lastLowPriorityBinding = -1;
continue;
} else {
break;
}
}
pos = incrementPos(lastLowPriorityBinding, length);
}
break;
} }
} }
if (pos != startPos) { if (nextBinding != null) {
routingNamePositions.put(routingName, pos); bindingIndex.setIndex(nextPosition);
} }
return theBinding; return nextBinding;
} }
private boolean matchBinding(Message message, Binding binding) { private static boolean matchBinding(final Message message,
if (messageLoadBalancingType.equals(MessageLoadBalancingType.OFF) && binding instanceof RemoteQueueBinding) { final Binding binding,
final MessageLoadBalancingType loadBalancingType) {
if (loadBalancingType.equals(MessageLoadBalancingType.OFF) && binding instanceof RemoteQueueBinding) {
return false; return false;
} }
Filter filter = binding.getFilter(); final Filter filter = binding.getFilter();
if (filter == null || filter.match(message)) { if (filter == null || filter.match(message)) {
return true; return true;
} else {
return false;
} }
return false;
} }
private void routeUsingStrictOrdering(final Message message, private void routeUsingStrictOrdering(final Message message,
@ -506,17 +425,7 @@ public final class BindingsImpl implements Bindings {
final GroupingHandler groupingGroupingHandler, final GroupingHandler groupingGroupingHandler,
final SimpleString groupId, final SimpleString groupId,
final int tries) throws Exception { final int tries) throws Exception {
for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet()) { routingNameBindingMap.forEach((routingName, bindings, nextPosition) -> {
SimpleString routingName = entry.getKey();
List<Binding> bindings = entry.getValue();
if (bindings == null) {
// The value can become null if it's concurrently removed while we're iterating - this is expected
// ConcurrentHashMap behaviour!
continue;
}
// concat a full group id, this is for when a binding has multiple bindings // concat a full group id, this is for when a binding has multiple bindings
// NOTE: In case a dev ever change this rule, QueueImpl::unproposed is using this rule to determine if // NOTE: In case a dev ever change this rule, QueueImpl::unproposed is using this rule to determine if
// the binding belongs to its Queue before removing it // the binding belongs to its Queue before removing it
@ -527,9 +436,9 @@ public final class BindingsImpl implements Bindings {
if (resp == null) { if (resp == null) {
// ok let's find the next binding to propose // ok let's find the next binding to propose
Binding theBinding = getNextBinding(message, routingName, bindings); Binding theBinding = getNextBinding(message, bindings, nextPosition);
if (theBinding == null) { if (theBinding == null) {
continue; return;
} }
resp = groupingGroupingHandler.propose(new Proposal(fullID, theBinding.getClusterName())); resp = groupingGroupingHandler.propose(new Proposal(fullID, theBinding.getClusterName()));
@ -554,10 +463,10 @@ public final class BindingsImpl implements Bindings {
routeAndCheckNull(message, context, resp, chosen, groupId, tries); routeAndCheckNull(message, context, resp, chosen, groupId, tries);
} }
} });
} }
private Binding locateBinding(SimpleString clusterName, List<Binding> bindings) { private static Binding locateBinding(SimpleString clusterName, Binding[] bindings) {
for (Binding binding : bindings) { for (Binding binding : bindings) {
if (binding.getClusterName().equals(clusterName)) { if (binding.getClusterName().equals(clusterName)) {
return binding; return binding;
@ -603,21 +512,13 @@ public final class BindingsImpl implements Bindings {
if (routingNameBindingMap.isEmpty()) { if (routingNameBindingMap.isEmpty()) {
out.println("\tEMPTY!"); out.println("\tEMPTY!");
} }
for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet()) { routingNameBindingMap.forEach((routingName, bindings, nextPosition) -> {
out.println("\tkey=" + entry.getKey() + ", value(s):"); out.println("\tkey=" + routingName + ",\tposition=" + nextPosition.getIndex() + "\tvalue(s):");
for (Binding bind : entry.getValue()) { for (Binding bind : bindings) {
out.println("\t\t" + bind); out.println("\t\t" + bind);
} }
out.println(); out.println();
} });
out.println("routingNamePositions:");
if (routingNamePositions.isEmpty()) {
out.println("\tEMPTY!");
}
for (Map.Entry<SimpleString, Integer> entry : routingNamePositions.entrySet()) {
out.println("\tkey=" + entry.getKey() + ", value=" + entry.getValue());
}
out.println(); out.println();
@ -679,17 +580,21 @@ public final class BindingsImpl implements Bindings {
} }
} }
private int incrementPos(int pos, final int length) { private static int moveNextPosition(int position, final int length) {
pos++; position++;
if (pos == length) { if (position == length) {
pos = 0; position = 0;
} }
return pos; return position;
} }
/**
* debug method: used just for tests!!
* @return
*/
public Map<SimpleString, List<Binding>> getRoutingNameBindingMap() { public Map<SimpleString, List<Binding>> getRoutingNameBindingMap() {
return routingNameBindingMap; return routingNameBindingMap.copyAsMap();
} }
} }

View File

@ -0,0 +1,270 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.postoffice.impl;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Binding;
/**
* This is a copy-on-write map of {@link Binding} along with the last index set.<br>
*/
final class CopyOnWriteBindings {
public interface BindingIndex {
/**
* Cannot return a negative value and returns {@code 0} if uninitialized.
*/
int getIndex();
/**
* Cannot set a negative value.
*/
void setIndex(int v);
}
private static final class BindingsAndPosition extends AtomicReference<Binding[]> implements BindingIndex {
private static final AtomicIntegerFieldUpdater<BindingsAndPosition> NEXT_POSITION_UPDATER = AtomicIntegerFieldUpdater.newUpdater(BindingsAndPosition.class, "nextPosition");
public volatile int nextPosition;
BindingsAndPosition(Binding[] bindings) {
super(bindings);
NEXT_POSITION_UPDATER.lazySet(this, 0);
}
@Override
public int getIndex() {
return nextPosition;
}
@Override
public void setIndex(int v) {
if (v < 0) {
throw new IllegalArgumentException("cannot set a negative position");
}
NEXT_POSITION_UPDATER.lazySet(this, v);
}
}
private final ConcurrentHashMap<SimpleString, BindingsAndPosition> map;
CopyOnWriteBindings() {
map = new ConcurrentHashMap<>();
}
/**
* Add the specified {@code binding}, if not present.
*/
public void addBindingIfAbsent(Binding binding) {
Objects.requireNonNull(binding);
final SimpleString routingName = binding.getRoutingName();
Objects.requireNonNull(routingName);
BindingsAndPosition bindings;
do {
bindings = map.get(routingName);
if (bindings == null || bindings.get() == TOMBSTONE_BINDINGS) {
final BindingsAndPosition newBindings = new BindingsAndPosition(new Binding[]{binding});
bindings = map.compute(routingName, (ignored, bindingsAndPosition) -> {
if (bindingsAndPosition == null || bindingsAndPosition.get() == TOMBSTONE_BINDINGS) {
return newBindings;
}
return bindingsAndPosition;
});
assert bindings != null;
if (bindings == newBindings) {
return;
}
}
}
while (!addBindingIfAbsent(bindings, binding));
}
/**
* Remove the specified {@code binding}, if present.
*/
public void removeBinding(Binding binding) {
Objects.requireNonNull(binding);
final SimpleString routingName = binding.getRoutingName();
Objects.requireNonNull(routingName);
final BindingsAndPosition bindings = map.get(routingName);
if (bindings == null) {
return;
}
final Binding[] newBindings = removeBindingIfPresent(bindings, binding);
if (newBindings == TOMBSTONE_BINDINGS) {
// GC attempt
map.computeIfPresent(routingName, (bindingsRoutingName, existingBindings) -> {
if (existingBindings.get() == TOMBSTONE_BINDINGS) {
return null;
}
return existingBindings;
});
}
}
/**
* Returns a snapshot of the bindings, if present and a "lazy" binding index, otherwise {@code null}.<br>
* There is no strong commitment on preserving the index value if the related bindings are concurrently modified
* or the index itself is concurrently modified.
*/
public Pair<Binding[], BindingIndex> getBindings(SimpleString routingName) {
Objects.requireNonNull(routingName);
BindingsAndPosition bindings = map.get(routingName);
if (bindings == null) {
return null;
}
final Binding[] bindingsSnapshot = bindings.get();
if (bindingsSnapshot == TOMBSTONE_BINDINGS) {
return null;
}
assert bindingsSnapshot != null && bindingsSnapshot.length > 0;
return new Pair<>(bindingsSnapshot, bindings);
}
@FunctionalInterface
public interface BindingsConsumer<T extends Throwable> {
/**
* {@code routingName} cannot be {@code null}.
* {@code bindings} cannot be {@code null} or empty.
* {@code nextPosition} cannot be null.
*/
void accept(SimpleString routingName, Binding[] bindings, BindingIndex nextPosition) throws T;
}
/**
* Iterates through the bindings and its related indexes.<br>
*/
public <T extends Throwable> void forEach(BindingsConsumer<T> bindingsConsumer) throws T {
Objects.requireNonNull(bindingsConsumer);
if (map.isEmpty()) {
return;
}
for (Map.Entry<SimpleString, BindingsAndPosition> entry : map.entrySet()) {
final BindingsAndPosition value = entry.getValue();
final Binding[] bindings = value.get();
if (bindings == TOMBSTONE_BINDINGS) {
continue;
}
assert bindings != null && bindings.length > 0;
bindingsConsumer.accept(entry.getKey(), bindings, value);
}
}
public boolean isEmpty() {
return map.isEmpty();
}
public Map<SimpleString, List<Binding>> copyAsMap() {
if (map.isEmpty()) {
return Collections.emptyMap();
}
final HashMap<SimpleString, List<Binding>> copy = new HashMap<>(map.size());
map.forEach((routingName, bindings) -> {
final Binding[] bindingArray = bindings.get();
if (bindingArray == TOMBSTONE_BINDINGS) {
return;
}
copy.put(routingName, Arrays.asList(bindingArray));
});
return copy;
}
private static final Binding[] TOMBSTONE_BINDINGS = new Binding[0];
private static int indexOfBinding(final Binding[] bindings, final Binding toFind) {
for (int i = 0, size = bindings.length; i < size; i++) {
final Binding binding = bindings[i];
if (binding.equals(toFind)) {
return i;
}
}
return -1;
}
/**
* Remove the binding if present and returns the new bindings, {@code null} otherwise.
*/
private static Binding[] removeBindingIfPresent(final AtomicReference<Binding[]> bindings,
final Binding bindingToRemove) {
Objects.requireNonNull(bindings);
Objects.requireNonNull(bindingToRemove);
Binding[] oldBindings;
Binding[] newBindings;
do {
oldBindings = bindings.get();
// no need to check vs TOMBSTONE_BINDINGS, because found === -1;
final int found = indexOfBinding(oldBindings, bindingToRemove);
if (found == -1) {
return null;
}
final int oldBindingsCount = oldBindings.length;
if (oldBindingsCount == 1) {
newBindings = TOMBSTONE_BINDINGS;
} else {
final int newBindingsCount = oldBindingsCount - 1;
newBindings = new Binding[newBindingsCount];
System.arraycopy(oldBindings, 0, newBindings, 0, found);
final int remaining = newBindingsCount - found;
if (remaining > 0) {
System.arraycopy(oldBindings, found + 1, newBindings, found, remaining);
}
}
}
while (!bindings.compareAndSet(oldBindings, newBindings));
return newBindings;
}
/**
* Returns {@code true} if the given binding has been added or already present,
* {@code false} if bindings are going to be garbage-collected.
*/
private static boolean addBindingIfAbsent(final AtomicReference<Binding[]> bindings, final Binding newBinding) {
Objects.requireNonNull(bindings);
Objects.requireNonNull(newBinding);
Binding[] oldBindings;
Binding[] newBindings;
do {
oldBindings = bindings.get();
if (oldBindings == TOMBSTONE_BINDINGS) {
return false;
}
if (indexOfBinding(oldBindings, newBinding) >= 0) {
return true;
}
final int oldLength = oldBindings.length;
newBindings = Arrays.copyOf(oldBindings, oldLength + 1);
assert newBindings[oldLength] == null;
newBindings[oldLength] = newBinding;
}
while (!bindings.compareAndSet(oldBindings, newBindings));
return true;
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.postoffice.impl; package org.apache.activemq.artemis.core.postoffice.impl;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
@ -32,7 +33,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException; import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
@ -1079,34 +1079,46 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
* if a DLA still not found, it should then use previous semantics. * if a DLA still not found, it should then use previous semantics.
* */ * */
private RoutingStatus route(final Message message, private RoutingStatus route(final Message message,
final RoutingContext context, final RoutingContext context,
final boolean direct, final boolean direct,
boolean rejectDuplicates, final boolean rejectDuplicates,
final Binding bindingMove, boolean sendToDLA) throws Exception { final Binding bindingMove,
final boolean sendToDLA) throws Exception {
RoutingStatus result;
// Sanity check // Sanity check
if (message.getRefCount() > 0) { if (message.getRefCount() > 0) {
throw new IllegalStateException("Message cannot be routed more than once"); throw new IllegalStateException("Message cannot be routed more than once");
} }
final SimpleString address = context.getAddress(message); final SimpleString address = context.getAddress(message);
final AddressSettings settings = addressSettingsRepository.getMatch(address.toString());
AtomicBoolean startedTX = new AtomicBoolean(false); if (settings != null) {
applyExpiryDelay(message, settings);
applyExpiryDelay(message, address);
if (context.isDuplicateDetection() && !checkDuplicateID(message, context, rejectDuplicates, startedTX)) {
return RoutingStatus.DUPLICATED_ID;
} }
final boolean startedTX;
if (context.isDuplicateDetection()) {
final DuplicateCheckResult duplicateCheckResult = checkDuplicateID(message, context, rejectDuplicates);
switch (duplicateCheckResult) {
case DuplicateNotStartedTX:
return RoutingStatus.DUPLICATED_ID;
case NoDuplicateStartedTX:
startedTX = true;
break;
case NoDuplicateNotStartedTX:
startedTX = false;
//nop
break;
default:
throw new IllegalStateException("Unexpected value: " + duplicateCheckResult);
}
} else {
startedTX = false;
}
message.clearInternalProperties(); message.clearInternalProperties();
Bindings bindings;
Bindings bindings = addressManager.getBindingsForRoutingAddress(address); final AddressInfo addressInfo = addressManager.getAddressInfo(address);
AddressInfo addressInfo = addressManager.getAddressInfo(address);
if (bindingMove != null) { if (bindingMove != null) {
context.clear(); context.clear();
context.setReusable(false); context.setReusable(false);
@ -1114,7 +1126,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
if (addressInfo != null) { if (addressInfo != null) {
addressInfo.incrementRoutedMessageCount(); addressInfo.incrementRoutedMessageCount();
} }
} else if (bindings != null) { } else if ((bindings = addressManager.getBindingsForRoutingAddress(address)) != null) {
bindings.route(message, context); bindings.route(message, context);
if (addressInfo != null) { if (addressInfo != null) {
addressInfo.incrementRoutedMessageCount(); addressInfo.incrementRoutedMessageCount();
@ -1126,7 +1138,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
} }
// this is a debug and not warn because this could be a regular scenario on publish-subscribe queues (or topic subscriptions on JMS) // this is a debug and not warn because this could be a regular scenario on publish-subscribe queues (or topic subscriptions on JMS)
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Couldn't find any bindings for address=" + address + " on message=" + message); logger.debugf("Couldn't find any bindings for address=%s on message=%s", message, address, message);
} }
} }
@ -1135,62 +1147,19 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
} }
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Message after routed=" + message + "\n" + context.toString()); logger.tracef("Message after routed=%s\n%s", message, context);
} }
try { try {
final RoutingStatus status;
if (context.getQueueCount() == 0) { if (context.getQueueCount() == 0) {
// Send to DLA if appropriate status = maybeSendToDLA(message, context, address, sendToDLA);
AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
if (sendToDLA) {
// it's already been through here once, giving up now
sendToDLA = false;
} else {
sendToDLA = addressSettings != null ? addressSettings.isSendToDLAOnNoRoute() : AddressSettings.DEFAULT_SEND_TO_DLA_ON_NO_ROUTE;
}
if (sendToDLA) {
// Send to the DLA for the address
SimpleString dlaAddress = addressSettings != null ? addressSettings.getDeadLetterAddress() : null;
if (logger.isDebugEnabled()) {
logger.debug("sending message to dla address = " + dlaAddress + ", message=" + message);
}
if (dlaAddress == null) {
result = RoutingStatus.NO_BINDINGS;
ActiveMQServerLogger.LOGGER.noDLA(address);
} else {
message.referenceOriginalMessage(message, null);
message.setAddress(dlaAddress);
message.reencode();
route(message, new RoutingContextImpl(context.getTransaction()), false, true, null, sendToDLA);
result = RoutingStatus.NO_BINDINGS_DLA;
}
} else {
result = RoutingStatus.NO_BINDINGS;
if (logger.isDebugEnabled()) {
logger.debug("Message " + message + " is not going anywhere as it didn't have a binding on address:" + address);
}
if (message.isLargeMessage()) {
((LargeServerMessage) message).deleteFile();
}
}
} else { } else {
result = RoutingStatus.OK; status = RoutingStatus.OK;
try { try {
processRoute(message, context, direct); processRoute(message, context, direct);
} catch (ActiveMQAddressFullException e) { } catch (ActiveMQAddressFullException e) {
if (startedTX.get()) { if (startedTX) {
context.getTransaction().rollback(); context.getTransaction().rollback();
} else if (context.getTransaction() != null) { } else if (context.getTransaction() != null) {
context.getTransaction().markAsRollbackOnly(e); context.getTransaction().markAsRollbackOnly(e);
@ -1198,45 +1167,83 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
throw e; throw e;
} }
} }
if (startedTX) {
if (startedTX.get()) {
context.getTransaction().commit(); context.getTransaction().commit();
} }
if (server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, status));
}
return status;
} catch (Exception e) { } catch (Exception e) {
if (server.hasBrokerMessagePlugins()) { if (server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.onMessageRouteException(message, context, direct, rejectDuplicates, e)); server.callBrokerMessagePlugins(plugin -> plugin.onMessageRouteException(message, context, direct, rejectDuplicates, e));
} }
throw e; throw e;
} }
}
if (server.hasBrokerMessagePlugins()) { private RoutingStatus maybeSendToDLA(final Message message,
server.callBrokerMessagePlugins(plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, result)); final RoutingContext context,
final SimpleString address,
final boolean sendToDLAHint) throws Exception {
final RoutingStatus status;
final AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
final boolean sendToDLA;
if (sendToDLAHint) {
// it's already been through here once, giving up now
sendToDLA = false;
} else {
sendToDLA = addressSettings != null ? addressSettings.isSendToDLAOnNoRoute() : AddressSettings.DEFAULT_SEND_TO_DLA_ON_NO_ROUTE;
} }
if (sendToDLA) {
// Send to the DLA for the address
final SimpleString dlaAddress = addressSettings != null ? addressSettings.getDeadLetterAddress() : null;
if (logger.isDebugEnabled()) {
logger.debugf("sending message to dla address = %s, message=%s", dlaAddress, message);
}
if (dlaAddress == null) {
status = RoutingStatus.NO_BINDINGS;
ActiveMQServerLogger.LOGGER.noDLA(address);
} else {
message.referenceOriginalMessage(message, null);
return result; message.setAddress(dlaAddress);
message.reencode();
route(message, new RoutingContextImpl(context.getTransaction()), false, true, null, true);
status = RoutingStatus.NO_BINDINGS_DLA;
}
} else {
status = RoutingStatus.NO_BINDINGS;
if (logger.isDebugEnabled()) {
logger.debugf("Message %s is not going anywhere as it didn't have a binding on address:%s", message, address);
}
if (message.isLargeMessage()) {
((LargeServerMessage) message).deleteFile();
}
}
return status;
} }
// HORNETQ-1029 // HORNETQ-1029
private void applyExpiryDelay(Message message, SimpleString address) { private static void applyExpiryDelay(Message message, AddressSettings settings) {
AddressSettings settings = addressSettingsRepository.getMatch(address.toString()); long expirationOverride = settings.getExpiryDelay();
if (settings != null) {
long expirationOverride = settings.getExpiryDelay();
// A -1 <expiry-delay> means don't do anything // A -1 <expiry-delay> means don't do anything
if (expirationOverride >= 0) { if (expirationOverride >= 0) {
// only override the expiration on messages where the expiration hasn't been set by the user // only override the expiration on messages where the expiration hasn't been set by the user
if (message.getExpiration() == 0) { if (message.getExpiration() == 0) {
message.setExpiration(System.currentTimeMillis() + expirationOverride); message.setExpiration(System.currentTimeMillis() + expirationOverride);
} }
} else { } else {
long minExpiration = settings.getMinExpiryDelay(); long minExpiration = settings.getMinExpiryDelay();
long maxExpiration = settings.getMaxExpiryDelay(); long maxExpiration = settings.getMaxExpiryDelay();
if (maxExpiration != AddressSettings.DEFAULT_MAX_EXPIRY_DELAY && (message.getExpiration() == 0 || message.getExpiration() > (System.currentTimeMillis() + maxExpiration))) { if (maxExpiration != AddressSettings.DEFAULT_MAX_EXPIRY_DELAY && (message.getExpiration() == 0 || message.getExpiration() > (System.currentTimeMillis() + maxExpiration))) {
message.setExpiration(System.currentTimeMillis() + maxExpiration); message.setExpiration(System.currentTimeMillis() + maxExpiration);
} else if (minExpiration != AddressSettings.DEFAULT_MIN_EXPIRY_DELAY && message.getExpiration() < (System.currentTimeMillis() + minExpiration)) { } else if (minExpiration != AddressSettings.DEFAULT_MIN_EXPIRY_DELAY && message.getExpiration() < (System.currentTimeMillis() + minExpiration)) {
message.setExpiration(System.currentTimeMillis() + minExpiration); message.setExpiration(System.currentTimeMillis() + minExpiration);
}
} }
} }
} }
@ -1489,20 +1496,22 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
public void processRoute(final Message message, public void processRoute(final Message message,
final RoutingContext context, final RoutingContext context,
final boolean direct) throws Exception { final boolean direct) throws Exception {
final List<MessageReference> refs = new ArrayList<>(); final ArrayList<MessageReference> refs = new ArrayList<>();
Transaction tx = context.getTransaction(); final Transaction tx = context.getTransaction();
Long deliveryTime = null; final Long deliveryTime;
if (message.hasScheduledDeliveryTime()) { if (message.hasScheduledDeliveryTime()) {
deliveryTime = message.getScheduledDeliveryTime(); deliveryTime = message.getScheduledDeliveryTime();
} else {
deliveryTime = null;
} }
final SimpleString messageAddress = message.getAddressSimpleString();
PagingStore owningStore = pagingManager.getPageStore(message.getAddressSimpleString()); final PagingStore owningStore = pagingManager.getPageStore(messageAddress);
message.setOwner(owningStore); message.setOwner(owningStore);
for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) { for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) {
PagingStore store; final PagingStore store;
if (entry.getKey() == message.getAddressSimpleString() || entry.getKey().equals(message.getAddressSimpleString())) { if (entry.getKey().equals(messageAddress)) {
store = owningStore; store = owningStore;
} else { } else {
store = pagingManager.getPageStore(entry.getKey()); store = pagingManager.getPageStore(entry.getKey());
@ -1518,68 +1527,22 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
continue; continue;
} }
for (Queue queue : entry.getValue().getNonDurableQueues()) { final List<Queue> nonDurableQueues = entry.getValue().getNonDurableQueues();
MessageReference reference = MessageReference.Factory.createReference(message, queue); if (!nonDurableQueues.isEmpty()) {
refs.ensureCapacity(nonDurableQueues.size());
if (deliveryTime != null) { nonDurableQueues.forEach(queue -> {
reference.setScheduledDeliveryTime(deliveryTime); final MessageReference reference = MessageReference.Factory.createReference(message, queue);
} if (deliveryTime != null) {
refs.add(reference); reference.setScheduledDeliveryTime(deliveryTime);
}
queue.refUp(reference); refs.add(reference);
queue.refUp(reference);
});
} }
Iterator<Queue> iter = entry.getValue().getDurableQueues().iterator(); final List<Queue> durableQueues = entry.getValue().getDurableQueues();
if (!durableQueues.isEmpty()) {
while (iter.hasNext()) { processRouteToDurableQueues(message, context, deliveryTime, tx, durableQueues, refs);
Queue queue = iter.next();
MessageReference reference = MessageReference.Factory.createReference(message, queue);
if (context.isAlreadyAcked(context.getAddress(message), queue)) {
reference.setAlreadyAcked();
if (tx != null) {
queue.acknowledge(tx, reference);
}
}
if (deliveryTime != null) {
reference.setScheduledDeliveryTime(deliveryTime);
}
refs.add(reference);
queue.refUp(reference);
if (message.isDurable()) {
int durableRefCount = queue.durableUp(message);
if (durableRefCount == 1) {
if (tx != null) {
storageManager.storeMessageTransactional(tx.getID(), message);
} else {
storageManager.storeMessage(message);
}
if (message.isLargeMessage()) {
confirmLargeMessageSend(tx, message);
}
}
if (tx != null) {
storageManager.storeReferenceTransactional(tx.getID(), queue.getID(), message.getMessageID());
tx.setContainsPersistent();
} else {
storageManager.storeReference(queue.getID(), message.getMessageID(), !iter.hasNext());
}
if (deliveryTime != null && deliveryTime > 0) {
if (tx != null) {
storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference);
} else {
storageManager.updateScheduledDeliveryTime(reference);
}
}
}
} }
} }
@ -1608,6 +1571,59 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
} }
} }
private void processRouteToDurableQueues(final Message message,
final RoutingContext context,
final Long deliveryTime,
final Transaction tx,
final List<Queue> durableQueues,
final ArrayList<MessageReference> refs) throws Exception {
final int durableQueuesCount = durableQueues.size();
refs.ensureCapacity(durableQueuesCount);
final Iterator<Queue> iter = durableQueues.iterator();
for (int i = 0; i < durableQueuesCount; i++) {
final Queue queue = iter.next();
final MessageReference reference = MessageReference.Factory.createReference(message, queue);
if (context.isAlreadyAcked(message, queue)) {
reference.setAlreadyAcked();
if (tx != null) {
queue.acknowledge(tx, reference);
}
}
if (deliveryTime != null) {
reference.setScheduledDeliveryTime(deliveryTime);
}
refs.add(reference);
queue.refUp(reference);
if (message.isDurable()) {
final int durableRefCount = queue.durableUp(message);
if (durableRefCount == 1) {
if (tx != null) {
storageManager.storeMessageTransactional(tx.getID(), message);
} else {
storageManager.storeMessage(message);
}
if (message.isLargeMessage()) {
confirmLargeMessageSend(tx, message);
}
}
if (tx != null) {
storageManager.storeReferenceTransactional(tx.getID(), queue.getID(), message.getMessageID());
tx.setContainsPersistent();
} else {
final boolean last = i == (durableQueuesCount - 1);
storageManager.storeReference(queue.getID(), message.getMessageID(), last);
}
if (deliveryTime != null && deliveryTime > 0) {
if (tx != null) {
storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference);
} else {
storageManager.updateScheduledDeliveryTime(reference);
}
}
}
}
}
/** /**
* @param tx * @param tx
* @param message * @param message
@ -1671,72 +1687,76 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
} }
} }
private boolean checkDuplicateID(final Message message, private enum DuplicateCheckResult {
final RoutingContext context, DuplicateNotStartedTX, NoDuplicateStartedTX, NoDuplicateNotStartedTX
boolean rejectDuplicates, }
AtomicBoolean startedTX) throws Exception {
private DuplicateCheckResult checkDuplicateID(final Message message,
final RoutingContext context,
final boolean rejectDuplicates) throws Exception {
// Check the DuplicateCache for the Bridge first // Check the DuplicateCache for the Bridge first
final Object bridgeDup = message.removeExtraBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID);
Object bridgeDup = message.removeExtraBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID);
if (bridgeDup != null) { if (bridgeDup != null) {
// if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one return checkBridgeDuplicateID(message, context, (byte[]) bridgeDup);
byte[] bridgeDupBytes = (byte[]) bridgeDup;
DuplicateIDCache cacheBridge = getDuplicateIDCache(BRIDGE_CACHE_STR.concat(context.getAddress(message).toString()));
if (context.getTransaction() == null) {
context.setTransaction(new TransactionImpl(storageManager));
startedTX.set(true);
}
if (!cacheBridge.atomicVerify(bridgeDupBytes, context.getTransaction())) {
context.getTransaction().rollback();
startedTX.set(false);
message.usageDown(); // this will cause large message delete
return false;
}
} else {
// if used BridgeDuplicate, it's not going to use the regular duplicate
// since this will would break redistribution (re-setting the duplicateId)
byte[] duplicateIDBytes = message.getDuplicateIDBytes();
DuplicateIDCache cache = null;
boolean isDuplicate = false;
if (duplicateIDBytes != null) {
cache = getDuplicateIDCache(context.getAddress(message));
isDuplicate = cache.contains(duplicateIDBytes);
if (rejectDuplicates && isDuplicate) {
ActiveMQServerLogger.LOGGER.duplicateMessageDetected(message);
String warnMessage = "Duplicate message detected - message will not be routed. Message information:" + message.toString();
if (context.getTransaction() != null) {
context.getTransaction().markAsRollbackOnly(new ActiveMQDuplicateIdException(warnMessage));
}
message.usageDown(); // this will cause large message delete
return false;
}
}
if (cache != null && !isDuplicate) {
if (context.getTransaction() == null) {
// We need to store the duplicate id atomically with the message storage, so we need to create a tx for this
context.setTransaction(new TransactionImpl(storageManager));
startedTX.set(true);
}
cache.addToCache(duplicateIDBytes, context.getTransaction(), startedTX.get());
}
} }
// if used BridgeDuplicate, it's not going to use the regular duplicate
// since this will would break redistribution (re-setting the duplicateId)
final byte[] duplicateIDBytes = message.getDuplicateIDBytes();
if (duplicateIDBytes == null) {
return DuplicateCheckResult.NoDuplicateNotStartedTX;
}
return checkNotBridgeDuplicateID(message, context, rejectDuplicates, duplicateIDBytes);
}
return true; private DuplicateCheckResult checkNotBridgeDuplicateID(final Message message,
final RoutingContext context,
final boolean rejectDuplicates,
final byte[] duplicateIDBytes) throws Exception {
assert duplicateIDBytes != null && Arrays.equals(message.getDuplicateIDBytes(), duplicateIDBytes);
final DuplicateIDCache cache = getDuplicateIDCache(context.getAddress(message));
final boolean isDuplicate = cache.contains(duplicateIDBytes);
if (rejectDuplicates && isDuplicate) {
ActiveMQServerLogger.LOGGER.duplicateMessageDetected(message);
if (context.getTransaction() != null) {
final String warnMessage = "Duplicate message detected - message will not be routed. Message information:" + message;
context.getTransaction().markAsRollbackOnly(new ActiveMQDuplicateIdException(warnMessage));
}
message.usageDown(); // this will cause large message delete
return DuplicateCheckResult.DuplicateNotStartedTX;
}
if (isDuplicate) {
assert !rejectDuplicates;
return DuplicateCheckResult.NoDuplicateNotStartedTX;
}
final boolean startedTX;
if (context.getTransaction() == null) {
// We need to store the duplicate id atomically with the message storage, so we need to create a tx for this
context.setTransaction(new TransactionImpl(storageManager));
startedTX = true;
} else {
startedTX = false;
}
cache.addToCache(duplicateIDBytes, context.getTransaction(), startedTX);
return startedTX ? DuplicateCheckResult.NoDuplicateStartedTX : DuplicateCheckResult.NoDuplicateNotStartedTX;
}
private DuplicateCheckResult checkBridgeDuplicateID(final Message message,
final RoutingContext context,
final byte[] bridgeDupBytes) throws Exception {
assert bridgeDupBytes != null;
boolean startedTX = false;
if (context.getTransaction() == null) {
context.setTransaction(new TransactionImpl(storageManager));
startedTX = true;
}
// if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one
final DuplicateIDCache cacheBridge = getDuplicateIDCache(BRIDGE_CACHE_STR.concat(context.getAddress(message).toString()));
if (!cacheBridge.atomicVerify(bridgeDupBytes, context.getTransaction())) {
context.getTransaction().rollback();
message.usageDown(); // this will cause large message delete
return DuplicateCheckResult.DuplicateNotStartedTX;
}
return startedTX ? DuplicateCheckResult.NoDuplicateStartedTX : DuplicateCheckResult.NoDuplicateNotStartedTX;
} }
/** /**

View File

@ -68,7 +68,7 @@ public interface RoutingContext {
void addQueueWithAck(SimpleString address, Queue queue); void addQueueWithAck(SimpleString address, Queue queue);
boolean isAlreadyAcked(SimpleString address, Queue queue); boolean isAlreadyAcked(Message message, Queue queue);
void setAddress(SimpleString address); void setAddress(SimpleString address);

View File

@ -192,8 +192,8 @@ public class RoutingContextImpl implements RoutingContext {
} }
@Override @Override
public boolean isAlreadyAcked(SimpleString address, Queue queue) { public boolean isAlreadyAcked(Message message, Queue queue) {
RouteContextList listing = map.get(address); final RouteContextList listing = map.get(getAddress(message));
return listing == null ? false : listing.isAlreadyAcked(queue); return listing == null ? false : listing.isAlreadyAcked(queue);
} }