diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 6800854760..a2a5ee5de3 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -19,6 +19,7 @@ package org.apache.activemq.transport.amqp; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -38,6 +39,7 @@ import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.TopicRegion; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQTempDestination; import org.apache.activemq.command.ActiveMQTempQueue; import org.apache.activemq.command.ActiveMQTempTopic; import org.apache.activemq.command.Command; @@ -517,12 +519,21 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { static abstract class AmqpDeliveryListener { + protected ActiveMQDestination destination; + protected List closeActions = new ArrayList(); + abstract public void onDelivery(Delivery delivery) throws Exception; public void onDetach() throws Exception { } public void onClose() throws Exception { + + for (Runnable action : closeActions) { + action.run(); + } + + closeActions.clear(); } public void drainCheck() { @@ -531,6 +542,18 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { abstract void doCommit() throws Exception; abstract void doRollback() throws Exception; + + public void addCloseAction(Runnable action) { + closeActions.add(action); + } + + public ActiveMQDestination getDestination() { + return destination; + } + + public void setDestination(ActiveMQDestination destination) { + this.destination = destination; + } } private void onConnectionOpen() throws AmqpProtocolException { @@ -683,14 +706,11 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { class ProducerContext extends BaseProducerContext { private final ProducerId producerId; private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); - private final ActiveMQDestination destination; private boolean closed; - private final boolean anonymous; + private boolean anonymous; - public ProducerContext(ProducerId producerId, ActiveMQDestination destination, boolean anonymous) { + public ProducerContext(ProducerId producerId) { this.producerId = producerId; - this.destination = destination; - this.anonymous = anonymous; } @Override @@ -797,6 +817,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { if (!closed) { sendToActiveMQ(new RemoveInfo(producerId), null); } + + super.onClose(); } public void close() { @@ -914,6 +936,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { // Client is producing to this receiver object org.apache.qpid.proton.amqp.transport.Target remoteTarget = receiver.getRemoteTarget(); int flow = producerCredit; + try { if (remoteTarget instanceof Coordinator) { pumpProtonToSocket(); @@ -924,29 +947,35 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { } else { Target target = (Target) remoteTarget; ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++); + final ProducerContext producerContext = new ProducerContext(producerId); ActiveMQDestination destination = null; - boolean anonymous = false; String targetNodeName = target.getAddress(); if ((targetNodeName == null || targetNodeName.length() == 0) && !target.getDynamic()) { - anonymous = true; + producerContext.anonymous = true; } else if (target.getDynamic()) { destination = createTemporaryDestination(receiver, target.getCapabilities()); Target actualTarget = new Target(); actualTarget.setAddress(destination.getQualifiedName()); actualTarget.setDynamic(true); receiver.setTarget(actualTarget); + producerContext.addCloseAction(new Runnable() { + + @Override + public void run() { + deleteTemporaryDestination((ActiveMQTempDestination) producerContext.getDestination()); + } + }); } else { destination = createDestination(remoteTarget); } - final ProducerContext producerContext = new ProducerContext(producerId, destination, anonymous); - receiver.setContext(producerContext); receiver.flow(flow); ProducerInfo producerInfo = new ProducerInfo(producerId); producerInfo.setDestination(destination); + producerContext.setDestination(destination); sendToActiveMQ(producerInfo, new ResponseHandler() { @Override public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException { @@ -1005,7 +1034,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { private boolean closed; public ConsumerInfo info; private boolean endOfBrowse = false; - public ActiveMQDestination destination; public int credit; public int consumerPrefetch = 0; private long lastDeliveredSequenceId; @@ -1068,28 +1096,32 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { @Override public void onClose() throws Exception { - if (!closed) { - closed = true; - sender.setContext(null); - subscriptionsByConsumerId.remove(consumerId); + try { + if (!closed) { + closed = true; + sender.setContext(null); + subscriptionsByConsumerId.remove(consumerId); - AmqpSessionContext session = (AmqpSessionContext) sender.getSession().getContext(); - if (session != null) { - session.consumers.remove(info.getConsumerId()); - } - - RemoveInfo removeCommand = new RemoveInfo(consumerId); - removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); - sendToActiveMQ(removeCommand, null); - - if (info.isDurable()) { - RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); - rsi.setConnectionId(connectionId); - rsi.setSubscriptionName(sender.getName()); - rsi.setClientId(connectionInfo.getClientId()); - - sendToActiveMQ(rsi, null); + AmqpSessionContext session = (AmqpSessionContext) sender.getSession().getContext(); + if (session != null) { + session.consumers.remove(info.getConsumerId()); + } + + RemoveInfo removeCommand = new RemoveInfo(consumerId); + removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); + sendToActiveMQ(removeCommand, null); + + if (info.isDurable()) { + RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); + rsi.setConnectionId(connectionId); + rsi.setSubscriptionName(sender.getName()); + rsi.setClientId(connectionInfo.getClientId()); + + sendToActiveMQ(rsi, null); + } } + } finally { + super.onClose(); } } @@ -1415,6 +1447,13 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { source.setAddress(destination.getQualifiedName()); source.setDynamic(true); sender.setSource(source); + consumerContext.addCloseAction(new Runnable() { + + @Override + public void run() { + deleteTemporaryDestination((ActiveMQTempDestination) consumerContext.getDestination()); + } + }); } else { destination = createDestination(source); } @@ -1425,7 +1464,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { consumerInfo.setSelector(selector); consumerInfo.setNoRangeAcks(true); consumerInfo.setDestination(destination); - consumerContext.destination = destination; + consumerContext.setDestination(destination); int senderCredit = sender.getRemoteCredit(); if (prefetch != 0) { // use the value configured on the transport connector @@ -1551,6 +1590,24 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { return rc; } + private void deleteTemporaryDestination(ActiveMQTempDestination destination) { + DestinationInfo info = new DestinationInfo(); + info.setConnectionId(connectionId); + info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); + info.setDestination(destination); + + sendToActiveMQ(info, new ResponseHandler() { + + @Override + public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException { + if (response.isException()) { + Throwable exception = ((ExceptionResponse) response).getException(); + LOG.debug("Error during temp destination removeal: {}", exception.getMessage()); + } + } + }); + } + private boolean contains(Symbol[] symbols, Symbol key) { if (symbols == null) { return false; diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java index 87553e27e8..552d8287ae 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java @@ -1067,7 +1067,6 @@ public class JMSClientTest extends JMSClientTestSupport { } } - @Ignore("Broker cannot currently tell if it should delete a temp destination") @Test(timeout=30000) public void testDeleteTemporaryQueue() throws Exception { ActiveMQAdmin.enableJMSFrameTracing(); @@ -1112,7 +1111,7 @@ public class JMSClientTest extends JMSClientTestSupport { } } - @Ignore("Broker cannot currently tell if it should delete a temp destination") + @Ignore("Legacy QPid client does not support creation of TemporaryTopics correctly") @Test(timeout=30000) public void testDeleteTemporaryTopic() throws Exception { ActiveMQAdmin.enableJMSFrameTracing();