This commit is contained in:
Clebert Suconic 2020-11-17 14:39:17 -05:00
commit b67403e88a
14 changed files with 572 additions and 158 deletions

View File

@ -20,7 +20,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
public class CompositeAddress {
public static String SEPARATOR = "::";
public static final String SEPARATOR = "::";
public static String toFullyQualified(String address, String qName) {
return toFullyQualified(SimpleString.toSimpleString(address), SimpleString.toSimpleString(qName)).toString();

View File

@ -16,7 +16,7 @@
*/
package org.apache.activemq.artemis.core.postoffice;
import java.util.List;
import java.util.Collection;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -31,7 +31,7 @@ public interface Address {
boolean containsWildCard();
List<Address> getLinkedAddresses();
Collection<Address> getLinkedAddresses();
void addLinkedAddress(Address address);

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.postoffice;
import java.util.Collection;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
@ -33,7 +34,7 @@ public interface Bindings extends UnproposalListener {
void addBinding(Binding binding);
void removeBinding(Binding binding);
Binding removeBindingByUniqueName(SimpleString uniqueName);
void setMessageLoadBalancingType(MessageLoadBalancingType messageLoadBalancingType);

View File

@ -16,12 +16,16 @@
*/
package org.apache.activemq.artemis.core.postoffice.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.postoffice.Address;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.jctools.maps.NonBlockingHashSet;
/**
* Splits an address string into its hierarchical parts using {@link WildcardConfiguration#getDelimiter()} as delimiter.
@ -36,7 +40,7 @@ public class AddressImpl implements Address {
private final boolean containsWildCard;
private final List<Address> linkedAddresses = new ArrayList<>();
private Set<Address> linkedAddresses = null;
private final WildcardConfiguration wildcardConfiguration;
@ -67,17 +71,27 @@ public class AddressImpl implements Address {
}
@Override
public List<Address> getLinkedAddresses() {
public Collection<Address> getLinkedAddresses() {
final Collection<Address> linkedAddresses = this.linkedAddresses;
if (linkedAddresses == null) {
return Collections.emptySet();
}
return linkedAddresses;
}
@Override
public void addLinkedAddress(final Address address) {
if (linkedAddresses == null) {
linkedAddresses = PlatformDependent.hasUnsafe() ? new NonBlockingHashSet<>() : new ConcurrentHashSet<>();
}
linkedAddresses.add(address);
}
@Override
public void removeLinkedAddress(final Address actualAddress) {
if (linkedAddresses == null) {
return;
}
linkedAddresses.remove(actualAddress);
}

View File

@ -149,7 +149,6 @@ public final class BindingsImpl implements Bindings {
} finally {
updated();
}
}
@Override
@ -162,7 +161,11 @@ public final class BindingsImpl implements Bindings {
}
@Override
public void removeBinding(final Binding binding) {
public Binding removeBindingByUniqueName(final SimpleString bindingUniqueName) {
final Binding binding = bindingsNameMap.remove(bindingUniqueName);
if (binding == null) {
return null;
}
try {
if (binding.isExclusive()) {
exclusiveBindings.remove(binding);
@ -181,11 +184,12 @@ public final class BindingsImpl implements Bindings {
}
bindingsIdMap.remove(binding.getID());
bindingsNameMap.remove(binding.getUniqueName());
assert !bindingsNameMap.containsKey(binding.getUniqueName());
if (logger.isTraceEnabled()) {
logger.trace("Removing binding " + binding + " from " + this + " bindingTable: " + debugBindings());
}
return binding;
} finally {
updated();
}

View File

@ -208,31 +208,37 @@ public class SimpleAddressManager implements AddressManager {
Bindings bindings = mappings.get(realAddress);
if (bindings != null) {
removeMapping(bindableName, bindings);
final SimpleString bindableQueueName = CompositeAddress.extractQueueName(bindableName);
final Binding binding = bindings.removeBindingByUniqueName(bindableQueueName);
if (binding == null) {
throw new IllegalStateException("Cannot find binding " + bindableName);
}
if (bindings.getBindings().isEmpty()) {
mappings.remove(realAddress);
}
}
}
protected Binding removeMapping(final SimpleString bindableName, final Bindings bindings) {
Binding theBinding = null;
protected void addMappingsInternal(final SimpleString address,
final Collection<Binding> newBindings) throws Exception {
if (newBindings.isEmpty()) {
return;
}
SimpleString realAddress = CompositeAddress.extractAddressName(address);
Bindings bindings = mappings.get(realAddress);
for (Binding binding : bindings.getBindings()) {
if (binding.getUniqueName().equals(CompositeAddress.extractQueueName(bindableName))) {
theBinding = binding;
break;
if (bindings == null) {
bindings = bindingsFactory.createBindings(realAddress);
final Bindings prevBindings = mappings.putIfAbsent(realAddress, bindings);
if (prevBindings != null) {
bindings = prevBindings;
}
}
if (theBinding == null) {
throw new IllegalStateException("Cannot find binding " + bindableName);
for (Binding binding : newBindings) {
bindings.addBinding(binding);
}
bindings.removeBinding(theBinding);
return theBinding;
}
protected boolean addMappingInternal(final SimpleString address, final Binding binding) throws Exception {

View File

@ -16,6 +16,9 @@
*/
package org.apache.activemq.artemis.core.postoffice.impl;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@ -28,11 +31,6 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
import org.apache.activemq.artemis.core.transaction.Transaction;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* extends the simple manager to allow wildcard addresses to be used.
*/
@ -65,20 +63,22 @@ public class WildcardAddressManager extends SimpleAddressManager {
// this should only happen if we're routing to an address that has no mappings when we're running checkAllowable
if (bindings == null && !wildCardAddresses.isEmpty()) {
Address add = addAndUpdateAddressMap(address);
Address add = addAndUpdateAddressMap(address, true);
if (!add.containsWildCard()) {
for (Address destAdd : add.getLinkedAddresses()) {
Bindings b = super.getBindingsForRoutingAddress(destAdd.getAddress());
if (b != null) {
Collection<Binding> theBindings = b.getBindings();
for (Binding theBinding : theBindings) {
super.addMappingInternal(address, theBinding);
super.addMappingsInternal(address, b.getBindings());
if (bindings == null) {
bindings = super.getBindingsForRoutingAddress(address);
}
super.getBindingsForRoutingAddress(address).setMessageLoadBalancingType(b.getMessageLoadBalancingType());
bindings.setMessageLoadBalancingType(b.getMessageLoadBalancingType());
}
}
}
bindings = super.getBindingsForRoutingAddress(address);
if (bindings == null) {
bindings = super.getBindingsForRoutingAddress(address);
}
}
return bindings;
}
@ -94,7 +94,7 @@ public class WildcardAddressManager extends SimpleAddressManager {
public boolean addBinding(final Binding binding) throws Exception {
boolean exists = super.addBinding(binding);
if (!exists) {
Address add = addAndUpdateAddressMap(binding.getAddress());
Address add = addAndUpdateAddressMap(binding.getAddress(), false);
if (add.containsWildCard()) {
for (Address destAdd : add.getLinkedAddresses()) {
super.addMappingInternal(destAdd.getAddress(), binding);
@ -103,19 +103,17 @@ public class WildcardAddressManager extends SimpleAddressManager {
for (Address destAdd : add.getLinkedAddresses()) {
Bindings bindings = super.getBindingsForRoutingAddress(destAdd.getAddress());
if (bindings != null) {
for (Binding b : bindings.getBindings()) {
super.addMappingInternal(binding.getAddress(), b);
}
super.addMappingsInternal(binding.getAddress(), bindings.getBindings());
}
}
}
}
return exists;
return !exists;
}
@Override
public void updateMessageLoadBalancingTypeForAddress(SimpleString address, MessageLoadBalancingType messageLoadBalancingType) throws Exception {
Address add = addAndUpdateAddressMap(address);
Address add = addAndUpdateAddressMap(address, true);
Bindings bindingsForRoutingAddress = super.getBindingsForRoutingAddress(address);
if (bindingsForRoutingAddress != null) {
bindingsForRoutingAddress.setMessageLoadBalancingType(messageLoadBalancingType);
@ -142,13 +140,19 @@ public class WildcardAddressManager extends SimpleAddressManager {
public Binding removeBinding(final SimpleString uniqueName, Transaction tx) throws Exception {
Binding binding = super.removeBinding(uniqueName, tx);
if (binding != null) {
Address add = getAddress(binding.getAddress());
if (add.containsWildCard()) {
for (Address theAddress : add.getLinkedAddresses()) {
super.removeBindingInternal(theAddress.getAddress(), uniqueName);
final SimpleString bindingAddress = binding.getAddress();
final boolean containsWildcard = bindingAddress.containsEitherOf(wildcardConfiguration.getAnyWords(), wildcardConfiguration.getSingleWord());
Address address = containsWildcard ? wildCardAddresses.get(bindingAddress) : addresses.get(bindingAddress);
if (address == null) {
address = new AddressImpl(bindingAddress, wildcardConfiguration);
} else {
if (containsWildcard) {
for (Address linkedAddress : address.getLinkedAddresses()) {
super.removeBindingInternal(linkedAddress.getAddress(), uniqueName);
}
}
}
removeAndUpdateAddressMap(add);
removeAndUpdateAddressMap(address);
}
return binding;
}
@ -171,31 +175,21 @@ public class WildcardAddressManager extends SimpleAddressManager {
wildCardAddresses.clear();
}
private Address getAddress(final SimpleString address) {
Address add = new AddressImpl(address, wildcardConfiguration);
Address actualAddress;
if (add.containsWildCard()) {
actualAddress = wildCardAddresses.get(address);
} else {
actualAddress = addresses.get(address);
}
return actualAddress != null ? actualAddress : add;
}
private synchronized Address addAndUpdateAddressMap(final SimpleString address) {
Address actualAddress;
final boolean containsWildcard = address.containsEitherOf(wildcardConfiguration.getAnyWords(), wildcardConfiguration.getSingleWord());
if (containsWildcard) {
actualAddress = wildCardAddresses.get(address);
} else {
actualAddress = addresses.get(address);
private Address addAndUpdateAddressMap(final SimpleString address, boolean getBiased) {
final boolean containsWildCard = address.containsEitherOf(wildcardConfiguration.getAnyWords(), wildcardConfiguration.getSingleWord());
final Map<SimpleString, Address> addressMap = containsWildCard ? wildCardAddresses : addresses;
Address actualAddress = null;
if (getBiased) {
// CHM::get doesn't need to synchronize anything, so it's to be preferred for getBiased cases
actualAddress = addressMap.get(address);
}
if (actualAddress == null) {
actualAddress = new AddressImpl(address, wildcardConfiguration);
addAddress(address, actualAddress);
if (containsWildcard) {
for (Address destAdd : addresses.values()) {
actualAddress = addressMap.computeIfAbsent(address, addressKey -> new AddressImpl(addressKey, wildcardConfiguration));
}
assert actualAddress.containsWildCard() == containsWildCard;
synchronized (this) {
if (containsWildCard) {
for (Address destAdd : this.addresses.values()) {
if (destAdd.matches(actualAddress)) {
destAdd.addLinkedAddress(actualAddress);
actualAddress.addLinkedAddress(destAdd);
@ -209,31 +203,24 @@ public class WildcardAddressManager extends SimpleAddressManager {
}
}
}
}
return actualAddress;
}
private void addAddress(final SimpleString address, final Address actualAddress) {
if (actualAddress.containsWildCard()) {
wildCardAddresses.put(address, actualAddress);
} else {
addresses.put(address, actualAddress);
return actualAddress;
}
}
private synchronized void removeAndUpdateAddressMap(final Address address) throws Exception {
private void removeAndUpdateAddressMap(final Address address) throws Exception {
// we only remove if there are no bindings left
Bindings bindings = super.getBindingsForRoutingAddress(address.getAddress());
if (bindings == null || bindings.getBindings().size() == 0) {
List<Address> addresses = address.getLinkedAddresses();
for (Address address1 : addresses) {
address1.removeLinkedAddress(address);
Bindings linkedBindings = super.getBindingsForRoutingAddress(address1.getAddress());
if (linkedBindings == null || linkedBindings.getBindings().size() == 0) {
removeAddress(address1);
if (bindings == null || bindings.getBindings().isEmpty()) {
synchronized (this) {
for (Address address1 : address.getLinkedAddresses()) {
address1.removeLinkedAddress(address);
Bindings linkedBindings = super.getBindingsForRoutingAddress(address1.getAddress());
if (linkedBindings == null || linkedBindings.getBindings().size() == 0) {
removeAddress(address1);
}
}
removeAddress(address);
}
removeAddress(address);
}
}

View File

@ -0,0 +1,41 @@
Apache Artemix JMH Benchmarks
-------
This module contains optional [JMH](http://openjdk.java.net/projects/code-tools/jmh/) performance tests.
Note that this module is an optional part of the overall project build and does not deploy anything, due to its use
of JMH which is not permissively licensed. The module must be built directly.
Building the benchmarks
-------
The benchmarks are maven built and involve some code generation for the JMH part. As such it is required that you
rebuild upon changing the code.
mvn clean install
Running the benchmarks: General
-------
It is recommended that you consider some basic benchmarking practices before running benchmarks:
1. Use a quiet machine with enough CPUs to run the number of threads you mean to run.
2. Set the CPU freq to avoid variance due to turbo boost/heating.
3. Use an OS tool such as taskset to pin the threads in the topology you mean to measure.
Running the JMH Benchmarks
-----
To run all JMH benchmarks:
java -jar target/benchmark.jar
To list available benchmarks:
java -jar target/benchmark.jar -l
Some JMH help:
java -jar target/benchmark.jar -h
Example
-----
To run a benchmark on a single thread (tg 1) with gc profiling use:
java -jar target/benchmark.jar -prof gc -tg 1

View File

@ -0,0 +1,99 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<groupId>org.apache.activemq.tests</groupId>
<artifactId>artemis-tests-pom</artifactId>
<version>2.17.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>performance-jmh</artifactId>
<packaging>jar</packaging>
<name>ActiveMQ Artemis JMH Performance Tests</name>
<properties>
<jmh.version>1.26</jmh.version>
</properties>
<dependencies>
<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-core-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>${jmh.version}</version>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>${jmh.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>benchmark</finalName>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.openjdk.jmh.Main</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<!-- Don't deploy artifacts for this module. It has non-permissive
dependencies and is only optionally used for local testing. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,247 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.performance.jmh;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.BindingsFactory;
import org.apache.activemq.artemis.core.postoffice.impl.BindingsImpl;
import org.apache.activemq.artemis.core.postoffice.impl.WildcardAddressManager;
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Group;
import org.openjdk.jmh.annotations.GroupThreads;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
@State(Scope.Benchmark)
@Fork(2)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 8, time = 1)
public class WildcardAddressManagerPerfTest {
private static class BindingFactoryFake implements BindingsFactory {
@Override
public Bindings createBindings(SimpleString address) throws Exception {
return new BindingsImpl(address, null);
}
}
private static class BindingFake implements Binding {
final SimpleString address;
final SimpleString id;
final Long idl;
BindingFake(SimpleString addressParameter, SimpleString id, long idl) {
this.address = addressParameter;
this.id = id;
this.idl = idl;
}
@Override
public void unproposed(SimpleString groupID) {
}
@Override
public SimpleString getAddress() {
return address;
}
@Override
public Bindable getBindable() {
return null;
}
@Override
public BindingType getType() {
return BindingType.LOCAL_QUEUE;
}
@Override
public SimpleString getUniqueName() {
return id;
}
@Override
public SimpleString getRoutingName() {
return id;
}
@Override
public SimpleString getClusterName() {
return null;
}
@Override
public Filter getFilter() {
return null;
}
@Override
public boolean isHighAcceptPriority(Message message) {
return false;
}
@Override
public boolean isExclusive() {
return false;
}
@Override
public Long getID() {
return idl;
}
@Override
public int getDistance() {
return 0;
}
@Override
public void route(Message message, RoutingContext context) throws Exception {
}
@Override
public void close() throws Exception {
}
@Override
public String toManagementString() {
return "FakeBiding Address=" + this.address;
}
@Override
public boolean isConnected() {
return true;
}
@Override
public void routeWithAck(Message message, RoutingContext context) {
}
}
public WildcardAddressManager addressManager;
@Param({"2", "8", "10"})
int topicsLog2;
int topics;
AtomicLong topicCounter;
private static final WildcardConfiguration WILDCARD_CONFIGURATION;
SimpleString[] addresses;
static {
WILDCARD_CONFIGURATION = new WildcardConfiguration();
WILDCARD_CONFIGURATION.setAnyWords('>');
}
private static final SimpleString WILDCARD = SimpleString.toSimpleString("Topic1.>");
@Setup
public void init() throws Exception {
addressManager = new WildcardAddressManager(new BindingFactoryFake(), WILDCARD_CONFIGURATION, null, null);
addressManager.addAddressInfo(new AddressInfo(WILDCARD, RoutingType.MULTICAST));
topics = 1 << topicsLog2;
addresses = new SimpleString[topics];
for (int i = 0; i < topics; i++) {
Binding binding = new BindingFake(WILDCARD, SimpleString.toSimpleString("" + i), i);
addressManager.addBinding(binding);
addresses[i] = SimpleString.toSimpleString("Topic1." + i);
addressManager.getBindingsForRoutingAddress(addresses[i]);
}
topicCounter = new AtomicLong(0);
topicCounter.set(topics);
}
private long nextId() {
return topicCounter.getAndIncrement();
}
@State(value = Scope.Thread)
public static class ThreadState {
Binding binding;
long next;
SimpleString[] addresses;
@Setup
public void init(WildcardAddressManagerPerfTest benchmarkState) {
final long id = benchmarkState.nextId();
binding = new BindingFake(WILDCARD, SimpleString.toSimpleString("" + id), id);
addresses = benchmarkState.addresses;
}
public SimpleString nextAddress() {
final long current = next;
next = current + 1;
final int index = (int) (current & (addresses.length - 1));
return addresses[index];
}
}
@Benchmark
@Group("both")
@GroupThreads(2)
public Bindings testPublishWhileAddRemoveNewBinding(ThreadState state) throws Exception {
return addressManager.getBindingsForRoutingAddress(state.nextAddress());
}
@Benchmark
@Group("both")
@GroupThreads(2)
public Binding testAddRemoveNewBindingWhilePublish(ThreadState state) throws Exception {
final Binding binding = state.binding;
addressManager.addBinding(binding);
return addressManager.removeBinding(binding.getUniqueName(), null);
}
@Benchmark
@GroupThreads(4)
public Bindings testJustPublish(ThreadState state) throws Exception {
return addressManager.getBindingsForRoutingAddress(state.nextAddress());
}
@Benchmark
@GroupThreads(4)
public Binding testJustAddRemoveNewBinding(ThreadState state) throws Exception {
final Binding binding = state.binding;
addressManager.addBinding(binding);
return addressManager.removeBinding(binding.getUniqueName(), null);
}
}

View File

@ -90,6 +90,12 @@
<module>extra-tests</module>
</modules>
</profile>
<profile>
<id>jmh</id>
<modules>
<module>performance-jmh</module>
</modules>
</profile>
<profile>
<!-- deprecated: use openwire-tests -->
<id>activemq5-unit-tests</id>

View File

@ -16,12 +16,12 @@
*/
package org.apache.activemq.artemis.tests.unit.core.postoffice.impl;
import javax.transaction.xa.Xid;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import javax.transaction.xa.Xid;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
@ -83,7 +83,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
@Override
public void run() {
try {
bind.removeBinding(fake);
bind.removeBindingByUniqueName(fake.getUniqueName());
} catch (Exception e) {
e.printStackTrace();
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.tests.unit.core.postoffice.impl;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -30,15 +29,11 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.BindingsFactory;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.BindingsImpl;
import org.apache.activemq.artemis.core.postoffice.impl.WildcardAddressManager;
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.jboss.logging.Logger;
import org.junit.Ignore;
import org.junit.Test;
@ -220,59 +215,4 @@ public class WildcardAddressManagerPerfTest {
}
}
class BindingsFake implements Bindings {
ConcurrentHashSet<Binding> bindings = new ConcurrentHashSet<>();
@Override
public Collection<Binding> getBindings() {
return bindings;
}
@Override
public void addBinding(Binding binding) {
bindings.addIfAbsent(binding);
}
@Override
public void removeBinding(Binding binding) {
bindings.remove(binding);
}
@Override
public void setMessageLoadBalancingType(MessageLoadBalancingType messageLoadBalancingType) {
}
@Override
public MessageLoadBalancingType getMessageLoadBalancingType() {
return null;
}
@Override
public void unproposed(SimpleString groupID) {
}
@Override
public void updated(QueueBinding binding) {
}
@Override
public boolean redistribute(Message message,
Queue originatingQueue,
RoutingContext context) throws Exception {
return false;
}
@Override
public void route(Message message, RoutingContext context) throws Exception {
log.debug("routing message: " + message);
}
@Override
public boolean allowRedistribute() {
return false;
}
}
}

View File

@ -17,10 +17,15 @@
package org.apache.activemq.artemis.tests.unit.core.postoffice.impl;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -40,6 +45,7 @@ import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancing
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Test;
/**
@ -139,6 +145,21 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
assertNull(wildcardAddresses.get(SimpleString.toSimpleString("Topic1.#")));
}
@Test
public void testWildCardAddBinding() throws Exception {
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), null, null);
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Queue1.#"), RoutingType.ANYCAST));
Assert.assertTrue(ad.addBinding(new BindingFake("Queue1.#", "one")));
}
@Test(expected = ActiveMQQueueExistsException.class)
public void testWildCardAddAlreadyExistingBindingShouldThrowException() throws Exception {
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), null, null);
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Queue1.#"), RoutingType.ANYCAST));
ad.addBinding(new BindingFake("Queue1.#", "one"));
ad.addBinding(new BindingFake("Queue1.#", "one"));
}
@Test
public void testWildCardAddressRemovalDifferentWildcard() throws Exception {
@ -198,6 +219,54 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
}
@Test
public void testConcurrentCalls() throws Exception {
final WildcardConfiguration configuration = new WildcardConfiguration();
configuration.setAnyWords('>');
final WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), configuration, null, null);
final SimpleString wildCard = SimpleString.toSimpleString("Topic1.>");
ad.addAddressInfo(new AddressInfo(wildCard, RoutingType.MULTICAST));
AtomicReference<Throwable> oops = new AtomicReference<>();
int numSubs = 500;
int numThreads = 2;
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
for (int i = 0; i < numSubs; i++ ) {
final int id = i;
executorService.submit(() -> {
try {
// add/remove is externally sync via postOffice
synchronized (executorService) {
// subscribe as wildcard
ad.addBinding(new BindingFake(SimpleString.toSimpleString("Topic1.>"), SimpleString.toSimpleString("" + id)));
}
SimpleString pubAddr = SimpleString.toSimpleString("Topic1." + id );
// publish to new address, will create
Bindings binding = ad.getBindingsForRoutingAddress(pubAddr);
// publish again, read only
binding = ad.getBindingsForRoutingAddress(pubAddr);
// cluster consumer, concurrent access
ad.updateMessageLoadBalancingTypeForAddress(wildCard, MessageLoadBalancingType.ON_DEMAND);
} catch (Exception e) {
e.printStackTrace();
oops.set(e);
}
});
}
executorService.shutdown();
assertTrue("finished on time", executorService.awaitTermination(10, TimeUnit.MINUTES));
assertNull("no exceptions", oops.get());
}
class BindingFactoryFake implements BindingsFactory {
@Override
@ -304,23 +373,23 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
}
}
class BindingsFake implements Bindings {
static class BindingsFake implements Bindings {
ArrayList<Binding> bindings = new ArrayList<>();
ConcurrentHashMap<SimpleString, Binding> bindings = new ConcurrentHashMap<>();
@Override
public Collection<Binding> getBindings() {
return bindings;
return bindings.values();
}
@Override
public void addBinding(Binding binding) {
bindings.add(binding);
bindings.put(binding.getUniqueName(), binding);
}
@Override
public void removeBinding(Binding binding) {
bindings.remove(binding);
public Binding removeBindingByUniqueName(SimpleString uniqueName) {
return bindings.remove(uniqueName);
}
@Override