This closes #814
This commit is contained in:
commit
6f6d9845fe
|
@ -511,6 +511,13 @@ public interface ActiveMQServerControl {
|
||||||
@Operation(desc = "Destroy a queue", impact = MBeanOperationInfo.ACTION)
|
@Operation(desc = "Destroy a queue", impact = MBeanOperationInfo.ACTION)
|
||||||
void destroyQueue(@Parameter(name = "name", desc = "Name of the queue to destroy") String name) throws Exception;
|
void destroyQueue(@Parameter(name = "name", desc = "Name of the queue to destroy") String name) throws Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Destroys the queue corresponding to the specified name.
|
||||||
|
*/
|
||||||
|
@Operation(desc = "Destroy a queue", impact = MBeanOperationInfo.ACTION)
|
||||||
|
void destroyQueue(@Parameter(name = "name", desc = "Name of the queue to destroy") String name,
|
||||||
|
@Parameter(name = "removeConsumers", desc = "Remove consumers of this queue") boolean removeConsumers) throws Exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Enables message counters for this server.
|
* Enables message counters for this server.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -238,7 +238,7 @@ public class JMSTopicControlImpl extends StandardMBean implements TopicControl {
|
||||||
throw new IllegalArgumentException("No subscriptions with name " + queueName + " for clientID " + clientID);
|
throw new IllegalArgumentException("No subscriptions with name " + queueName + " for clientID " + clientID);
|
||||||
}
|
}
|
||||||
ActiveMQServerControl serverControl = (ActiveMQServerControl) managementService.getResource(ResourceNames.CORE_SERVER);
|
ActiveMQServerControl serverControl = (ActiveMQServerControl) managementService.getResource(ResourceNames.CORE_SERVER);
|
||||||
serverControl.destroyQueue(queueName);
|
serverControl.destroyQueue(queueName, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -117,7 +117,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
|
||||||
server.removeClientConnection(remoteContainerId);
|
server.removeClientConnection(remoteContainerId);
|
||||||
}
|
}
|
||||||
connection.close();
|
connection.close();
|
||||||
amqpConnection.close();
|
amqpConnection.close(null);
|
||||||
} finally {
|
} finally {
|
||||||
for (Transaction tx : transactions.values()) {
|
for (Transaction tx : transactions.values()) {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternal
|
||||||
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
|
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
|
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
|
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
|
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
|
import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
|
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
|
||||||
|
@ -59,12 +60,14 @@ import org.apache.qpid.proton.amqp.messaging.Rejected;
|
||||||
import org.apache.qpid.proton.amqp.transport.AmqpError;
|
import org.apache.qpid.proton.amqp.transport.AmqpError;
|
||||||
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
|
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
|
||||||
import org.apache.qpid.proton.engine.Delivery;
|
import org.apache.qpid.proton.engine.Delivery;
|
||||||
import org.apache.qpid.proton.engine.Link;
|
|
||||||
import org.apache.qpid.proton.engine.Receiver;
|
import org.apache.qpid.proton.engine.Receiver;
|
||||||
import org.apache.qpid.proton.message.ProtonJMessage;
|
import org.apache.qpid.proton.message.ProtonJMessage;
|
||||||
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
public class AMQPSessionCallback implements SessionCallback {
|
public class AMQPSessionCallback implements SessionCallback {
|
||||||
|
|
||||||
|
private static final Logger logger = Logger.getLogger(AMQPSessionCallback.class);
|
||||||
|
|
||||||
protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0);
|
protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0);
|
||||||
|
|
||||||
private final AMQPConnectionCallback protonSPI;
|
private final AMQPConnectionCallback protonSPI;
|
||||||
|
@ -467,9 +470,14 @@ public class AMQPSessionCallback implements SessionCallback {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void disconnect(ServerConsumer consumer, String queueName) {
|
public void disconnect(ServerConsumer consumer, String queueName) {
|
||||||
synchronized (connection.getLock()) {
|
ErrorCondition ec = new ErrorCondition(AmqpSupport.RESOURCE_DELETED, "Queue was deleted: " + queueName);
|
||||||
((Link) consumer.getProtocolContext()).close();
|
try {
|
||||||
connection.flush();
|
synchronized (connection.getLock()) {
|
||||||
|
((ProtonServerSenderContext) consumer.getProtocolContext()).close(ec);
|
||||||
|
connection.flush();
|
||||||
|
}
|
||||||
|
} catch (ActiveMQAMQPException e) {
|
||||||
|
logger.error("Error closing link for " + consumer.getQueue().getAddress());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -504,5 +512,4 @@ public class AMQPSessionCallback implements SessionCallback {
|
||||||
protonSPI.removeTransaction(txid);
|
protonSPI.removeTransaction(txid);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,8 +23,10 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
|
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
|
import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||||
|
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is a Server's Connection representation used by ActiveMQ Artemis.
|
* This is a Server's Connection representation used by ActiveMQ Artemis.
|
||||||
|
@ -103,6 +105,9 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void disconnect(boolean criticalError) {
|
public void disconnect(boolean criticalError) {
|
||||||
|
ErrorCondition errorCondition = new ErrorCondition();
|
||||||
|
errorCondition.setCondition(AmqpSupport.CONNECTION_FORCED);
|
||||||
|
amqpConnection.close(errorCondition);
|
||||||
getTransportConnection().close();
|
getTransportConnection().close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -111,7 +116,7 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void disconnect(String scaleDownNodeID, boolean criticalError) {
|
public void disconnect(String scaleDownNodeID, boolean criticalError) {
|
||||||
getTransportConnection().close();
|
disconnect(criticalError);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.activemq.artemis.utils.ByteUtil;
|
||||||
import org.apache.activemq.artemis.utils.VersionLoader;
|
import org.apache.activemq.artemis.utils.VersionLoader;
|
||||||
import org.apache.qpid.proton.amqp.Symbol;
|
import org.apache.qpid.proton.amqp.Symbol;
|
||||||
import org.apache.qpid.proton.amqp.transaction.Coordinator;
|
import org.apache.qpid.proton.amqp.transaction.Coordinator;
|
||||||
|
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
|
||||||
import org.apache.qpid.proton.engine.Connection;
|
import org.apache.qpid.proton.engine.Connection;
|
||||||
import org.apache.qpid.proton.engine.Delivery;
|
import org.apache.qpid.proton.engine.Delivery;
|
||||||
import org.apache.qpid.proton.engine.Link;
|
import org.apache.qpid.proton.engine.Link;
|
||||||
|
@ -132,8 +133,8 @@ public class AMQPConnectionContext extends ProtonInitializable {
|
||||||
handler.flush();
|
handler.flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void close() {
|
public void close(ErrorCondition errorCondition) {
|
||||||
handler.close();
|
handler.close(errorCondition);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected AMQPSessionContext getSessionExtension(Session realSession) throws ActiveMQAMQPException {
|
protected AMQPSessionContext getSessionExtension(Session realSession) throws ActiveMQAMQPException {
|
||||||
|
@ -264,7 +265,7 @@ public class AMQPConnectionContext extends ProtonInitializable {
|
||||||
if (!connectionCallback.isSupportsAnonymous()) {
|
if (!connectionCallback.isSupportsAnonymous()) {
|
||||||
connectionCallback.sendSASLSupported();
|
connectionCallback.sendSASLSupported();
|
||||||
connectionCallback.close();
|
connectionCallback.close();
|
||||||
handler.close();
|
handler.close(null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,6 +53,9 @@ public class AmqpSupport {
|
||||||
public static final Symbol PRODUCT = Symbol.valueOf("product");
|
public static final Symbol PRODUCT = Symbol.valueOf("product");
|
||||||
public static final Symbol VERSION = Symbol.valueOf("version");
|
public static final Symbol VERSION = Symbol.valueOf("version");
|
||||||
public static final Symbol PLATFORM = Symbol.valueOf("platform");
|
public static final Symbol PLATFORM = Symbol.valueOf("platform");
|
||||||
|
public static final Symbol RESOURCE_DELETED = Symbol.valueOf("amqp:resource-deleted");
|
||||||
|
public static final Symbol CONNECTION_FORCED = Symbol.valueOf("amqp:connection:forced");
|
||||||
|
|
||||||
|
|
||||||
// Symbols used in configuration of newly opened links.
|
// Symbols used in configuration of newly opened links.
|
||||||
public static final Symbol COPY = Symbol.getSymbol("copy");
|
public static final Symbol COPY = Symbol.getSymbol("copy");
|
||||||
|
|
|
@ -169,7 +169,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
// Attempt to recover a previous subscription happens when a link reattach happens on a subscription queue
|
// Attempt to recover a previous subscription happens when a link reattach happens on a subscription queue
|
||||||
String clientId = connection.getRemoteContainer();
|
String clientId = connection.getRemoteContainer();
|
||||||
String pubId = sender.getName();
|
String pubId = sender.getName();
|
||||||
queue = clientId + ":" + pubId;
|
queue = createQueueName(clientId, pubId);
|
||||||
boolean exists = sessionSPI.queueQuery(queue, false).isExists();
|
boolean exists = sessionSPI.queueQuery(queue, false).isExists();
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -207,7 +207,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) {
|
if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) {
|
||||||
String clientId = connection.getRemoteContainer();
|
String clientId = connection.getRemoteContainer();
|
||||||
String pubId = sender.getName();
|
String pubId = sender.getName();
|
||||||
queue = clientId + ":" + pubId;
|
queue = createQueueName(clientId, pubId);
|
||||||
QueueQueryResult result = sessionSPI.queueQuery(queue, false);
|
QueueQueryResult result = sessionSPI.queueQuery(queue, false);
|
||||||
|
|
||||||
if (result.isExists()) {
|
if (result.isExists()) {
|
||||||
|
@ -307,7 +307,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
} else {
|
} else {
|
||||||
String clientId = connection.getRemoteContainer();
|
String clientId = connection.getRemoteContainer();
|
||||||
String pubId = sender.getName();
|
String pubId = sender.getName();
|
||||||
String queue = clientId + ":" + pubId;
|
String queue = createQueueName(clientId, pubId);
|
||||||
result = sessionSPI.queueQuery(queue, false);
|
result = sessionSPI.queueQuery(queue, false);
|
||||||
if (result.isExists()) {
|
if (result.isExists()) {
|
||||||
if (result.getConsumerCount() > 0) {
|
if (result.getConsumerCount() > 0) {
|
||||||
|
@ -324,6 +324,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
|
public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
|
||||||
Object message = delivery.getContext();
|
Object message = delivery.getContext();
|
||||||
|
@ -478,4 +479,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static String createQueueName(String clientId, String pubId) {
|
||||||
|
return clientId + "." + pubId;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -248,6 +248,10 @@ public class ProtonHandler extends ProtonInitializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void flush() {
|
public void flush() {
|
||||||
|
flush(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void flush(boolean wait) {
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
transport.process();
|
transport.process();
|
||||||
|
|
||||||
|
@ -255,14 +259,21 @@ public class ProtonHandler extends ProtonInitializable {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
dispatchExecutor.execute(dispatchRunnable);
|
if (wait) {
|
||||||
|
dispatch();
|
||||||
|
} else {
|
||||||
|
dispatchExecutor.execute(dispatchRunnable);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void close() {
|
public void close(ErrorCondition errorCondition) {
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
|
if (errorCondition != null) {
|
||||||
|
connection.setCondition(errorCondition);
|
||||||
|
}
|
||||||
connection.close();
|
connection.close();
|
||||||
}
|
}
|
||||||
flush();
|
flush(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void checkServerSASL() {
|
protected void checkServerSASL() {
|
||||||
|
|
|
@ -697,19 +697,23 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void destroyQueue(final String name) throws Exception {
|
public void destroyQueue(final String name, final boolean removeConsumers) throws Exception {
|
||||||
checkStarted();
|
checkStarted();
|
||||||
|
|
||||||
clearIO();
|
clearIO();
|
||||||
try {
|
try {
|
||||||
SimpleString queueName = new SimpleString(name);
|
SimpleString queueName = new SimpleString(name);
|
||||||
|
server.destroyQueue(queueName, null, !removeConsumers, removeConsumers);
|
||||||
server.destroyQueue(queueName, null, true);
|
|
||||||
} finally {
|
} finally {
|
||||||
blockOnIO();
|
blockOnIO();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroyQueue(final String name) throws Exception {
|
||||||
|
destroyQueue(name, false);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getConnectionCount() {
|
public int getConnectionCount() {
|
||||||
checkStarted();
|
checkStarted();
|
||||||
|
|
|
@ -207,10 +207,22 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
|
||||||
* @throws Exception if an error occurs while creating the receiver.
|
* @throws Exception if an error occurs while creating the receiver.
|
||||||
*/
|
*/
|
||||||
public AmqpReceiver createReceiver(Source source) throws Exception {
|
public AmqpReceiver createReceiver(Source source) throws Exception {
|
||||||
|
return createReceiver(source, getNextReceiverId());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a receiver instance using the given Source
|
||||||
|
*
|
||||||
|
* @param source the caller created and configured Source used to create the receiver link.
|
||||||
|
* @param receiverId the receiver id to use.
|
||||||
|
* @return a newly created receiver that is ready for use.
|
||||||
|
* @throws Exception if an error occurs while creating the receiver.
|
||||||
|
*/
|
||||||
|
public AmqpReceiver createReceiver(Source source, String receiverId) throws Exception {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
|
|
||||||
final ClientFuture request = new ClientFuture();
|
final ClientFuture request = new ClientFuture();
|
||||||
final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, getNextReceiverId());
|
final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, receiverId);
|
||||||
|
|
||||||
connection.getScheduler().execute(new Runnable() {
|
connection.getScheduler().execute(new Runnable() {
|
||||||
|
|
||||||
|
|
|
@ -58,7 +58,10 @@ import org.apache.activemq.artemis.core.remoting.CloseListener;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
|
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
|
||||||
|
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||||
|
import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait;
|
||||||
import org.apache.activemq.artemis.utils.ByteUtil;
|
import org.apache.activemq.artemis.utils.ByteUtil;
|
||||||
import org.apache.activemq.artemis.utils.VersionLoader;
|
import org.apache.activemq.artemis.utils.VersionLoader;
|
||||||
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||||
|
@ -71,6 +74,8 @@ import org.apache.activemq.transport.amqp.client.AmqpValidator;
|
||||||
import org.apache.qpid.jms.JmsConnectionFactory;
|
import org.apache.qpid.jms.JmsConnectionFactory;
|
||||||
import org.apache.qpid.proton.amqp.Symbol;
|
import org.apache.qpid.proton.amqp.Symbol;
|
||||||
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
|
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
|
||||||
|
import org.apache.qpid.proton.amqp.messaging.Source;
|
||||||
|
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -156,6 +161,7 @@ public class ProtonTest extends ProtonTestBase {
|
||||||
server.createQueue(new SimpleString("amqp_testtopic" + "8"), new SimpleString("amqp_testtopic" + "8"), null, true, false);
|
server.createQueue(new SimpleString("amqp_testtopic" + "8"), new SimpleString("amqp_testtopic" + "8"), null, true, false);
|
||||||
server.createQueue(new SimpleString("amqp_testtopic" + "9"), new SimpleString("amqp_testtopic" + "9"), null, true, false);
|
server.createQueue(new SimpleString("amqp_testtopic" + "9"), new SimpleString("amqp_testtopic" + "9"), null, true, false);
|
||||||
server.createQueue(new SimpleString("amqp_testtopic" + "10"), new SimpleString("amqp_testtopic" + "10"), null, true, false);
|
server.createQueue(new SimpleString("amqp_testtopic" + "10"), new SimpleString("amqp_testtopic" + "10"), null, true, false);
|
||||||
|
|
||||||
connection = createConnection();
|
connection = createConnection();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -186,9 +192,9 @@ public class ProtonTest extends ProtonTestBase {
|
||||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
myDurSub = session.createDurableSubscriber(topic, "myDurSub");
|
myDurSub = session.createDurableSubscriber(topic, "myDurSub");
|
||||||
myDurSub.close();
|
myDurSub.close();
|
||||||
Assert.assertNotNull(server.getPostOffice().getBinding(new SimpleString("myClientId:myDurSub")));
|
Assert.assertNotNull(server.getPostOffice().getBinding(new SimpleString("myClientId.myDurSub")));
|
||||||
session.unsubscribe("myDurSub");
|
session.unsubscribe("myDurSub");
|
||||||
Assert.assertNull(server.getPostOffice().getBinding(new SimpleString("myClientId:myDurSub")));
|
Assert.assertNull(server.getPostOffice().getBinding(new SimpleString("myClientId.myDurSub")));
|
||||||
session.close();
|
session.close();
|
||||||
connection.close();
|
connection.close();
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -740,6 +746,81 @@ public class ProtonTest extends ProtonTestBase {
|
||||||
assertTrue(expectedException.getMessage().contains("target address does not exist"));
|
assertTrue(expectedException.getMessage().contains("target address does not exist"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLinkDetachSentWhenQueueDeleted() throws Exception {
|
||||||
|
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
|
||||||
|
final AmqpConnection amqpConnection = client.connect();
|
||||||
|
try {
|
||||||
|
AmqpSession session = amqpConnection.createSession();
|
||||||
|
|
||||||
|
AmqpReceiver receiver = session.createReceiver(coreAddress);
|
||||||
|
server.destroyQueue(new SimpleString(coreAddress), null, false, true);
|
||||||
|
|
||||||
|
Wait.waitFor(new Wait.Condition() {
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return amqpConnection.isClosed();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assertTrue(receiver.isClosed());
|
||||||
|
} finally {
|
||||||
|
amqpConnection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCloseIsSentOnConnectionClose() throws Exception {
|
||||||
|
connection.close();
|
||||||
|
|
||||||
|
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
|
||||||
|
final AmqpConnection amqpConnection = client.connect();
|
||||||
|
try {
|
||||||
|
for (RemotingConnection connection : server.getRemotingService().getConnections()) {
|
||||||
|
server.getRemotingService().removeConnection(connection);
|
||||||
|
connection.disconnect(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
Wait.waitFor(new Wait.Condition() {
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return amqpConnection.isClosed();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
assertTrue(amqpConnection.isClosed());
|
||||||
|
assertEquals(AmqpSupport.CONNECTION_FORCED, amqpConnection.getConnection().getRemoteCondition().getCondition());
|
||||||
|
} finally {
|
||||||
|
amqpConnection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClientIdIsSetInSubscriptionList() throws Exception {
|
||||||
|
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
|
||||||
|
AmqpConnection amqpConnection = client.createConnection();
|
||||||
|
amqpConnection.setContainerId("testClient");
|
||||||
|
amqpConnection.setOfferedCapabilities(Arrays.asList(Symbol.getSymbol("topic")));
|
||||||
|
amqpConnection.connect();
|
||||||
|
try {
|
||||||
|
AmqpSession session = amqpConnection.createSession();
|
||||||
|
|
||||||
|
Source source = new Source();
|
||||||
|
source.setDurable(TerminusDurability.UNSETTLED_STATE);
|
||||||
|
source.setCapabilities(Symbol.getSymbol("topic"));
|
||||||
|
source.setAddress("jms.topic.mytopic");
|
||||||
|
AmqpReceiver receiver = session.createReceiver(source, "testSub");
|
||||||
|
|
||||||
|
SimpleString fo = new SimpleString("testClient.testSub:jms.topic.mytopic");
|
||||||
|
assertNotNull(server.locateQueue(fo));
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
} finally {
|
||||||
|
amqpConnection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSendingAndReceivingToQueueWithDifferentAddressAndQueueName() throws Exception {
|
public void testSendingAndReceivingToQueueWithDifferentAddressAndQueueName() throws Exception {
|
||||||
|
|
||||||
|
|
|
@ -249,6 +249,32 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
||||||
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
|
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreateAndDestroyQueueClosingConsumers() throws Exception {
|
||||||
|
SimpleString address = RandomUtil.randomSimpleString();
|
||||||
|
SimpleString name = RandomUtil.randomSimpleString();
|
||||||
|
boolean durable = true;
|
||||||
|
|
||||||
|
ActiveMQServerControl serverControl = createManagementControl();
|
||||||
|
|
||||||
|
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
|
||||||
|
|
||||||
|
serverControl.createQueue(address.toString(), name.toString(), durable);
|
||||||
|
|
||||||
|
ServerLocator receiveLocator = createInVMNonHALocator();
|
||||||
|
ClientSessionFactory receiveCsf = createSessionFactory(receiveLocator);
|
||||||
|
ClientSession receiveClientSession = receiveCsf.createSession(true, false, false);
|
||||||
|
ClientConsumer consumer = receiveClientSession.createConsumer(name);
|
||||||
|
|
||||||
|
Assert.assertFalse(consumer.isClosed());
|
||||||
|
|
||||||
|
checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
|
||||||
|
serverControl.destroyQueue(name.toString(), true);
|
||||||
|
Assert.assertTrue(consumer.isClosed());
|
||||||
|
|
||||||
|
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCreateAndDestroyQueueWithNullFilter() throws Exception {
|
public void testCreateAndDestroyQueueWithNullFilter() throws Exception {
|
||||||
SimpleString address = RandomUtil.randomSimpleString();
|
SimpleString address = RandomUtil.randomSimpleString();
|
||||||
|
|
|
@ -127,6 +127,11 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
|
||||||
proxy.invokeOperation("destroyQueue", name);
|
proxy.invokeOperation("destroyQueue", name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroyQueue(final String name, final boolean removeConsumers) throws Exception {
|
||||||
|
proxy.invokeOperation("destroyQueue", name, removeConsumers);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void disableMessageCounters() throws Exception {
|
public void disableMessageCounters() throws Exception {
|
||||||
proxy.invokeOperation("disableMessageCounters");
|
proxy.invokeOperation("disableMessageCounters");
|
||||||
|
|
Loading…
Reference in New Issue