ARTEMIS-3219 Improve FQQN message routing

This commit is contained in:
franz1981 2021-04-02 09:50:43 +02:00
parent 17dd86ff4b
commit 08ec7c67c8
2 changed files with 416 additions and 241 deletions

View File

@ -25,12 +25,11 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Binding;
@ -54,9 +53,7 @@ public final class BindingsImpl implements Bindings {
// This is public as we use on test assertions // This is public as we use on test assertions
public static final int MAX_GROUP_RETRY = 10; public static final int MAX_GROUP_RETRY = 10;
private final ConcurrentMap<SimpleString, List<Binding>> routingNameBindingMap = new ConcurrentHashMap<>(); private final CopyOnWriteBindings routingNameBindingMap = new CopyOnWriteBindings();
private final Map<SimpleString, Integer> routingNamePositions = new ConcurrentHashMap<>();
private final Map<Long, Binding> bindingsIdMap = new ConcurrentHashMap<>(); private final Map<Long, Binding> bindingsIdMap = new ConcurrentHashMap<>();
@ -113,6 +110,8 @@ public final class BindingsImpl implements Bindings {
} }
} }
@Override @Override
public void addBinding(final Binding binding) { public void addBinding(final Binding binding) {
try { try {
@ -122,30 +121,14 @@ public final class BindingsImpl implements Bindings {
if (binding.isExclusive()) { if (binding.isExclusive()) {
exclusiveBindings.add(binding); exclusiveBindings.add(binding);
} else { } else {
SimpleString routingName = binding.getRoutingName(); routingNameBindingMap.addBindingIfAbsent(binding);
List<Binding> bindings = routingNameBindingMap.get(routingName);
if (bindings == null) {
bindings = new CopyOnWriteArrayList<>();
List<Binding> oldBindings = routingNameBindingMap.putIfAbsent(routingName, bindings);
if (oldBindings != null) {
bindings = oldBindings;
}
}
if (!bindings.contains(binding)) {
bindings.add(binding);
}
} }
bindingsIdMap.put(binding.getID(), binding); bindingsIdMap.put(binding.getID(), binding);
bindingsNameMap.put(binding.getUniqueName(), binding); bindingsNameMap.put(binding.getUniqueName(), binding);
if (binding instanceof RemoteQueueBinding) { if (binding instanceof RemoteQueueBinding) {
setMessageLoadBalancingType(((RemoteQueueBinding)binding).getMessageLoadBalancingType()); setMessageLoadBalancingType(((RemoteQueueBinding) binding).getMessageLoadBalancingType());
} }
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Adding binding " + binding + " into " + this + " bindingTable: " + debugBindings()); logger.trace("Adding binding " + binding + " into " + this + " bindingTable: " + debugBindings());
@ -174,17 +157,7 @@ public final class BindingsImpl implements Bindings {
if (binding.isExclusive()) { if (binding.isExclusive()) {
exclusiveBindings.remove(binding); exclusiveBindings.remove(binding);
} else { } else {
SimpleString routingName = binding.getRoutingName(); routingNameBindingMap.removeBinding(binding);
List<Binding> bindings = routingNameBindingMap.get(routingName);
if (bindings != null) {
bindings.remove(binding);
if (bindings.isEmpty()) {
routingNameBindingMap.remove(routingName);
}
}
} }
bindingsIdMap.remove(binding.getID()); bindingsIdMap.remove(binding.getID());
@ -208,78 +181,56 @@ public final class BindingsImpl implements Bindings {
public boolean redistribute(final Message message, public boolean redistribute(final Message message,
final Queue originatingQueue, final Queue originatingQueue,
final RoutingContext context) throws Exception { final RoutingContext context) throws Exception {
if (messageLoadBalancingType.equals(MessageLoadBalancingType.STRICT) || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF)) { final MessageLoadBalancingType loadBalancingType = this.messageLoadBalancingType;
if (loadBalancingType.equals(MessageLoadBalancingType.STRICT) || loadBalancingType.equals(MessageLoadBalancingType.OFF)) {
return false; return false;
} }
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Redistributing message " + message); logger.tracef("Redistributing message %s", message);
} }
SimpleString routingName = originatingQueue.getName(); final SimpleString routingName = originatingQueue.getName();
List<Binding> bindings = routingNameBindingMap.get(routingName); final Pair<Binding[], CopyOnWriteBindings.BindingIndex> bindingsAndPosition = routingNameBindingMap.getBindings(routingName);
if (bindings == null) { if (bindingsAndPosition == null) {
// The value can become null if it's concurrently removed while we're iterating - this is expected // The value can become null if it's concurrently removed while we're iterating - this is expected
// ConcurrentHashMap behaviour! // ConcurrentHashMap behaviour!
return false; return false;
} }
Integer ipos = routingNamePositions.get(routingName); final Binding[] bindings = bindingsAndPosition.getA();
int pos = ipos != null ? ipos.intValue() : 0; final CopyOnWriteBindings.BindingIndex bindingIndex = bindingsAndPosition.getB();
int length = bindings.size(); assert bindings.length > 0;
int startPos = pos; final int bindingsCount = bindings.length;
Binding theBinding = null; int nextPosition = bindingIndex.getIndex();
// TODO - combine this with similar logic in route() if (nextPosition >= bindingsCount) {
while (true) { nextPosition = 0;
Binding binding; }
try {
binding = bindings.get(pos);
} catch (IndexOutOfBoundsException e) {
// This can occur if binding is removed while in route
if (!bindings.isEmpty()) {
pos = 0;
startPos = 0;
length = bindings.size();
continue;
} else {
break;
}
}
pos = incrementPos(pos, length);
Filter filter = binding.getFilter();
boolean highPrior = binding.isHighAcceptPriority(message);
Binding nextBinding = null;
for (int i = 0; i < bindingsCount; i++) {
final Binding binding = bindings[nextPosition];
nextPosition = moveNextPosition(nextPosition, bindingsCount);
final Filter filter = binding.getFilter();
final boolean highPrior = binding.isHighAcceptPriority(message);
if (highPrior && binding.getBindable() != originatingQueue && (filter == null || filter.match(message))) { if (highPrior && binding.getBindable() != originatingQueue && (filter == null || filter.match(message))) {
theBinding = binding; nextBinding = binding;
break;
}
if (pos == startPos) {
break; break;
} }
} }
if (nextBinding == null) {
routingNamePositions.put(routingName, pos);
if (theBinding != null) {
theBinding.route(message, context);
return true;
} else {
return false; return false;
} }
bindingIndex.setIndex(nextPosition);
nextBinding.route(message, context);
return true;
} }
@Override @Override
@ -290,8 +241,8 @@ public final class BindingsImpl implements Bindings {
private void route(final Message message, private void route(final Message message,
final RoutingContext context, final RoutingContext context,
final boolean groupRouting) throws Exception { final boolean groupRouting) throws Exception {
int currentVersion = version.get(); final int currentVersion = version.get();
boolean reusableContext = context.isReusable(message, currentVersion); final boolean reusableContext = context.isReusable(message, currentVersion);
if (!reusableContext) { if (!reusableContext) {
context.clear(); context.clear();
@ -300,54 +251,33 @@ public final class BindingsImpl implements Bindings {
/* This is a special treatment for scaled-down messages involving SnF queues. /* This is a special treatment for scaled-down messages involving SnF queues.
* See org.apache.activemq.artemis.core.server.impl.ScaleDownHandler.scaleDownMessages() for the logic that sends messages with this property * See org.apache.activemq.artemis.core.server.impl.ScaleDownHandler.scaleDownMessages() for the logic that sends messages with this property
*/ */
byte[] ids = message.removeExtraBytesProperty(Message.HDR_SCALEDOWN_TO_IDS); final byte[] ids = message.removeExtraBytesProperty(Message.HDR_SCALEDOWN_TO_IDS);
if (ids != null) { if (ids != null) {
ByteBuffer buffer = ByteBuffer.wrap(ids); handleScaledDownMessage(message, ids);
while (buffer.hasRemaining()) {
long id = buffer.getLong();
for (Map.Entry<Long, Binding> entry : bindingsIdMap.entrySet()) {
if (entry.getValue() instanceof RemoteQueueBinding) {
RemoteQueueBinding remoteQueueBinding = (RemoteQueueBinding) entry.getValue();
if (remoteQueueBinding.getRemoteQueueID() == id) {
message.putExtraBytesProperty(Message.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array());
}
}
}
}
} }
boolean routed = false; final boolean routed;
// despite the double check can lead to some race, this can save allocating an iterator for an empty set
boolean hasExclusives = false; if (!exclusiveBindings.isEmpty()) {
routed = routeToExclusiveBindings(message, context);
for (Binding binding : exclusiveBindings) { } else {
if (!hasExclusives) { routed = false;
context.clear().setReusable(false);
hasExclusives = true;
}
if (binding.getFilter() == null || binding.getFilter().match(message)) {
binding.getBindable().route(message, context);
routed = true;
}
} }
if (!routed) { if (!routed) {
// Remove the ids now, in order to avoid double check // Remove the ids now, in order to avoid double check
ids = message.removeExtraBytesProperty(Message.HDR_ROUTE_TO_IDS); final byte[] routeToIds = message.removeExtraBytesProperty(Message.HDR_ROUTE_TO_IDS);
// Fetch the groupId now, in order to avoid double checking SimpleString groupId;
SimpleString groupId = message.getGroupID(); if (routeToIds != null) {
if (ids != null) {
context.clear().setReusable(false); context.clear().setReusable(false);
routeFromCluster(message, context, ids); routeFromCluster(message, context, routeToIds);
} else if (groupingHandler != null && groupRouting && groupId != null) { } else if (groupRouting && groupingHandler != null && (groupId = message.getGroupID()) != null) {
context.clear().setReusable(false); context.clear().setReusable(false);
routeUsingStrictOrdering(message, context, groupingHandler, groupId, 0); routeUsingStrictOrdering(message, context, groupingHandler, groupId, 0);
} else if (CompositeAddress.isFullyQualified(message.getAddress())) { } else if (CompositeAddress.isFullyQualified(message.getAddress())) {
context.clear().setReusable(false); context.clear().setReusable(false);
Binding theBinding = bindingsNameMap.get(CompositeAddress.extractQueueName(message.getAddressSimpleString())); final Binding theBinding = bindingsNameMap.get(CompositeAddress.extractQueueName(message.getAddressSimpleString()));
if (theBinding != null) { if (theBinding != null) {
theBinding.route(message, context); theBinding.route(message, context);
} }
@ -361,35 +291,58 @@ public final class BindingsImpl implements Bindings {
} }
} }
private void simpleRouting(Message message, RoutingContext context, int currentVersion) throws Exception { private boolean routeToExclusiveBindings(final Message message, final RoutingContext context) throws Exception {
boolean hasExclusives = false;
boolean routed = false;
for (Binding binding : exclusiveBindings) {
if (!hasExclusives) {
context.clear().setReusable(false);
hasExclusives = true;
}
final Filter filter = binding.getFilter();
if (filter == null || filter.match(message)) {
binding.getBindable().route(message, context);
routed = true;
}
}
return routed;
}
private void handleScaledDownMessage(final Message message, final byte[] ids) {
ByteBuffer buffer = ByteBuffer.wrap(ids);
while (buffer.hasRemaining()) {
long id = buffer.getLong();
for (Map.Entry<Long, Binding> entry : bindingsIdMap.entrySet()) {
if (entry.getValue() instanceof RemoteQueueBinding) {
RemoteQueueBinding remoteQueueBinding = (RemoteQueueBinding) entry.getValue();
if (remoteQueueBinding.getRemoteQueueID() == id) {
message.putExtraBytesProperty(Message.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array());
}
}
}
}
}
private void simpleRouting(final Message message,
final RoutingContext context,
final int currentVersion) throws Exception {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Routing message " + message + " on binding=" + this + " current context::" + context); logger.tracef("Routing message %s on binding=%s current context::$s", message, this, context);
} }
for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet()) { routingNameBindingMap.forEach((routingName, bindings, nextPosition) -> {
SimpleString routingName = entry.getKey(); final Binding nextBinding = getNextBinding(message, bindings, nextPosition);
if (nextBinding != null && nextBinding.getFilter() == null && nextBinding.isLocal() && bindings.length == 1) {
List<Binding> bindings = entry.getValue();
if (bindings == null) {
// The value can become null if it's concurrently removed while we're iterating - this is expected
// ConcurrentHashMap behaviour!
continue;
}
Binding theBinding = getNextBinding(message, routingName, bindings);
if (theBinding != null && theBinding.getFilter() == null && bindings.size() == 1 && theBinding.isLocal()) {
context.setReusable(true, currentVersion); context.setReusable(true, currentVersion);
} else { } else {
// notice that once this is set to false, any calls to setReusable(true) will be moot as the context will ignore it // notice that once this is set to false, any calls to setReusable(true) will be moot as the context will ignore it
context.setReusable(false, currentVersion); context.setReusable(false, currentVersion);
} }
if (theBinding != null) { if (nextBinding != null) {
theBinding.route(message, context); nextBinding.route(message, context);
} }
} });
} }
@Override @Override
@ -406,99 +359,65 @@ public final class BindingsImpl implements Bindings {
* (depending if you are using multi-thread), and not lose messages. * (depending if you are using multi-thread), and not lose messages.
*/ */
private Binding getNextBinding(final Message message, private Binding getNextBinding(final Message message,
final SimpleString routingName, final Binding[] bindings,
final List<Binding> bindings) { final CopyOnWriteBindings.BindingIndex bindingIndex) {
Integer ipos = routingNamePositions.get(routingName); int nextPosition = bindingIndex.getIndex();
int pos = ipos != null ? ipos : 0; final int bindingsCount = bindings.length;
int length = bindings.size(); if (nextPosition >= bindingsCount) {
nextPosition = 0;
int startPos = pos; }
Binding theBinding = null;
Binding nextBinding = null;
int lastLowPriorityBinding = -1; int lastLowPriorityBinding = -1;
// snapshot this, to save loading it on each iteration
final MessageLoadBalancingType loadBalancingType = this.messageLoadBalancingType;
while (true) { for (int i = 0; i < bindingsCount; i++) {
Binding binding; final Binding binding = bindings[nextPosition];
try { if (matchBinding(message, binding, loadBalancingType)) {
binding = bindings.get(pos);
} catch (IndexOutOfBoundsException e) {
// This can occur if binding is removed while in route
if (!bindings.isEmpty()) {
pos = 0;
startPos = 0;
length = bindings.size();
continue;
} else {
break;
}
}
if (matchBinding(message, binding)) {
// bindings.length == 1 ==> only a local queue so we don't check for matching consumers (it's an // bindings.length == 1 ==> only a local queue so we don't check for matching consumers (it's an
// unnecessary overhead) // unnecessary overhead)
if (length == 1 || (binding.isConnected() && (messageLoadBalancingType.equals(MessageLoadBalancingType.STRICT) || binding.isHighAcceptPriority(message)))) { if (bindingsCount == 1 || (binding.isConnected() && (loadBalancingType.equals(MessageLoadBalancingType.STRICT) || binding.isHighAcceptPriority(message)))) {
theBinding = binding; nextBinding = binding;
nextPosition = moveNextPosition(nextPosition, bindingsCount);
pos = incrementPos(pos, length);
break; break;
} else { }
//https://issues.jboss.org/browse/HORNETQ-1254 When !routeWhenNoConsumers, //https://issues.jboss.org/browse/HORNETQ-1254 When !routeWhenNoConsumers,
// the localQueue should always have the priority over the secondary bindings // the localQueue should always have the priority over the secondary bindings
if (lastLowPriorityBinding == -1 || messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND) && binding instanceof LocalQueueBinding) { if (lastLowPriorityBinding == -1 || loadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND) && binding instanceof LocalQueueBinding) {
lastLowPriorityBinding = pos; lastLowPriorityBinding = nextPosition;
}
} }
} }
nextPosition = moveNextPosition(nextPosition, bindingsCount);
pos = incrementPos(pos, length); }
if (nextBinding == null) {
if (pos == startPos) { // if no bindings were found, we will apply a secondary level on the routing logic
if (lastLowPriorityBinding != -1) {
// if no bindings were found, we will apply a secondary level on the routing logic nextBinding = bindings[lastLowPriorityBinding];
if (lastLowPriorityBinding != -1) { nextPosition = moveNextPosition(lastLowPriorityBinding, bindingsCount);
try {
theBinding = bindings.get(lastLowPriorityBinding);
} catch (IndexOutOfBoundsException e) {
// This can occur if binding is removed while in route
if (!bindings.isEmpty()) {
pos = 0;
lastLowPriorityBinding = -1;
continue;
} else {
break;
}
}
pos = incrementPos(lastLowPriorityBinding, length);
}
break;
} }
} }
if (pos != startPos) { if (nextBinding != null) {
routingNamePositions.put(routingName, pos); bindingIndex.setIndex(nextPosition);
} }
return theBinding; return nextBinding;
} }
private boolean matchBinding(Message message, Binding binding) { private static boolean matchBinding(final Message message,
if (messageLoadBalancingType.equals(MessageLoadBalancingType.OFF) && binding instanceof RemoteQueueBinding) { final Binding binding,
final MessageLoadBalancingType loadBalancingType) {
if (loadBalancingType.equals(MessageLoadBalancingType.OFF) && binding instanceof RemoteQueueBinding) {
return false; return false;
} }
Filter filter = binding.getFilter(); final Filter filter = binding.getFilter();
if (filter == null || filter.match(message)) { if (filter == null || filter.match(message)) {
return true; return true;
} else {
return false;
} }
return false;
} }
private void routeUsingStrictOrdering(final Message message, private void routeUsingStrictOrdering(final Message message,
@ -506,17 +425,7 @@ public final class BindingsImpl implements Bindings {
final GroupingHandler groupingGroupingHandler, final GroupingHandler groupingGroupingHandler,
final SimpleString groupId, final SimpleString groupId,
final int tries) throws Exception { final int tries) throws Exception {
for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet()) { routingNameBindingMap.forEach((routingName, bindings, nextPosition) -> {
SimpleString routingName = entry.getKey();
List<Binding> bindings = entry.getValue();
if (bindings == null) {
// The value can become null if it's concurrently removed while we're iterating - this is expected
// ConcurrentHashMap behaviour!
continue;
}
// concat a full group id, this is for when a binding has multiple bindings // concat a full group id, this is for when a binding has multiple bindings
// NOTE: In case a dev ever change this rule, QueueImpl::unproposed is using this rule to determine if // NOTE: In case a dev ever change this rule, QueueImpl::unproposed is using this rule to determine if
// the binding belongs to its Queue before removing it // the binding belongs to its Queue before removing it
@ -527,9 +436,9 @@ public final class BindingsImpl implements Bindings {
if (resp == null) { if (resp == null) {
// ok let's find the next binding to propose // ok let's find the next binding to propose
Binding theBinding = getNextBinding(message, routingName, bindings); Binding theBinding = getNextBinding(message, bindings, nextPosition);
if (theBinding == null) { if (theBinding == null) {
continue; return;
} }
resp = groupingGroupingHandler.propose(new Proposal(fullID, theBinding.getClusterName())); resp = groupingGroupingHandler.propose(new Proposal(fullID, theBinding.getClusterName()));
@ -554,10 +463,10 @@ public final class BindingsImpl implements Bindings {
routeAndCheckNull(message, context, resp, chosen, groupId, tries); routeAndCheckNull(message, context, resp, chosen, groupId, tries);
} }
} });
} }
private Binding locateBinding(SimpleString clusterName, List<Binding> bindings) { private static Binding locateBinding(SimpleString clusterName, Binding[] bindings) {
for (Binding binding : bindings) { for (Binding binding : bindings) {
if (binding.getClusterName().equals(clusterName)) { if (binding.getClusterName().equals(clusterName)) {
return binding; return binding;
@ -603,21 +512,13 @@ public final class BindingsImpl implements Bindings {
if (routingNameBindingMap.isEmpty()) { if (routingNameBindingMap.isEmpty()) {
out.println("\tEMPTY!"); out.println("\tEMPTY!");
} }
for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet()) { routingNameBindingMap.forEach((routingName, bindings, nextPosition) -> {
out.println("\tkey=" + entry.getKey() + ", value(s):"); out.println("\tkey=" + routingName + ",\tposition=" + nextPosition.getIndex() + "\tvalue(s):");
for (Binding bind : entry.getValue()) { for (Binding bind : bindings) {
out.println("\t\t" + bind); out.println("\t\t" + bind);
} }
out.println(); out.println();
} });
out.println("routingNamePositions:");
if (routingNamePositions.isEmpty()) {
out.println("\tEMPTY!");
}
for (Map.Entry<SimpleString, Integer> entry : routingNamePositions.entrySet()) {
out.println("\tkey=" + entry.getKey() + ", value=" + entry.getValue());
}
out.println(); out.println();
@ -679,17 +580,21 @@ public final class BindingsImpl implements Bindings {
} }
} }
private int incrementPos(int pos, final int length) { private static int moveNextPosition(int position, final int length) {
pos++; position++;
if (pos == length) { if (position == length) {
pos = 0; position = 0;
} }
return pos; return position;
} }
/**
* debug method: used just for tests!!
* @return
*/
public Map<SimpleString, List<Binding>> getRoutingNameBindingMap() { public Map<SimpleString, List<Binding>> getRoutingNameBindingMap() {
return routingNameBindingMap; return routingNameBindingMap.copyAsMap();
} }
} }

View File

@ -0,0 +1,270 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.postoffice.impl;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Binding;
/**
* This is a copy-on-write map of {@link Binding} along with the last index set.<br>
*/
final class CopyOnWriteBindings {
public interface BindingIndex {
/**
* Cannot return a negative value and returns {@code 0} if uninitialized.
*/
int getIndex();
/**
* Cannot set a negative value.
*/
void setIndex(int v);
}
private static final class BindingsAndPosition extends AtomicReference<Binding[]> implements BindingIndex {
private static final AtomicIntegerFieldUpdater<BindingsAndPosition> NEXT_POSITION_UPDATER = AtomicIntegerFieldUpdater.newUpdater(BindingsAndPosition.class, "nextPosition");
public volatile int nextPosition;
BindingsAndPosition(Binding[] bindings) {
super(bindings);
NEXT_POSITION_UPDATER.lazySet(this, 0);
}
@Override
public int getIndex() {
return nextPosition;
}
@Override
public void setIndex(int v) {
if (v < 0) {
throw new IllegalArgumentException("cannot set a negative position");
}
NEXT_POSITION_UPDATER.lazySet(this, v);
}
}
private final ConcurrentHashMap<SimpleString, BindingsAndPosition> map;
CopyOnWriteBindings() {
map = new ConcurrentHashMap<>();
}
/**
* Add the specified {@code binding}, if not present.
*/
public void addBindingIfAbsent(Binding binding) {
Objects.requireNonNull(binding);
final SimpleString routingName = binding.getRoutingName();
Objects.requireNonNull(routingName);
BindingsAndPosition bindings;
do {
bindings = map.get(routingName);
if (bindings == null || bindings.get() == TOMBSTONE_BINDINGS) {
final BindingsAndPosition newBindings = new BindingsAndPosition(new Binding[]{binding});
bindings = map.compute(routingName, (ignored, bindingsAndPosition) -> {
if (bindingsAndPosition == null || bindingsAndPosition.get() == TOMBSTONE_BINDINGS) {
return newBindings;
}
return bindingsAndPosition;
});
assert bindings != null;
if (bindings == newBindings) {
return;
}
}
}
while (!addBindingIfAbsent(bindings, binding));
}
/**
* Remove the specified {@code binding}, if present.
*/
public void removeBinding(Binding binding) {
Objects.requireNonNull(binding);
final SimpleString routingName = binding.getRoutingName();
Objects.requireNonNull(routingName);
final BindingsAndPosition bindings = map.get(routingName);
if (bindings == null) {
return;
}
final Binding[] newBindings = removeBindingIfPresent(bindings, binding);
if (newBindings == TOMBSTONE_BINDINGS) {
// GC attempt
map.computeIfPresent(routingName, (bindingsRoutingName, existingBindings) -> {
if (existingBindings.get() == TOMBSTONE_BINDINGS) {
return null;
}
return existingBindings;
});
}
}
/**
* Returns a snapshot of the bindings, if present and a "lazy" binding index, otherwise {@code null}.<br>
* There is no strong commitment on preserving the index value if the related bindings are concurrently modified
* or the index itself is concurrently modified.
*/
public Pair<Binding[], BindingIndex> getBindings(SimpleString routingName) {
Objects.requireNonNull(routingName);
BindingsAndPosition bindings = map.get(routingName);
if (bindings == null) {
return null;
}
final Binding[] bindingsSnapshot = bindings.get();
if (bindingsSnapshot == TOMBSTONE_BINDINGS) {
return null;
}
assert bindingsSnapshot != null && bindingsSnapshot.length > 0;
return new Pair<>(bindingsSnapshot, bindings);
}
@FunctionalInterface
public interface BindingsConsumer<T extends Throwable> {
/**
* {@code routingName} cannot be {@code null}.
* {@code bindings} cannot be {@code null} or empty.
* {@code nextPosition} cannot be null.
*/
void accept(SimpleString routingName, Binding[] bindings, BindingIndex nextPosition) throws T;
}
/**
* Iterates through the bindings and its related indexes.<br>
*/
public <T extends Throwable> void forEach(BindingsConsumer<T> bindingsConsumer) throws T {
Objects.requireNonNull(bindingsConsumer);
if (map.isEmpty()) {
return;
}
for (Map.Entry<SimpleString, BindingsAndPosition> entry : map.entrySet()) {
final BindingsAndPosition value = entry.getValue();
final Binding[] bindings = value.get();
if (bindings == TOMBSTONE_BINDINGS) {
continue;
}
assert bindings != null && bindings.length > 0;
bindingsConsumer.accept(entry.getKey(), bindings, value);
}
}
public boolean isEmpty() {
return map.isEmpty();
}
public Map<SimpleString, List<Binding>> copyAsMap() {
if (map.isEmpty()) {
return Collections.emptyMap();
}
final HashMap<SimpleString, List<Binding>> copy = new HashMap<>(map.size());
map.forEach((routingName, bindings) -> {
final Binding[] bindingArray = bindings.get();
if (bindingArray == TOMBSTONE_BINDINGS) {
return;
}
copy.put(routingName, Arrays.asList(bindingArray));
});
return copy;
}
private static final Binding[] TOMBSTONE_BINDINGS = new Binding[0];
private static int indexOfBinding(final Binding[] bindings, final Binding toFind) {
for (int i = 0, size = bindings.length; i < size; i++) {
final Binding binding = bindings[i];
if (binding.equals(toFind)) {
return i;
}
}
return -1;
}
/**
* Remove the binding if present and returns the new bindings, {@code null} otherwise.
*/
private static Binding[] removeBindingIfPresent(final AtomicReference<Binding[]> bindings,
final Binding bindingToRemove) {
Objects.requireNonNull(bindings);
Objects.requireNonNull(bindingToRemove);
Binding[] oldBindings;
Binding[] newBindings;
do {
oldBindings = bindings.get();
// no need to check vs TOMBSTONE_BINDINGS, because found === -1;
final int found = indexOfBinding(oldBindings, bindingToRemove);
if (found == -1) {
return null;
}
final int oldBindingsCount = oldBindings.length;
if (oldBindingsCount == 1) {
newBindings = TOMBSTONE_BINDINGS;
} else {
final int newBindingsCount = oldBindingsCount - 1;
newBindings = new Binding[newBindingsCount];
System.arraycopy(oldBindings, 0, newBindings, 0, found);
final int remaining = newBindingsCount - found;
if (remaining > 0) {
System.arraycopy(oldBindings, found + 1, newBindings, found, remaining);
}
}
}
while (!bindings.compareAndSet(oldBindings, newBindings));
return newBindings;
}
/**
* Returns {@code true} if the given binding has been added or already present,
* {@code false} if bindings are going to be garbage-collected.
*/
private static boolean addBindingIfAbsent(final AtomicReference<Binding[]> bindings, final Binding newBinding) {
Objects.requireNonNull(bindings);
Objects.requireNonNull(newBinding);
Binding[] oldBindings;
Binding[] newBindings;
do {
oldBindings = bindings.get();
if (oldBindings == TOMBSTONE_BINDINGS) {
return false;
}
if (indexOfBinding(oldBindings, newBinding) >= 0) {
return true;
}
final int oldLength = oldBindings.length;
newBindings = Arrays.copyOf(oldBindings, oldLength + 1);
assert newBindings[oldLength] == null;
newBindings[oldLength] = newBinding;
}
while (!bindings.compareAndSet(oldBindings, newBindings));
return true;
}
}