diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java index 5c13c6cf68..4cc45b1593 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java @@ -25,12 +25,11 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.postoffice.Binding; @@ -54,9 +53,7 @@ public final class BindingsImpl implements Bindings { // This is public as we use on test assertions public static final int MAX_GROUP_RETRY = 10; - private final ConcurrentMap> routingNameBindingMap = new ConcurrentHashMap<>(); - - private final Map routingNamePositions = new ConcurrentHashMap<>(); + private final CopyOnWriteBindings routingNameBindingMap = new CopyOnWriteBindings(); private final Map bindingsIdMap = new ConcurrentHashMap<>(); @@ -113,6 +110,8 @@ public final class BindingsImpl implements Bindings { } } + + @Override public void addBinding(final Binding binding) { try { @@ -122,30 +121,14 @@ public final class BindingsImpl implements Bindings { if (binding.isExclusive()) { exclusiveBindings.add(binding); } else { - SimpleString routingName = binding.getRoutingName(); - - List bindings = routingNameBindingMap.get(routingName); - - if (bindings == null) { - bindings = new CopyOnWriteArrayList<>(); - - List oldBindings = routingNameBindingMap.putIfAbsent(routingName, bindings); - - if (oldBindings != null) { - bindings = oldBindings; - } - } - - if (!bindings.contains(binding)) { - bindings.add(binding); - } + routingNameBindingMap.addBindingIfAbsent(binding); } bindingsIdMap.put(binding.getID(), binding); bindingsNameMap.put(binding.getUniqueName(), binding); if (binding instanceof RemoteQueueBinding) { - setMessageLoadBalancingType(((RemoteQueueBinding)binding).getMessageLoadBalancingType()); + setMessageLoadBalancingType(((RemoteQueueBinding) binding).getMessageLoadBalancingType()); } if (logger.isTraceEnabled()) { logger.trace("Adding binding " + binding + " into " + this + " bindingTable: " + debugBindings()); @@ -174,17 +157,7 @@ public final class BindingsImpl implements Bindings { if (binding.isExclusive()) { exclusiveBindings.remove(binding); } else { - SimpleString routingName = binding.getRoutingName(); - - List bindings = routingNameBindingMap.get(routingName); - - if (bindings != null) { - bindings.remove(binding); - - if (bindings.isEmpty()) { - routingNameBindingMap.remove(routingName); - } - } + routingNameBindingMap.removeBinding(binding); } bindingsIdMap.remove(binding.getID()); @@ -208,78 +181,56 @@ public final class BindingsImpl implements Bindings { public boolean redistribute(final Message message, final Queue originatingQueue, final RoutingContext context) throws Exception { - if (messageLoadBalancingType.equals(MessageLoadBalancingType.STRICT) || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF)) { + final MessageLoadBalancingType loadBalancingType = this.messageLoadBalancingType; + if (loadBalancingType.equals(MessageLoadBalancingType.STRICT) || loadBalancingType.equals(MessageLoadBalancingType.OFF)) { return false; } if (logger.isTraceEnabled()) { - logger.trace("Redistributing message " + message); + logger.tracef("Redistributing message %s", message); } - SimpleString routingName = originatingQueue.getName(); + final SimpleString routingName = originatingQueue.getName(); - List bindings = routingNameBindingMap.get(routingName); + final Pair bindingsAndPosition = routingNameBindingMap.getBindings(routingName); - if (bindings == null) { + if (bindingsAndPosition == null) { // The value can become null if it's concurrently removed while we're iterating - this is expected // ConcurrentHashMap behaviour! return false; } - Integer ipos = routingNamePositions.get(routingName); + final Binding[] bindings = bindingsAndPosition.getA(); - int pos = ipos != null ? ipos.intValue() : 0; + final CopyOnWriteBindings.BindingIndex bindingIndex = bindingsAndPosition.getB(); - int length = bindings.size(); + assert bindings.length > 0; - int startPos = pos; + final int bindingsCount = bindings.length; - Binding theBinding = null; + int nextPosition = bindingIndex.getIndex(); - // TODO - combine this with similar logic in route() - while (true) { - Binding binding; - try { - binding = bindings.get(pos); - } catch (IndexOutOfBoundsException e) { - // This can occur if binding is removed while in route - if (!bindings.isEmpty()) { - pos = 0; - startPos = 0; - length = bindings.size(); - - continue; - } else { - break; - } - } - - pos = incrementPos(pos, length); - - Filter filter = binding.getFilter(); - - boolean highPrior = binding.isHighAcceptPriority(message); + if (nextPosition >= bindingsCount) { + nextPosition = 0; + } + Binding nextBinding = null; + for (int i = 0; i < bindingsCount; i++) { + final Binding binding = bindings[nextPosition]; + nextPosition = moveNextPosition(nextPosition, bindingsCount); + final Filter filter = binding.getFilter(); + final boolean highPrior = binding.isHighAcceptPriority(message); if (highPrior && binding.getBindable() != originatingQueue && (filter == null || filter.match(message))) { - theBinding = binding; - - break; - } - - if (pos == startPos) { + nextBinding = binding; break; } } - - routingNamePositions.put(routingName, pos); - - if (theBinding != null) { - theBinding.route(message, context); - - return true; - } else { + if (nextBinding == null) { return false; } + bindingIndex.setIndex(nextPosition); + nextBinding.route(message, context); + return true; } @Override @@ -290,8 +241,8 @@ public final class BindingsImpl implements Bindings { private void route(final Message message, final RoutingContext context, final boolean groupRouting) throws Exception { - int currentVersion = version.get(); - boolean reusableContext = context.isReusable(message, currentVersion); + final int currentVersion = version.get(); + final boolean reusableContext = context.isReusable(message, currentVersion); if (!reusableContext) { context.clear(); @@ -300,54 +251,33 @@ public final class BindingsImpl implements Bindings { /* This is a special treatment for scaled-down messages involving SnF queues. * See org.apache.activemq.artemis.core.server.impl.ScaleDownHandler.scaleDownMessages() for the logic that sends messages with this property */ - byte[] ids = message.removeExtraBytesProperty(Message.HDR_SCALEDOWN_TO_IDS); + final byte[] ids = message.removeExtraBytesProperty(Message.HDR_SCALEDOWN_TO_IDS); if (ids != null) { - ByteBuffer buffer = ByteBuffer.wrap(ids); - while (buffer.hasRemaining()) { - long id = buffer.getLong(); - for (Map.Entry entry : bindingsIdMap.entrySet()) { - if (entry.getValue() instanceof RemoteQueueBinding) { - RemoteQueueBinding remoteQueueBinding = (RemoteQueueBinding) entry.getValue(); - if (remoteQueueBinding.getRemoteQueueID() == id) { - message.putExtraBytesProperty(Message.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array()); - } - } - } - } + handleScaledDownMessage(message, ids); } - boolean routed = false; - - boolean hasExclusives = false; - - for (Binding binding : exclusiveBindings) { - if (!hasExclusives) { - context.clear().setReusable(false); - hasExclusives = true; - } - - if (binding.getFilter() == null || binding.getFilter().match(message)) { - binding.getBindable().route(message, context); - routed = true; - } + final boolean routed; + // despite the double check can lead to some race, this can save allocating an iterator for an empty set + if (!exclusiveBindings.isEmpty()) { + routed = routeToExclusiveBindings(message, context); + } else { + routed = false; } if (!routed) { // Remove the ids now, in order to avoid double check - ids = message.removeExtraBytesProperty(Message.HDR_ROUTE_TO_IDS); + final byte[] routeToIds = message.removeExtraBytesProperty(Message.HDR_ROUTE_TO_IDS); - // Fetch the groupId now, in order to avoid double checking - SimpleString groupId = message.getGroupID(); - - if (ids != null) { + SimpleString groupId; + if (routeToIds != null) { context.clear().setReusable(false); - routeFromCluster(message, context, ids); - } else if (groupingHandler != null && groupRouting && groupId != null) { + routeFromCluster(message, context, routeToIds); + } else if (groupRouting && groupingHandler != null && (groupId = message.getGroupID()) != null) { context.clear().setReusable(false); routeUsingStrictOrdering(message, context, groupingHandler, groupId, 0); } else if (CompositeAddress.isFullyQualified(message.getAddress())) { context.clear().setReusable(false); - Binding theBinding = bindingsNameMap.get(CompositeAddress.extractQueueName(message.getAddressSimpleString())); + final Binding theBinding = bindingsNameMap.get(CompositeAddress.extractQueueName(message.getAddressSimpleString())); if (theBinding != null) { theBinding.route(message, context); } @@ -361,35 +291,58 @@ public final class BindingsImpl implements Bindings { } } - private void simpleRouting(Message message, RoutingContext context, int currentVersion) throws Exception { + private boolean routeToExclusiveBindings(final Message message, final RoutingContext context) throws Exception { + boolean hasExclusives = false; + boolean routed = false; + for (Binding binding : exclusiveBindings) { + if (!hasExclusives) { + context.clear().setReusable(false); + hasExclusives = true; + } + final Filter filter = binding.getFilter(); + if (filter == null || filter.match(message)) { + binding.getBindable().route(message, context); + routed = true; + } + } + return routed; + } + + private void handleScaledDownMessage(final Message message, final byte[] ids) { + ByteBuffer buffer = ByteBuffer.wrap(ids); + while (buffer.hasRemaining()) { + long id = buffer.getLong(); + for (Map.Entry entry : bindingsIdMap.entrySet()) { + if (entry.getValue() instanceof RemoteQueueBinding) { + RemoteQueueBinding remoteQueueBinding = (RemoteQueueBinding) entry.getValue(); + if (remoteQueueBinding.getRemoteQueueID() == id) { + message.putExtraBytesProperty(Message.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array()); + } + } + } + } + } + + private void simpleRouting(final Message message, + final RoutingContext context, + final int currentVersion) throws Exception { if (logger.isTraceEnabled()) { - logger.trace("Routing message " + message + " on binding=" + this + " current context::" + context); + logger.tracef("Routing message %s on binding=%s current context::$s", message, this, context); } - for (Map.Entry> entry : routingNameBindingMap.entrySet()) { - SimpleString routingName = entry.getKey(); - - List bindings = entry.getValue(); - - if (bindings == null) { - // The value can become null if it's concurrently removed while we're iterating - this is expected - // ConcurrentHashMap behaviour! - continue; - } - - Binding theBinding = getNextBinding(message, routingName, bindings); - - if (theBinding != null && theBinding.getFilter() == null && bindings.size() == 1 && theBinding.isLocal()) { + routingNameBindingMap.forEach((routingName, bindings, nextPosition) -> { + final Binding nextBinding = getNextBinding(message, bindings, nextPosition); + if (nextBinding != null && nextBinding.getFilter() == null && nextBinding.isLocal() && bindings.length == 1) { context.setReusable(true, currentVersion); } else { // notice that once this is set to false, any calls to setReusable(true) will be moot as the context will ignore it context.setReusable(false, currentVersion); } - if (theBinding != null) { - theBinding.route(message, context); + if (nextBinding != null) { + nextBinding.route(message, context); } - } + }); } @Override @@ -406,99 +359,65 @@ public final class BindingsImpl implements Bindings { * (depending if you are using multi-thread), and not lose messages. */ private Binding getNextBinding(final Message message, - final SimpleString routingName, - final List bindings) { - Integer ipos = routingNamePositions.get(routingName); + final Binding[] bindings, + final CopyOnWriteBindings.BindingIndex bindingIndex) { + int nextPosition = bindingIndex.getIndex(); - int pos = ipos != null ? ipos : 0; + final int bindingsCount = bindings.length; - int length = bindings.size(); - - int startPos = pos; - - Binding theBinding = null; + if (nextPosition >= bindingsCount) { + nextPosition = 0; + } + Binding nextBinding = null; int lastLowPriorityBinding = -1; + // snapshot this, to save loading it on each iteration + final MessageLoadBalancingType loadBalancingType = this.messageLoadBalancingType; - while (true) { - Binding binding; - try { - binding = bindings.get(pos); - } catch (IndexOutOfBoundsException e) { - // This can occur if binding is removed while in route - if (!bindings.isEmpty()) { - pos = 0; - startPos = 0; - length = bindings.size(); - - continue; - } else { - break; - } - } - - if (matchBinding(message, binding)) { + for (int i = 0; i < bindingsCount; i++) { + final Binding binding = bindings[nextPosition]; + if (matchBinding(message, binding, loadBalancingType)) { // bindings.length == 1 ==> only a local queue so we don't check for matching consumers (it's an // unnecessary overhead) - if (length == 1 || (binding.isConnected() && (messageLoadBalancingType.equals(MessageLoadBalancingType.STRICT) || binding.isHighAcceptPriority(message)))) { - theBinding = binding; - - pos = incrementPos(pos, length); - + if (bindingsCount == 1 || (binding.isConnected() && (loadBalancingType.equals(MessageLoadBalancingType.STRICT) || binding.isHighAcceptPriority(message)))) { + nextBinding = binding; + nextPosition = moveNextPosition(nextPosition, bindingsCount); break; - } else { - //https://issues.jboss.org/browse/HORNETQ-1254 When !routeWhenNoConsumers, - // the localQueue should always have the priority over the secondary bindings - if (lastLowPriorityBinding == -1 || messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND) && binding instanceof LocalQueueBinding) { - lastLowPriorityBinding = pos; - } + } + //https://issues.jboss.org/browse/HORNETQ-1254 When !routeWhenNoConsumers, + // the localQueue should always have the priority over the secondary bindings + if (lastLowPriorityBinding == -1 || loadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND) && binding instanceof LocalQueueBinding) { + lastLowPriorityBinding = nextPosition; } } - - pos = incrementPos(pos, length); - - if (pos == startPos) { - - // if no bindings were found, we will apply a secondary level on the routing logic - if (lastLowPriorityBinding != -1) { - try { - theBinding = bindings.get(lastLowPriorityBinding); - } catch (IndexOutOfBoundsException e) { - // This can occur if binding is removed while in route - if (!bindings.isEmpty()) { - pos = 0; - - lastLowPriorityBinding = -1; - - continue; - } else { - break; - } - } - - pos = incrementPos(lastLowPriorityBinding, length); - } - break; + nextPosition = moveNextPosition(nextPosition, bindingsCount); + } + if (nextBinding == null) { + // if no bindings were found, we will apply a secondary level on the routing logic + if (lastLowPriorityBinding != -1) { + nextBinding = bindings[lastLowPriorityBinding]; + nextPosition = moveNextPosition(lastLowPriorityBinding, bindingsCount); } } - if (pos != startPos) { - routingNamePositions.put(routingName, pos); + if (nextBinding != null) { + bindingIndex.setIndex(nextPosition); } - return theBinding; + return nextBinding; } - private boolean matchBinding(Message message, Binding binding) { - if (messageLoadBalancingType.equals(MessageLoadBalancingType.OFF) && binding instanceof RemoteQueueBinding) { + private static boolean matchBinding(final Message message, + final Binding binding, + final MessageLoadBalancingType loadBalancingType) { + if (loadBalancingType.equals(MessageLoadBalancingType.OFF) && binding instanceof RemoteQueueBinding) { return false; } - Filter filter = binding.getFilter(); + final Filter filter = binding.getFilter(); if (filter == null || filter.match(message)) { return true; - } else { - return false; } + return false; } private void routeUsingStrictOrdering(final Message message, @@ -506,17 +425,7 @@ public final class BindingsImpl implements Bindings { final GroupingHandler groupingGroupingHandler, final SimpleString groupId, final int tries) throws Exception { - for (Map.Entry> entry : routingNameBindingMap.entrySet()) { - SimpleString routingName = entry.getKey(); - - List bindings = entry.getValue(); - - if (bindings == null) { - // The value can become null if it's concurrently removed while we're iterating - this is expected - // ConcurrentHashMap behaviour! - continue; - } - + routingNameBindingMap.forEach((routingName, bindings, nextPosition) -> { // concat a full group id, this is for when a binding has multiple bindings // NOTE: In case a dev ever change this rule, QueueImpl::unproposed is using this rule to determine if // the binding belongs to its Queue before removing it @@ -527,9 +436,9 @@ public final class BindingsImpl implements Bindings { if (resp == null) { // ok let's find the next binding to propose - Binding theBinding = getNextBinding(message, routingName, bindings); + Binding theBinding = getNextBinding(message, bindings, nextPosition); if (theBinding == null) { - continue; + return; } resp = groupingGroupingHandler.propose(new Proposal(fullID, theBinding.getClusterName())); @@ -554,10 +463,10 @@ public final class BindingsImpl implements Bindings { routeAndCheckNull(message, context, resp, chosen, groupId, tries); } - } + }); } - private Binding locateBinding(SimpleString clusterName, List bindings) { + private static Binding locateBinding(SimpleString clusterName, Binding[] bindings) { for (Binding binding : bindings) { if (binding.getClusterName().equals(clusterName)) { return binding; @@ -603,21 +512,13 @@ public final class BindingsImpl implements Bindings { if (routingNameBindingMap.isEmpty()) { out.println("\tEMPTY!"); } - for (Map.Entry> entry : routingNameBindingMap.entrySet()) { - out.println("\tkey=" + entry.getKey() + ", value(s):"); - for (Binding bind : entry.getValue()) { + routingNameBindingMap.forEach((routingName, bindings, nextPosition) -> { + out.println("\tkey=" + routingName + ",\tposition=" + nextPosition.getIndex() + "\tvalue(s):"); + for (Binding bind : bindings) { out.println("\t\t" + bind); } out.println(); - } - - out.println("routingNamePositions:"); - if (routingNamePositions.isEmpty()) { - out.println("\tEMPTY!"); - } - for (Map.Entry entry : routingNamePositions.entrySet()) { - out.println("\tkey=" + entry.getKey() + ", value=" + entry.getValue()); - } + }); out.println(); @@ -679,17 +580,21 @@ public final class BindingsImpl implements Bindings { } } - private int incrementPos(int pos, final int length) { - pos++; + private static int moveNextPosition(int position, final int length) { + position++; - if (pos == length) { - pos = 0; + if (position == length) { + position = 0; } - return pos; + return position; } + /** + * debug method: used just for tests!! + * @return + */ public Map> getRoutingNameBindingMap() { - return routingNameBindingMap; + return routingNameBindingMap.copyAsMap(); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CopyOnWriteBindings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CopyOnWriteBindings.java new file mode 100644 index 0000000000..fc79da45c1 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CopyOnWriteBindings.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.postoffice.impl; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.Binding; + +/** + * This is a copy-on-write map of {@link Binding} along with the last index set.
+ */ +final class CopyOnWriteBindings { + + public interface BindingIndex { + + /** + * Cannot return a negative value and returns {@code 0} if uninitialized. + */ + int getIndex(); + + /** + * Cannot set a negative value. + */ + void setIndex(int v); + } + + private static final class BindingsAndPosition extends AtomicReference implements BindingIndex { + + private static final AtomicIntegerFieldUpdater NEXT_POSITION_UPDATER = AtomicIntegerFieldUpdater.newUpdater(BindingsAndPosition.class, "nextPosition"); + + public volatile int nextPosition; + + BindingsAndPosition(Binding[] bindings) { + super(bindings); + NEXT_POSITION_UPDATER.lazySet(this, 0); + } + + @Override + public int getIndex() { + return nextPosition; + } + + @Override + public void setIndex(int v) { + if (v < 0) { + throw new IllegalArgumentException("cannot set a negative position"); + } + NEXT_POSITION_UPDATER.lazySet(this, v); + } + } + + private final ConcurrentHashMap map; + + CopyOnWriteBindings() { + map = new ConcurrentHashMap<>(); + } + + /** + * Add the specified {@code binding}, if not present. + */ + public void addBindingIfAbsent(Binding binding) { + Objects.requireNonNull(binding); + final SimpleString routingName = binding.getRoutingName(); + Objects.requireNonNull(routingName); + BindingsAndPosition bindings; + do { + bindings = map.get(routingName); + if (bindings == null || bindings.get() == TOMBSTONE_BINDINGS) { + final BindingsAndPosition newBindings = new BindingsAndPosition(new Binding[]{binding}); + bindings = map.compute(routingName, (ignored, bindingsAndPosition) -> { + if (bindingsAndPosition == null || bindingsAndPosition.get() == TOMBSTONE_BINDINGS) { + return newBindings; + } + return bindingsAndPosition; + }); + assert bindings != null; + if (bindings == newBindings) { + return; + } + } + } + while (!addBindingIfAbsent(bindings, binding)); + } + + /** + * Remove the specified {@code binding}, if present. + */ + public void removeBinding(Binding binding) { + Objects.requireNonNull(binding); + final SimpleString routingName = binding.getRoutingName(); + Objects.requireNonNull(routingName); + final BindingsAndPosition bindings = map.get(routingName); + if (bindings == null) { + return; + } + final Binding[] newBindings = removeBindingIfPresent(bindings, binding); + if (newBindings == TOMBSTONE_BINDINGS) { + // GC attempt + map.computeIfPresent(routingName, (bindingsRoutingName, existingBindings) -> { + if (existingBindings.get() == TOMBSTONE_BINDINGS) { + return null; + } + return existingBindings; + }); + } + } + + /** + * Returns a snapshot of the bindings, if present and a "lazy" binding index, otherwise {@code null}.
+ * There is no strong commitment on preserving the index value if the related bindings are concurrently modified + * or the index itself is concurrently modified. + */ + public Pair getBindings(SimpleString routingName) { + Objects.requireNonNull(routingName); + BindingsAndPosition bindings = map.get(routingName); + if (bindings == null) { + return null; + } + final Binding[] bindingsSnapshot = bindings.get(); + if (bindingsSnapshot == TOMBSTONE_BINDINGS) { + return null; + } + assert bindingsSnapshot != null && bindingsSnapshot.length > 0; + return new Pair<>(bindingsSnapshot, bindings); + } + + @FunctionalInterface + public interface BindingsConsumer { + + /** + * {@code routingName} cannot be {@code null}. + * {@code bindings} cannot be {@code null} or empty. + * {@code nextPosition} cannot be null. + */ + void accept(SimpleString routingName, Binding[] bindings, BindingIndex nextPosition) throws T; + } + + /** + * Iterates through the bindings and its related indexes.
+ */ + public void forEach(BindingsConsumer bindingsConsumer) throws T { + Objects.requireNonNull(bindingsConsumer); + if (map.isEmpty()) { + return; + } + for (Map.Entry entry : map.entrySet()) { + final BindingsAndPosition value = entry.getValue(); + final Binding[] bindings = value.get(); + if (bindings == TOMBSTONE_BINDINGS) { + continue; + } + assert bindings != null && bindings.length > 0; + bindingsConsumer.accept(entry.getKey(), bindings, value); + } + } + + public boolean isEmpty() { + return map.isEmpty(); + } + + public Map> copyAsMap() { + if (map.isEmpty()) { + return Collections.emptyMap(); + } + final HashMap> copy = new HashMap<>(map.size()); + map.forEach((routingName, bindings) -> { + final Binding[] bindingArray = bindings.get(); + if (bindingArray == TOMBSTONE_BINDINGS) { + return; + } + copy.put(routingName, Arrays.asList(bindingArray)); + }); + return copy; + } + + private static final Binding[] TOMBSTONE_BINDINGS = new Binding[0]; + + private static int indexOfBinding(final Binding[] bindings, final Binding toFind) { + for (int i = 0, size = bindings.length; i < size; i++) { + final Binding binding = bindings[i]; + if (binding.equals(toFind)) { + return i; + } + } + return -1; + } + + /** + * Remove the binding if present and returns the new bindings, {@code null} otherwise. + */ + private static Binding[] removeBindingIfPresent(final AtomicReference bindings, + final Binding bindingToRemove) { + Objects.requireNonNull(bindings); + Objects.requireNonNull(bindingToRemove); + Binding[] oldBindings; + Binding[] newBindings; + do { + oldBindings = bindings.get(); + // no need to check vs TOMBSTONE_BINDINGS, because found === -1; + final int found = indexOfBinding(oldBindings, bindingToRemove); + if (found == -1) { + return null; + } + final int oldBindingsCount = oldBindings.length; + if (oldBindingsCount == 1) { + newBindings = TOMBSTONE_BINDINGS; + } else { + final int newBindingsCount = oldBindingsCount - 1; + newBindings = new Binding[newBindingsCount]; + System.arraycopy(oldBindings, 0, newBindings, 0, found); + final int remaining = newBindingsCount - found; + if (remaining > 0) { + System.arraycopy(oldBindings, found + 1, newBindings, found, remaining); + } + } + } + while (!bindings.compareAndSet(oldBindings, newBindings)); + return newBindings; + } + + /** + * Returns {@code true} if the given binding has been added or already present, + * {@code false} if bindings are going to be garbage-collected. + */ + private static boolean addBindingIfAbsent(final AtomicReference bindings, final Binding newBinding) { + Objects.requireNonNull(bindings); + Objects.requireNonNull(newBinding); + Binding[] oldBindings; + Binding[] newBindings; + do { + oldBindings = bindings.get(); + if (oldBindings == TOMBSTONE_BINDINGS) { + return false; + } + if (indexOfBinding(oldBindings, newBinding) >= 0) { + return true; + } + final int oldLength = oldBindings.length; + newBindings = Arrays.copyOf(oldBindings, oldLength + 1); + assert newBindings[oldLength] == null; + newBindings[oldLength] = newBinding; + } + while (!bindings.compareAndSet(oldBindings, newBindings)); + return true; + } +}