This closes #941

This commit is contained in:
Justin Bertram 2017-01-04 09:44:03 -06:00
commit bdabcbcbea
16 changed files with 352 additions and 25 deletions

View File

@ -159,6 +159,8 @@ public final class ActiveMQDefaultConfiguration {
// true means that the server supports wild card routing
private static boolean DEFAULT_WILDCARD_ROUTING_ENABLED = true;
private static String DEFAULT_ADDRESS_PATH_SEPARATOR = ".";
private static SimpleString DEFAULT_MANAGEMENT_ADDRESS = new SimpleString("activemq.management");
// the name of the address that consumers bind to receive management notifications
@ -543,6 +545,10 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_WILDCARD_ROUTING_ENABLED;
}
public static String getDefaultAddressPathSeparator() {
return DEFAULT_ADDRESS_PATH_SEPARATOR;
}
/**
* the name of the management address to send management messages to. It is prefixed with "jms.queue" so that JMS clients can send messages to it.
*/

View File

@ -819,6 +819,10 @@ public interface Configuration {
*/
Configuration setWildcardRoutingEnabled(boolean enabled);
WildcardConfiguration getWildcardConfiguration();
Configuration setWildCardConfiguration(WildcardConfiguration wildcardConfiguration);
/**
* Returns the timeout (in milliseconds) after which transactions is removed from the resource
* manager after it was created. <br>

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.config;
import java.io.Serializable;
public class WildcardConfiguration implements Serializable {
static final char SINGLE_WORD = '*';
static final char ANY_WORDS = '#';
static final char DELIMITER = '.';
boolean enabled = true;
char singleWord = SINGLE_WORD;
char anyWords = ANY_WORDS;
char delimiter = DELIMITER;
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof WildcardConfiguration)) return false;
WildcardConfiguration that = (WildcardConfiguration) o;
if (enabled != that.enabled) return false;
if (singleWord != that.singleWord) return false;
if (anyWords != that.anyWords) return false;
return delimiter == that.delimiter;
}
@Override
public int hashCode() {
int result = (enabled ? 1 : 0);
result = 31 * result + (int) singleWord;
result = 31 * result + (int) anyWords;
result = 31 * result + (int) delimiter;
return result;
}
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public char getAnyWords() {
return anyWords;
}
public void setAnyWords(char anyWords) {
this.anyWords = anyWords;
}
public char getDelimiter() {
return delimiter;
}
public void setDelimiter(char delimiter) {
this.delimiter = delimiter;
}
public char getSingleWord() {
return singleWord;
}
public void setSingleWord(char singleWord) {
this.singleWord = singleWord;
}
}

View File

@ -53,6 +53,7 @@ import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
import org.apache.activemq.artemis.core.security.Role;
@ -196,7 +197,7 @@ public class ConfigurationImpl implements Configuration, Serializable {
protected boolean runSyncSpeedTest = ActiveMQDefaultConfiguration.isDefaultRunSyncSpeedTest();
private boolean wildcardRoutingEnabled = ActiveMQDefaultConfiguration.isDefaultWildcardRoutingEnabled();
private WildcardConfiguration wildcardConfiguration = new WildcardConfiguration();
private boolean messageCounterEnabled = ActiveMQDefaultConfiguration.isDefaultMessageCounterEnabled();
@ -896,13 +897,27 @@ public class ConfigurationImpl implements Configuration, Serializable {
}
@Override
@Deprecated
public boolean isWildcardRoutingEnabled() {
return wildcardRoutingEnabled;
return wildcardConfiguration.isEnabled();
}
@Override
@Deprecated
public ConfigurationImpl setWildcardRoutingEnabled(final boolean enabled) {
wildcardRoutingEnabled = enabled;
logger.info("Usage of wildcardRoutingEnabled configuration property is deprecated, please use wildCardConfiguration.enabled instead");
wildcardConfiguration.setEnabled(enabled);
return this;
}
@Override
public WildcardConfiguration getWildcardConfiguration() {
return wildcardConfiguration;
}
@Override
public Configuration setWildCardConfiguration(WildcardConfiguration wildcardConfiguration) {
this.wildcardConfiguration = wildcardConfiguration;
return this;
}
@ -1574,7 +1589,7 @@ public class ConfigurationImpl implements Configuration, Serializable {
result = prime * result + threadPoolMaxSize;
result = prime * result + (int) (transactionTimeout ^ (transactionTimeout >>> 32));
result = prime * result + (int) (transactionTimeoutScanPeriod ^ (transactionTimeoutScanPeriod >>> 32));
result = prime * result + (wildcardRoutingEnabled ? 1231 : 1237);
result = prime * result + ((wildcardConfiguration == null) ? 0 : wildcardConfiguration.hashCode());
result = prime * result + (resolveProtocols ? 1231 : 1237);
result = prime * result + (int) (journalLockAcquisitionTimeout ^ (journalLockAcquisitionTimeout >>> 32));
result = prime * result + (int) (connectionTtlCheckInterval ^ (connectionTtlCheckInterval >>> 32));
@ -1800,7 +1815,10 @@ public class ConfigurationImpl implements Configuration, Serializable {
return false;
if (transactionTimeoutScanPeriod != other.transactionTimeoutScanPeriod)
return false;
if (wildcardRoutingEnabled != other.wildcardRoutingEnabled)
if (wildcardConfiguration == null) {
if (other.wildcardConfiguration != null)
return false;
} else if (!wildcardConfiguration.equals(other.wildcardConfiguration))
return false;
if (resolveProtocols != other.resolveProtocols)
return false;

View File

@ -47,6 +47,7 @@ import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.config.ha.ColocatedPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
@ -553,7 +554,9 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
config.setRunSyncSpeedTest(getBoolean(e, "run-sync-speed-test", config.isRunSyncSpeedTest()));
if (e.hasAttribute("wild-card-routing-enabled")) {
config.setWildcardRoutingEnabled(getBoolean(e, "wild-card-routing-enabled", config.isWildcardRoutingEnabled()));
}
config.setMessageCounterEnabled(getBoolean(e, "message-counter-enabled", config.isMessageCounterEnabled()));
@ -602,6 +605,12 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
}
config.setConnectorServiceConfigurations(configs);
NodeList wildCardConfiguration = e.getElementsByTagName("wildcard-addresses");
if (wildCardConfiguration.getLength() > 0) {
parseWildcardConfiguration((Element) wildCardConfiguration.item(0), config);
}
}
/**
@ -1612,6 +1621,21 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
mainConfig.getDivertConfigurations().add(config);
}
/**
* @param node
* @return
*/
protected void parseWildcardConfiguration(final Element e, final Configuration mainConfig) {
WildcardConfiguration conf = new WildcardConfiguration();
conf.setDelimiter(getString(e, "delimiter", Character.toString(conf.getDelimiter()), Validators.NO_CHECK).charAt(0));
conf.setAnyWords(getString(e, "any-words", Character.toString(conf.getAnyWords()), Validators.NO_CHECK).charAt(0));
conf.setSingleWord(getString(e, "single-word", Character.toString(conf.getSingleWord()), Validators.NO_CHECK).charAt(0));
conf.setEnabled(getBoolean(e, "enabled", conf.isEnabled()));
mainConfig.setWildCardConfiguration(conf);
}
private ConnectorServiceConfiguration parseConnectorService(final Element e) {
Node nameNode = e.getAttributes().getNamedItem("name");

View File

@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.postoffice.Address;
/**
@ -35,10 +36,17 @@ public class AddressImpl implements Address {
private final List<Address> linkedAddresses = new ArrayList<>();
private final WildcardConfiguration wildcardConfiguration;
public AddressImpl(final SimpleString address) {
this(address, new WildcardConfiguration());
}
public AddressImpl(final SimpleString address, WildcardConfiguration wildcardConfiguration) {
this.address = address;
addressParts = address.split(WildcardAddressManager.DELIM);
containsWildCard = address.contains(WildcardAddressManager.SINGLE_WORD) || address.contains(WildcardAddressManager.ANY_WORDS);
this.wildcardConfiguration = wildcardConfiguration;
addressParts = address.split(wildcardConfiguration.getDelimiter());
containsWildCard = address.contains(wildcardConfiguration.getSingleWord()) || address.contains(wildcardConfiguration.getAnyWords());
}
@Override

View File

@ -42,6 +42,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.NotificationType;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.message.impl.MessageImpl;
@ -143,7 +144,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
final ManagementService managementService,
final long reaperPeriod,
final int reaperPriority,
final boolean enableWildCardRouting,
final WildcardConfiguration wildcardConfiguration,
final int idCacheSize,
final boolean persistIDCache,
final HierarchicalRepository<AddressSettings> addressSettingsRepository) {
@ -159,10 +160,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
this.reaperPriority = reaperPriority;
if (enableWildCardRouting) {
addressManager = new WildcardAddressManager(this);
if (wildcardConfiguration.isEnabled()) {
addressManager = new WildcardAddressManager(this, wildcardConfiguration);
} else {
addressManager = new SimpleAddressManager(this);
addressManager = new SimpleAddressManager(this, wildcardConfiguration);
}
this.idCacheSize = idCacheSize;

View File

@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.postoffice.Address;
import org.apache.activemq.artemis.core.postoffice.AddressManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
@ -58,7 +59,14 @@ public class SimpleAddressManager implements AddressManager {
private final BindingsFactory bindingsFactory;
protected final WildcardConfiguration wildcardConfiguration;
public SimpleAddressManager(final BindingsFactory bindingsFactory) {
this(bindingsFactory, new WildcardConfiguration());
}
public SimpleAddressManager(final BindingsFactory bindingsFactory, final WildcardConfiguration wildcardConfiguration) {
this.wildcardConfiguration = wildcardConfiguration;
this.bindingsFactory = bindingsFactory;
}
@ -105,12 +113,12 @@ public class SimpleAddressManager implements AddressManager {
@Override
public Bindings getMatchingBindings(final SimpleString address) throws Exception {
Address add = new AddressImpl(address);
Address add = new AddressImpl(address, wildcardConfiguration);
Bindings bindings = bindingsFactory.createBindings(address);
for (Binding binding : nameMap.values()) {
Address addCheck = new AddressImpl(binding.getAddress());
Address addCheck = new AddressImpl(binding.getAddress(), wildcardConfiguration);
if (addCheck.matches(add)) {
bindings.addBinding(binding);

View File

@ -22,6 +22,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.postoffice.Address;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
@ -51,6 +52,10 @@ public class WildcardAddressManager extends SimpleAddressManager {
private final Map<SimpleString, Address> wildCardAddresses = new ConcurrentHashMap<>();
public WildcardAddressManager(final BindingsFactory bindingsFactory, final WildcardConfiguration wildcardConfiguration) {
super(bindingsFactory, wildcardConfiguration);
}
public WildcardAddressManager(final BindingsFactory bindingsFactory) {
super(bindingsFactory);
}
@ -136,7 +141,7 @@ public class WildcardAddressManager extends SimpleAddressManager {
}
private Address getAddress(final SimpleString address) {
Address add = new AddressImpl(address);
Address add = new AddressImpl(address, wildcardConfiguration);
Address actualAddress;
if (add.containsWildCard()) {
actualAddress = wildCardAddresses.get(address);
@ -147,7 +152,7 @@ public class WildcardAddressManager extends SimpleAddressManager {
}
private synchronized Address addAndUpdateAddressMap(final SimpleString address) {
Address add = new AddressImpl(address);
Address add = new AddressImpl(address, wildcardConfiguration);
Address actualAddress;
if (add.containsWildCard()) {
actualAddress = wildCardAddresses.get(address);

View File

@ -2141,7 +2141,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
pagingManager = createPagingManager();
resourceManager = new ResourceManagerImpl((int) (configuration.getTransactionTimeout() / 1000), configuration.getTransactionTimeoutScanPeriod(), scheduledPool);
postOffice = new PostOfficeImpl(this, storageManager, pagingManager, queueFactory, managementService, configuration.getMessageExpiryScanPeriod(), configuration.getMessageExpiryThreadPriority(), configuration.isWildcardRoutingEnabled(), configuration.getIDCacheSize(), configuration.isPersistIDCache(), addressSettingsRepository);
postOffice = new PostOfficeImpl(this, storageManager, pagingManager, queueFactory, managementService, configuration.getMessageExpiryScanPeriod(), configuration.getMessageExpiryThreadPriority(), configuration.getWildcardConfiguration(), configuration.getIDCacheSize(), configuration.isPersistIDCache(), addressSettingsRepository);
// This can't be created until node id is set
clusterManager = new ClusterManager(executorFactory, this, postOffice, scheduledPool, managementService, configuration, nodeManager, haPolicy.isBackup());

View File

@ -938,6 +938,14 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="wildcard-addresses" type="wildcardType" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Wildcard addresses format
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:all>
</xsd:complexType>
@ -2741,4 +2749,42 @@
</xsd:sequence>
</xsd:complexType>
<xsd:complexType name="wildcardType">
<xsd:annotation>
<xsd:documentation>
Complex type element to configure wildcard address format.
</xsd:documentation>
</xsd:annotation>
<xsd:all>
<xsd:element maxOccurs="1" minOccurs="0" name="enabled" type="xsd:boolean">
<xsd:annotation>
<xsd:documentation>
are wildcard addresses enabled
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="delimiter" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
wildcard address parts delimiter. Default '.'
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="any-words" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
wildcard address any words character. Default '#'
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="single-word" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
wildcard address single word character. Default '*'
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:all>
</xsd:complexType>
</xsd:schema>

View File

@ -246,10 +246,6 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
conf.setLargeMessagesDirectory(s);
Assert.assertEquals(s, conf.getLargeMessagesDirectory());
b = RandomUtil.randomBoolean();
conf.setWildcardRoutingEnabled(b);
Assert.assertEquals(b, conf.isWildcardRoutingEnabled());
l = RandomUtil.randomLong();
conf.setTransactionTimeout(l);
Assert.assertEquals(l, conf.getTransactionTimeout());

View File

@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.FileDeploymentManager;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec;
@ -86,6 +87,21 @@ public class FileConfigurationParserTest extends ActiveMQTestBase {
Assert.assertEquals(333, config.getClusterConfigurations().get(0).getRetryInterval());
}
@Test
public void testWildcardConfiguration() throws Exception {
FileConfigurationParser parser = new FileConfigurationParser();
String configStr = firstPart + "<wildcard-addresses>\n<enabled>true</enabled>\n<delimiter>/</delimiter>\n<any-words>></any-words></wildcard-addresses>" + lastPart;
ByteArrayInputStream input = new ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8));
Configuration config = parser.parseMainConfig(input);
WildcardConfiguration wildCard = config.getWildcardConfiguration();
assertEquals('/', wildCard.getDelimiter());
assertTrue(wildCard.isEnabled());
assertEquals('>', wildCard.getAnyWords());
assertEquals('*', wildCard.getSingleWord());
}
@Test
public void testParsingDefaultServerConfig() throws Exception {
FileConfigurationParser parser = new FileConfigurationParser();

View File

@ -12,9 +12,11 @@ messages which are sent to a *hierarchy* of addresses.
>
> In JMS terminology this allows "topic hierarchies" to be created.
To enable this functionality set the property
`wild-card-routing-enabled` in the `broker.xml` file to
`true`. This is `true` by default.
This functionality is enabled by default. To turn it off add the following to the `broker.xml` configuration.
For more information on the wild card syntax take a look at [wildcard syntax](wildcard-syntax.md) chapter,
<wildcard-addresses>
<enabled>false</enabled>
</wildcard-addresses>
For more information on the wild card syntax and how to configure it, take a look at [wildcard syntax](wildcard-syntax.md) chapter,
also see the topic hierarchy example in the [examples](examples.md).

View File

@ -25,3 +25,17 @@ The wildcard 'news.\*' would match 'news.europe', but not
The wildcard 'news.\*.sport' would match 'news.europe.sport' and also
'news.usa.sport', but not 'news.europe.politics'.
## Configuring Wildcard syntax
It's possible to further configure the syntax of the wildcard addresses using the broker configuration.
For that, the `<wildcard-addresses>` configuration tag is used.
<wildcard-addresses>
<enabled>true</enabled>
<delimiter>.</delimiter>
<any-words>#</any-words>
<single-word>*</single-word>
</wildcard-addresses>
The example above shows the default configuration.

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.client;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class WildcardConfigurationTest extends ActiveMQTestBase {
private ActiveMQServer server;
private ServerLocator locator;
private ClientSession clientSession;
private ClientSessionFactory sf;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
WildcardConfiguration wildcardConfiguration = new WildcardConfiguration();
wildcardConfiguration.setDelimiter('/');
Configuration configuration = createDefaultInVMConfig().setWildcardRoutingEnabled(true).setTransactionTimeoutScanPeriod(500).setWildCardConfiguration(wildcardConfiguration);
server = addServer(ActiveMQServers.newActiveMQServer(configuration, false));
server.start();
server.getManagementService().enableNotifications(false);
locator = createInVMNonHALocator();
sf = createSessionFactory(locator);
clientSession = addClientSession(sf.createSession(false, true, true));
}
@Test
public void testBasicWildcardRouting() throws Exception {
SimpleString addressAB = new SimpleString("a/b");
SimpleString addressAC = new SimpleString("a/c");
SimpleString address = new SimpleString("a/*");
SimpleString queueName1 = new SimpleString("Q1");
SimpleString queueName2 = new SimpleString("Q2");
SimpleString queueName = new SimpleString("Q");
clientSession.createQueue(addressAB, queueName1, null, false);
clientSession.createQueue(addressAC, queueName2, null, false);
clientSession.createQueue(address, queueName, null, false);
ClientProducer producer = clientSession.createProducer(addressAB);
ClientProducer producer2 = clientSession.createProducer(addressAC);
ClientConsumer clientConsumer = clientSession.createConsumer(queueName);
clientSession.start();
producer.send(createTextMessage(clientSession, "m1"));
producer2.send(createTextMessage(clientSession, "m2"));
ClientMessage m = clientConsumer.receive(500);
Assert.assertNotNull(m);
Assert.assertEquals("m1", m.getBodyBuffer().readString());
m.acknowledge();
m = clientConsumer.receive(500);
Assert.assertNotNull(m);
Assert.assertEquals("m2", m.getBodyBuffer().readString());
m.acknowledge();
m = clientConsumer.receiveImmediate();
Assert.assertNull(m);
}
}