diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/IterableStream.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/IterableStream.java new file mode 100644 index 0000000000..412a770b0a --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/IterableStream.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import java.util.stream.Stream; + +public final class IterableStream { + + private IterableStream() { + + } + + public static Iterable iterableOf(Stream stream) { + return stream::iterator; + } + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index 8b608ac176..9084bebac4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -2184,7 +2184,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active clearIO(); try { - for (Binding binding : postOffice.getMatchingBindings(SimpleString.toSimpleString(address)).getBindings()) { + for (Binding binding : postOffice.getMatchingBindings(SimpleString.toSimpleString(address))) { if (binding instanceof LocalQueueBinding) { Queue queue = ((LocalQueueBinding) binding).getQueue(); for (Consumer consumer : queue.getConsumers()) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java index c8c0428d2e..fb7f55e0e0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java @@ -16,12 +16,13 @@ */ package org.apache.activemq.artemis.core.postoffice; +import java.util.Collection; import java.util.EnumSet; -import java.util.Map; import java.util.Set; +import java.util.stream.Stream; -import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -45,9 +46,9 @@ public interface AddressManager { Bindings getBindingsForRoutingAddress(SimpleString address) throws Exception; - Bindings getMatchingBindings(SimpleString address) throws Exception; + Collection getMatchingBindings(SimpleString address) throws Exception; - Bindings getDirectBindings(SimpleString address) throws Exception; + Collection getDirectBindings(SimpleString address) throws Exception; SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception; @@ -57,7 +58,7 @@ public interface AddressManager { Binding getBinding(SimpleString queueName); - Map getBindings(); + Stream getBindings(); Set getAddresses(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java index b54448f8d9..a03f7e3476 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java @@ -16,21 +16,22 @@ */ package org.apache.activemq.artemis.core.postoffice; +import java.util.Collection; import java.util.EnumSet; import java.util.List; -import java.util.Map; import java.util.Set; +import java.util.stream.Stream; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -135,11 +136,11 @@ public interface PostOffice extends ActiveMQComponent { Binding getBinding(SimpleString uniqueName); - Bindings getMatchingBindings(SimpleString address) throws Exception; + Collection getMatchingBindings(SimpleString address) throws Exception; - Bindings getDirectBindings(SimpleString address) throws Exception; + Collection getDirectBindings(SimpleString address) throws Exception; - Map getAllBindings(); + Stream getAllBindings(); SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/AddressImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/AddressImpl.java index aef2c10cac..12ef067669 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/AddressImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/AddressImpl.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.postoffice.impl; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import org.apache.activemq.artemis.api.core.SimpleString; @@ -36,7 +37,7 @@ public class AddressImpl implements Address { private final boolean containsWildCard; - private final List
linkedAddresses = new ArrayList<>(); + private List
linkedAddresses = null; private final WildcardConfiguration wildcardConfiguration; @@ -68,11 +69,14 @@ public class AddressImpl implements Address { @Override public List
getLinkedAddresses() { - return linkedAddresses; + return linkedAddresses == null ? Collections.emptyList() : linkedAddresses; } @Override public void addLinkedAddress(final Address address) { + if (linkedAddresses == null) { + linkedAddresses = new ArrayList<>(1); + } if (!linkedAddresses.contains(address)) { linkedAddresses.add(address); } @@ -80,6 +84,9 @@ public class AddressImpl implements Address { @Override public void removeLinkedAddress(final Address actualAddress) { + if (linkedAddresses == null) { + return; + } linkedAddresses.remove(actualAddress); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 04b5c9ce7b..9e063fad27 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -82,6 +82,7 @@ import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.jboss.logging.Logger; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; @@ -97,6 +98,9 @@ import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; + +import static org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf; /** * This is the class that will make the routing to Queues and decide which consumer will get the messages @@ -769,18 +773,16 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding server.callBrokerAddressPlugins(plugin -> plugin.beforeRemoveAddress(address)); } - final Bindings bindingsForAddress = getDirectBindings(address); + final Collection bindingsForAddress = getDirectBindings(address); if (force) { - for (Binding binding : bindingsForAddress.getBindings()) { + for (Binding binding : bindingsForAddress) { if (binding instanceof QueueBinding) { ((QueueBinding)binding).getQueue().deleteQueue(true); } } - } else { - if (bindingsForAddress.getBindings().size() > 0) { - throw ActiveMQMessageBundle.BUNDLE.addressHasBindings(address); - } + } else if (!bindingsForAddress.isEmpty()) { + throw ActiveMQMessageBundle.BUNDLE.addressHasBindings(address); } managementService.unregisterAddress(address); final AddressInfo addressInfo = addressManager.removeAddressInfo(address); @@ -969,17 +971,17 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } @Override - public Bindings getMatchingBindings(final SimpleString address) throws Exception { + public Collection getMatchingBindings(final SimpleString address) throws Exception { return addressManager.getMatchingBindings(address); } @Override - public Bindings getDirectBindings(final SimpleString address) throws Exception { + public Collection getDirectBindings(final SimpleString address) throws Exception { return addressManager.getDirectBindings(address); } @Override - public Map getAllBindings() { + public Stream getAllBindings() { return addressManager.getBindings(); } @@ -1731,7 +1733,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding public void run() { // The reaper thread should be finished case the PostOffice is gone // This is to avoid leaks on PostOffice between stops and starts - for (Queue queue : getLocalQueues()) { + for (Queue queue : iterableOf(getLocalQueues())) { try { queue.expireReferences(); } catch (Exception e) { @@ -1753,11 +1755,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding @Override public void run() { - for (Queue queue : getLocalQueues()) { + getLocalQueues().forEach(queue -> { if (!queue.isInternalQueue() && QueueManagerImpl.isAutoDelete(queue) && QueueManagerImpl.consumerCountCheck(queue) && QueueManagerImpl.delayCheck(queue) && QueueManagerImpl.messageCountCheck(queue) && queueWasUsed(queue)) { QueueManagerImpl.performAutoDeleteQueue(server, queue); } - } + }); Set addresses = addressManager.getAddresses(); @@ -1796,19 +1798,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } - private List getLocalQueues() { - Map nameMap = addressManager.getBindings(); - - List queues = new ArrayList<>(); - - for (Binding binding : nameMap.values()) { - if (binding.getType() == BindingType.LOCAL_QUEUE) { - Queue queue = (Queue) binding.getBindable(); - - queues.add(queue); - } - } - return queues; + private Stream getLocalQueues() { + return addressManager.getBindings() + .filter(binding -> binding.getType() == BindingType.LOCAL_QUEUE) + .map(binding -> (Queue) binding.getBindable()); } public static final class AddOperation implements TransactionOperation { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java index 35d3e6902b..965cd6f47d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java @@ -16,15 +16,18 @@ */ package org.apache.activemq.artemis.core.postoffice.impl; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; -import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.stream.Stream; +import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -35,7 +38,6 @@ import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.BindingsFactory; import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.metrics.MetricsManager; @@ -59,10 +61,7 @@ public class SimpleAddressManager implements AddressManager { */ protected final ConcurrentMap mappings = new ConcurrentHashMap<>(); - /** - * {@code HashMap} - */ - private final ConcurrentMap nameMap = new ConcurrentHashMap<>(); + private final ConcurrentMap> nameMap = new ConcurrentHashMap<>(); private final BindingsFactory bindingsFactory; @@ -87,7 +86,8 @@ public class SimpleAddressManager implements AddressManager { @Override public boolean addBinding(final Binding binding) throws Exception { - if (nameMap.putIfAbsent(binding.getUniqueName(), binding) != null) { + final Pair bindingAddressPair = new Pair<>(binding, new AddressImpl(binding.getAddress(), wildcardConfiguration)); + if (nameMap.putIfAbsent(binding.getUniqueName(), bindingAddressPair) != null) { throw ActiveMQMessageBundle.BUNDLE.bindingAlreadyExists(binding); } @@ -100,15 +100,15 @@ public class SimpleAddressManager implements AddressManager { @Override public Binding removeBinding(final SimpleString uniqueName, Transaction tx) throws Exception { - final Binding binding = nameMap.remove(uniqueName); + final Pair binding = nameMap.remove(uniqueName); if (binding == null) { return null; } - removeBindingInternal(binding.getAddress(), uniqueName); + removeBindingInternal(binding.getA().getAddress(), uniqueName); - return binding; + return binding.getA(); } @Override @@ -118,42 +118,40 @@ public class SimpleAddressManager implements AddressManager { @Override public Binding getBinding(final SimpleString bindableName) { - return nameMap.get(CompositeAddress.extractQueueName(bindableName)); + final Pair bindingAddressPair = nameMap.get(CompositeAddress.extractQueueName(bindableName)); + return bindingAddressPair == null ? null : bindingAddressPair.getA(); } @Override - public Map getBindings() { - return nameMap; + public Stream getBindings() { + return nameMap.values().stream().map(pair -> pair.getA()); } @Override - public Bindings getMatchingBindings(final SimpleString address) throws Exception { + public Collection getMatchingBindings(final SimpleString address) throws Exception { SimpleString realAddress = CompositeAddress.extractAddressName(address); Address add = new AddressImpl(realAddress, wildcardConfiguration); - Bindings bindings = bindingsFactory.createBindings(realAddress); - - for (Binding binding : nameMap.values()) { - Address addCheck = new AddressImpl(binding.getAddress(), wildcardConfiguration); - + Collection bindings = new ArrayList<>(); + nameMap.forEach((bindingUniqueName, bindingAddressPair) -> { + final Address addCheck = bindingAddressPair.getB(); if (addCheck.matches(add)) { - bindings.addBinding(binding); + bindings.add(bindingAddressPair.getA()); } - } - + }); return bindings; } @Override - public Bindings getDirectBindings(final SimpleString address) throws Exception { + public Collection getDirectBindings(final SimpleString address) throws Exception { SimpleString realAddress = CompositeAddress.extractAddressName(address); - Bindings bindings = bindingsFactory.createBindings(realAddress); + Collection bindings = new ArrayList<>(); - for (Binding binding : nameMap.values()) { - if (binding.getAddress().equals(realAddress)) { - bindings.addBinding(binding); + nameMap.forEach((bindingUniqueName, bindingAddressPair) -> { + if (bindingAddressPair.getA().getAddress().equals(realAddress)) { + bindings.add(bindingAddressPair.getA()); } - } + }); return bindings; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/address/FederatedAddress.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/address/FederatedAddress.java index 819fa3d1d6..a7218edb9e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/address/FederatedAddress.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/address/FederatedAddress.java @@ -111,8 +111,6 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe super.start(); server.getPostOffice() .getAllBindings() - .values() - .stream() .filter(b -> b instanceof QueueBinding || b instanceof DivertBinding) .forEach(this::afterAddBinding); } @@ -144,7 +142,7 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe //if a new address is added we need to see if there are matching divert bindings server.getPostOffice() .getDirectBindings(addressInfo.getName()) - .getBindings().stream().filter(binding -> binding instanceof DivertBinding) + .stream().filter(binding -> binding instanceof DivertBinding) .forEach(this::afterAddBinding); } catch (Exception e) { ActiveMQServerLogger.LOGGER.federationBindingsLookupError(e, addressInfo.getName()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/queue/FederatedQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/queue/FederatedQueue.java index 79536251ee..33df3b500a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/queue/FederatedQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/queue/FederatedQueue.java @@ -94,8 +94,6 @@ public class FederatedQueue extends FederatedAbstract implements ActiveMQServerC super.start(); server.getPostOffice() .getAllBindings() - .values() - .stream() .filter(b -> b instanceof QueueBinding) .map(b -> (QueueBinding) b) .forEach(b -> conditionalCreateRemoteConsumer(b.getQueue())); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 308eb0ce6e..25685e3a2d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -27,6 +27,7 @@ import java.security.AccessController; import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Date; import java.util.EnumSet; import java.util.HashMap; @@ -100,7 +101,6 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContex import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.BindingType; -import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding; @@ -204,6 +204,8 @@ import org.apache.activemq.artemis.utils.critical.CriticalComponent; import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer; import org.jboss.logging.Logger; +import static org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf; + /** * The ActiveMQ Artemis server implementation */ @@ -910,19 +912,17 @@ public class ActiveMQServerImpl implements ActiveMQServer { int defaultConsumersBeforeDispatch = addressSettings.getDefaultConsumersBeforeDispatch(); long defaultDelayBeforeDispatch = addressSettings.getDefaultDelayBeforeDispatch(); - List names = new ArrayList<>(); - // make an exception for the management address (see HORNETQ-29) ManagementService managementService = getManagementService(); if (managementService != null) { if (realAddress.equals(managementService.getManagementAddress())) { - return new BindingQueryResult(true, null, names, autoCreateQeueus, autoCreateAddresses, defaultPurgeOnNoConsumers, defaultMaxConsumers, defaultExclusive, defaultLastValue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch); + return new BindingQueryResult(true, null, Collections.emptyList(), autoCreateQeueus, autoCreateAddresses, defaultPurgeOnNoConsumers, defaultMaxConsumers, defaultExclusive, defaultLastValue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch); } } - Bindings bindings = getPostOffice().getMatchingBindings(realAddress); + List names = new ArrayList<>(); - for (Binding binding : bindings.getBindings()) { + for (Binding binding : getPostOffice().getMatchingBindings(realAddress)) { if (binding.getType() == BindingType.LOCAL_QUEUE || binding.getType() == BindingType.REMOTE_QUEUE) { SimpleString name; if (!newFQQN && CompositeAddress.isFullyQualified(address.toString())) { @@ -1594,11 +1594,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { } public int getQueueCountForUser(String username) throws Exception { - Map bindings = postOffice.getAllBindings(); - int queuesForUser = 0; - for (Binding binding : bindings.values()) { + for (Binding binding : iterableOf(postOffice.getAllBindings())) { if (binding instanceof LocalQueueBinding && ((LocalQueueBinding) binding).getQueue().getUser().equals(SimpleString.toSimpleString(username))) { queuesForUser++; } @@ -1699,7 +1697,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { public long getTotalMessageCount() { long total = 0; - for (Binding binding : postOffice.getAllBindings().values()) { + for (Binding binding : iterableOf(postOffice.getAllBindings())) { if (binding.getType() == BindingType.LOCAL_QUEUE) { total += ((LocalQueueBinding) binding).getQueue().getMessageCount(); } @@ -1712,7 +1710,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { public long getTotalMessagesAdded() { long total = 0; - for (Binding binding : postOffice.getAllBindings().values()) { + for (Binding binding : iterableOf(postOffice.getAllBindings())) { if (binding.getType() == BindingType.LOCAL_QUEUE) { total += ((LocalQueueBinding) binding).getQueue().getMessagesAdded(); } @@ -1725,7 +1723,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { public long getTotalMessagesAcknowledged() { long total = 0; - for (Binding binding : postOffice.getAllBindings().values()) { + for (Binding binding : iterableOf(postOffice.getAllBindings())) { if (binding.getType() == BindingType.LOCAL_QUEUE) { total += ((LocalQueueBinding) binding).getQueue().getMessagesAcknowledged(); } @@ -1738,7 +1736,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { public long getTotalConsumerCount() { long total = 0; - for (Binding binding : postOffice.getAllBindings().values()) { + for (Binding binding : iterableOf(postOffice.getAllBindings())) { if (binding.getType() == BindingType.LOCAL_QUEUE) { total += ((LocalQueueBinding) binding).getQueue().getConsumerCount(); } @@ -4023,7 +4021,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { addressSettingsRepository.swap(configuration.getAddressesSettings().entrySet()); ActiveMQServerLogger.LOGGER.reloadingConfiguration("diverts"); - final Set divertsToRemove = postOffice.getAllBindings().values().stream() + final Set divertsToRemove = postOffice.getAllBindings() .filter(binding -> binding instanceof DivertBinding) .map(Binding::getUniqueName) .collect(Collectors.toSet()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 7e8881719b..9105fc2049 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -114,6 +114,8 @@ import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer; import org.jboss.logging.Logger; import org.jctools.queues.MpscUnboundedArrayQueue; +import static org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf; + /** * Implementation of a Queue *

@@ -3320,42 +3322,35 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { String targetNodeID = null; Binding targetBinding = null; - for (Map.Entry entry : postOffice.getAllBindings().entrySet()) { - Binding binding = entry.getValue(); + // we only care about the remote queue bindings + for (RemoteQueueBinding remoteQueueBinding : iterableOf(postOffice.getAllBindings() + .filter(RemoteQueueBinding.class::isInstance) + .map(RemoteQueueBinding.class::cast))) { + // does this remote queue binding point to the same queue as the message? + if (oldQueueID == remoteQueueBinding.getRemoteQueueID()) { + // get the name of this queue so we can find the corresponding remote queue binding pointing to the scale down target node + SimpleString oldQueueName = remoteQueueBinding.getRoutingName(); - // we only care about the remote queue bindings - if (binding instanceof RemoteQueueBinding) { - RemoteQueueBinding remoteQueueBinding = (RemoteQueueBinding) binding; + // parse the queue name of the remote queue binding to determine the node ID + String temp = remoteQueueBinding.getQueue().getName().toString(); + targetNodeID = temp.substring(temp.lastIndexOf(".") + 1); + logger.debug("Message formerly destined for " + oldQueueName + " with ID: " + oldQueueID + " on address " + copyMessage.getAddressSimpleString() + " on node " + targetNodeID); - // does this remote queue binding point to the same queue as the message? - if (oldQueueID == remoteQueueBinding.getRemoteQueueID()) { - // get the name of this queue so we can find the corresponding remote queue binding pointing to the scale down target node - SimpleString oldQueueName = remoteQueueBinding.getRoutingName(); - - // parse the queue name of the remote queue binding to determine the node ID - String temp = remoteQueueBinding.getQueue().getName().toString(); + // now that we have the name of the queue we need to look through all the bindings again to find the new remote queue binding + // again, we only care about the remote queue bindings + for (RemoteQueueBinding innerRemoteQueueBinding : iterableOf(postOffice.getAllBindings() + .filter(RemoteQueueBinding.class::isInstance) + .map(RemoteQueueBinding.class::cast))) { + temp = innerRemoteQueueBinding.getQueue().getName().toString(); targetNodeID = temp.substring(temp.lastIndexOf(".") + 1); - logger.debug("Message formerly destined for " + oldQueueName + " with ID: " + oldQueueID + " on address " + copyMessage.getAddressSimpleString() + " on node " + targetNodeID); - - // now that we have the name of the queue we need to look through all the bindings again to find the new remote queue binding - for (Map.Entry entry2 : postOffice.getAllBindings().entrySet()) { - binding = entry2.getValue(); - - // again, we only care about the remote queue bindings - if (binding instanceof RemoteQueueBinding) { - remoteQueueBinding = (RemoteQueueBinding) binding; - temp = remoteQueueBinding.getQueue().getName().toString(); - targetNodeID = temp.substring(temp.lastIndexOf(".") + 1); - if (oldQueueName.equals(remoteQueueBinding.getRoutingName()) && targetNodeID.equals(queueSuffix.toString())) { - targetBinding = remoteQueueBinding; - if (logger.isDebugEnabled()) { - logger.debug("Message now destined for " + remoteQueueBinding.getRoutingName() + " with ID: " + remoteQueueBinding.getRemoteQueueID() + " on address " + copyMessage.getAddress() + " on node " + targetNodeID); - } - break; - } else { - logger.debug("Failed to match: " + remoteQueueBinding); - } + if (oldQueueName.equals(innerRemoteQueueBinding.getRoutingName()) && targetNodeID.equals(queueSuffix.toString())) { + targetBinding = innerRemoteQueueBinding; + if (logger.isDebugEnabled()) { + logger.debug("Message now destined for " + innerRemoteQueueBinding.getRoutingName() + " with ID: " + innerRemoteQueueBinding.getRemoteQueueID() + " on address " + copyMessage.getAddress() + " on node " + targetNodeID); } + break; + } else { + logger.debug("Failed to match: " + innerRemoteQueueBinding); } } } diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/RegionProxy.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/RegionProxy.java index 0b97ffc69b..259da323ad 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/RegionProxy.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/RegionProxy.java @@ -16,6 +16,10 @@ */ package org.apache.activemq.broker.artemiswrapper; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.postoffice.QueueBinding; @@ -44,10 +48,6 @@ import org.apache.activemq.command.Response; import org.mockito.AdditionalAnswers; import org.mockito.Mockito; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - public class RegionProxy implements Region { private final ActiveMQServer server; private final RoutingType routingType; @@ -77,21 +77,21 @@ public class RegionProxy implements Region { @Override public Map getDestinationMap() { - return server.getPostOffice().getAllBindings().entrySet().stream() - .filter(e -> e.getValue() instanceof QueueBinding) + return server.getPostOffice().getAllBindings() + .filter(QueueBinding.class::isInstance) .filter(e -> { - final SimpleString address = ((QueueBinding) e.getValue()).getQueue().getAddress(); + final SimpleString address = ((QueueBinding) e).getQueue().getAddress(); return server.getAddressInfo(address).getRoutingType() == routingType; } ) .collect(Collectors.toMap( e -> { - final String uniqueName = e.getValue().getUniqueName().toString(); + final String uniqueName = e.getUniqueName().toString(); return new ActiveMQQueue(uniqueName); }, e -> { - final Queue queue = ((QueueBinding) e.getValue()).getQueue(); - final String address = e.getValue().getAddress().toString(); + final Queue queue = ((QueueBinding) e).getQueue(); + final String address = e.getAddress().toString(); return new DestinationProxy(queue, address, server); })); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessagesTestUtil.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessagesTestUtil.java index 379504610a..704cd44e2e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessagesTestUtil.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessagesTestUtil.java @@ -19,7 +19,6 @@ package org.apache.activemq.artemis.tests.integration.amqp.largemessages; import java.util.NoSuchElementException; -import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.MessageReference; @@ -33,11 +32,9 @@ public class AMQPLargeMessagesTestUtil { public static void validateAllTemporaryBuffers(ActiveMQServer server) { - for (Binding binding : server.getPostOffice().getAllBindings().values()) { - if (binding instanceof QueueBinding) { - validateTemporaryBuffers(((QueueBinding)binding).getQueue()); - } - } + server.getPostOffice().getAllBindings() + .filter(QueueBinding.class::isInstance) + .forEach(binding -> validateTemporaryBuffers(((QueueBinding) binding).getQueue())); } public static void validateTemporaryBuffers(Queue serverQueue) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTFQQNTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTFQQNTest.java index 5324590340..9133d5d157 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTFQQNTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTFQQNTest.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.tests.integration.mqtt.imported; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -39,9 +38,8 @@ public class MQTTFQQNTest extends MQTTTestSupport { try { subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE); - Map allBindings = server.getPostOffice().getAllBindings(); - assertEquals(1, allBindings.size()); - Binding b = allBindings.values().iterator().next(); + assertEquals(1, server.getPostOffice().getAllBindings().count()); + Binding b = server.getPostOffice().getAllBindings().iterator().next(); //check that query using bare queue name works as before QueueQueryResult result = server.queueQuery(b.getUniqueName()); assertTrue(result.isExists()); @@ -132,9 +130,8 @@ public class MQTTFQQNTest extends MQTTTestSupport { try { subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE); - Map allBindings = server.getPostOffice().getAllBindings(); - assertEquals(1, allBindings.size()); - Binding b = allBindings.values().iterator().next(); + assertEquals(1, server.getPostOffice().getAllBindings().count()); + Binding b = server.getPostOffice().getAllBindings().iterator().next(); //check ::queue QueueQueryResult result = server.queueQuery(new SimpleString("::" + b.getUniqueName())); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index e04c953d20..7187dbcccc 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -69,6 +69,8 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf; + /** * MQTT Test imported from ActiveMQ MQTT component. */ @@ -149,10 +151,8 @@ public class MQTTTest extends MQTTTestSupport { subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE); - for (Binding b : server.getPostOffice().getAllBindings().values()) { - if (b instanceof QueueBinding) { - Assert.assertFalse("Queue " + ((QueueBinding) b).getQueue().getName(), ((QueueBinding)b).getQueue().isDirectDeliver()); - } + for (Binding b : iterableOf(server.getPostOffice().getAllBindings().filter(QueueBinding.class::isInstance))) { + Assert.assertFalse("Queue " + ((QueueBinding) b).getQueue().getName(), ((QueueBinding)b).getQueue().isDirectDeliver()); } subscriptionProvider.disconnect(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPendingMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPendingMessageTest.java index 885b428fb1..db9be9e3ac 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPendingMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPendingMessageTest.java @@ -16,11 +16,6 @@ */ package org.apache.activemq.artemis.tests.integration.persistence.metrics; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.ToLongFunction; - import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Message; @@ -29,6 +24,10 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicSession; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.ToLongFunction; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; @@ -493,8 +492,7 @@ public class JournalPendingMessageTest extends AbstractPersistentStatTestSupport protected List getQueues(final String address) throws Exception { final List queues = new ArrayList<>(); - for (Binding binding : server.getPostOffice().getDirectBindings(SimpleString.toSimpleString(address)) - .getBindings()) { + for (Binding binding : server.getPostOffice().getDirectBindings(SimpleString.toSimpleString(address))) { if (binding.getType() == BindingType.LOCAL_QUEUE) { LocalQueueBinding queueBinding = (LocalQueueBinding) binding; queues.add(queueBinding.getQueue()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java index 9f8c47f98f..c8a0661f1a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java @@ -46,15 +46,17 @@ import org.apache.activemq.artemis.core.server.cluster.ClusterController; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; -import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.Wait; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import static org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf; + @RunWith(value = Parameterized.class) public class ScaleDownTest extends ClusterTestBase { @@ -243,13 +245,13 @@ public class ScaleDownTest extends ClusterTestBase { // find and pause the sf queue so no messages actually move from node 0 to node 1 String sfQueueName = null; - for (Map.Entry entry : servers[0].getPostOffice().getAllBindings().entrySet()) { - String temp = entry.getValue().getAddress().toString(); + for (Binding binding : iterableOf(servers[0].getPostOffice().getAllBindings())) { + String temp = binding.getAddress().toString(); if (temp.startsWith(servers[1].getInternalNamingPrefix() + "sf.") && temp.endsWith(servers[1].getNodeID().toString())) { // we found the sf queue for the other node // need to pause the sfQueue here - ((LocalQueueBinding) entry.getValue()).getQueue().pause(); + ((LocalQueueBinding) binding).getQueue().pause(); sfQueueName = temp; } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java index 9a2786cbc2..648f6d1c23 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java @@ -72,6 +72,8 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import static org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf; + @RunWith(Parameterized.class) public class StompTest extends StompTestBase { @@ -2026,10 +2028,8 @@ public class StompTest extends StompTestBase { conn.connect(defUser, defPass); subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO); - for (Binding b : server.getPostOffice().getAllBindings().values()) { - if (b instanceof QueueBinding) { - Assert.assertFalse("Queue " + ((QueueBinding) b).getQueue().getName(), ((QueueBinding)b).getQueue().isDirectDeliver()); - } + for (Binding b : iterableOf(server.getPostOffice().getAllBindings().filter(QueueBinding.class::isInstance))) { + Assert.assertFalse("Queue " + ((QueueBinding) b).getQueue().getName(), ((QueueBinding)b).getQueue().isDirectDeliver()); } // Send MQTT Message diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java index ecd02f5116..8375ae825c 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java @@ -151,21 +151,21 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase { assertEquals(1, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.>")).getBindings().size()); assertEquals(1, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.test")).getBindings().size()); - assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test")).getBindings().size()); - assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.>")).getBindings().size()); + assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test")).size()); + assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.>")).size()); //Remove the address ad.removeAddressInfo(SimpleString.toSimpleString("Topic1.test")); //should still have 1 address and binding assertEquals(1, ad.getAddresses().size()); - assertEquals(1, ad.getBindings().size()); + assertEquals(1, ad.getBindings().count()); ad.removeBinding(SimpleString.toSimpleString("one"), null); ad.removeAddressInfo(SimpleString.toSimpleString("Topic1.>")); assertEquals(0, ad.getAddresses().size()); - assertEquals(0, ad.getBindings().size()); + assertEquals(0, ad.getBindings().count()); } @Test @@ -189,12 +189,12 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase { assertEquals(1, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.test.test1")).getBindings().size()); assertEquals(1, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.test.test2")).getBindings().size()); - assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.>")).getBindings().size()); - assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test")).getBindings().size()); - assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test1")).getBindings().size()); - assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test2")).getBindings().size()); - assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic2.>")).getBindings().size()); - assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic2.test")).getBindings().size()); + assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.>")).size()); + assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test")).size()); + assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test1")).size()); + assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test2")).size()); + assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic2.>")).size()); + assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic2.test")).size()); } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java index 274c50dc4e..027061410f 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java @@ -16,14 +16,16 @@ */ package org.apache.activemq.artemis.tests.unit.core.server.impl.fakes; +import java.util.Collection; import java.util.EnumSet; import java.util.List; -import java.util.Map; import java.util.Set; +import java.util.stream.Stream; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; @@ -37,7 +39,6 @@ import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl; @@ -177,7 +178,7 @@ public class FakePostOffice implements PostOffice { } @Override - public Map getAllBindings() { + public Stream getAllBindings() { return null; } @@ -193,13 +194,13 @@ public class FakePostOffice implements PostOffice { } @Override - public Bindings getMatchingBindings(final SimpleString address) { + public Collection getMatchingBindings(final SimpleString address) { return null; } @Override - public Bindings getDirectBindings(final SimpleString address) { + public Collection getDirectBindings(final SimpleString address) { return null; }