This commit is contained in:
Martyn Taylor 2017-04-12 09:59:28 +01:00
commit 0937e75414
16 changed files with 1036 additions and 65 deletions

View File

@ -14,13 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.postoffice.impl;
package org.apache.activemq.artemis.utils;
import org.apache.activemq.artemis.api.core.SimpleString;
public class CompositeAddress {
public static String SEPARATOR = "::";
private final String address;
private final String queueName;
private final boolean fqqn;
public String getAddress() {
return address;
@ -34,17 +37,52 @@ public class CompositeAddress {
this.address = address;
this.queueName = queueName;
this.fqqn = address != null;
}
public CompositeAddress(String singleName) {
String[] split = singleName.split(SEPARATOR);
if (split.length == 1) {
this.fqqn = false;
this.address = null;
this.queueName = split[0];
} else {
this.fqqn = true;
this.address = split[0];
this.queueName = split[1];
}
}
public boolean isFqqn() {
return fqqn;
}
public static boolean isFullyQualified(String address) {
return address.toString().contains(SEPARATOR);
return address.contains(SEPARATOR);
}
public static CompositeAddress getQueueName(String address) {
String[] split = address.split(SEPARATOR);
if (split.length <= 0) {
throw new IllegalStateException("Nott A Fully Qualified Name");
throw new IllegalStateException("Not A Fully Qualified Name");
}
if (split.length == 1) {
return new CompositeAddress(null, split[0]);
}
return new CompositeAddress(split[0], split[1]);
}
public static String extractQueueName(String name) {
String[] split = name.split(SEPARATOR);
return split[split.length - 1];
}
public static SimpleString extractQueueName(SimpleString name) {
return new SimpleString(extractQueueName(name.toString()));
}
public static String extractAddressName(String address) {
String[] split = address.split(SEPARATOR);
return split[0];
}
}

View File

@ -28,7 +28,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
@ -46,6 +45,7 @@ import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransa
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;

View File

@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.jboss.logging.Logger;
/**
@ -103,7 +104,7 @@ public class SimpleAddressManager implements AddressManager {
@Override
public Binding getBinding(final SimpleString bindableName) {
return nameMap.get(bindableName);
return nameMap.get(CompositeAddress.extractQueueName(bindableName));
}
@Override
@ -131,7 +132,7 @@ public class SimpleAddressManager implements AddressManager {
@Override
public SimpleString getMatchingQueue(final SimpleString address, RoutingType routingType) throws Exception {
Binding binding = nameMap.get(address);
Binding binding = getBinding(address);
if (binding == null || !(binding instanceof LocalQueueBinding)
|| !binding.getAddress().equals(address)) {
@ -151,7 +152,7 @@ public class SimpleAddressManager implements AddressManager {
@Override
public SimpleString getMatchingQueue(final SimpleString address, final SimpleString queueName, RoutingType routingType) throws Exception {
Binding binding = nameMap.get(queueName);
Binding binding = getBinding(queueName);
if (binding != null && !binding.getAddress().equals(address)) {
throw new IllegalStateException("queue belongs to address" + binding.getAddress());
@ -188,9 +189,8 @@ public class SimpleAddressManager implements AddressManager {
Binding theBinding = null;
for (Binding binding : bindings.getBindings()) {
if (binding.getUniqueName().equals(bindableName)) {
if (binding.getUniqueName().equals(CompositeAddress.extractQueueName(bindableName))) {
theBinding = binding;
break;
}
}

View File

@ -162,6 +162,7 @@ import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor;
import org.apache.activemq.artemis.utils.CertificateUtil;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
@ -702,7 +703,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
throw ActiveMQMessageBundle.BUNDLE.addressIsNull();
}
AddressSettings addressSettings = getAddressSettingsRepository().getMatch(address.toString());
CompositeAddress addressKey = new CompositeAddress(address.toString());
String realAddress = addressKey.isFqqn() ? addressKey.getAddress() : addressKey.getQueueName();
AddressSettings addressSettings = getAddressSettingsRepository().getMatch(realAddress);
boolean autoCreateQeueus = addressSettings.isAutoCreateQueues();
boolean autoCreateAddresses = addressSettings.isAutoCreateAddresses();
@ -714,20 +717,25 @@ public class ActiveMQServerImpl implements ActiveMQServer {
// make an exception for the management address (see HORNETQ-29)
ManagementService managementService = getManagementService();
if (managementService != null) {
if (address.equals(managementService.getManagementAddress())) {
if (realAddress.equals(managementService.getManagementAddress())) {
return new BindingQueryResult(true, names, autoCreateQeueus, autoCreateAddresses, defaultPurgeOnNoConsumers, defaultMaxConsumers);
}
}
Bindings bindings = getPostOffice().getMatchingBindings(address);
SimpleString bindAddress = new SimpleString(realAddress);
Bindings bindings = getPostOffice().getMatchingBindings(bindAddress);
for (Binding binding : bindings.getBindings()) {
if (binding.getType() == BindingType.LOCAL_QUEUE || binding.getType() == BindingType.REMOTE_QUEUE) {
names.add(binding.getUniqueName());
if (addressKey.isFqqn()) {
names.add(new SimpleString(addressKey.getAddress()).concat(CompositeAddress.SEPARATOR).concat(binding.getUniqueName()));
} else {
names.add(binding.getUniqueName());
}
}
}
return new BindingQueryResult(getAddressInfo(address) != null, names, autoCreateQeueus, autoCreateAddresses, defaultPurgeOnNoConsumers, defaultMaxConsumers);
return new BindingQueryResult(getAddressInfo(bindAddress) != null, names, autoCreateQeueus, autoCreateAddresses, defaultPurgeOnNoConsumers, defaultMaxConsumers);
}
@Override

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class CompositeAddressTest {
@Test
public void testSplit() throws Exception {
String name = "someQueue";
String name2 = "someAddress::someQueue";
String qname = CompositeAddress.extractQueueName(name);
assertEquals(name, qname);
qname = CompositeAddress.extractQueueName(name2);
assertEquals(name, qname);
}
}

View File

@ -289,6 +289,9 @@ The example below configures an address-setting to be automatically deleted by t
Internally the broker maps a clients request for an address to specific queues. The broker decides on behalf of the client which queues to send messages to or from which queue to receive messages. However, more advanced use cases might require that the client specify a queue directly. In these situations the client and use a fully qualified queue name, by specifying both the address name and the queue name, separated by a ::.
Currently Artemis supports fully qualified queue names on Core, AMQP, JMS, OpenWire, MQTT and Stomp protocols for
receiving messages only.
### Specifying a Fully Qualified Queue Name
In this example, the address foo is configured with two queues q1, q2 as shown in the configuration below.

View File

@ -22,6 +22,8 @@ import static org.apache.activemq.transport.amqp.AmqpSupport.contains;
import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -39,6 +41,7 @@ import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.tests.util.FQQN;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
@ -53,38 +56,60 @@ import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.jgroups.util.UUID;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test basic send and receive scenarios using only AMQP sender and receiver links.
*/
@RunWith(Parameterized.class)
public class AmqpSendReceiveTest extends AmqpClientTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(AmqpSendReceiveTest.class);
@Parameterized.Parameters(name = "useFQQN={0}")
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{{false}, {true}});
}
private boolean useFQQN;
public AmqpSendReceiveTest(boolean useFQQN) {
this.useFQQN = useFQQN;
}
@Test(timeout = 60000)
public void testCreateQueueReceiver() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getDestName(getTestName()));
Queue queue = getProxyToQueue(getTestName());
Queue queue = getProxyToQueue(getDestName(getTestName()));
assertNotNull(queue);
receiver.close();
connection.close();
}
private String getDestName(String bareName) {
String destName = bareName;
if (useFQQN) {
destName = FQQN.toFullQN(bareName, bareName);
}
return destName;
}
@Test(timeout = 60000)
public void testAcceptWithoutSettling() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getDestName(getTestName()));
sendMessages(getTestName(), 10);
@ -98,7 +123,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
receiver.close();
connection.close();
Queue queue = getProxyToQueue(getTestName());
Queue queue = getProxyToQueue(getDestName(getTestName()));
assertNotNull(queue);
assertEquals(0, queue.getMessageCount());
}
@ -130,7 +155,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
session.createReceiver(getTestName(), "JMSPriority > 8");
session.createReceiver(getDestName(getTestName()), "JMSPriority > 8");
connection.getStateInspector().assertValid();
connection.close();
@ -163,7 +188,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
session.createReceiver(getTestName(), null, true);
session.createReceiver(getDestName(getTestName()), null, true);
connection.getStateInspector().assertValid();
connection.close();
@ -177,7 +202,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpSession session = connection.createSession();
try {
session.createReceiver(getTestName(), "null = 'f''", true);
session.createReceiver(getDestName(getTestName()), "null = 'f''", true);
fail("should throw exception");
} catch (Exception e) {
assertTrue(e.getCause() instanceof JMSException);
@ -195,9 +220,9 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getDestName(getTestName()));
Queue queueView = getProxyToQueue(getTestName());
Queue queueView = getProxyToQueue(getDestName(getTestName()));
assertEquals(1, queueView.getMessageCount());
receiver.flow(1);
@ -221,9 +246,9 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(forwardingAddress);
AmqpReceiver receiver = session.createReceiver(getDestName(forwardingAddress));
Queue queueView = getProxyToQueue(forwardingAddress);
Queue queueView = getProxyToQueue(getDestName(forwardingAddress));
assertEquals(1, queueView.getMessageCount());
receiver.flow(1);
@ -250,8 +275,16 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
sendMessages("anycast://" + addressA, 1);
assertEquals(1, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount());
assertEquals(1, server.locateQueue(SimpleString.toSimpleString(getDestName(addressA, queueA))).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
assertEquals(0, server.locateQueue(SimpleString.toSimpleString(getDestName(addressA, queueC))).getMessageCount());
}
private String getDestName(String address, String qName) {
String dest = qName;
if (useFQQN) {
dest = FQQN.toFullQN(address, qName);
}
return dest;
}
@Test(timeout = 60000)
@ -269,8 +302,8 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
sendMessages(addressA, 1, RoutingType.ANYCAST);
assertEquals(1, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount());
assertEquals(1, server.locateQueue(SimpleString.toSimpleString(getDestName(addressA, queueA))).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(getDestName(addressA, queueB))).getMessageCount());
assertEquals(0, server.locateQueue(SimpleString.toSimpleString(getDestName(addressA, queueC))).getMessageCount());
}
@Test
@ -288,8 +321,8 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
sendMessages("multicast://" + addressA, 1);
assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount());
assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
assertEquals(0, server.locateQueue(SimpleString.toSimpleString(getDestName(addressA, queueA))).getMessageCount());
assertEquals(2, server.locateQueue(SimpleString.toSimpleString(getDestName(addressA, queueC))).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(getDestName(addressA, queueB))).getMessageCount());
}
@Test
@ -307,8 +340,8 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
sendMessages(addressA, 1, RoutingType.MULTICAST);
assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount());
assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
assertEquals(0, server.locateQueue(SimpleString.toSimpleString(getDestName(addressA, queueA))).getMessageCount());
assertEquals(2, server.locateQueue(SimpleString.toSimpleString(getDestName(addressA, queueC))).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(getDestName(addressA, queueB))).getMessageCount());
}
@Test(timeout = 60000)
@ -319,9 +352,9 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getDestName(getTestName()));
Queue queueView = getProxyToQueue(getTestName());
Queue queueView = getProxyToQueue(getDestName(getTestName()));
assertEquals(1, queueView.getMessageCount());
receiver.flow(1);
@ -343,9 +376,9 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getDestName(getTestName()));
Queue queueView = getProxyToQueue(getTestName());
Queue queueView = getProxyToQueue(getDestName(getTestName()));
assertEquals(1, queueView.getMessageCount());
receiver.flow(1);
@ -368,16 +401,16 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver1 = session.createReceiver(getTestName());
AmqpReceiver receiver1 = session.createReceiver(getDestName(getTestName()));
Queue queueView = getProxyToQueue(getTestName());
Queue queueView = getProxyToQueue(getDestName(getTestName()));
assertEquals(MSG_COUNT, queueView.getMessageCount());
receiver1.flow(2);
assertNotNull(receiver1.receive(5, TimeUnit.SECONDS));
assertNotNull(receiver1.receive(5, TimeUnit.SECONDS));
AmqpReceiver receiver2 = session.createReceiver(getTestName());
AmqpReceiver receiver2 = session.createReceiver(getDestName(getTestName()));
assertEquals(2, server.getTotalConsumerCount());
@ -404,9 +437,9 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver1 = session.createReceiver(getTestName());
AmqpReceiver receiver1 = session.createReceiver(getDestName(getTestName()));
final Queue queueView = getProxyToQueue(getTestName());
final Queue queueView = getProxyToQueue(getDestName(getTestName()));
assertEquals(MSG_COUNT, queueView.getMessageCount());
receiver1.flow(2);
@ -425,7 +458,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50)));
AmqpReceiver receiver2 = session.createReceiver(getTestName());
AmqpReceiver receiver2 = session.createReceiver(getDestName(getTestName()));
assertEquals(2, server.getTotalConsumerCount());
@ -462,9 +495,9 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver1 = session.createReceiver(getTestName());
AmqpReceiver receiver1 = session.createReceiver(getDestName(getTestName()));
final Queue queueView = getProxyToQueue(getTestName());
final Queue queueView = getProxyToQueue(getDestName(getTestName()));
assertEquals(MSG_COUNT, queueView.getMessageCount());
receiver1.flow(20);
@ -479,7 +512,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
receiver1.close();
AmqpReceiver receiver2 = session.createReceiver(getTestName());
AmqpReceiver receiver2 = session.createReceiver(getDestName(getTestName()));
assertEquals(1, server.getTotalConsumerCount());
@ -525,7 +558,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
sender.close();
LOG.info("Attempting to read message with receiver");
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getDestName(getTestName()));
receiver.flow(2);
AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull("Should have read message", received);
@ -560,17 +593,17 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
sender.close();
Queue queue = getProxyToQueue(getTestName());
Queue queue = getProxyToQueue(getDestName(getTestName()));
assertEquals(MSG_COUNT, queue.getMessageCount());
AmqpReceiver receiver1 = session.createReceiver(getTestName());
AmqpReceiver receiver1 = session.createReceiver(getDestName(getTestName()));
receiver1.flow(MSG_COUNT);
AmqpMessage received = receiver1.receive(5, TimeUnit.SECONDS);
assertNotNull("Should have got a message", received);
assertEquals("msg0", received.getMessageId());
receiver1.close();
AmqpReceiver receiver2 = session.createReceiver(getTestName());
AmqpReceiver receiver2 = session.createReceiver(getDestName(getTestName()));
receiver2.flow(200);
for (int i = 0; i < MSG_COUNT; ++i) {
received = receiver2.receive(5, TimeUnit.SECONDS);
@ -602,7 +635,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
sender.send(message2);
sender.close();
AmqpReceiver receiver = session.createReceiver(getTestName(), "sn = 100");
AmqpReceiver receiver = session.createReceiver(getDestName(getTestName()), "sn = 100");
receiver.flow(2);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull("Should have read a message", received);
@ -639,7 +672,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
sender.close();
LOG.info("Attempting to read first two messages with receiver #1");
AmqpReceiver receiver1 = session.createReceiver(getTestName());
AmqpReceiver receiver1 = session.createReceiver(getDestName(getTestName()));
receiver1.flow(2);
AmqpMessage message1 = receiver1.receive(10, TimeUnit.SECONDS);
AmqpMessage message2 = receiver1.receive(10, TimeUnit.SECONDS);
@ -651,7 +684,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
message2.accept();
LOG.info("Attempting to read next two messages with receiver #2");
AmqpReceiver receiver2 = session.createReceiver(getTestName());
AmqpReceiver receiver2 = session.createReceiver(getDestName(getTestName()));
receiver2.flow(2);
AmqpMessage message3 = receiver2.receive(10, TimeUnit.SECONDS);
AmqpMessage message4 = receiver2.receive(10, TimeUnit.SECONDS);
@ -699,10 +732,10 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
sender.close();
AmqpReceiver receiver1 = session.createReceiver(getTestName());
AmqpReceiver receiver1 = session.createReceiver(getDestName(getTestName()));
receiver1.flow(1);
AmqpReceiver receiver2 = session.createReceiver(getTestName());
AmqpReceiver receiver2 = session.createReceiver(getDestName(getTestName()));
receiver2.flow(1);
AmqpMessage message1 = receiver1.receive(10, TimeUnit.SECONDS);
@ -761,7 +794,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
final String address = getTestName();
AmqpReceiver receiver = session.createReceiver(address);
AmqpReceiver receiver = session.createReceiver(getDestName(address));
AmqpSender sender = session.createSender(address);
for (int i = 0; i < 2; i++) {
@ -802,7 +835,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
LOG.info("Starting consumer connection");
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(address);
AmqpReceiver receiver = session.createReceiver(getDestName(address));
receiver.flow(1);
receiverReady.countDown();
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
@ -859,9 +892,9 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpReceiver receiver1 = session.createReceiver(getTestName());
AmqpReceiver receiver1 = session.createReceiver(getDestName(getTestName()));
Queue queue = getProxyToQueue(getTestName());
Queue queue = getProxyToQueue(getDestName(getTestName()));
// Create default message that should be sent as non-durable
AmqpMessage message1 = new AmqpMessage();
@ -906,10 +939,10 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
final String address = getTestName();
AmqpReceiver receiver = session.createReceiver(address);
AmqpReceiver receiver = session.createReceiver(getDestName(address));
AmqpSender sender = session.createSender(address);
final Queue destinationView = getProxyToQueue(address);
final Queue destinationView = getProxyToQueue(getDestName(address));
for (int i = 0; i < MSG_COUNT; i++) {
AmqpMessage message = new AmqpMessage();
@ -960,8 +993,8 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
final String address = getTestName();
AmqpSender sender = session.createSender(address);
AmqpReceiver receiver1 = session.createReceiver(address, null, false, true);
AmqpReceiver receiver2 = session.createReceiver(address, null, false, true);
AmqpReceiver receiver1 = session.createReceiver(getDestName(address), null, false, true);
AmqpReceiver receiver2 = session.createReceiver(getDestName(address), null, false, true);
for (int i = 0; i < MSG_COUNT; i++) {
AmqpMessage message = new AmqpMessage();
@ -1084,9 +1117,9 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
message.setMessageId("msg:1");
sender.send(message);
AmqpReceiver receiver = session.createReceiver(address);
AmqpReceiver receiver = session.createReceiver(getDestName(address));
Queue queueView = getProxyToQueue(address);
Queue queueView = getProxyToQueue(getDestName(address));
assertEquals(1, queueView.getMessageCount());
receiver.flow(1);

View File

@ -0,0 +1,187 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.FQQN;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import java.util.HashMap;
import java.util.Map;
public class ProtonFullQualifiedNameTest extends ProtonTestBase {
private static final String amqpConnectionUri = "amqp://localhost:5672";
private SimpleString anycastAddress = new SimpleString("address.anycast");
private SimpleString multicastAddress = new SimpleString("address.multicast");
private SimpleString anycastQ1 = new SimpleString("q1");
private SimpleString anycastQ2 = new SimpleString("q2");
private SimpleString anycastQ3 = new SimpleString("q3");
JmsConnectionFactory factory = new JmsConnectionFactory(amqpConnectionUri);
private ServerLocator locator;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
Configuration serverConfig = server.getConfiguration();
Map<String, AddressSettings> settings = serverConfig.getAddressesSettings();
assertNotNull(settings);
AddressSettings addressSetting = settings.get("#");
if (addressSetting == null) {
addressSetting = new AddressSettings();
settings.put("#", addressSetting);
}
addressSetting.setAutoCreateQueues(true);
locator = createNettyNonHALocator();
}
@Override
@After
public void tearDown() throws Exception {
super.tearDown();
}
@Override
protected void configureServer(Configuration serverConfig) {
serverConfig.addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, new HashMap<String, Object>(), "netty", new HashMap<String, Object>()));
}
@Test
//there isn't much use of FQQN for topics
//however we can test query functionality
public void testTopic() throws Exception {
Connection connection = factory.createConnection();
try {
connection.setClientID("FQQNconn");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(multicastAddress.toString());
MessageConsumer consumer1 = session.createConsumer(topic);
MessageConsumer consumer2 = session.createConsumer(topic);
MessageConsumer consumer3 = session.createConsumer(topic);
MessageProducer producer = session.createProducer(topic);
producer.send(session.createMessage());
//each consumer receives one
Message m = consumer1.receive(2000);
assertNotNull(m);
m = consumer2.receive(2000);
assertNotNull(m);
m = consumer3.receive(2000);
assertNotNull(m);
Bindings bindings = server.getPostOffice().getBindingsForAddress(multicastAddress);
for (Binding b : bindings.getBindings()) {
System.out.println("checking binidng " + b.getUniqueName() + " " + ((LocalQueueBinding)b).getQueue().getDeliveringMessages());
SimpleString qName = b.getUniqueName();
//do FQQN query
QueueQueryResult result = server.queueQuery(FQQN.toFullQN(multicastAddress, qName));
assertTrue(result.isExists());
assertEquals(result.getName(), FQQN.toFullQN(multicastAddress, qName));
//do qname query
result = server.queueQuery(qName);
assertTrue(result.isExists());
assertEquals(result.getName(), qName);
}
} finally {
connection.close();
}
}
@Test
public void testQueue() throws Exception {
server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ1, null, true, false, -1, false, true);
server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ2, null, true, false, -1, false, true);
server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ3, null, true, false, -1, false, true);
Connection connection = factory.createConnection();
try {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue q1 = session.createQueue(FQQN.toFullQN(anycastAddress, anycastQ1).toString());
Queue q2 = session.createQueue(FQQN.toFullQN(anycastAddress, anycastQ2).toString());
Queue q3 = session.createQueue(FQQN.toFullQN(anycastAddress, anycastQ3).toString());
//send 3 messages to anycastAddress
ClientSessionFactory cf = createSessionFactory(locator);
ClientSession coreSession = cf.createSession();
//send 3 messages
ClientProducer coreProducer = coreSession.createProducer(anycastAddress);
sendMessages(coreSession, coreProducer, 3);
MessageConsumer consumer1 = session.createConsumer(q1);
MessageConsumer consumer2 = session.createConsumer(q2);
MessageConsumer consumer3 = session.createConsumer(q3);
//each consumer receives one
assertNotNull(consumer1.receive(2000));
assertNotNull(consumer2.receive(2000));
assertNotNull(consumer3.receive(2000));
connection.close();
//queues are empty now
for (SimpleString q : new SimpleString[]{anycastQ1, anycastQ2, anycastQ3}) {
//FQQN query
QueueQueryResult query = server.queueQuery(FQQN.toFullQN(anycastAddress, q));
assertTrue(query.isExists());
assertEquals(anycastAddress, query.getAddress());
assertEquals(FQQN.toFullQN(anycastAddress, q), query.getName());
assertEquals(0, query.getMessageCount());
//try query again using qName
query = server.queueQuery(q);
assertEquals(q, query.getName());
}
} finally {
connection.close();
}
}
}

View File

@ -20,6 +20,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
@ -68,9 +69,13 @@ public class ProtonTestBase extends ActiveMQTestBase {
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
amqpServer.getConfiguration().getAddressesSettings().put("#", addressSettings);
configureServer(amqpServer.getConfiguration());
return amqpServer;
}
protected void configureServer(Configuration serverConfig) {
}
protected void configureAmqp(Map<String, Object> params) {
}

View File

@ -0,0 +1,216 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.client;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Before;
import org.junit.Test;
public class FullQualifiedQueueTest extends ActiveMQTestBase {
private SimpleString anycastAddress = new SimpleString("address.anycast");
private SimpleString multicastAddress = new SimpleString("address.multicast");
private SimpleString mixedAddress = new SimpleString("address.mixed");
private SimpleString anycastQ1 = new SimpleString("q1");
private SimpleString anycastQ2 = new SimpleString("q2");
private SimpleString anycastQ3 = new SimpleString("q3");
private SimpleString multicastQ1 = new SimpleString("q4");
private SimpleString multicastQ2 = new SimpleString("q5");
private SimpleString multicastQ3 = new SimpleString("q6");
private ActiveMQServer server;
private ServerLocator locator;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
server = createServer(false, true);
server.start();
locator = createNettyNonHALocator();
}
@Test
public void testMixedQueues() throws Exception {
server.createQueue(mixedAddress, RoutingType.MULTICAST, multicastQ1, null, true, false, -1, false, true);
server.createQueue(mixedAddress, RoutingType.MULTICAST, multicastQ2, null, true, false, -1, false, true);
server.createQueue(mixedAddress, RoutingType.MULTICAST, multicastQ3, null, true, false, -1, false, true);
server.createQueue(mixedAddress, RoutingType.ANYCAST, anycastQ1, null, true, false, -1, false, true);
server.createQueue(mixedAddress, RoutingType.ANYCAST, anycastQ2, null, true, false, -1, false, true);
server.createQueue(mixedAddress, RoutingType.ANYCAST, anycastQ3, null, true, false, -1, false, true);
ClientSessionFactory cf = createSessionFactory(locator);
ClientSession session = cf.createSession();
session.start();
//send 3 messages
ClientProducer producer = session.createProducer(mixedAddress);
final int num = 3;
sendMessages(session, producer, num);
ClientConsumer consumer1 = session.createConsumer(toFullQN(mixedAddress, anycastQ1));
ClientConsumer consumer2 = session.createConsumer(toFullQN(mixedAddress, anycastQ2));
ClientConsumer consumer3 = session.createConsumer(toFullQN(mixedAddress, anycastQ3));
ClientConsumer consumer4 = session.createConsumer(toFullQN(mixedAddress, multicastQ1));
ClientConsumer consumer5 = session.createConsumer(toFullQN(mixedAddress, multicastQ2));
ClientConsumer consumer6 = session.createConsumer(toFullQN(mixedAddress, multicastQ3));
session.start();
//each anycast consumer receives one, each multicast receives three.
ClientMessage m = consumer1.receive(2000);
assertNotNull(m);
System.out.println("consumer1 : " + m);
m.acknowledge();
m = consumer2.receive(2000);
assertNotNull(m);
System.out.println("consumer2 : " + m);
m.acknowledge();
m = consumer3.receive(2000);
assertNotNull(m);
System.out.println("consumer3 : " + m);
m.acknowledge();
for (int i = 0; i < num; i++) {
m = consumer4.receive(2000);
assertNotNull(m);
System.out.println("consumer4 : " + m);
m.acknowledge();
m = consumer5.receive(2000);
assertNotNull(m);
System.out.println("consumer5 : " + m);
m.acknowledge();
m = consumer6.receive(2000);
assertNotNull(m);
System.out.println("consumer6 : " + m);
m.acknowledge();
}
session.commit();
//queues are empty now
for (SimpleString q : new SimpleString[]{anycastQ1, anycastQ2, anycastQ3, multicastQ1, multicastQ2, multicastQ3}) {
QueueQueryResult query = server.queueQuery(toFullQN(mixedAddress, q));
assertTrue(query.isExists());
assertEquals(mixedAddress, query.getAddress());
assertEquals(toFullQN(mixedAddress, q), query.getName());
assertEquals(0, query.getMessageCount());
}
}
@Test
public void testMulticastQueues() throws Exception {
server.createQueue(multicastAddress, RoutingType.MULTICAST, multicastQ1, null, true, false, -1, false, true);
server.createQueue(multicastAddress, RoutingType.MULTICAST, multicastQ2, null, true, false, -1, false, true);
server.createQueue(multicastAddress, RoutingType.MULTICAST, multicastQ3, null, true, false, -1, false, true);
ClientSessionFactory cf = createSessionFactory(locator);
ClientSession session = cf.createSession();
session.start();
//send 3 messages
ClientProducer producer = session.createProducer(multicastAddress);
sendMessages(session, producer, 1);
ClientConsumer consumer1 = session.createConsumer(toFullQN(multicastAddress, multicastQ1));
ClientConsumer consumer2 = session.createConsumer(toFullQN(multicastAddress, multicastQ2));
ClientConsumer consumer3 = session.createConsumer(toFullQN(multicastAddress, multicastQ3));
session.start();
//each consumer receives one
ClientMessage m = consumer1.receive(2000);
assertNotNull(m);
m.acknowledge();
m = consumer2.receive(2000);
assertNotNull(m);
m.acknowledge();
m = consumer3.receive(2000);
assertNotNull(m);
m.acknowledge();
session.commit();
//queues are empty now
for (SimpleString q : new SimpleString[]{multicastQ1, multicastQ2, multicastQ3}) {
QueueQueryResult query = server.queueQuery(toFullQN(multicastAddress, q));
assertTrue(query.isExists());
assertEquals(multicastAddress, query.getAddress());
assertEquals(toFullQN(multicastAddress, q), query.getName());
assertEquals(0, query.getMessageCount());
}
}
@Test
public void testAnycastQueues() throws Exception {
server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ1, null, true, false, -1, false, true);
server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ2, null, true, false, -1, false, true);
server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ3, null, true, false, -1, false, true);
ClientSessionFactory cf = createSessionFactory(locator);
ClientSession session = cf.createSession();
session.start();
//send 3 messages
ClientProducer producer = session.createProducer(anycastAddress);
sendMessages(session, producer, 3);
ClientConsumer consumer1 = session.createConsumer(toFullQN(anycastAddress, anycastQ1));
ClientConsumer consumer2 = session.createConsumer(toFullQN(anycastAddress, anycastQ2));
ClientConsumer consumer3 = session.createConsumer(toFullQN(anycastAddress, anycastQ3));
session.start();
//each consumer receives one
ClientMessage m = consumer1.receive(2000);
assertNotNull(m);
m.acknowledge();
m = consumer2.receive(2000);
assertNotNull(m);
m.acknowledge();
m = consumer3.receive(2000);
assertNotNull(m);
m.acknowledge();
session.commit();
//queues are empty now
for (SimpleString q : new SimpleString[]{anycastQ1, anycastQ2, anycastQ3}) {
QueueQueryResult query = server.queueQuery(toFullQN(anycastAddress, q));
assertTrue(query.isExists());
assertEquals(anycastAddress, query.getAddress());
assertEquals(toFullQN(anycastAddress, q), query.getName());
assertEquals(0, query.getMessageCount());
}
}
private SimpleString toFullQN(SimpleString address, SimpleString qName) {
return address.concat("::").concat(qName);
}
}

View File

@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.mqtt.imported;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MQTTFQQNTest extends MQTTTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(MQTTFQQNTest.class);
@Override
@Before
public void setUp() throws Exception {
Field sessions = MQTTSession.class.getDeclaredField("SESSIONS");
sessions.setAccessible(true);
sessions.set(null, new ConcurrentHashMap<>());
Field connectedClients = MQTTConnectionManager.class.getDeclaredField("CONNECTED_CLIENTS");
connectedClients.setAccessible(true);
connectedClients.set(null, new ConcurrentHashSet<>());
super.setUp();
}
@Override
@After
public void tearDown() throws Exception {
super.tearDown();
}
@Test
public void testMQTTSubNames() throws Exception {
final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
initializeConnection(subscriptionProvider);
try {
subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE);
Map<SimpleString, Binding> allBindings = server.getPostOffice().getAllBindings();
assertEquals(1, allBindings.size());
Binding b = allBindings.values().iterator().next();
//check that query using bare queue name works as before
QueueQueryResult result = server.queueQuery(b.getUniqueName());
assertTrue(result.isExists());
assertEquals(result.getAddress(), new SimpleString("foo.bah"));
assertEquals(b.getUniqueName(), result.getName());
//check that queue query using FQQN returns FQQN
result = server.queueQuery(new SimpleString("foo.bah::" + b.getUniqueName()));
assertTrue(result.isExists());
assertEquals(new SimpleString("foo.bah"), result.getAddress());
assertEquals(new SimpleString("foo.bah::" + b.getUniqueName()), result.getName());
} finally {
subscriptionProvider.disconnect();
}
}
}

View File

@ -0,0 +1,223 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.openwire;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.tests.util.FQQN;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.Collection;
/**
* Verify FQQN queues work with openwire/artemis JMS API
*/
@RunWith(Parameterized.class)
public class FQQNOpenWireTest extends OpenWireTestBase {
protected static final String urlString = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.cacheEnabled=true";
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{{"OpenWire"}, {"Artemis"}});
}
private SimpleString anycastAddress = new SimpleString("address.anycast");
private SimpleString multicastAddress = new SimpleString("address.multicast");
private SimpleString anycastQ1 = new SimpleString("q1");
private SimpleString anycastQ2 = new SimpleString("q2");
private SimpleString anycastQ3 = new SimpleString("q3");
private ConnectionFactory factory;
private ServerLocator locator;
public FQQNOpenWireTest(String factoryType) {
if ("OpenWire".equals(factoryType)) {
factory = new ActiveMQConnectionFactory(urlString);
} else if ("Artemis".equals(factoryType)) {
factory = new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory(urlString);
}
}
@Test
//there isn't much use of FQQN for topics
//however we can test query functionality
public void testTopic() throws Exception {
Connection connection = factory.createConnection();
try {
connection.setClientID("FQQNconn");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(multicastAddress.toString());
MessageConsumer consumer1 = session.createConsumer(topic);
MessageConsumer consumer2 = session.createConsumer(topic);
MessageConsumer consumer3 = session.createConsumer(topic);
MessageProducer producer = session.createProducer(topic);
producer.send(session.createMessage());
//each consumer receives one
Message m = consumer1.receive(2000);
assertNotNull(m);
m = consumer2.receive(2000);
assertNotNull(m);
m = consumer3.receive(2000);
assertNotNull(m);
Bindings bindings = server.getPostOffice().getBindingsForAddress(multicastAddress);
for (Binding b : bindings.getBindings()) {
System.out.println("checking binidng " + b.getUniqueName() + " " + ((LocalQueueBinding)b).getQueue().getDeliveringMessages());
SimpleString qName = b.getUniqueName();
//do FQQN query
QueueQueryResult result = server.queueQuery(FQQN.toFullQN(multicastAddress, qName));
assertTrue(result.isExists());
assertEquals(result.getName(), FQQN.toFullQN(multicastAddress, qName));
//do qname query
result = server.queueQuery(qName);
assertTrue(result.isExists());
assertEquals(result.getName(), qName);
}
} finally {
connection.close();
}
}
@Test
//jms queues know no addresses, this test only shows
//that it is possible for jms clients to receive from
//core queues by its FQQN.
public void testQueue() throws Exception {
server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ1, null, true, false, -1, false, true);
server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ2, null, true, false, -1, false, true);
server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ3, null, true, false, -1, false, true);
Connection connection = factory.createConnection();
try {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue q1 = session.createQueue(FQQN.toFullQN(anycastAddress, anycastQ1).toString());
Queue q2 = session.createQueue(FQQN.toFullQN(anycastAddress, anycastQ2).toString());
Queue q3 = session.createQueue(FQQN.toFullQN(anycastAddress, anycastQ3).toString());
//send 3 messages to anycastAddress
locator = createNonHALocator(true);
ClientSessionFactory cf = createSessionFactory(locator);
ClientSession coreSession = cf.createSession();
//send 3 messages
ClientProducer coreProducer = coreSession.createProducer(anycastAddress);
sendMessages(coreSession, coreProducer, 3);
System.out.println("Queue is: " + q1);
MessageConsumer consumer1 = session.createConsumer(q1);
MessageConsumer consumer2 = session.createConsumer(q2);
MessageConsumer consumer3 = session.createConsumer(q3);
//each consumer receives one
assertNotNull(consumer1.receive(2000));
assertNotNull(consumer2.receive(2000));
assertNotNull(consumer3.receive(2000));
connection.close();
//queues are empty now
for (SimpleString q : new SimpleString[]{anycastQ1, anycastQ2, anycastQ3}) {
//FQQN query
QueueQueryResult query = server.queueQuery(FQQN.toFullQN(anycastAddress, q));
assertTrue(query.isExists());
assertEquals(anycastAddress, query.getAddress());
assertEquals(FQQN.toFullQN(anycastAddress, q), query.getName());
assertEquals(0, query.getMessageCount());
//try query again using qName
query = server.queueQuery(q);
assertEquals(q, query.getName());
}
} finally {
connection.close();
if (locator != null) {
locator.close();
}
}
}
@Test
public void testFQNConsumer() throws Exception {
Connection exConn = null;
SimpleString durableQueue = new SimpleString("myqueue");
this.server.createQueue(durableQueue, RoutingType.ANYCAST, durableQueue, null, true, false, -1, false, true);
try {
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
exConn = exFact.createConnection();
exConn.start();
Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(durableQueue.toString());
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("This is a text message");
producer.send(message);
Destination destinationFQN = session.createQueue(FQQN.toFullQN(durableQueue, durableQueue).toString());
MessageConsumer messageConsumer = session.createConsumer(destinationFQN);
TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
assertEquals("This is a text message", messageReceived.getText());
} finally {
if (exConn != null) {
exConn.close();
}
}
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.stomp;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class FQQNStompTest extends StompTestBase {
private StompClientConnection conn;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
conn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port);
QueueQueryResult result = server.getActiveMQServer().queueQuery(new SimpleString(getQueueName()));
assertTrue(result.isExists());
System.out.println("address: " + result.getAddress() + " queu " + result.getName());
}
@Override
@After
public void tearDown() throws Exception {
try {
boolean connected = conn != null && conn.isConnected();
if (connected) {
try {
conn.disconnect();
} catch (Exception e) {
}
}
} finally {
super.tearDown();
}
}
@Test
//to receive from a FQQN queue like testQueue::testQueue
//special care is needed as ":" is a reserved character
//in STOMP. Clients need to escape it.
public void testReceiveFQQN() throws Exception {
conn.connect(defUser, defPass);
subscribeQueue(conn, "sub-01", getQueueName() + "\\c\\c" + getQueueName());
sendJmsMessage("Hello World!");
ClientStompFrame frame = conn.receiveFrame(2000);
assertNotNull(frame);
assertEquals("Hello World!", frame.getBody());
System.out.println("frame: " + frame);
unsubscribe(conn, "sub-01");
}
}

View File

@ -355,6 +355,10 @@ public abstract class StompTestBase extends ActiveMQTestBase {
return subscribe(conn, subscriptionId, ack, durableId, selector, getQueuePrefix() + getQueueName(), receipt);
}
public void subscribeQueue(StompClientConnection conn, String subId, String destination) throws IOException, InterruptedException {
subscribe(conn, subId, Stomp.Headers.Subscribe.AckModeValues.AUTO, null, null, destination, false);
}
public ClientStompFrame subscribe(StompClientConnection conn,
String subscriptionId,
String ack,

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.util;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.utils.CompositeAddress;
public final class FQQN {
public static SimpleString toFullQN(SimpleString address, SimpleString qName) {
return address.concat(CompositeAddress.SEPARATOR).concat(qName);
}
public static String toFullQN(String address, String qName) {
return address + CompositeAddress.SEPARATOR + qName;
}
}

View File

@ -69,6 +69,36 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
assertEquals("Exception happened during the process", 0, errors);
}
@Test
public void testUnitOnWildCardFailingScenarioFQQN() throws Exception {
int errors = 0;
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake());
ad.addBinding(new BindingFake("Topic1", "Topic1"));
ad.addBinding(new BindingFake("Topic1", "one"));
ad.addBinding(new BindingFake("*", "two"));
ad.removeBinding(SimpleString.toSimpleString("Topic1::one"), null);
try {
ad.removeBinding(SimpleString.toSimpleString("*::two"), null);
} catch (Throwable e) {
// We are not failing the test here as this test is replicating the exact scenario
// that was happening under https://issues.jboss.org/browse/HORNETQ-988
// In which this would be ignored
errors++;
e.printStackTrace();
}
try {
ad.addBinding(new BindingFake("Topic1", "three"));
} catch (Throwable e) {
// We are not failing the test here as this test is replicating the exact scenario
// that was happening under https://issues.jboss.org/browse/HORNETQ-988
// In which this would be ignored
errors++;
e.printStackTrace();
}
assertEquals("Exception happened during the process", 0, errors);
}
class BindingFactoryFake implements BindingsFactory {
@Override