ARTEMIS-877 Add Consumer support for AMQP for new addressing schema

This commit is contained in:
Andy Taylor 2016-12-03 09:03:43 +00:00 committed by Martyn Taylor
parent a182a135e9
commit 224f62b295
39 changed files with 1618 additions and 210 deletions

View File

@ -44,30 +44,6 @@ import static org.junit.Assert.fail;
public class FileBrokerTest {
@Test
public void startWithJMS() throws Exception {
ServerDTO serverDTO = new ServerDTO();
serverDTO.configuration = "broker.xml";
FileBroker broker = null;
try {
broker = new FileBroker(serverDTO, new ActiveMQJAASSecurityManager());
broker.start();
JMSServerManagerImpl jmsServerManager = (JMSServerManagerImpl) broker.getComponents().get("jms");
Assert.assertNotNull(jmsServerManager);
Assert.assertTrue(jmsServerManager.isStarted());
//this tells us the jms server is activated
Assert.assertTrue(jmsServerManager.getJMSStorageManager().isStarted());
ActiveMQServerImpl activeMQServer = (ActiveMQServerImpl) broker.getComponents().get("core");
Assert.assertNotNull(activeMQServer);
Assert.assertTrue(activeMQServer.isStarted());
Assert.assertTrue(broker.isStarted());
} finally {
if (broker != null) {
broker.stop();
}
}
}
@Test
public void startWithoutJMS() throws Exception {
ServerDTO serverDTO = new ServerDTO();

View File

@ -451,10 +451,29 @@ public interface ActiveMQServerControl {
* @param address address to bind the queue to
* @param name name of the queue
*/
@Deprecated
@Operation(desc = "Create a queue with the specified address", impact = MBeanOperationInfo.ACTION)
void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
@Parameter(name = "name", desc = "Name of the queue") String name) throws Exception;
/**
* Create a durable queue.
* <br>
* If {@code address} is {@code null} it will be defaulted to {@code name}.
* <br>
* This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits.
*
* @param address address to bind the queue to
* @param name name of the queue
* @param routingType The routing type used for this address, MULTICAST or ANYCAST
*/
@Operation(desc = "Create a queue with the specified address", impact = MBeanOperationInfo.ACTION)
void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
@Parameter(name = "name", desc = "Name of the queue") String name,
@Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType) throws Exception;
/**
* Create a queue.
* <br>
@ -466,11 +485,30 @@ public interface ActiveMQServerControl {
* @param name name of the queue
* @param durable whether the queue is durable
*/
@Deprecated
@Operation(desc = "Create a queue with the specified address, name and durability", impact = MBeanOperationInfo.ACTION)
void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
@Parameter(name = "name", desc = "Name of the queue") String name,
@Parameter(name = "durable", desc = "Is the queue durable?") boolean durable) throws Exception;
/**
* Create a queue.
* <br>
* If {@code address} is {@code null} it will be defaulted to {@code name}.
* <br>
* This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits.
*
* @param address address to bind the queue to
* @param name name of the queue
* @param durable whether the queue is durable
* @param routingType The routing type used for this address, MULTICAST or ANYCAST
*/
@Operation(desc = "Create a queue with the specified address, name and durability", impact = MBeanOperationInfo.ACTION)
void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
@Parameter(name = "name", desc = "Name of the queue") String name,
@Parameter(name = "durable", desc = "Is the queue durable?") boolean durable,
@Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType) throws Exception;
/**
* Create a queue.
* <br>
@ -489,6 +527,27 @@ public interface ActiveMQServerControl {
@Parameter(name = "filter", desc = "Filter of the queue") String filter,
@Parameter(name = "durable", desc = "Is the queue durable?") boolean durable) throws Exception;
/**
* Create a queue.
* <br>
* If {@code address} is {@code null} it will be defaulted to {@code name}.
* <br>
* This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits.
*
* @param address address to bind the queue to
* @param name name of the queue
* @param filter of the queue
* @param durable whether the queue is durable
* @param routingType The routing type used for this address, MULTICAST or ANYCAST
*/
@Operation(desc = "Create a queue", impact = MBeanOperationInfo.ACTION)
void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
@Parameter(name = "name", desc = "Name of the queue") String name,
@Parameter(name = "filter", desc = "Filter of the queue") String filter,
@Parameter(name = "durable", desc = "Is the queue durable?") boolean durable,
@Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType) throws Exception;
/**
* Create a queue.
* <br>

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server;
import org.apache.activemq.artemis.api.core.SimpleString;
import java.util.Set;
public class AddressQueryResult {
private final SimpleString name;
private final Set<RoutingType> routingTypes;
private final long id;
private final boolean autoCreated;
private final boolean exists;
private final boolean autoCreateAddresses;
public AddressQueryResult(SimpleString name, Set<RoutingType> routingTypes, long id, boolean autoCreated, boolean exists, boolean autoCreateAddresses) {
this.name = name;
this.routingTypes = routingTypes;
this.id = id;
this.autoCreated = autoCreated;
this.exists = exists;
this.autoCreateAddresses = autoCreateAddresses;
}
public SimpleString getName() {
return name;
}
public Set<RoutingType> getRoutingTypes() {
return routingTypes;
}
public long getId() {
return id;
}
public boolean isAutoCreated() {
return autoCreated;
}
public boolean isExists() {
return exists;
}
public boolean isAutoCreateAddresses() {
return autoCreateAddresses;
}
}

View File

@ -28,12 +28,15 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
@ -192,28 +195,40 @@ public class AMQPSessionCallback implements SessionCallback {
serverConsumer.receiveCredits(-1);
}
public void createTemporaryQueue(String queueName) throws Exception {
serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), null, true, false);
public void createTemporaryQueue(String queueName, RoutingType routingType) throws Exception {
serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), routingType, null, true, false);
}
public void createTemporaryQueue(String address, String queueName, String filter) throws Exception {
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), true, false);
public void createTemporaryQueue(String address, String queueName, RoutingType routingType, String filter) throws Exception {
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), true, false);
}
public void createDurableQueue(String address, String queueName, String filter) throws Exception {
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), false, true);
public void createUnsharedDurableQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception {
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, 1, false, false);
}
public QueueQueryResult queueQuery(String queueName, boolean autoCreate) throws Exception {
public void createSharedDurableQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception {
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, -1, false, false);
}
public void createSharedVolatileQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception {
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, false, -1, true, true);
}
public QueueQueryResult queueQuery(String queueName, RoutingType routingType, boolean autoCreate) throws Exception {
QueueQueryResult queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName));
if (!queueQueryResult.isExists() && queueQueryResult.isAutoCreateQueues() && autoCreate) {
try {
serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), null, false, true);
serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), routingType, null, false, true);
} catch (ActiveMQQueueExistsException e) {
// The queue may have been created by another thread in the mean time. Catch and do nothing.
}
queueQueryResult = new QueueQueryResult(queueQueryResult.getName(), queueQueryResult.getAddress(), queueQueryResult.isDurable(), queueQueryResult.isTemporary(), queueQueryResult.getFilterString(), queueQueryResult.getConsumerCount(), queueQueryResult.getMessageCount(), queueQueryResult.isAutoCreateQueues(), true, queueQueryResult.isAutoCreated(), queueQueryResult.isDeleteOnNoConsumers(), queueQueryResult.getRoutingType(), queueQueryResult.getMaxConsumers());
queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName));
}
if (queueQueryResult.getRoutingType() != routingType) {
throw new IllegalStateException("Incorrect Routing Type for queue, expecting: " + routingType);
}
return queueQueryResult;
}
@ -231,6 +246,20 @@ public class AMQPSessionCallback implements SessionCallback {
return bindingQueryResult.isExists();
}
public AddressQueryResult addressQuery(String addressName, RoutingType routingType, boolean autoCreate) throws Exception {
AddressQueryResult addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName));
if (!addressQueryResult.isExists() && addressQueryResult.isAutoCreateAddresses() && autoCreate) {
try {
serverSession.createAddress(SimpleString.toSimpleString(addressName), routingType, true);
} catch (ActiveMQQueueExistsException e) {
// The queue may have been created by another thread in the mean time. Catch and do nothing.
}
addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName));
}
return addressQueryResult;
}
public void closeSender(final Object brokerConsumer) throws Exception {
final ServerConsumer consumer = ((ServerConsumer) brokerConsumer);
@ -522,4 +551,21 @@ public class AMQPSessionCallback implements SessionCallback {
protonSPI.removeTransaction(txid);
}
public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception {
return serverSession.getMatchingQueue(address, routingType);
}
public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception {
return serverSession.getMatchingQueue(address, queueName, routingType);
}
public AddressInfo getAddress(SimpleString address) {
return serverSession.getAddress(address);
}
public void removeTemporaryQueue(String address) throws Exception {
serverSession.deleteQueue(SimpleString.toSimpleString(address));
}
}

View File

@ -55,6 +55,7 @@ public class AmqpSupport {
public static final Symbol PLATFORM = Symbol.valueOf("platform");
public static final Symbol RESOURCE_DELETED = Symbol.valueOf("amqp:resource-deleted");
public static final Symbol CONNECTION_FORCED = Symbol.valueOf("amqp:connection:forced");
public static final Symbol SHARED_SUBS = Symbol.valueOf("SHARED-SUBS");
// Symbols used in configuration of newly opened links.

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.proton;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
@ -27,6 +28,7 @@ import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMess
import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
@ -55,6 +57,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
// Used by the broker to decide when to refresh clients credit. This is not used when client requests credit.
private static int minCreditRefresh = 30;
private TerminusExpiryPolicy expiryPolicy;
public ProtonServerReceiverContext(AMQPSessionCallback sessionSPI,
AMQPConnectionContext connection,
@ -83,10 +86,11 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
address = sessionSPI.tempQueueName();
try {
sessionSPI.createTemporaryQueue(address);
sessionSPI.createTemporaryQueue(address, RoutingType.ANYCAST);
} catch (Exception e) {
throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
}
expiryPolicy = target.getExpiryPolicy() != null ? target.getExpiryPolicy() : TerminusExpiryPolicy.LINK_DETACH;
target.setAddress(address);
} else {
//if not dynamic then we use the targets address as the address to forward the messages to, however there has to
@ -165,6 +169,14 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
@Override
public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
protonSession.removeReceiver(receiver);
org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
if (target != null && target.getDynamic() && (target.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || target.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {
try {
sessionSPI.removeTemporaryQueue(target.getAddress());
} catch (Exception e) {
//ignore on close, its temp anyway and will be removed later
}
}
}
@Override

View File

@ -19,10 +19,16 @@ package org.apache.activemq.artemis.protocol.amqp.proton;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
@ -30,6 +36,7 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPExceptio
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
@ -66,6 +73,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
private static final Symbol COPY = Symbol.valueOf("copy");
private static final Symbol TOPIC = Symbol.valueOf("topic");
private static final Symbol QUEUE = Symbol.valueOf("queue");
private static final Symbol SHARED = Symbol.valueOf("shared");
private static final Symbol GLOBAL = Symbol.valueOf("global");
private Object brokerConsumer;
@ -74,7 +84,14 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
protected final AMQPConnectionContext connection;
protected boolean closed = false;
protected final AMQPSessionCallback sessionSPI;
private boolean multicast;
//todo get this from somewhere
private RoutingType defaultRoutingType = RoutingType.ANYCAST;
protected CreditsSemaphore creditsSemaphore = new CreditsSemaphore(0);
private RoutingType routingTypeToUse = defaultRoutingType;
private boolean shared = false;
private boolean global = false;
private boolean isVolatile = false;
public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, AMQPSessionContext protonSession, AMQPSessionCallback server) {
super();
@ -127,7 +144,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
super.initialise();
Source source = (Source) sender.getRemoteSource();
String queue;
String queue = null;
String selector = null;
final Map<Symbol, Object> supportedFilters = new HashMap<>();
@ -148,32 +165,15 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
}
// if we have a capability for a Topic (AMQP -> JMS Mapping) or we are configured on this
// address to act like a topic then act like a subscription.
boolean isPubSub = hasCapabilities(TOPIC, source) || isPubSub(source);
if (isPubSub) {
Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS);
if (filter != null) {
String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
if (selector != null) {
selector += " AND " + noLocalFilter;
} else {
selector = noLocalFilter;
}
supportedFilters.put(filter.getKey(), filter.getValue());
}
}
if (source == null) {
// Attempt to recover a previous subscription happens when a link reattach happens on a
// subscription queue
String clientId = getClientId();
String pubId = sender.getName();
queue = createQueueName(clientId, pubId);
QueueQueryResult result = sessionSPI.queueQuery(queue, false);
queue = createQueueName(clientId, pubId, true, global, false);
QueueQueryResult result = sessionSPI.queueQuery(queue, RoutingType.MULTICAST, false);
multicast = true;
routingTypeToUse = RoutingType.MULTICAST;
// Once confirmed that the address exists we need to return a Source that reflects
// the lifetime policy and capabilities of the new subscription.
@ -222,23 +222,86 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// node is temporary and will be deleted on closing of the session
queue = java.util.UUID.randomUUID().toString();
try {
sessionSPI.createTemporaryQueue(queue);
sessionSPI.createTemporaryQueue(queue, RoutingType.ANYCAST);
// protonSession.getServerSession().createQueue(queue, queue, null, true, false);
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
}
source.setAddress(queue);
} else {
SimpleString addressToUse;
SimpleString queueNameToUse = null;
shared = hasCapabilities(SHARED, source);
global = hasCapabilities(GLOBAL, source);
//find out if we have an address made up of the address and queue name, if yes then set queue name
if (CompositeAddress.isFullyQualified(source.getAddress())) {
CompositeAddress compositeAddress = CompositeAddress.getQueueName(source.getAddress());
addressToUse = new SimpleString(compositeAddress.getAddress());
queueNameToUse = new SimpleString(compositeAddress.getQueueName());
} else {
addressToUse = new SimpleString(source.getAddress());
}
//check to see if the client has defined how we act
boolean clientDefined = hasCapabilities(TOPIC, source) || hasCapabilities(QUEUE, source);
if (clientDefined) {
multicast = hasCapabilities(TOPIC, source);
AddressInfo addressInfo = sessionSPI.getAddress(addressToUse);
Set<RoutingType> routingTypes = addressInfo.getRoutingTypes();
//if the client defines 1 routing type and the broker another then throw an exception
if (multicast && !routingTypes.contains(RoutingType.MULTICAST)) {
throw new ActiveMQAMQPIllegalStateException("Address is not configured for topic support");
} else if (!multicast && !routingTypes.contains(RoutingType.ANYCAST)) {
throw new ActiveMQAMQPIllegalStateException("Address is not configured for queue support");
}
} else {
//if not we look up the address
AddressQueryResult addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), defaultRoutingType, true);
if (!addressQueryResult.isExists()) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
}
Set<RoutingType> routingTypes = addressQueryResult.getRoutingTypes();
if (routingTypes.contains(RoutingType.MULTICAST) && routingTypes.size() == 1) {
multicast = true;
} else {
//todo add some checks if both routing types are supported
multicast = false;
}
}
routingTypeToUse = multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST;
// if not dynamic then we use the target's address as the address to forward the
// messages to, however there has to be a queue bound to it so we need to check this.
if (isPubSub) {
// if we are a subscription and durable create a durable queue using the container
// id and link name
if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) {
if (multicast) {
Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS);
if (filter != null) {
String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
if (selector != null) {
selector += " AND " + noLocalFilter;
} else {
selector = noLocalFilter;
}
supportedFilters.put(filter.getKey(), filter.getValue());
}
if (queueNameToUse != null) {
SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.MULTICAST );
queue = matchingAnycastQueue.toString();
}
//if the address specifies a broker configured queue then we always use this, treat it as a queue
if (queue != null) {
multicast = false;
} else if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) {
// if we are a subscription and durable create a durable queue using the container
// id and link name
String clientId = getClientId();
String pubId = sender.getName();
queue = createQueueName(clientId, pubId);
QueueQueryResult result = sessionSPI.queueQuery(queue, false);
queue = createQueueName(clientId, pubId, shared, global, false);
QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false);
if (result.isExists()) {
// If a client reattaches to a durable subscription with a different no-local
@ -248,25 +311,54 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
if (result.getConsumerCount() == 0) {
sessionSPI.deleteQueue(queue);
sessionSPI.createDurableQueue(source.getAddress(), queue, selector);
sessionSPI.createUnsharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
} else {
throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist");
}
}
} else {
sessionSPI.createDurableQueue(source.getAddress(), queue, selector);
if (shared) {
sessionSPI.createSharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
} else {
sessionSPI.createUnsharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
}
}
} else {
// otherwise we are a volatile subscription
queue = java.util.UUID.randomUUID().toString();
try {
sessionSPI.createTemporaryQueue(source.getAddress(), queue, selector);
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
isVolatile = true;
if (shared && sender.getName() != null) {
queue = createQueueName(getClientId(), sender.getName(), shared, global, isVolatile);
try {
sessionSPI.createSharedVolatileQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
} catch (ActiveMQQueueExistsException e) {
//this is ok, just means its shared
}
} else {
queue = java.util.UUID.randomUUID().toString();
try {
sessionSPI.createTemporaryQueue(source.getAddress(), queue, RoutingType.MULTICAST, selector);
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
}
}
}
} else {
queue = source.getAddress();
if (queueNameToUse != null) {
SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.ANYCAST);
if (matchingAnycastQueue != null) {
queue = matchingAnycastQueue.toString();
} else {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
}
} else {
SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, RoutingType.ANYCAST);
if (matchingAnycastQueue != null) {
queue = matchingAnycastQueue.toString();
} else {
queue = addressToUse.toString();
}
}
}
if (queue == null) {
@ -274,7 +366,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
try {
if (!sessionSPI.queueQuery(queue, !isPubSub).isExists()) {
if (!sessionSPI.queueQuery(queue, routingTypeToUse, !multicast).isExists()) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
}
} catch (ActiveMQAMQPNotFoundException e) {
@ -290,9 +382,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// have not honored what it asked for.
source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters);
boolean browseOnly = !isPubSub && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
boolean browseOnly = !multicast && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
try {
brokerConsumer = sessionSPI.createSender(this, queue, isPubSub ? null : selector, browseOnly);
brokerConsumer = sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly);
} catch (ActiveMQAMQPResourceLimitExceededException e1) {
throw new ActiveMQAMQPResourceLimitExceededException(e1.getMessage());
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage());
}
@ -302,10 +396,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
return connection.getRemoteContainer();
}
private boolean isPubSub(Source source) {
String pubSubPrefix = sessionSPI.getPubSubPrefix();
return source != null && pubSubPrefix != null && source.getAddress() != null && source.getAddress().startsWith(pubSubPrefix);
}
/*
* close the session
@ -341,23 +431,30 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// any durable resources for say pub subs
if (remoteLinkClose) {
Source source = (Source) sender.getSource();
if (source != null && source.getAddress() != null && (hasCapabilities(TOPIC, source) || isPubSub(source))) {
if (source != null && source.getAddress() != null && multicast) {
String queueName = source.getAddress();
QueueQueryResult result = sessionSPI.queueQuery(queueName, false);
QueueQueryResult result = sessionSPI.queueQuery(queueName, routingTypeToUse, false);
if (result.isExists() && source.getDynamic()) {
sessionSPI.deleteQueue(queueName);
} else {
String clientId = getClientId();
String pubId = sender.getName();
String queue = createQueueName(clientId, pubId);
result = sessionSPI.queueQuery(queue, false);
if (result.isExists()) {
if (result.getConsumerCount() > 0) {
System.out.println("error");
}
if (pubId.contains("|")) {
pubId = pubId.split("\\|")[0];
}
String queue = createQueueName(clientId, pubId, shared, global, isVolatile);
result = sessionSPI.queueQuery(queue, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false);
//only delete if it isn't volatile and has no consumers
if (result.isExists() && !isVolatile && result.getConsumerCount() == 0) {
sessionSPI.deleteQueue(queue);
}
}
} else if (source != null && source.getDynamic() && (source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {
try {
sessionSPI.removeTemporaryQueue(source.getAddress());
} catch (Exception e) {
//ignore on close, its temp anyway and will be removed later
}
}
}
} catch (Exception e) {
@ -521,7 +618,19 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
return false;
}
private static String createQueueName(String clientId, String pubId) {
return clientId + "." + pubId;
private static String createQueueName(String clientId, String pubId, boolean shared, boolean global, boolean isVolatile) {
String queue = clientId == null || clientId.isEmpty() ? pubId : clientId + "." + pubId;
if (shared) {
if (queue.contains("|")) {
queue = queue.split("\\|")[0];
}
if (isVolatile) {
queue += ":shared-volatile";
}
if (global) {
queue += ":global";
}
}
return queue;
}
}

View File

@ -22,7 +22,7 @@ import org.apache.qpid.proton.engine.Connection;
public class ExtCapability {
public static final Symbol[] capabilities = new Symbol[]{AmqpSupport.SOLE_CONNECTION_CAPABILITY, AmqpSupport.DELAYED_DELIVERY};
public static final Symbol[] capabilities = new Symbol[]{AmqpSupport.SOLE_CONNECTION_CAPABILITY, AmqpSupport.DELAYED_DELIVERY, AmqpSupport.SHARED_SUBS};
public static Symbol[] getCapabilities() {
return capabilities;

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.rest.test;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.jboss.resteasy.client.ClientRequest;
import org.jboss.resteasy.client.ClientResponse;
import org.jboss.resteasy.spi.Link;
@ -30,6 +31,7 @@ public class FindDestinationTest extends MessageTestBase {
@Test
public void testFindQueue() throws Exception {
String testName = "testFindQueue";
server.getActiveMQServer().createAddressInfo(new AddressInfo(SimpleString.toSimpleString(testName), RoutingType.MULTICAST));
server.getActiveMQServer().createQueue(new SimpleString(testName), RoutingType.MULTICAST, new SimpleString(testName), null, false, false);
ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/queues/" + testName));
@ -60,6 +62,7 @@ public class FindDestinationTest extends MessageTestBase {
@Test
public void testFindTopic() throws Exception {
server.getActiveMQServer().createAddressInfo(new AddressInfo(SimpleString.toSimpleString("testTopic"), RoutingType.MULTICAST));
server.getActiveMQServer().createQueue(new SimpleString("testTopic"), RoutingType.MULTICAST, new SimpleString("testTopic"), null, false, false);
ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/topics/testTopic"));

View File

@ -619,7 +619,6 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
}
}
@Deprecated
@Override
public void createQueue(final String address, final String name) throws Exception {
checkStarted();
@ -632,6 +631,18 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
}
}
@Override
public void createQueue(final String address, final String name, final String routingType) throws Exception {
checkStarted();
clearIO();
try {
server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), null, true, false);
} finally {
blockOnIO();
}
}
@Override
public void createQueue(final String address, final String name, final boolean durable) throws Exception {
checkStarted();
@ -644,6 +655,60 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
}
}
@Override
public void createQueue(final String address, final String name, final boolean durable, final String routingType) throws Exception {
checkStarted();
clearIO();
try {
server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), null, durable, false);
} finally {
blockOnIO();
}
}
@Override
public void createQueue(final String address,
final String name,
final String filterStr,
final boolean durable) throws Exception {
checkStarted();
clearIO();
try {
SimpleString filter = null;
if (filterStr != null && !filterStr.trim().equals("")) {
filter = new SimpleString(filterStr);
}
server.createQueue(SimpleString.toSimpleString(address), ActiveMQDefaultConfiguration.getDefaultRoutingType(), new SimpleString(name), filter, durable, false);
} finally {
blockOnIO();
}
}
@Override
public void createQueue(final String address,
final String name,
final String filterStr,
final boolean durable,
final String routingType) throws Exception {
checkStarted();
clearIO();
try {
SimpleString filter = null;
if (filterStr != null && !filterStr.trim().equals("")) {
filter = new SimpleString(filterStr);
}
server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false);
} finally {
blockOnIO();
}
}
@Override
public void createQueue(String address,
String routingType,
@ -669,25 +734,6 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
}
}
@Override
public void createQueue(final String address,
final String name,
final String filterStr,
final boolean durable) throws Exception {
checkStarted();
clearIO();
try {
SimpleString filter = null;
if (filterStr != null && !filterStr.trim().equals("")) {
filter = new SimpleString(filterStr);
}
server.createQueue(SimpleString.toSimpleString(address), ActiveMQDefaultConfiguration.getDefaultRoutingType(), new SimpleString(name), filter, durable, false);
} finally {
blockOnIO();
}
}
@Override
public String[] getQueueNames() {
@ -1704,30 +1750,30 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
settings.add("expiryAddress", addressSettings.getExpiryAddress().toString());
}
return settings.add("expiryDelay", addressSettings.getExpiryDelay())
.add("maxDeliveryAttempts", addressSettings.getMaxDeliveryAttempts())
.add("pageCacheMaxSize", addressSettings.getPageCacheMaxSize())
.add("maxSizeBytes", addressSettings.getMaxSizeBytes())
.add("pageSizeBytes", addressSettings.getPageSizeBytes())
.add("redeliveryDelay", addressSettings.getRedeliveryDelay())
.add("redeliveryMultiplier", addressSettings.getRedeliveryMultiplier())
.add("maxRedeliveryDelay", addressSettings.getMaxRedeliveryDelay())
.add("redistributionDelay", addressSettings.getRedistributionDelay())
.add("lastValueQueue", addressSettings.isLastValueQueue())
.add("sendToDLAOnNoRoute", addressSettings.isSendToDLAOnNoRoute())
.add("addressFullMessagePolicy", policy)
.add("slowConsumerThreshold", addressSettings.getSlowConsumerThreshold())
.add("slowConsumerCheckPeriod", addressSettings.getSlowConsumerCheckPeriod())
.add("slowConsumerPolicy", consumerPolicy)
.add("autoCreateJmsQueues", addressSettings.isAutoCreateJmsQueues())
.add("autoCreateJmsTopics", addressSettings.isAutoCreateJmsTopics())
.add("autoDeleteJmsQueues", addressSettings.isAutoDeleteJmsQueues())
.add("autoDeleteJmsTopics", addressSettings.isAutoDeleteJmsQueues())
.add("autoCreateQueues", addressSettings.isAutoCreateQueues())
.add("autoDeleteQueues", addressSettings.isAutoDeleteQueues())
.add("autoCreateAddress", addressSettings.isAutoCreateAddresses())
.add("autoDeleteAddress", addressSettings.isAutoDeleteAddresses())
.build()
.toString();
.add("maxDeliveryAttempts", addressSettings.getMaxDeliveryAttempts())
.add("pageCacheMaxSize", addressSettings.getPageCacheMaxSize())
.add("maxSizeBytes", addressSettings.getMaxSizeBytes())
.add("pageSizeBytes", addressSettings.getPageSizeBytes())
.add("redeliveryDelay", addressSettings.getRedeliveryDelay())
.add("redeliveryMultiplier", addressSettings.getRedeliveryMultiplier())
.add("maxRedeliveryDelay", addressSettings.getMaxRedeliveryDelay())
.add("redistributionDelay", addressSettings.getRedistributionDelay())
.add("lastValueQueue", addressSettings.isLastValueQueue())
.add("sendToDLAOnNoRoute", addressSettings.isSendToDLAOnNoRoute())
.add("addressFullMessagePolicy", policy)
.add("slowConsumerThreshold", addressSettings.getSlowConsumerThreshold())
.add("slowConsumerCheckPeriod", addressSettings.getSlowConsumerCheckPeriod())
.add("slowConsumerPolicy", consumerPolicy)
.add("autoCreateJmsQueues", addressSettings.isAutoCreateJmsQueues())
.add("autoCreateJmsTopics", addressSettings.isAutoCreateJmsTopics())
.add("autoDeleteJmsQueues", addressSettings.isAutoDeleteJmsQueues())
.add("autoDeleteJmsTopics", addressSettings.isAutoDeleteJmsQueues())
.add("autoCreateQueues", addressSettings.isAutoCreateQueues())
.add("autoDeleteQueues", addressSettings.isAutoDeleteQueues())
.add("autoCreateAddress", addressSettings.isAutoCreateAddresses())
.add("autoDeleteAddress", addressSettings.isAutoDeleteAddresses())
.build()
.toString();
}
@Override

View File

@ -20,6 +20,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.transaction.Transaction;
@ -44,6 +45,10 @@ public interface AddressManager {
Bindings getMatchingBindings(SimpleString address) throws Exception;
SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception;
SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception;
void clear();
Binding getBinding(SimpleString queueName);
@ -59,4 +64,5 @@ public interface AddressManager {
AddressInfo removeAddressInfo(SimpleString address);
AddressInfo getAddressInfo(SimpleString address);
}

View File

@ -26,6 +26,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.transaction.Transaction;
@ -79,6 +80,10 @@ public interface PostOffice extends ActiveMQComponent {
Map<SimpleString, Binding> getAllBindings();
SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception;
SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception;
RoutingStatus route(ServerMessage message, boolean direct) throws Exception;
RoutingStatus route(ServerMessage message,
@ -119,6 +124,4 @@ public interface PostOffice extends ActiveMQComponent {
boolean isAddressBound(final SimpleString address) throws Exception;
Set<SimpleString> getAddresses();
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.postoffice.impl;
public class CompositeAddress {
public static String SEPARATOR = "::";
private final String address;
private final String queueName;
public String getAddress() {
return address;
}
public String getQueueName() {
return queueName;
}
public CompositeAddress(String address, String queueName) {
this.address = address;
this.queueName = queueName;
}
public static boolean isFullyQualified(String address) {
return address.toString().contains(SEPARATOR);
}
public static CompositeAddress getQueueName(String address) {
String[] split = address.split(SEPARATOR);
if (split.length <= 0) {
throw new IllegalStateException("Nott A Fully Qualified Name");
}
return new CompositeAddress(split[0], split[1]);
}
}

View File

@ -67,6 +67,7 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
@ -865,6 +866,16 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
return addressManager.getAddresses();
}
@Override
public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception {
return addressManager.getMatchingQueue(address, routingType);
}
@Override
public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception {
return addressManager.getMatchingQueue(address, queueName, routingType);
}
@Override
public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception {
// We send direct to the queue so we can send it to the same queue that is bound to the notifications address -

View File

@ -118,6 +118,37 @@ public class SimpleAddressManager implements AddressManager {
return bindings;
}
@Override
public SimpleString getMatchingQueue(final SimpleString address, RoutingType routingType) throws Exception {
Binding binding = nameMap.get(address);
if (binding == null || !(binding instanceof LocalQueueBinding)
|| !binding.getAddress().equals(address)) {
Bindings bindings = mappings.get(address);
if (bindings != null) {
for (Binding theBinding : bindings.getBindings()) {
if (theBinding instanceof LocalQueueBinding) {
binding = theBinding;
break;
}
}
}
}
return binding != null ? binding.getUniqueName() : null;
}
@Override
public SimpleString getMatchingQueue(final SimpleString address, final SimpleString queueName, RoutingType routingType) throws Exception {
Binding binding = nameMap.get(queueName);
if (binding != null && !binding.getAddress().equals(address)) {
throw new IllegalStateException("queue belongs to address" + binding.getAddress());
}
return binding != null ? binding.getUniqueName() : null;
}
@Override
public void clear() {
nameMap.clear();

View File

@ -344,6 +344,8 @@ public interface ActiveMQServer extends ActiveMQComponent {
QueueQueryResult queueQuery(SimpleString name) throws Exception;
AddressQueryResult addressQuery(SimpleString name) throws Exception;
Queue deployQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filterString,
boolean durable,
boolean temporary,

View File

@ -163,6 +163,8 @@ public interface ServerSession extends SecurityAuth {
QueueQueryResult executeQueueQuery(SimpleString name) throws Exception;
AddressQueryResult executeAddressQuery(SimpleString name) throws Exception;
BindingQueryResult executeBindingQuery(SimpleString address) throws Exception;
void closeConsumer(long consumerID) throws Exception;
@ -237,4 +239,10 @@ public interface ServerSession extends SecurityAuth {
List<MessageReference> getInTXMessagesForConsumer(long consumerId);
String getValidatedUser();
SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception;
SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception;
AddressInfo getAddress(SimpleString address);
}

View File

@ -720,7 +720,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
} else {
// Add binding in storage so the queue will get reloaded on startup and we can find it - it's never
// actually routed to at that address though
queue = server.createQueue(queueName, RoutingType.MULTICAST, queueName, null, true, false);
queue = server.createQueue(queueName, RoutingType.MULTICAST, queueName, null, true, false, -1, false, true);
}
// There are a few things that will behave differently when it's an internal queue

View File

@ -108,6 +108,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.Divert;
@ -747,6 +748,24 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return response;
}
@Override
public AddressQueryResult addressQuery(SimpleString name) throws Exception {
if (name == null) {
throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull();
}
boolean autoCreateAddresses = getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateAddresses();
AddressInfo addressInfo = postOffice.getAddressInfo(name);
AddressQueryResult response;
if (addressInfo != null) {
response = new AddressQueryResult(addressInfo.getName(), addressInfo.getRoutingTypes(), addressInfo.getId(), addressInfo.isAutoCreated(), true, autoCreateAddresses);
} else {
response = new AddressQueryResult(name, null, -1, false, false, autoCreateAddresses);
}
return response;
}
@Override
public void threadDump() {
StringWriter str = new StringWriter();
@ -1468,7 +1487,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final SimpleString filterString,
final boolean durable,
final boolean temporary) throws Exception {
return createQueue(address, routingType, queueName, filterString, null, durable, temporary, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), true);
return createQueue(address, routingType, queueName, filterString, null, durable, temporary, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), false);
}
@Override

View File

@ -65,6 +65,7 @@ import org.apache.activemq.artemis.core.security.SecurityStore;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
@ -707,6 +708,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
return server.queueQuery(name);
}
@Override
public AddressQueryResult executeAddressQuery(SimpleString name) throws Exception {
return server.addressQuery(name);
}
@Override
public BindingQueryResult executeBindingQuery(final SimpleString address) throws Exception {
return server.bindingQuery(address);
@ -1483,6 +1489,21 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
return validatedUser;
}
@Override
public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception {
return server.getPostOffice().getMatchingQueue(address, routingType);
}
@Override
public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception {
return server.getPostOffice().getMatchingQueue(address, queueName, routingType);
}
@Override
public AddressInfo getAddress(SimpleString address) {
return server.getPostOffice().getAddressInfo(address);
}
@Override
public String toString() {
StringBuffer buffer = new StringBuffer();

View File

@ -75,6 +75,44 @@ public class AmqpClient {
return connection;
}
/**
* Creates a connection with the broker at the given location, this method initiates a
* connect attempt immediately and will fail if the remote peer cannot be reached.
*
* @throws Exception if an error occurs attempting to connect to the Broker.
* @return a new connection object used to interact with the connected peer.
*/
public AmqpConnection connect(boolean noContainerId) throws Exception {
AmqpConnection connection = createConnection();
connection.setNoContainerID();
LOG.debug("Attempting to create new connection to peer: {}", remoteURI);
connection.connect();
return connection;
}
/**
* Creates a connection with the broker at the given location, this method initiates a
* connect attempt immediately and will fail if the remote peer cannot be reached.
*
* @throws Exception if an error occurs attempting to connect to the Broker.
* @return a new connection object used to interact with the connected peer.
*/
public AmqpConnection connect(String containerId) throws Exception {
AmqpConnection connection = createConnection();
connection.setContainerId(containerId);
LOG.debug("Attempting to create new connection to peer: {}", remoteURI);
connection.connect();
return connection;
}
/**
* Creates a connection object using the configured values for user, password, remote URI
* etc. This method does not immediately initiate a connection to the remote leaving that

View File

@ -104,6 +104,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
private long closeTimeout = DEFAULT_CLOSE_TIMEOUT;
private long drainTimeout = DEFAULT_DRAIN_TIMEOUT;
private boolean trace;
private boolean noContainerID = false;
public AmqpConnection(org.apache.activemq.transport.amqp.client.transport.NettyTransport transport,
String username,
@ -139,7 +140,9 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
serializer.execute(new Runnable() {
@Override
public void run() {
getEndpoint().setContainer(safeGetContainerId());
if (!noContainerID) {
getEndpoint().setContainer(safeGetContainerId());
}
getEndpoint().setHostname(remoteURI.getHost());
if (!getOfferedCapabilities().isEmpty()) {
getEndpoint().setOfferedCapabilities(getOfferedCapabilities().toArray(new Symbol[0]));
@ -735,4 +738,8 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
public String toString() {
return "AmqpConnection { " + connectionId + " }";
}
public void setNoContainerID() {
noContainerID = true;
}
}

View File

@ -271,7 +271,68 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
checkClosed();
final ClientFuture request = new ClientFuture();
final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, getNextReceiverId());
final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, receiverId);
connection.getScheduler().execute(new Runnable() {
@Override
public void run() {
checkClosed();
receiver.setStateInspector(getStateInspector());
receiver.open(request);
pumpToProtonTransport(request);
}
});
request.sync();
return receiver;
}
/**
* Create a receiver instance using the given Source
*
* @param source the caller created and configured Source used to create the receiver link.
* @return a newly created receiver that is ready for use.
* @throws Exception if an error occurs while creating the receiver.
*/
public AmqpReceiver createMulticastReceiver(Source source, String receiverId, String receiveName) throws Exception {
checkClosed();
final ClientFuture request = new ClientFuture();
final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, receiverId);
receiver.setSubscriptionName(receiveName);
connection.getScheduler().execute(new Runnable() {
@Override
public void run() {
checkClosed();
receiver.setStateInspector(getStateInspector());
receiver.open(request);
pumpToProtonTransport(request);
}
});
request.sync();
return receiver;
}
/**
* Create a receiver instance using the given Source
*
* @param source the caller created and configured Source used to create the receiver link.
* @return a newly created receiver that is ready for use.
* @throws Exception if an error occurs while creating the receiver.
*/
public AmqpReceiver createMulticastReceiver(String receiverId, String address, String receiveName) throws Exception {
checkClosed();
final ClientFuture request = new ClientFuture();
final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, address, receiverId);
receiver.setSubscriptionName(receiveName);
connection.getScheduler().execute(new Runnable() {

View File

@ -22,14 +22,21 @@ import java.util.LinkedList;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.server.JMSServerManager;
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.amqp.Symbol;
import org.junit.After;
import org.junit.Before;
@ -39,6 +46,10 @@ import org.junit.Before;
*/
public class AmqpClientTestSupport extends ActiveMQTestBase {
protected static Symbol SHARED = Symbol.getSymbol("shared");
protected static Symbol GLOBAL = Symbol.getSymbol("global");
private boolean useSSL;
protected JMSServerManager serverManager;
@ -86,6 +97,12 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
ActiveMQServer server = createServer(true, true);
serverManager = new JMSServerManagerImpl(server);
Configuration serverConfig = server.getConfiguration();
CoreAddressConfiguration address = new CoreAddressConfiguration();
address.setName(getTestName()).getRoutingTypes().add(RoutingType.ANYCAST);
CoreQueueConfiguration queueConfig = new CoreQueueConfiguration();
queueConfig.setName(getTestName()).setAddress(getTestName()).setRoutingType(RoutingType.ANYCAST);
address.getQueueConfigurations().add(queueConfig);
serverConfig.addAddressConfiguration(address);
serverConfig.getAddressesSettings().put("#", new AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")));
serverConfig.setSecurityEnabled(false);
serverManager.start();
@ -179,4 +196,19 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
public AmqpClient createAmqpClient(URI brokerURI, String username, String password) throws Exception {
return new AmqpClient(brokerURI, username, password);
}
protected void sendMessages(int numMessages, String address) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(address);
for (int i = 0; i < numMessages; i++) {
AmqpMessage message = new AmqpMessage();
message.setText("message-" + i);
sender.send(message);
}
sender.close();
connection.connect();
}
}

View File

@ -24,6 +24,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpFrameValidator;
@ -54,7 +56,8 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
@Override
public void setUp() throws Exception {
super.setUp();
server.createQueue(new SimpleString(getTopicName()), new SimpleString(getTopicName()), null, true, false);
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(getTopicName()), RoutingType.MULTICAST));
server.createQueue(new SimpleString(getTopicName()), RoutingType.MULTICAST, new SimpleString(getTopicName()), null, true, false);
}
@Test(timeout = 60000)
@ -371,6 +374,6 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
}
public String getTopicName() {
return "topic://myTopic";
return "myTopic";
}
}

View File

@ -111,8 +111,6 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
sender.close();
Thread.sleep(200);
queueView = getProxyToQueue(remoteTarget.getAddress());
assertNull(queueView);

View File

@ -20,7 +20,6 @@ package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
@ -28,7 +27,6 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Before;
import org.junit.Test;
/**
@ -36,11 +34,6 @@ import org.junit.Test;
*/
public class AmqpTransactionTest extends AmqpClientTestSupport {
@Before
public void createQueue() throws Exception {
server.createQueue(SimpleString.toSimpleString(getTestName()), SimpleString.toSimpleString(getTestName()), null, true, false);
}
@Test(timeout = 30000)
public void testBeginAndCommitTransaction() throws Exception {
AmqpClient client = createAmqpClient();

View File

@ -0,0 +1,240 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.QUEUE_CAPABILITY;
import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY;
public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport {
SimpleString address = new SimpleString("testAddress");
SimpleString queue1 = new SimpleString("queue1");
SimpleString queue2 = new SimpleString("queue2");
@Test(timeout = 60000)
public void testConsumeFromSingleQueueOnAddressSameName() throws Exception {
server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
server.createQueue(address, RoutingType.ANYCAST, address, null, true, false);
sendMessages(1, address.toString());
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(address.toString());
receiver.flow(1);
AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(address).getBindable()).getConsumerCount());
receiver.close();
connection.close();
}
@Test(timeout = 60000)
public void testConsumeFromSingleQueueOnAddressSameNameMultipleQueues() throws Exception {
server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false);
server.createQueue(address, RoutingType.ANYCAST, address, null, true, false);
sendMessages(2, address.toString());
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(address.toString());
receiver.flow(1);
AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(address).getBindable()).getConsumerCount());
assertEquals(0, ((QueueImpl)server.getPostOffice().getBinding(queue1).getBindable()).getConsumerCount());
receiver.close();
connection.close();
}
@Test(timeout = 60000)
public void testConsumeFromSingleQueueOnAddressDifferentName() throws Exception {
server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false);
sendMessages(1, address.toString());
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(address.toString());
receiver.flow(1);
AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(queue1).getBindable()).getConsumerCount());
receiver.close();
connection.close();
}
@Test(timeout = 60000)
public void testConsumeFromSingleQueueOnAddressDifferentNameMultipleQueues() throws Exception {
server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false);
server.createQueue(address, RoutingType.ANYCAST, queue2, null, true, false);
sendMessages(1, address.toString());
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(address.toString());
receiver.flow(1);
AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(queue1).getBindable()).getConsumerCount());
assertEquals(0, ((QueueImpl)server.getPostOffice().getBinding(queue2).getBindable()).getConsumerCount());
receiver.close();
connection.close();
}
@Test(timeout = 60000)
public void testConsumeFromSingleQualifiedQueueOnAddressSameName() throws Exception {
server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false);
sendMessages(1, address.toString());
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(address.toString() + "::" + queue1.toString());
receiver.flow(1);
AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(queue1).getBindable()).getConsumerCount());
receiver.close();
connection.close();
}
@Test(timeout = 60000)
public void testConsumeWhenOnlyMulticast() throws Exception {
server.createAddressInfo(new AddressInfo(address, RoutingType.MULTICAST));
sendMessages(1, address.toString());
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
Source jmsSource = createJmsSource(false);
jmsSource.setAddress(address.toString());
try {
session.createReceiver(jmsSource);
fail("should throw exception");
} catch (Exception e) {
//ignore
}
connection.close();
}
@Test(timeout = 60000)
public void testConsumeWhenNoAddressCreatedNoAutoCreate() throws Exception {
AddressSettings settings = new AddressSettings();
settings.setAutoCreateAddresses(false);
server.getAddressSettingsRepository().addMatch(address.toString(), settings);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
try {
session.createReceiver(address.toString());
fail("should throw exception");
} catch (Exception e) {
//ignore
}
connection.close();
}
@Test(timeout = 60000)
public void testConsumeWhenNoAddressCreatedAutoCreate() throws Exception {
AddressSettings settings = new AddressSettings();
settings.setAutoCreateAddresses(true);
server.getAddressSettingsRepository().addMatch(address.toString(), settings);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(address.toString());
sendMessages(1, address.toString());
receiver.flow(1);
AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(address).getBindable()).getConsumerCount());
connection.close();
}
@Test(timeout = 60000)
public void testConsumeWhenNoAddressHasBothRoutingTypesButDefaultQueueIsMultiCast() throws Exception {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.ANYCAST);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.createAddressInfo(addressInfo);
server.createQueue(address, RoutingType.MULTICAST, address, null, true, false);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
try {
session.createReceiver(address.toString());
fail("expected exception");
} catch (Exception e) {
//ignore
}
connection.close();
}
protected Source createJmsSource(boolean topic) {
Source source = new Source();
// Set the capability to indicate the node type being created
if (!topic) {
source.setCapabilities(QUEUE_CAPABILITY);
} else {
source.setCapabilities(TOPIC_CAPABILITY);
}
return source;
}
}

View File

@ -0,0 +1,119 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.QUEUE_CAPABILITY;
import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY;
public class BrokerDefinedMulticastConsumerTest extends AmqpClientTestSupport {
SimpleString address = new SimpleString("testAddress");
SimpleString queue1 = new SimpleString("queue1");
SimpleString queue2 = new SimpleString("queue2");
@Test(timeout = 60000)
public void testConsumeFromSingleQueueOnAddressSameName() throws Exception {
server.createAddressInfo(new AddressInfo(address, RoutingType.MULTICAST));
server.createQueue(address, RoutingType.MULTICAST, address, null, true, false);
sendMessages(1, address.toString());
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(address.toString() + "::" + address.toString());
receiver.flow(1);
AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(address).getBindable()).getConsumerCount());
receiver.close();
connection.close();
}
@Test(timeout = 60000)
public void testConsumeWhenOnlyAnycast() throws Exception {
server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
sendMessages(1, address.toString());
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
Source jmsSource = createJmsSource(true);
jmsSource.setAddress(address.toString());
try {
session.createReceiver(jmsSource);
fail("should throw exception");
} catch (Exception e) {
//ignore
}
connection.close();
}
@Test(timeout = 60000)
public void testConsumeWhenNoAddressHasBothRoutingTypesButDefaultQueueIsAnyCast() throws Exception {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
addressInfo.getRoutingTypes().add(RoutingType.ANYCAST);
server.createAddressInfo(addressInfo);
server.createQueue(address, RoutingType.MULTICAST, address, null, true, false);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
try {
session.createReceiver(address.toString());
fail("expected exception");
} catch (Exception e) {
//ignore
}
connection.close();
}
protected Source createJmsSource(boolean topic) {
Source source = new Source();
// Set the capability to indicate the node type being created
if (!topic) {
source.setCapabilities(QUEUE_CAPABILITY);
} else {
source.setCapabilities(TOPIC_CAPABILITY);
}
return source;
}
}

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
public class ClientDefinedAnycastConsumerTest extends AmqpClientTestSupport {
SimpleString address = new SimpleString("testAddress");
@Test(timeout = 60000)
public void testConsumeFromSingleQueueOnAddressSameName() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(address.toString());
sendMessages(1, address.toString());
receiver.flow(1);
AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(address).getBindable()).getConsumerCount());
receiver.close();
connection.close();
}
}

View File

@ -0,0 +1,327 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY;
public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
SimpleString address = new SimpleString("testAddress");
@Test(timeout = 60000)
public void test2ConsumersOnSharedVolatileAddress() throws Exception {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.createAddressInfo(addressInfo);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId"));
AmqpSession session = connection.createSession();
Source source = createSharedSource(TerminusDurability.NONE);
AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
receiver.flow(1);
receiver2.flow(1);
sendMessages(2, address.toString());
AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount());
receiver.close();
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
receiver2.close();
//check its been deleted
assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
connection.close();
}
@Test(timeout = 60000)
public void test2ConsumersOnSharedVolatileAddressBrokerDefined() throws Exception {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.createAddressInfo(addressInfo);
server.createQueue(address, RoutingType.MULTICAST, SimpleString.toSimpleString("myClientId.mySub:shared-volatile"), null, true, false, -1, false, false);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId"));
AmqpSession session = connection.createSession();
Source source = createSharedSource(TerminusDurability.NONE);
AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|1");
receiver.flow(1);
receiver2.flow(1);
sendMessages(2, address.toString());
AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount());
receiver.close();
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
receiver2.close();
//check its **Hasn't** been deleted
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
connection.close();
}
@Test(timeout = 60000)
public void test2ConsumersOnSharedVolatileAddressNoReceiverClose() throws Exception {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.createAddressInfo(addressInfo);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId"));
AmqpSession session = connection.createSession();
Source source = createSharedSource(TerminusDurability.NONE);
AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
receiver.flow(1);
receiver2.flow(1);
sendMessages(2, address.toString());
AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount());
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
//check its been deleted
connection.close();
assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
}
@Test(timeout = 60000)
public void test2ConsumersOnSharedVolatileAddressGlobal() throws Exception {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.createAddressInfo(addressInfo);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect(false));
AmqpSession session = connection.createSession();
Source source = createSharedGlobalSource(TerminusDurability.NONE);
AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
receiver.flow(1);
receiver2.flow(1);
sendMessages(2, address.toString());
AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")).getBindable()).getConsumerCount());
receiver.close();
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")));
receiver2.close();
//check its been deleted
assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")));
connection.close();
}
@Test(timeout = 60000)
public void test2ConsumersOnSharedDurableAddress() throws Exception {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.createAddressInfo(addressInfo);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId"));
AmqpSession session = connection.createSession();
Source source = createSharedSource(TerminusDurability.CONFIGURATION);
AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
receiver.flow(1);
receiver2.flow(1);
sendMessages(2, address.toString());
AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")).getBindable()).getConsumerCount());
receiver.close();
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")));
receiver2.close();
//check its been deleted
assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")));
connection.close();
}
@Test(timeout = 60000)
public void test2ConsumersOnSharedDurableAddressReconnect() throws Exception {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.createAddressInfo(addressInfo);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId"));
AmqpSession session = connection.createSession();
Source source = createSharedSource(TerminusDurability.CONFIGURATION);
AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
receiver.flow(1);
receiver2.flow(1);
sendMessages(2, address.toString());
AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")).getBindable()).getConsumerCount());
connection.close();
connection = addConnection(client.connect("myClientId"));
session = connection.createSession();
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")));
receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
receiver.close();
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")));
receiver2.close();
//check its been deleted
assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")));
connection.close();
}
@Test(timeout = 60000)
public void test2ConsumersOnSharedDurableAddressReconnectwithNull() throws Exception {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.createAddressInfo(addressInfo);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId"));
AmqpSession session = connection.createSession();
Source source = createSharedSource(TerminusDurability.CONFIGURATION);
AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
receiver.flow(1);
receiver2.flow(1);
sendMessages(2, address.toString());
AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")).getBindable()).getConsumerCount());
connection.close();
connection = addConnection(client.connect("myClientId"));
session = connection.createSession();
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")));
receiver = session.createDurableReceiver(null, "mySub");
receiver2 = session.createDurableReceiver(null, "mySub|2");
receiver.close();
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")));
receiver2.close();
//check its been deleted
assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")));
connection.close();
}
@Test(timeout = 60000)
public void test2ConsumersOnSharedDurableAddressGlobal() throws Exception {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.createAddressInfo(addressInfo);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect(false));
AmqpSession session = connection.createSession();
Source source = createSharedGlobalSource(TerminusDurability.CONFIGURATION);
AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
receiver.flow(1);
receiver2.flow(1);
sendMessages(2, address.toString());
AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global")).getBindable()).getConsumerCount());
receiver.close();
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global")));
receiver2.close();
//check its been deleted
assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global")));
connection.close();
}
@Test(timeout = 60000)
public void test2ConsumersOnNonSharedDurableAddress() throws Exception {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.createAddressInfo(addressInfo);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId"));
AmqpSession session = connection.createSession();
Source source = createNonSharedSource(TerminusDurability.CONFIGURATION);
Source source1 = createSharedSource(TerminusDurability.CONFIGURATION);
AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
try {
session.createMulticastReceiver(source1, "myReceiverID", "mySub|2");
fail("Exception expected");
} catch (Exception e) {
//expected
}
connection.close();
}
private Source createNonSharedSource(TerminusDurability terminusDurability) {
Source source = new Source();
source.setAddress(address.toString());
source.setCapabilities(TOPIC_CAPABILITY);
source.setDurable(terminusDurability);
return source;
}
private Source createSharedSource(TerminusDurability terminusDurability) {
Source source = new Source();
source.setAddress(address.toString());
source.setCapabilities(TOPIC_CAPABILITY, SHARED);
source.setDurable(terminusDurability);
return source;
}
private Source createSharedGlobalSource(TerminusDurability terminusDurability) {
Source source = new Source();
source.setAddress(address.toString());
source.setCapabilities(TOPIC_CAPABILITY, SHARED, GLOBAL);
source.setDurable(terminusDurability);
return source;
}
}

View File

@ -30,6 +30,8 @@ import javax.jms.TopicSubscriber;
import java.util.Map;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.After;
import org.junit.Assert;
@ -55,6 +57,8 @@ public class ProtonPubSubTest extends ProtonTestBase {
@Before
public void setUp() throws Exception {
super.setUp();
server.createAddressInfo(new AddressInfo(ssPubAddress, RoutingType.MULTICAST));
server.createAddressInfo(new AddressInfo(ssprefixedPubAddress, RoutingType.MULTICAST));
server.createQueue(ssPubAddress, ssPubAddress, new SimpleString("foo=bar"), false, true);
server.createQueue(ssprefixedPubAddress, ssprefixedPubAddress, new SimpleString("foo=bar"), false, true);
factory = new JmsConnectionFactory("amqp://localhost:5672");

View File

@ -70,6 +70,8 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.protocol.amqp.client.AMQPClientConnectionFactory;
@ -151,20 +153,31 @@ public class ProtonTest extends ProtonTestBase {
@Before
public void setUp() throws Exception {
super.setUp();
server.createQueue(new SimpleString(coreAddress), new SimpleString(coreAddress), null, true, false);
server.createQueue(new SimpleString(coreAddress + "1"), new SimpleString(coreAddress + "1"), null, true, false);
server.createQueue(new SimpleString(coreAddress + "2"), new SimpleString(coreAddress + "2"), null, true, false);
server.createQueue(new SimpleString(coreAddress + "3"), new SimpleString(coreAddress + "3"), null, true, false);
server.createQueue(new SimpleString(coreAddress + "4"), new SimpleString(coreAddress + "4"), null, true, false);
server.createQueue(new SimpleString(coreAddress + "5"), new SimpleString(coreAddress + "5"), null, true, false);
server.createQueue(new SimpleString(coreAddress + "6"), new SimpleString(coreAddress + "6"), null, true, false);
server.createQueue(new SimpleString(coreAddress + "7"), new SimpleString(coreAddress + "7"), null, true, false);
server.createQueue(new SimpleString(coreAddress + "8"), new SimpleString(coreAddress + "8"), null, true, false);
server.createQueue(new SimpleString(coreAddress + "9"), new SimpleString(coreAddress + "9"), null, true, false);
server.createQueue(new SimpleString(coreAddress + "10"), new SimpleString(coreAddress + "10"), null, true, false);
server.createQueue(new SimpleString("amqp_testtopic"), new SimpleString("amqp_testtopic"), null, true, false);
server.createQueue(new SimpleString("amqp_testtopic" + "1"), new SimpleString("amqp_testtopic" + "1"), null, true, false);
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "1"), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "2"), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "3"), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "4"), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "5"), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "6"), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "7"), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "8"), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "9"), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "10"), RoutingType.ANYCAST));
server.createQueue(new SimpleString(coreAddress), RoutingType.ANYCAST, new SimpleString(coreAddress), null, true, false);
server.createQueue(new SimpleString(coreAddress + "1"), RoutingType.ANYCAST, new SimpleString(coreAddress + "1"), null, true, false);
server.createQueue(new SimpleString(coreAddress + "2"), RoutingType.ANYCAST, new SimpleString(coreAddress + "2"), null, true, false);
server.createQueue(new SimpleString(coreAddress + "3"), RoutingType.ANYCAST, new SimpleString(coreAddress + "3"), null, true, false);
server.createQueue(new SimpleString(coreAddress + "4"), RoutingType.ANYCAST, new SimpleString(coreAddress + "4"), null, true, false);
server.createQueue(new SimpleString(coreAddress + "5"), RoutingType.ANYCAST, new SimpleString(coreAddress + "5"), null, true, false);
server.createQueue(new SimpleString(coreAddress + "6"), RoutingType.ANYCAST, new SimpleString(coreAddress + "6"), null, true, false);
server.createQueue(new SimpleString(coreAddress + "7"), RoutingType.ANYCAST, new SimpleString(coreAddress + "7"), null, true, false);
server.createQueue(new SimpleString(coreAddress + "8"), RoutingType.ANYCAST, new SimpleString(coreAddress + "8"), null, true, false);
server.createQueue(new SimpleString(coreAddress + "9"), RoutingType.ANYCAST, new SimpleString(coreAddress + "9"), null, true, false);
server.createQueue(new SimpleString(coreAddress + "10"), RoutingType.ANYCAST, new SimpleString(coreAddress + "10"), null, true, false);
server.createAddressInfo(new AddressInfo(new SimpleString("amqp_testtopic"), RoutingType.MULTICAST));
server.createQueue(new SimpleString("amqp_testtopic"), RoutingType.MULTICAST, new SimpleString("amqp_testtopic"), null, true, false);
/* server.createQueue(new SimpleString("amqp_testtopic" + "1"), new SimpleString("amqp_testtopic" + "1"), null, true, false);
server.createQueue(new SimpleString("amqp_testtopic" + "2"), new SimpleString("amqp_testtopic" + "2"), null, true, false);
server.createQueue(new SimpleString("amqp_testtopic" + "3"), new SimpleString("amqp_testtopic" + "3"), null, true, false);
server.createQueue(new SimpleString("amqp_testtopic" + "4"), new SimpleString("amqp_testtopic" + "4"), null, true, false);
@ -173,7 +186,7 @@ public class ProtonTest extends ProtonTestBase {
server.createQueue(new SimpleString("amqp_testtopic" + "7"), new SimpleString("amqp_testtopic" + "7"), null, true, false);
server.createQueue(new SimpleString("amqp_testtopic" + "8"), new SimpleString("amqp_testtopic" + "8"), null, true, false);
server.createQueue(new SimpleString("amqp_testtopic" + "9"), new SimpleString("amqp_testtopic" + "9"), null, true, false);
server.createQueue(new SimpleString("amqp_testtopic" + "10"), new SimpleString("amqp_testtopic" + "10"), null, true, false);
server.createQueue(new SimpleString("amqp_testtopic" + "10"), new SimpleString("amqp_testtopic" + "10"), null, true, false);*/
connection = createConnection();
@ -769,6 +782,12 @@ public class ProtonTest extends ProtonTestBase {
@Test
public void testLinkDetatchErrorIsCorrectWhenQueueDoesNotExists() throws Exception {
AddressSettings value = new AddressSettings();
value.setAutoCreateJmsQueues(false);
value.setAutoCreateQueues(false);
value.setAutoCreateAddresses(false);
value.setAutoCreateJmsTopics(false);
server.getAddressSettingsRepository().addMatch("AnAddressThatDoesNotExist", value);
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = client.connect();
AmqpSession session = amqpConnection.createSession();
@ -784,6 +803,7 @@ public class ProtonTest extends ProtonTestBase {
assertNotNull(expectedException);
assertTrue(expectedException.getMessage().contains("amqp:not-found"));
assertTrue(expectedException.getMessage().contains("target address does not exist"));
amqpConnection.close();
}
@Test
@ -838,6 +858,7 @@ public class ProtonTest extends ProtonTestBase {
@Test
public void testClientIdIsSetInSubscriptionList() throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString("mytopic"), RoutingType.ANYCAST));
AmqpConnection amqpConnection = client.createConnection();
amqpConnection.setContainerId("testClient");
amqpConnection.setOfferedCapabilities(Arrays.asList(Symbol.getSymbol("topic")));
@ -866,14 +887,14 @@ public class ProtonTest extends ProtonTestBase {
String queueName = "TestQueueName";
String address = "TestAddress";
server.createQueue(new SimpleString(address), new SimpleString(queueName), null, true, false);
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(address), RoutingType.ANYCAST));
server.createQueue(new SimpleString(address), RoutingType.ANYCAST, new SimpleString(queueName), null, true, false);
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = client.connect();
AmqpSession session = amqpConnection.createSession();
AmqpSender sender = session.createSender(address);
AmqpReceiver receiver = session.createReceiver(queueName);
AmqpReceiver receiver = session.createReceiver(address);
receiver.flow(1);
AmqpMessage message = new AmqpMessage();
@ -882,6 +903,7 @@ public class ProtonTest extends ProtonTestBase {
AmqpMessage receivedMessage = receiver.receive(5000, TimeUnit.MILLISECONDS);
assertNotNull(receivedMessage);
amqpConnection.close();
}
@Test

View File

@ -25,7 +25,10 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.Random;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.After;
@ -42,6 +45,7 @@ public class SendingAndReceivingTest extends ActiveMQTestBase {
public void setUp() throws Exception {
super.setUp();
server = createServer(true, true);
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString("exampleQueue"), RoutingType.ANYCAST));
server.start();
}

View File

@ -54,6 +54,7 @@ import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
@ -185,12 +186,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
ActiveMQServerControl serverControl = createManagementControl();
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
serverControl.createAddress(address.toString(), "ANYCAST");
serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, true, -1, false, false);
serverControl.createQueue(address.toString(), name.toString());
checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer);
checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer);
Assert.assertEquals(address.toString(), queueControl.getAddress());
Assert.assertEquals(name.toString(), queueControl.getName());
Assert.assertNull(queueControl.getFilter());
@ -211,12 +212,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
ActiveMQServerControl serverControl = createManagementControl();
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
serverControl.createAddress(address.toString(), "ANYCAST");
serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), filter, durable, -1, false, false);
serverControl.createQueue(address.toString(), name.toString(), filter, durable);
checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer);
checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer);
Assert.assertEquals(address.toString(), queueControl.getAddress());
Assert.assertEquals(name.toString(), queueControl.getName());
Assert.assertEquals(filter, queueControl.getFilter());
@ -236,12 +237,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
ActiveMQServerControl serverControl = createManagementControl();
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
serverControl.createAddress(address.toString(), "ANYCAST");
serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, durable, -1, false, false);
serverControl.createQueue(address.toString(), name.toString(), durable);
checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer);
checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer);
Assert.assertEquals(address.toString(), queueControl.getAddress());
Assert.assertEquals(name.toString(), queueControl.getName());
Assert.assertNull(queueControl.getFilter());
@ -264,12 +265,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
ActiveMQServerControl serverControl = createManagementControl();
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
serverControl.createAddress(address.toString(), "ANYCAST");
serverControl.createQueue(address.toString(), RoutingType.ANYCAST.toString(), name.toString(), null, durable, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer);
QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer);
Assert.assertEquals(address.toString(), queueControl.getAddress());
Assert.assertEquals(name.toString(), queueControl.getName());
Assert.assertNull(queueControl.getFilter());
@ -297,8 +298,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
ActiveMQServerControl serverControl = createManagementControl();
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
serverControl.createQueue(address.toString(), name.toString(), durable);
serverControl.createAddress(address.toString(), "ANYCAST");
serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, durable, -1, false, false);
ServerLocator receiveLocator = createInVMNonHALocator();
ClientSessionFactory receiveCsf = createSessionFactory(receiveLocator);
@ -307,7 +308,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
Assert.assertFalse(consumer.isClosed());
checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
serverControl.destroyQueue(name.toString(), true);
Wait.waitFor(new Wait.Condition() {
@Override
@ -329,12 +330,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
ActiveMQServerControl serverControl = createManagementControl();
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
serverControl.createAddress(address.toString(), "ANYCAST");
serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), filter, durable, -1, false, false);
serverControl.createQueue(address.toString(), name.toString(), filter, durable);
checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer);
checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer);
Assert.assertEquals(address.toString(), queueControl.getAddress());
Assert.assertEquals(name.toString(), queueControl.getName());
Assert.assertNull(queueControl.getFilter());
@ -355,12 +356,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
ActiveMQServerControl serverControl = createManagementControl();
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
serverControl.createAddress(address.toString(), "ANYCAST");
serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), filter, durable, -1, false, false);
serverControl.createQueue(address.toString(), name.toString(), filter, durable);
checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer);
checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer);
Assert.assertEquals(address.toString(), queueControl.getAddress());
Assert.assertEquals(name.toString(), queueControl.getName());
Assert.assertNull(queueControl.getFilter());
@ -383,8 +384,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
// management operations
Assert.assertFalse(ActiveMQServerControlTest.contains(name.toString(), serverControl.getQueueNames()));
serverControl.createQueue(address.toString(), name.toString());
serverControl.createAddress(address.toString(), "ANYCAST");
serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, true, -1, false, false);
Assert.assertTrue(ActiveMQServerControlTest.contains(name.toString(), serverControl.getQueueNames()));
serverControl.destroyQueue(name.toString());
@ -402,8 +403,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
// management operations
Assert.assertFalse(ActiveMQServerControlTest.contains(address.toString(), serverControl.getAddressNames()));
serverControl.createQueue(address.toString(), name.toString());
serverControl.createAddress(address.toString(), "ANYCAST");
serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, true, -1, false, false);
Assert.assertTrue(ActiveMQServerControlTest.contains(address.toString(), serverControl.getAddressNames()));
serverControl.destroyQueue(name.toString());
@ -1212,7 +1213,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory factory = createSessionFactory(locator);
ClientSession session = addClientSession(factory.createSession());
server.createQueue(queueName, queueName, null, false, false);
server.createAddressInfo(new AddressInfo(queueName, RoutingType.ANYCAST));
server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, false, false);
addClientConsumer(session.createConsumer(queueName));
Thread.sleep(100); // We check the timestamp for the creation time. We need to make sure it's different
addClientConsumer(session.createConsumer(queueName, SimpleString.toSimpleString(filter), true));
@ -1269,8 +1271,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
ServerLocator locator2 = createInVMNonHALocator();
ClientSessionFactory factory2 = createSessionFactory(locator2);
ClientSession session2 = addClientSession(factory2.createSession());
server.createQueue(queueName, queueName, null, false, false);
serverControl.createAddress(queueName.toString(), "ANYCAST");
server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, false, false);
addClientConsumer(session.createConsumer(queueName));
Thread.sleep(200);
@ -1335,7 +1337,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
@Test
public void testListSessionsAsJSON() throws Exception {
SimpleString queueName = new SimpleString(UUID.randomUUID().toString());
server.createQueue(queueName, queueName, null, false, false);
server.createAddressInfo(new AddressInfo(queueName, RoutingType.ANYCAST));
server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, false, false);
ActiveMQServerControl serverControl = createManagementControl();
ServerLocator locator = createInVMNonHALocator();
@ -1400,8 +1403,10 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
this.conf.clearConnectorConfigurations().addConnectorConfiguration("server2-connector", new TransportConfiguration(INVM_CONNECTOR_FACTORY, params));
server2.start();
server.createQueue(address, address, null, true, false);
server2.createQueue(address, address, null, true, false);
server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
server.createQueue(address, RoutingType.ANYCAST, address, null, true, false, -1, false, false);
server2.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
server2.createQueue(address, RoutingType.ANYCAST, address, null, true, false, -1, false, false);
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory csf = createSessionFactory(locator);
ClientSession session = csf.createSession();

View File

@ -126,6 +126,21 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
proxy.invokeOperation("createQueue", address, name, filter, durable);
}
@Override
public void createQueue(String address, String name, String routingType) throws Exception {
proxy.invokeOperation("createQueue", address, name, routingType);
}
@Override
public void createQueue(String address, String name, boolean durable, String routingType) throws Exception {
proxy.invokeOperation("createQueue", address, name, durable, routingType);
}
@Override
public void createQueue(String address,String name, String filter, boolean durable, String routingType) throws Exception {
proxy.invokeOperation("createQueue", address, name, filter, durable, routingType);
}
@Override
public void createQueue(final String address, final String name, final boolean durable) throws Exception {
proxy.invokeOperation("createQueue", address, name, durable);

View File

@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.management.ClusterConnectionControl;
import org.apache.activemq.artemis.api.core.management.DivertControl;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.server.RoutingType;
public class ManagementControlHelper {
@ -73,6 +74,13 @@ public class ManagementControlHelper {
return (QueueControl) ManagementControlHelper.createProxy(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, ActiveMQDefaultConfiguration.getDefaultRoutingType()), QueueControl.class, mbeanServer);
}
public static QueueControl createQueueControl(final SimpleString address,
final SimpleString name,
final RoutingType routingType,
final MBeanServer mbeanServer) throws Exception {
return (QueueControl) ManagementControlHelper.createProxy(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, routingType), QueueControl.class, mbeanServer);
}
public static AddressControl createAddressControl(final SimpleString address,
final MBeanServer mbeanServer) throws Exception {
return (AddressControl) ManagementControlHelper.createProxy(ObjectNameBuilder.DEFAULT.getAddressObjectName(address), AddressControl.class, mbeanServer);

View File

@ -33,7 +33,6 @@ import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.command.ActiveMQDestination;
import org.junit.After;
import org.junit.Before;

View File

@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
@ -50,6 +51,17 @@ public class FakePostOffice implements PostOffice {
return null;
}
@Override
public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) {
return null;
}
@Override
public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) {
return null;
}
@Override
public void start() throws Exception {