ARTEMIS-2844 Improve binding query performance by reusing AddressImpl instances

This commit is contained in:
Francesco Nigro 2020-07-09 19:56:25 +02:00 committed by Clebert Suconic
parent 70068a0659
commit 60e25b763c
20 changed files with 189 additions and 174 deletions

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.collections;
import java.util.stream.Stream;
public final class IterableStream {
private IterableStream() {
}
public static <T> Iterable<T> iterableOf(Stream<T> stream) {
return stream::iterator;
}
}

View File

@ -2184,7 +2184,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
clearIO();
try {
for (Binding binding : postOffice.getMatchingBindings(SimpleString.toSimpleString(address)).getBindings()) {
for (Binding binding : postOffice.getMatchingBindings(SimpleString.toSimpleString(address))) {
if (binding instanceof LocalQueueBinding) {
Queue queue = ((LocalQueueBinding) binding).getQueue();
for (Consumer consumer : queue.getConsumers()) {

View File

@ -16,12 +16,13 @@
*/
package org.apache.activemq.artemis.core.postoffice;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.transaction.Transaction;
@ -45,9 +46,9 @@ public interface AddressManager {
Bindings getBindingsForRoutingAddress(SimpleString address) throws Exception;
Bindings getMatchingBindings(SimpleString address) throws Exception;
Collection<Binding> getMatchingBindings(SimpleString address) throws Exception;
Bindings getDirectBindings(SimpleString address) throws Exception;
Collection<Binding> getDirectBindings(SimpleString address) throws Exception;
SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception;
@ -57,7 +58,7 @@ public interface AddressManager {
Binding getBinding(SimpleString queueName);
Map<SimpleString, Binding> getBindings();
Stream<Binding> getBindings();
Set<SimpleString> getAddresses();

View File

@ -16,21 +16,22 @@
*/
package org.apache.activemq.artemis.core.postoffice;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.transaction.Transaction;
@ -135,11 +136,11 @@ public interface PostOffice extends ActiveMQComponent {
Binding getBinding(SimpleString uniqueName);
Bindings getMatchingBindings(SimpleString address) throws Exception;
Collection<Binding> getMatchingBindings(SimpleString address) throws Exception;
Bindings getDirectBindings(SimpleString address) throws Exception;
Collection<Binding> getDirectBindings(SimpleString address) throws Exception;
Map<SimpleString, Binding> getAllBindings();
Stream<Binding> getAllBindings();
SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception;

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.postoffice.impl;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -36,7 +37,7 @@ public class AddressImpl implements Address {
private final boolean containsWildCard;
private final List<Address> linkedAddresses = new ArrayList<>();
private List<Address> linkedAddresses = null;
private final WildcardConfiguration wildcardConfiguration;
@ -68,11 +69,14 @@ public class AddressImpl implements Address {
@Override
public List<Address> getLinkedAddresses() {
return linkedAddresses;
return linkedAddresses == null ? Collections.emptyList() : linkedAddresses;
}
@Override
public void addLinkedAddress(final Address address) {
if (linkedAddresses == null) {
linkedAddresses = new ArrayList<>(1);
}
if (!linkedAddresses.contains(address)) {
linkedAddresses.add(address);
}
@ -80,6 +84,9 @@ public class AddressImpl implements Address {
@Override
public void removeLinkedAddress(final Address actualAddress) {
if (linkedAddresses == null) {
return;
}
linkedAddresses.remove(actualAddress);
}

View File

@ -82,6 +82,7 @@ import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
@ -97,6 +98,9 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import static org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf;
/**
* This is the class that will make the routing to Queues and decide which consumer will get the messages
@ -769,18 +773,16 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
server.callBrokerAddressPlugins(plugin -> plugin.beforeRemoveAddress(address));
}
final Bindings bindingsForAddress = getDirectBindings(address);
final Collection<Binding> bindingsForAddress = getDirectBindings(address);
if (force) {
for (Binding binding : bindingsForAddress.getBindings()) {
for (Binding binding : bindingsForAddress) {
if (binding instanceof QueueBinding) {
((QueueBinding)binding).getQueue().deleteQueue(true);
}
}
} else {
if (bindingsForAddress.getBindings().size() > 0) {
throw ActiveMQMessageBundle.BUNDLE.addressHasBindings(address);
}
} else if (!bindingsForAddress.isEmpty()) {
throw ActiveMQMessageBundle.BUNDLE.addressHasBindings(address);
}
managementService.unregisterAddress(address);
final AddressInfo addressInfo = addressManager.removeAddressInfo(address);
@ -969,17 +971,17 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
@Override
public Bindings getMatchingBindings(final SimpleString address) throws Exception {
public Collection<Binding> getMatchingBindings(final SimpleString address) throws Exception {
return addressManager.getMatchingBindings(address);
}
@Override
public Bindings getDirectBindings(final SimpleString address) throws Exception {
public Collection<Binding> getDirectBindings(final SimpleString address) throws Exception {
return addressManager.getDirectBindings(address);
}
@Override
public Map<SimpleString, Binding> getAllBindings() {
public Stream<Binding> getAllBindings() {
return addressManager.getBindings();
}
@ -1731,7 +1733,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
public void run() {
// The reaper thread should be finished case the PostOffice is gone
// This is to avoid leaks on PostOffice between stops and starts
for (Queue queue : getLocalQueues()) {
for (Queue queue : iterableOf(getLocalQueues())) {
try {
queue.expireReferences();
} catch (Exception e) {
@ -1753,11 +1755,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override
public void run() {
for (Queue queue : getLocalQueues()) {
getLocalQueues().forEach(queue -> {
if (!queue.isInternalQueue() && QueueManagerImpl.isAutoDelete(queue) && QueueManagerImpl.consumerCountCheck(queue) && QueueManagerImpl.delayCheck(queue) && QueueManagerImpl.messageCountCheck(queue) && queueWasUsed(queue)) {
QueueManagerImpl.performAutoDeleteQueue(server, queue);
}
}
});
Set<SimpleString> addresses = addressManager.getAddresses();
@ -1796,19 +1798,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
}
private List<Queue> getLocalQueues() {
Map<SimpleString, Binding> nameMap = addressManager.getBindings();
List<Queue> queues = new ArrayList<>();
for (Binding binding : nameMap.values()) {
if (binding.getType() == BindingType.LOCAL_QUEUE) {
Queue queue = (Queue) binding.getBindable();
queues.add(queue);
}
}
return queues;
private Stream<Queue> getLocalQueues() {
return addressManager.getBindings()
.filter(binding -> binding.getType() == BindingType.LOCAL_QUEUE)
.map(binding -> (Queue) binding.getBindable());
}
public static final class AddOperation implements TransactionOperation {

View File

@ -16,15 +16,18 @@
*/
package org.apache.activemq.artemis.core.postoffice.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Stream;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@ -35,7 +38,6 @@ import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.BindingsFactory;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
@ -59,10 +61,7 @@ public class SimpleAddressManager implements AddressManager {
*/
protected final ConcurrentMap<SimpleString, Bindings> mappings = new ConcurrentHashMap<>();
/**
* {@code HashMap<QueueName, Binding>}
*/
private final ConcurrentMap<SimpleString, Binding> nameMap = new ConcurrentHashMap<>();
private final ConcurrentMap<SimpleString, Pair<Binding, Address>> nameMap = new ConcurrentHashMap<>();
private final BindingsFactory bindingsFactory;
@ -87,7 +86,8 @@ public class SimpleAddressManager implements AddressManager {
@Override
public boolean addBinding(final Binding binding) throws Exception {
if (nameMap.putIfAbsent(binding.getUniqueName(), binding) != null) {
final Pair<Binding, Address> bindingAddressPair = new Pair<>(binding, new AddressImpl(binding.getAddress(), wildcardConfiguration));
if (nameMap.putIfAbsent(binding.getUniqueName(), bindingAddressPair) != null) {
throw ActiveMQMessageBundle.BUNDLE.bindingAlreadyExists(binding);
}
@ -100,15 +100,15 @@ public class SimpleAddressManager implements AddressManager {
@Override
public Binding removeBinding(final SimpleString uniqueName, Transaction tx) throws Exception {
final Binding binding = nameMap.remove(uniqueName);
final Pair<Binding, Address> binding = nameMap.remove(uniqueName);
if (binding == null) {
return null;
}
removeBindingInternal(binding.getAddress(), uniqueName);
removeBindingInternal(binding.getA().getAddress(), uniqueName);
return binding;
return binding.getA();
}
@Override
@ -118,42 +118,40 @@ public class SimpleAddressManager implements AddressManager {
@Override
public Binding getBinding(final SimpleString bindableName) {
return nameMap.get(CompositeAddress.extractQueueName(bindableName));
final Pair<Binding, Address> bindingAddressPair = nameMap.get(CompositeAddress.extractQueueName(bindableName));
return bindingAddressPair == null ? null : bindingAddressPair.getA();
}
@Override
public Map<SimpleString, Binding> getBindings() {
return nameMap;
public Stream<Binding> getBindings() {
return nameMap.values().stream().map(pair -> pair.getA());
}
@Override
public Bindings getMatchingBindings(final SimpleString address) throws Exception {
public Collection<Binding> getMatchingBindings(final SimpleString address) throws Exception {
SimpleString realAddress = CompositeAddress.extractAddressName(address);
Address add = new AddressImpl(realAddress, wildcardConfiguration);
Bindings bindings = bindingsFactory.createBindings(realAddress);
for (Binding binding : nameMap.values()) {
Address addCheck = new AddressImpl(binding.getAddress(), wildcardConfiguration);
Collection<Binding> bindings = new ArrayList<>();
nameMap.forEach((bindingUniqueName, bindingAddressPair) -> {
final Address addCheck = bindingAddressPair.getB();
if (addCheck.matches(add)) {
bindings.addBinding(binding);
bindings.add(bindingAddressPair.getA());
}
}
});
return bindings;
}
@Override
public Bindings getDirectBindings(final SimpleString address) throws Exception {
public Collection<Binding> getDirectBindings(final SimpleString address) throws Exception {
SimpleString realAddress = CompositeAddress.extractAddressName(address);
Bindings bindings = bindingsFactory.createBindings(realAddress);
Collection<Binding> bindings = new ArrayList<>();
for (Binding binding : nameMap.values()) {
if (binding.getAddress().equals(realAddress)) {
bindings.addBinding(binding);
nameMap.forEach((bindingUniqueName, bindingAddressPair) -> {
if (bindingAddressPair.getA().getAddress().equals(realAddress)) {
bindings.add(bindingAddressPair.getA());
}
}
});
return bindings;
}

View File

@ -111,8 +111,6 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe
super.start();
server.getPostOffice()
.getAllBindings()
.values()
.stream()
.filter(b -> b instanceof QueueBinding || b instanceof DivertBinding)
.forEach(this::afterAddBinding);
}
@ -144,7 +142,7 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe
//if a new address is added we need to see if there are matching divert bindings
server.getPostOffice()
.getDirectBindings(addressInfo.getName())
.getBindings().stream().filter(binding -> binding instanceof DivertBinding)
.stream().filter(binding -> binding instanceof DivertBinding)
.forEach(this::afterAddBinding);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.federationBindingsLookupError(e, addressInfo.getName());

View File

@ -94,8 +94,6 @@ public class FederatedQueue extends FederatedAbstract implements ActiveMQServerC
super.start();
server.getPostOffice()
.getAllBindings()
.values()
.stream()
.filter(b -> b instanceof QueueBinding)
.map(b -> (QueueBinding) b)
.forEach(b -> conditionalCreateRemoteConsumer(b.getQueue()));

View File

@ -27,6 +27,7 @@ import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
@ -100,7 +101,6 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContex
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding;
@ -204,6 +204,8 @@ import org.apache.activemq.artemis.utils.critical.CriticalComponent;
import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf;
/**
* The ActiveMQ Artemis server implementation
*/
@ -910,19 +912,17 @@ public class ActiveMQServerImpl implements ActiveMQServer {
int defaultConsumersBeforeDispatch = addressSettings.getDefaultConsumersBeforeDispatch();
long defaultDelayBeforeDispatch = addressSettings.getDefaultDelayBeforeDispatch();
List<SimpleString> names = new ArrayList<>();
// make an exception for the management address (see HORNETQ-29)
ManagementService managementService = getManagementService();
if (managementService != null) {
if (realAddress.equals(managementService.getManagementAddress())) {
return new BindingQueryResult(true, null, names, autoCreateQeueus, autoCreateAddresses, defaultPurgeOnNoConsumers, defaultMaxConsumers, defaultExclusive, defaultLastValue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch);
return new BindingQueryResult(true, null, Collections.emptyList(), autoCreateQeueus, autoCreateAddresses, defaultPurgeOnNoConsumers, defaultMaxConsumers, defaultExclusive, defaultLastValue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch);
}
}
Bindings bindings = getPostOffice().getMatchingBindings(realAddress);
List<SimpleString> names = new ArrayList<>();
for (Binding binding : bindings.getBindings()) {
for (Binding binding : getPostOffice().getMatchingBindings(realAddress)) {
if (binding.getType() == BindingType.LOCAL_QUEUE || binding.getType() == BindingType.REMOTE_QUEUE) {
SimpleString name;
if (!newFQQN && CompositeAddress.isFullyQualified(address.toString())) {
@ -1594,11 +1594,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
public int getQueueCountForUser(String username) throws Exception {
Map<SimpleString, Binding> bindings = postOffice.getAllBindings();
int queuesForUser = 0;
for (Binding binding : bindings.values()) {
for (Binding binding : iterableOf(postOffice.getAllBindings())) {
if (binding instanceof LocalQueueBinding && ((LocalQueueBinding) binding).getQueue().getUser().equals(SimpleString.toSimpleString(username))) {
queuesForUser++;
}
@ -1699,7 +1697,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
public long getTotalMessageCount() {
long total = 0;
for (Binding binding : postOffice.getAllBindings().values()) {
for (Binding binding : iterableOf(postOffice.getAllBindings())) {
if (binding.getType() == BindingType.LOCAL_QUEUE) {
total += ((LocalQueueBinding) binding).getQueue().getMessageCount();
}
@ -1712,7 +1710,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
public long getTotalMessagesAdded() {
long total = 0;
for (Binding binding : postOffice.getAllBindings().values()) {
for (Binding binding : iterableOf(postOffice.getAllBindings())) {
if (binding.getType() == BindingType.LOCAL_QUEUE) {
total += ((LocalQueueBinding) binding).getQueue().getMessagesAdded();
}
@ -1725,7 +1723,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
public long getTotalMessagesAcknowledged() {
long total = 0;
for (Binding binding : postOffice.getAllBindings().values()) {
for (Binding binding : iterableOf(postOffice.getAllBindings())) {
if (binding.getType() == BindingType.LOCAL_QUEUE) {
total += ((LocalQueueBinding) binding).getQueue().getMessagesAcknowledged();
}
@ -1738,7 +1736,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
public long getTotalConsumerCount() {
long total = 0;
for (Binding binding : postOffice.getAllBindings().values()) {
for (Binding binding : iterableOf(postOffice.getAllBindings())) {
if (binding.getType() == BindingType.LOCAL_QUEUE) {
total += ((LocalQueueBinding) binding).getQueue().getConsumerCount();
}
@ -4023,7 +4021,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
addressSettingsRepository.swap(configuration.getAddressesSettings().entrySet());
ActiveMQServerLogger.LOGGER.reloadingConfiguration("diverts");
final Set<SimpleString> divertsToRemove = postOffice.getAllBindings().values().stream()
final Set<SimpleString> divertsToRemove = postOffice.getAllBindings()
.filter(binding -> binding instanceof DivertBinding)
.map(Binding::getUniqueName)
.collect(Collectors.toSet());

View File

@ -114,6 +114,8 @@ import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
import org.jboss.logging.Logger;
import org.jctools.queues.MpscUnboundedArrayQueue;
import static org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf;
/**
* Implementation of a Queue
* <p>
@ -3320,42 +3322,35 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
String targetNodeID = null;
Binding targetBinding = null;
for (Map.Entry<SimpleString, Binding> entry : postOffice.getAllBindings().entrySet()) {
Binding binding = entry.getValue();
// we only care about the remote queue bindings
for (RemoteQueueBinding remoteQueueBinding : iterableOf(postOffice.getAllBindings()
.filter(RemoteQueueBinding.class::isInstance)
.map(RemoteQueueBinding.class::cast))) {
// does this remote queue binding point to the same queue as the message?
if (oldQueueID == remoteQueueBinding.getRemoteQueueID()) {
// get the name of this queue so we can find the corresponding remote queue binding pointing to the scale down target node
SimpleString oldQueueName = remoteQueueBinding.getRoutingName();
// we only care about the remote queue bindings
if (binding instanceof RemoteQueueBinding) {
RemoteQueueBinding remoteQueueBinding = (RemoteQueueBinding) binding;
// parse the queue name of the remote queue binding to determine the node ID
String temp = remoteQueueBinding.getQueue().getName().toString();
targetNodeID = temp.substring(temp.lastIndexOf(".") + 1);
logger.debug("Message formerly destined for " + oldQueueName + " with ID: " + oldQueueID + " on address " + copyMessage.getAddressSimpleString() + " on node " + targetNodeID);
// does this remote queue binding point to the same queue as the message?
if (oldQueueID == remoteQueueBinding.getRemoteQueueID()) {
// get the name of this queue so we can find the corresponding remote queue binding pointing to the scale down target node
SimpleString oldQueueName = remoteQueueBinding.getRoutingName();
// parse the queue name of the remote queue binding to determine the node ID
String temp = remoteQueueBinding.getQueue().getName().toString();
// now that we have the name of the queue we need to look through all the bindings again to find the new remote queue binding
// again, we only care about the remote queue bindings
for (RemoteQueueBinding innerRemoteQueueBinding : iterableOf(postOffice.getAllBindings()
.filter(RemoteQueueBinding.class::isInstance)
.map(RemoteQueueBinding.class::cast))) {
temp = innerRemoteQueueBinding.getQueue().getName().toString();
targetNodeID = temp.substring(temp.lastIndexOf(".") + 1);
logger.debug("Message formerly destined for " + oldQueueName + " with ID: " + oldQueueID + " on address " + copyMessage.getAddressSimpleString() + " on node " + targetNodeID);
// now that we have the name of the queue we need to look through all the bindings again to find the new remote queue binding
for (Map.Entry<SimpleString, Binding> entry2 : postOffice.getAllBindings().entrySet()) {
binding = entry2.getValue();
// again, we only care about the remote queue bindings
if (binding instanceof RemoteQueueBinding) {
remoteQueueBinding = (RemoteQueueBinding) binding;
temp = remoteQueueBinding.getQueue().getName().toString();
targetNodeID = temp.substring(temp.lastIndexOf(".") + 1);
if (oldQueueName.equals(remoteQueueBinding.getRoutingName()) && targetNodeID.equals(queueSuffix.toString())) {
targetBinding = remoteQueueBinding;
if (logger.isDebugEnabled()) {
logger.debug("Message now destined for " + remoteQueueBinding.getRoutingName() + " with ID: " + remoteQueueBinding.getRemoteQueueID() + " on address " + copyMessage.getAddress() + " on node " + targetNodeID);
}
break;
} else {
logger.debug("Failed to match: " + remoteQueueBinding);
}
if (oldQueueName.equals(innerRemoteQueueBinding.getRoutingName()) && targetNodeID.equals(queueSuffix.toString())) {
targetBinding = innerRemoteQueueBinding;
if (logger.isDebugEnabled()) {
logger.debug("Message now destined for " + innerRemoteQueueBinding.getRoutingName() + " with ID: " + innerRemoteQueueBinding.getRemoteQueueID() + " on address " + copyMessage.getAddress() + " on node " + targetNodeID);
}
break;
} else {
logger.debug("Failed to match: " + innerRemoteQueueBinding);
}
}
}

View File

@ -16,6 +16,10 @@
*/
package org.apache.activemq.broker.artemiswrapper;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
@ -44,10 +48,6 @@ import org.apache.activemq.command.Response;
import org.mockito.AdditionalAnswers;
import org.mockito.Mockito;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public class RegionProxy implements Region {
private final ActiveMQServer server;
private final RoutingType routingType;
@ -77,21 +77,21 @@ public class RegionProxy implements Region {
@Override
public Map<ActiveMQDestination, Destination> getDestinationMap() {
return server.getPostOffice().getAllBindings().entrySet().stream()
.filter(e -> e.getValue() instanceof QueueBinding)
return server.getPostOffice().getAllBindings()
.filter(QueueBinding.class::isInstance)
.filter(e -> {
final SimpleString address = ((QueueBinding) e.getValue()).getQueue().getAddress();
final SimpleString address = ((QueueBinding) e).getQueue().getAddress();
return server.getAddressInfo(address).getRoutingType() == routingType;
}
)
.collect(Collectors.toMap(
e -> {
final String uniqueName = e.getValue().getUniqueName().toString();
final String uniqueName = e.getUniqueName().toString();
return new ActiveMQQueue(uniqueName);
},
e -> {
final Queue queue = ((QueueBinding) e.getValue()).getQueue();
final String address = e.getValue().getAddress().toString();
final Queue queue = ((QueueBinding) e).getQueue();
final String address = e.getAddress().toString();
return new DestinationProxy(queue, address, server);
}));
}

View File

@ -19,7 +19,6 @@ package org.apache.activemq.artemis.tests.integration.amqp.largemessages;
import java.util.NoSuchElementException;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
@ -33,11 +32,9 @@ public class AMQPLargeMessagesTestUtil {
public static void validateAllTemporaryBuffers(ActiveMQServer server) {
for (Binding binding : server.getPostOffice().getAllBindings().values()) {
if (binding instanceof QueueBinding) {
validateTemporaryBuffers(((QueueBinding)binding).getQueue());
}
}
server.getPostOffice().getAllBindings()
.filter(QueueBinding.class::isInstance)
.forEach(binding -> validateTemporaryBuffers(((QueueBinding) binding).getQueue()));
}
public static void validateTemporaryBuffers(Queue serverQueue) {

View File

@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.tests.integration.mqtt.imported;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -39,9 +38,8 @@ public class MQTTFQQNTest extends MQTTTestSupport {
try {
subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE);
Map<SimpleString, Binding> allBindings = server.getPostOffice().getAllBindings();
assertEquals(1, allBindings.size());
Binding b = allBindings.values().iterator().next();
assertEquals(1, server.getPostOffice().getAllBindings().count());
Binding b = server.getPostOffice().getAllBindings().iterator().next();
//check that query using bare queue name works as before
QueueQueryResult result = server.queueQuery(b.getUniqueName());
assertTrue(result.isExists());
@ -132,9 +130,8 @@ public class MQTTFQQNTest extends MQTTTestSupport {
try {
subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE);
Map<SimpleString, Binding> allBindings = server.getPostOffice().getAllBindings();
assertEquals(1, allBindings.size());
Binding b = allBindings.values().iterator().next();
assertEquals(1, server.getPostOffice().getAllBindings().count());
Binding b = server.getPostOffice().getAllBindings().iterator().next();
//check ::queue
QueueQueryResult result = server.queueQuery(new SimpleString("::" + b.getUniqueName()));

View File

@ -69,6 +69,8 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf;
/**
* MQTT Test imported from ActiveMQ MQTT component.
*/
@ -149,10 +151,8 @@ public class MQTTTest extends MQTTTestSupport {
subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE);
for (Binding b : server.getPostOffice().getAllBindings().values()) {
if (b instanceof QueueBinding) {
Assert.assertFalse("Queue " + ((QueueBinding) b).getQueue().getName(), ((QueueBinding)b).getQueue().isDirectDeliver());
}
for (Binding b : iterableOf(server.getPostOffice().getAllBindings().filter(QueueBinding.class::isInstance))) {
Assert.assertFalse("Queue " + ((QueueBinding) b).getQueue().getName(), ((QueueBinding)b).getQueue().isDirectDeliver());
}
subscriptionProvider.disconnect();

View File

@ -16,11 +16,6 @@
*/
package org.apache.activemq.artemis.tests.integration.persistence.metrics;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.ToLongFunction;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Message;
@ -29,6 +24,10 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSession;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.ToLongFunction;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
@ -493,8 +492,7 @@ public class JournalPendingMessageTest extends AbstractPersistentStatTestSupport
protected List<Queue> getQueues(final String address) throws Exception {
final List<Queue> queues = new ArrayList<>();
for (Binding binding : server.getPostOffice().getDirectBindings(SimpleString.toSimpleString(address))
.getBindings()) {
for (Binding binding : server.getPostOffice().getDirectBindings(SimpleString.toSimpleString(address))) {
if (binding.getType() == BindingType.LOCAL_QUEUE) {
LocalQueueBinding queueBinding = (LocalQueueBinding) binding;
queues.add(queueBinding.getQueue());

View File

@ -46,15 +46,17 @@ import org.apache.activemq.artemis.core.server.cluster.ClusterController;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf;
@RunWith(value = Parameterized.class)
public class ScaleDownTest extends ClusterTestBase {
@ -243,13 +245,13 @@ public class ScaleDownTest extends ClusterTestBase {
// find and pause the sf queue so no messages actually move from node 0 to node 1
String sfQueueName = null;
for (Map.Entry<SimpleString, Binding> entry : servers[0].getPostOffice().getAllBindings().entrySet()) {
String temp = entry.getValue().getAddress().toString();
for (Binding binding : iterableOf(servers[0].getPostOffice().getAllBindings())) {
String temp = binding.getAddress().toString();
if (temp.startsWith(servers[1].getInternalNamingPrefix() + "sf.") && temp.endsWith(servers[1].getNodeID().toString())) {
// we found the sf queue for the other node
// need to pause the sfQueue here
((LocalQueueBinding) entry.getValue()).getQueue().pause();
((LocalQueueBinding) binding).getQueue().pause();
sfQueueName = temp;
}
}

View File

@ -72,6 +72,8 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf;
@RunWith(Parameterized.class)
public class StompTest extends StompTestBase {
@ -2026,10 +2028,8 @@ public class StompTest extends StompTestBase {
conn.connect(defUser, defPass);
subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO);
for (Binding b : server.getPostOffice().getAllBindings().values()) {
if (b instanceof QueueBinding) {
Assert.assertFalse("Queue " + ((QueueBinding) b).getQueue().getName(), ((QueueBinding)b).getQueue().isDirectDeliver());
}
for (Binding b : iterableOf(server.getPostOffice().getAllBindings().filter(QueueBinding.class::isInstance))) {
Assert.assertFalse("Queue " + ((QueueBinding) b).getQueue().getName(), ((QueueBinding)b).getQueue().isDirectDeliver());
}
// Send MQTT Message

View File

@ -151,21 +151,21 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
assertEquals(1, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.>")).getBindings().size());
assertEquals(1, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.test")).getBindings().size());
assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test")).getBindings().size());
assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.>")).getBindings().size());
assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test")).size());
assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.>")).size());
//Remove the address
ad.removeAddressInfo(SimpleString.toSimpleString("Topic1.test"));
//should still have 1 address and binding
assertEquals(1, ad.getAddresses().size());
assertEquals(1, ad.getBindings().size());
assertEquals(1, ad.getBindings().count());
ad.removeBinding(SimpleString.toSimpleString("one"), null);
ad.removeAddressInfo(SimpleString.toSimpleString("Topic1.>"));
assertEquals(0, ad.getAddresses().size());
assertEquals(0, ad.getBindings().size());
assertEquals(0, ad.getBindings().count());
}
@Test
@ -189,12 +189,12 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
assertEquals(1, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.test.test1")).getBindings().size());
assertEquals(1, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.test.test2")).getBindings().size());
assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.>")).getBindings().size());
assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test")).getBindings().size());
assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test1")).getBindings().size());
assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test2")).getBindings().size());
assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic2.>")).getBindings().size());
assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic2.test")).getBindings().size());
assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.>")).size());
assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test")).size());
assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test1")).size());
assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test2")).size());
assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic2.>")).size());
assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic2.test")).size());
}

View File

@ -16,14 +16,16 @@
*/
package org.apache.activemq.artemis.tests.unit.core.server.impl.fakes;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
@ -37,7 +39,6 @@ import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
@ -177,7 +178,7 @@ public class FakePostOffice implements PostOffice {
}
@Override
public Map<SimpleString, Binding> getAllBindings() {
public Stream<Binding> getAllBindings() {
return null;
}
@ -193,13 +194,13 @@ public class FakePostOffice implements PostOffice {
}
@Override
public Bindings getMatchingBindings(final SimpleString address) {
public Collection<Binding> getMatchingBindings(final SimpleString address) {
return null;
}
@Override
public Bindings getDirectBindings(final SimpleString address) {
public Collection<Binding> getDirectBindings(final SimpleString address) {
return null;
}