ARTEMIS-762 Reflect management changes in AMQP protocol

This commit is contained in:
Martyn Taylor 2016-09-30 16:36:00 +01:00 committed by Clebert Suconic
parent 20729e79fd
commit 95c4fdd408
14 changed files with 190 additions and 23 deletions

View File

@ -511,6 +511,13 @@ public interface ActiveMQServerControl {
@Operation(desc = "Destroy a queue", impact = MBeanOperationInfo.ACTION) @Operation(desc = "Destroy a queue", impact = MBeanOperationInfo.ACTION)
void destroyQueue(@Parameter(name = "name", desc = "Name of the queue to destroy") String name) throws Exception; void destroyQueue(@Parameter(name = "name", desc = "Name of the queue to destroy") String name) throws Exception;
/**
* Destroys the queue corresponding to the specified name.
*/
@Operation(desc = "Destroy a queue", impact = MBeanOperationInfo.ACTION)
void destroyQueue(@Parameter(name = "name", desc = "Name of the queue to destroy") String name,
@Parameter(name = "removeConsumers", desc = "Remove consumers of this queue") boolean removeConsumers) throws Exception;
/** /**
* Enables message counters for this server. * Enables message counters for this server.
*/ */

View File

@ -238,7 +238,7 @@ public class JMSTopicControlImpl extends StandardMBean implements TopicControl {
throw new IllegalArgumentException("No subscriptions with name " + queueName + " for clientID " + clientID); throw new IllegalArgumentException("No subscriptions with name " + queueName + " for clientID " + clientID);
} }
ActiveMQServerControl serverControl = (ActiveMQServerControl) managementService.getResource(ResourceNames.CORE_SERVER); ActiveMQServerControl serverControl = (ActiveMQServerControl) managementService.getResource(ResourceNames.CORE_SERVER);
serverControl.destroyQueue(queueName); serverControl.destroyQueue(queueName, true);
} }
@Override @Override

View File

@ -117,7 +117,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
server.removeClientConnection(remoteContainerId); server.removeClientConnection(remoteContainerId);
} }
connection.close(); connection.close();
amqpConnection.close(); amqpConnection.close(null);
} finally { } finally {
for (Transaction tx : transactions.values()) { for (Transaction tx : transactions.values()) {
try { try {

View File

@ -43,6 +43,7 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternal
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult; import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
@ -59,12 +60,14 @@ import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.ProtonJMessage; import org.apache.qpid.proton.message.ProtonJMessage;
import org.jboss.logging.Logger;
public class AMQPSessionCallback implements SessionCallback { public class AMQPSessionCallback implements SessionCallback {
private static final Logger logger = Logger.getLogger(AMQPSessionCallback.class);
protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0); protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0);
private final AMQPConnectionCallback protonSPI; private final AMQPConnectionCallback protonSPI;
@ -467,9 +470,14 @@ public class AMQPSessionCallback implements SessionCallback {
@Override @Override
public void disconnect(ServerConsumer consumer, String queueName) { public void disconnect(ServerConsumer consumer, String queueName) {
synchronized (connection.getLock()) { ErrorCondition ec = new ErrorCondition(AmqpSupport.RESOURCE_DELETED, "Queue was deleted: " + queueName);
((Link) consumer.getProtocolContext()).close(); try {
connection.flush(); synchronized (connection.getLock()) {
((ProtonServerSenderContext) consumer.getProtocolContext()).close(ec);
connection.flush();
}
} catch (ActiveMQAMQPException e) {
logger.error("Error closing link for " + consumer.getQueue().getAddress());
} }
} }
@ -504,5 +512,4 @@ public class AMQPSessionCallback implements SessionCallback {
protonSPI.removeTransaction(txid); protonSPI.removeTransaction(txid);
} }
} }

View File

@ -23,8 +23,10 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
/** /**
* This is a Server's Connection representation used by ActiveMQ Artemis. * This is a Server's Connection representation used by ActiveMQ Artemis.
@ -103,6 +105,9 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
@Override @Override
public void disconnect(boolean criticalError) { public void disconnect(boolean criticalError) {
ErrorCondition errorCondition = new ErrorCondition();
errorCondition.setCondition(AmqpSupport.CONNECTION_FORCED);
amqpConnection.close(errorCondition);
getTransportConnection().close(); getTransportConnection().close();
} }
@ -111,7 +116,7 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
*/ */
@Override @Override
public void disconnect(String scaleDownNodeID, boolean criticalError) { public void disconnect(String scaleDownNodeID, boolean criticalError) {
getTransportConnection().close(); disconnect(criticalError);
} }
@Override @Override

View File

@ -36,6 +36,7 @@ import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.VersionLoader; import org.apache.activemq.artemis.utils.VersionLoader;
import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transaction.Coordinator; import org.apache.qpid.proton.amqp.transaction.Coordinator;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Link; import org.apache.qpid.proton.engine.Link;
@ -132,8 +133,8 @@ public class AMQPConnectionContext extends ProtonInitializable {
handler.flush(); handler.flush();
} }
public void close() { public void close(ErrorCondition errorCondition) {
handler.close(); handler.close(errorCondition);
} }
protected AMQPSessionContext getSessionExtension(Session realSession) throws ActiveMQAMQPException { protected AMQPSessionContext getSessionExtension(Session realSession) throws ActiveMQAMQPException {
@ -264,7 +265,7 @@ public class AMQPConnectionContext extends ProtonInitializable {
if (!connectionCallback.isSupportsAnonymous()) { if (!connectionCallback.isSupportsAnonymous()) {
connectionCallback.sendSASLSupported(); connectionCallback.sendSASLSupported();
connectionCallback.close(); connectionCallback.close();
handler.close(); handler.close(null);
} }
} }
} }

View File

@ -53,6 +53,9 @@ public class AmqpSupport {
public static final Symbol PRODUCT = Symbol.valueOf("product"); public static final Symbol PRODUCT = Symbol.valueOf("product");
public static final Symbol VERSION = Symbol.valueOf("version"); public static final Symbol VERSION = Symbol.valueOf("version");
public static final Symbol PLATFORM = Symbol.valueOf("platform"); public static final Symbol PLATFORM = Symbol.valueOf("platform");
public static final Symbol RESOURCE_DELETED = Symbol.valueOf("amqp:resource-deleted");
public static final Symbol CONNECTION_FORCED = Symbol.valueOf("amqp:connection:forced");
// Symbols used in configuration of newly opened links. // Symbols used in configuration of newly opened links.
public static final Symbol COPY = Symbol.getSymbol("copy"); public static final Symbol COPY = Symbol.getSymbol("copy");

View File

@ -169,7 +169,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// Attempt to recover a previous subscription happens when a link reattach happens on a subscription queue // Attempt to recover a previous subscription happens when a link reattach happens on a subscription queue
String clientId = connection.getRemoteContainer(); String clientId = connection.getRemoteContainer();
String pubId = sender.getName(); String pubId = sender.getName();
queue = clientId + ":" + pubId; queue = createQueueName(clientId, pubId);
boolean exists = sessionSPI.queueQuery(queue, false).isExists(); boolean exists = sessionSPI.queueQuery(queue, false).isExists();
/* /*
@ -207,7 +207,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) { if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) {
String clientId = connection.getRemoteContainer(); String clientId = connection.getRemoteContainer();
String pubId = sender.getName(); String pubId = sender.getName();
queue = clientId + ":" + pubId; queue = createQueueName(clientId, pubId);
QueueQueryResult result = sessionSPI.queueQuery(queue, false); QueueQueryResult result = sessionSPI.queueQuery(queue, false);
if (result.isExists()) { if (result.isExists()) {
@ -307,7 +307,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} else { } else {
String clientId = connection.getRemoteContainer(); String clientId = connection.getRemoteContainer();
String pubId = sender.getName(); String pubId = sender.getName();
String queue = clientId + ":" + pubId; String queue = createQueueName(clientId, pubId);
result = sessionSPI.queueQuery(queue, false); result = sessionSPI.queueQuery(queue, false);
if (result.isExists()) { if (result.isExists()) {
if (result.getConsumerCount() > 0) { if (result.getConsumerCount() > 0) {
@ -324,6 +324,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} }
} }
@Override @Override
public void onMessage(Delivery delivery) throws ActiveMQAMQPException { public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
Object message = delivery.getContext(); Object message = delivery.getContext();
@ -478,4 +479,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} }
} }
private static String createQueueName(String clientId, String pubId) {
return clientId + "." + pubId;
}
} }

View File

@ -248,6 +248,10 @@ public class ProtonHandler extends ProtonInitializable {
} }
public void flush() { public void flush() {
flush(false);
}
private void flush(boolean wait) {
synchronized (lock) { synchronized (lock) {
transport.process(); transport.process();
@ -255,14 +259,21 @@ public class ProtonHandler extends ProtonInitializable {
} }
dispatchExecutor.execute(dispatchRunnable); if (wait) {
dispatch();
} else {
dispatchExecutor.execute(dispatchRunnable);
}
} }
public void close() { public void close(ErrorCondition errorCondition) {
synchronized (lock) { synchronized (lock) {
if (errorCondition != null) {
connection.setCondition(errorCondition);
}
connection.close(); connection.close();
} }
flush(); flush(true);
} }
protected void checkServerSASL() { protected void checkServerSASL() {

View File

@ -697,19 +697,23 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
} }
@Override @Override
public void destroyQueue(final String name) throws Exception { public void destroyQueue(final String name, final boolean removeConsumers) throws Exception {
checkStarted(); checkStarted();
clearIO(); clearIO();
try { try {
SimpleString queueName = new SimpleString(name); SimpleString queueName = new SimpleString(name);
server.destroyQueue(queueName, null, !removeConsumers, removeConsumers);
server.destroyQueue(queueName, null, true);
} finally { } finally {
blockOnIO(); blockOnIO();
} }
} }
@Override
public void destroyQueue(final String name) throws Exception {
destroyQueue(name, false);
}
@Override @Override
public int getConnectionCount() { public int getConnectionCount() {
checkStarted(); checkStarted();

View File

@ -207,10 +207,22 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
* @throws Exception if an error occurs while creating the receiver. * @throws Exception if an error occurs while creating the receiver.
*/ */
public AmqpReceiver createReceiver(Source source) throws Exception { public AmqpReceiver createReceiver(Source source) throws Exception {
return createReceiver(source, getNextReceiverId());
}
/**
* Create a receiver instance using the given Source
*
* @param source the caller created and configured Source used to create the receiver link.
* @param receiverId the receiver id to use.
* @return a newly created receiver that is ready for use.
* @throws Exception if an error occurs while creating the receiver.
*/
public AmqpReceiver createReceiver(Source source, String receiverId) throws Exception {
checkClosed(); checkClosed();
final ClientFuture request = new ClientFuture(); final ClientFuture request = new ClientFuture();
final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, getNextReceiverId()); final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, receiverId);
connection.getScheduler().execute(new Runnable() { connection.getScheduler().execute(new Runnable() {

View File

@ -58,7 +58,10 @@ import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait;
import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.VersionLoader; import org.apache.activemq.artemis.utils.VersionLoader;
import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpClient;
@ -71,6 +74,8 @@ import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -156,6 +161,7 @@ public class ProtonTest extends ProtonTestBase {
server.createQueue(new SimpleString("amqp_testtopic" + "8"), new SimpleString("amqp_testtopic" + "8"), null, true, false); server.createQueue(new SimpleString("amqp_testtopic" + "8"), new SimpleString("amqp_testtopic" + "8"), null, true, false);
server.createQueue(new SimpleString("amqp_testtopic" + "9"), new SimpleString("amqp_testtopic" + "9"), null, true, false); server.createQueue(new SimpleString("amqp_testtopic" + "9"), new SimpleString("amqp_testtopic" + "9"), null, true, false);
server.createQueue(new SimpleString("amqp_testtopic" + "10"), new SimpleString("amqp_testtopic" + "10"), null, true, false); server.createQueue(new SimpleString("amqp_testtopic" + "10"), new SimpleString("amqp_testtopic" + "10"), null, true, false);
connection = createConnection(); connection = createConnection();
} }
@ -186,9 +192,9 @@ public class ProtonTest extends ProtonTestBase {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
myDurSub = session.createDurableSubscriber(topic, "myDurSub"); myDurSub = session.createDurableSubscriber(topic, "myDurSub");
myDurSub.close(); myDurSub.close();
Assert.assertNotNull(server.getPostOffice().getBinding(new SimpleString("myClientId:myDurSub"))); Assert.assertNotNull(server.getPostOffice().getBinding(new SimpleString("myClientId.myDurSub")));
session.unsubscribe("myDurSub"); session.unsubscribe("myDurSub");
Assert.assertNull(server.getPostOffice().getBinding(new SimpleString("myClientId:myDurSub"))); Assert.assertNull(server.getPostOffice().getBinding(new SimpleString("myClientId.myDurSub")));
session.close(); session.close();
connection.close(); connection.close();
} finally { } finally {
@ -740,6 +746,81 @@ public class ProtonTest extends ProtonTestBase {
assertTrue(expectedException.getMessage().contains("target address does not exist")); assertTrue(expectedException.getMessage().contains("target address does not exist"));
} }
@Test
public void testLinkDetachSentWhenQueueDeleted() throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
final AmqpConnection amqpConnection = client.connect();
try {
AmqpSession session = amqpConnection.createSession();
AmqpReceiver receiver = session.createReceiver(coreAddress);
server.destroyQueue(new SimpleString(coreAddress), null, false, true);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return amqpConnection.isClosed();
}
});
assertTrue(receiver.isClosed());
} finally {
amqpConnection.close();
}
}
@Test
public void testCloseIsSentOnConnectionClose() throws Exception {
connection.close();
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
final AmqpConnection amqpConnection = client.connect();
try {
for (RemotingConnection connection : server.getRemotingService().getConnections()) {
server.getRemotingService().removeConnection(connection);
connection.disconnect(true);
}
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return amqpConnection.isClosed();
}
});
assertTrue(amqpConnection.isClosed());
assertEquals(AmqpSupport.CONNECTION_FORCED, amqpConnection.getConnection().getRemoteCondition().getCondition());
} finally {
amqpConnection.close();
}
}
@Test
public void testClientIdIsSetInSubscriptionList() throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = client.createConnection();
amqpConnection.setContainerId("testClient");
amqpConnection.setOfferedCapabilities(Arrays.asList(Symbol.getSymbol("topic")));
amqpConnection.connect();
try {
AmqpSession session = amqpConnection.createSession();
Source source = new Source();
source.setDurable(TerminusDurability.UNSETTLED_STATE);
source.setCapabilities(Symbol.getSymbol("topic"));
source.setAddress("jms.topic.mytopic");
AmqpReceiver receiver = session.createReceiver(source, "testSub");
SimpleString fo = new SimpleString("testClient.testSub:jms.topic.mytopic");
assertNotNull(server.locateQueue(fo));
} catch (Exception e) {
e.printStackTrace();
} finally {
amqpConnection.close();
}
}
@Test @Test
public void testSendingAndReceivingToQueueWithDifferentAddressAndQueueName() throws Exception { public void testSendingAndReceivingToQueueWithDifferentAddressAndQueueName() throws Exception {

View File

@ -249,6 +249,32 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
} }
@Test
public void testCreateAndDestroyQueueClosingConsumers() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString name = RandomUtil.randomSimpleString();
boolean durable = true;
ActiveMQServerControl serverControl = createManagementControl();
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
serverControl.createQueue(address.toString(), name.toString(), durable);
ServerLocator receiveLocator = createInVMNonHALocator();
ClientSessionFactory receiveCsf = createSessionFactory(receiveLocator);
ClientSession receiveClientSession = receiveCsf.createSession(true, false, false);
ClientConsumer consumer = receiveClientSession.createConsumer(name);
Assert.assertFalse(consumer.isClosed());
checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
serverControl.destroyQueue(name.toString(), true);
Assert.assertTrue(consumer.isClosed());
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
}
@Test @Test
public void testCreateAndDestroyQueueWithNullFilter() throws Exception { public void testCreateAndDestroyQueueWithNullFilter() throws Exception {
SimpleString address = RandomUtil.randomSimpleString(); SimpleString address = RandomUtil.randomSimpleString();

View File

@ -127,6 +127,11 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
proxy.invokeOperation("destroyQueue", name); proxy.invokeOperation("destroyQueue", name);
} }
@Override
public void destroyQueue(final String name, final boolean removeConsumers) throws Exception {
proxy.invokeOperation("destroyQueue", name, removeConsumers);
}
@Override @Override
public void disableMessageCounters() throws Exception { public void disableMessageCounters() throws Exception {
proxy.invokeOperation("disableMessageCounters"); proxy.invokeOperation("disableMessageCounters");