diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java index 70e4fd086b..1f193eb071 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java @@ -24,7 +24,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; @@ -46,6 +45,8 @@ import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.engine.Transport; import org.jboss.logging.Logger; +import io.netty.buffer.ByteBuf; + public class AMQPConnectionContext extends ProtonInitializable { private static final Logger log = Logger.getLogger(AMQPConnectionContext.class); @@ -181,7 +182,7 @@ public class AMQPConnectionContext extends ProtonInitializable { protected void remoteLinkOpened(Link link) throws Exception { - AMQPSessionContext protonSession = (AMQPSessionContext) getSessionExtension(link.getSession()); + AMQPSessionContext protonSession = getSessionExtension(link.getSession()); link.setSource(link.getRemoteSource()); link.setTarget(link.getRemoteTarget()); @@ -321,6 +322,7 @@ public class AMQPConnectionContext extends ProtonInitializable { public void onRemoteClose(Connection connection) { synchronized (getLock()) { connection.close(); + connection.free(); for (AMQPSessionContext protonSession : sessions.values()) { protonSession.close(); } @@ -352,6 +354,7 @@ public class AMQPConnectionContext extends ProtonInitializable { public void onRemoteClose(Session session) throws Exception { synchronized (getLock()) { session.close(); + session.free(); } AMQPSessionContext sessionContext = (AMQPSessionContext) session.getContext(); @@ -375,6 +378,7 @@ public class AMQPConnectionContext extends ProtonInitializable { @Override public void onRemoteClose(Link link) throws Exception { link.close(); + link.free(); ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext(); if (linkContext != null) { linkContext.close(true); @@ -384,10 +388,11 @@ public class AMQPConnectionContext extends ProtonInitializable { @Override public void onRemoteDetach(Link link) throws Exception { link.detach(); + link.free(); } @Override - public void onDetach(Link link) throws Exception { + public void onLocalDetach(Link link) throws Exception { Object context = link.getContext(); if (context instanceof ProtonServerSenderContext) { ProtonServerSenderContext senderContext = (ProtonServerSenderContext) context; @@ -402,10 +407,8 @@ public class AMQPConnectionContext extends ProtonInitializable { handler.onMessage(delivery); } else { // TODO: logs - System.err.println("Handler is null, can't delivery " + delivery); } } } - } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpJmsSelectorFilter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpJmsSelectorFilter.java new file mode 100644 index 0000000000..2e6ec2f5b9 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpJmsSelectorFilter.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.proton; + +import org.apache.qpid.proton.amqp.DescribedType; + +/** + * A Described Type wrapper for JMS selector values. + */ +public class AmqpJmsSelectorFilter implements DescribedType { + + private final String selector; + + public AmqpJmsSelectorFilter(String selector) { + this.selector = selector; + } + + @Override + public Object getDescriptor() { + return AmqpSupport.JMS_SELECTOR_CODE; + } + + @Override + public Object getDescribed() { + return this.selector; + } + + @Override + public String toString() { + return "AmqpJmsSelectorType{" + selector + "}"; + } +} diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpNoLocalFilter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpNoLocalFilter.java new file mode 100644 index 0000000000..24f3eadc6d --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpNoLocalFilter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.proton; + +import org.apache.qpid.proton.amqp.DescribedType; + +/** + * A Described Type wrapper for JMS no local option for MessageConsumer. + */ +public class AmqpNoLocalFilter implements DescribedType { + + public static final AmqpNoLocalFilter NO_LOCAL = new AmqpNoLocalFilter(); + + private final String noLocal; + + public AmqpNoLocalFilter() { + this.noLocal = "NoLocalFilter{}"; + } + + @Override + public Object getDescriptor() { + return AmqpSupport.NO_LOCAL_CODE; + } + + @Override + public Object getDescribed() { + return this.noLocal; + } +} diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 1ed57458d2..76279c5e1a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -170,21 +170,46 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr String clientId = connection.getRemoteContainer(); String pubId = sender.getName(); queue = createQueueName(clientId, pubId); - boolean exists = sessionSPI.queueQuery(queue, false).isExists(); + QueueQueryResult result = sessionSPI.queueQuery(queue, false); // Once confirmed that the address exists we need to return a Source that reflects // the lifetime policy and capabilities of the new subscription. - // - // TODO we are not applying selector or noLocal filters to the source we just - // looked up which would violate expectations if the client checked that they - // are present on subscription recovery (JMS Durable Re-subscribe) etc - if (exists) { + if (result.isExists()) { source = new org.apache.qpid.proton.amqp.messaging.Source(); source.setAddress(queue); source.setDurable(TerminusDurability.UNSETTLED_STATE); source.setExpiryPolicy(TerminusExpiryPolicy.NEVER); source.setDistributionMode(COPY); source.setCapabilities(TOPIC); + + SimpleString filterString = result.getFilterString(); + if (filterString != null) { + selector = filterString.toString(); + boolean noLocal = false; + + String remoteContainerId = sender.getSession().getConnection().getRemoteContainer(); + String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'"; + + if (selector.endsWith(noLocalFilter)) { + if (selector.length() > noLocalFilter.length()) { + noLocalFilter = " AND " + noLocalFilter; + selector = selector.substring(0, selector.length() - noLocalFilter.length()); + } else { + selector = null; + } + + noLocal = true; + } + + if (noLocal) { + supportedFilters.put(AmqpSupport.NO_LOCAL_NAME, AmqpNoLocalFilter.NO_LOCAL); + } + + if (selector != null && !selector.trim().isEmpty()) { + supportedFilters.put(AmqpSupport.JMS_SELECTOR_NAME, new AmqpJmsSelectorFilter(selector)); + } + } + sender.setSource(source); } else { throw new ActiveMQAMQPNotFoundException("Unknown subscription link: " + sender.getName()); @@ -228,7 +253,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } else { sessionSPI.createDurableQueue(source.getAddress(), queue, selector); } - source.setAddress(queue); } else { // otherwise we are a volatile subscription queue = java.util.UUID.randomUUID().toString(); @@ -237,7 +261,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage()); } - source.setAddress(queue); } } else { queue = source.getAddress(); @@ -308,7 +331,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // any durable resources for say pub subs if (remoteLinkClose) { Source source = (Source) sender.getSource(); - if (source != null && source.getAddress() != null && hasCapabilities(TOPIC, source)) { + if (source != null && source.getAddress() != null && (hasCapabilities(TOPIC, source) || isPubSub(source))) { String queueName = source.getAddress(); QueueQueryResult result = sessionSPI.queueQuery(queueName, false); if (result.isExists() && source.getDynamic()) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java index 91c9a67cfb..00bd27a100 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java @@ -69,7 +69,7 @@ public interface EventHandler { void onRemoteDetach(Link link) throws Exception; - void onDetach(Link link) throws Exception; + void onLocalDetach(Link link) throws Exception; void onDelivery(Delivery delivery) throws Exception; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/Events.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/Events.java index 6552f64ee3..405491af6e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/Events.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/Events.java @@ -85,7 +85,7 @@ public final class Events { handler.onFinal(event.getLink()); break; case LINK_LOCAL_DETACH: - handler.onDetach(event.getLink()); + handler.onLocalDetach(event.getLink()); break; case LINK_REMOTE_DETACH: handler.onRemoteDetach(event.getLink()); @@ -96,7 +96,8 @@ public final class Events { case DELIVERY: handler.onDelivery(event.getDelivery()); break; + default: + break; } } - } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java new file mode 100644 index 0000000000..e0c6b6cbad --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import static org.apache.activemq.transport.amqp.AmqpSupport.COPY; +import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_NAME; +import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_NAME; + +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.DescribedType; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.TerminusDurability; +import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; +import org.apache.qpid.proton.engine.Receiver; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests for broker side support of the Durable Subscription mapping for JMS. + */ +public class AmqpDurableReceiverTest extends AmqpClientTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(AmqpDurableReceiverTest.class); + + private final String SELECTOR_STRING = "color = red"; + + @Override + public void setUp() throws Exception { + super.setUp(); + server.createQueue(new SimpleString(getTopicName()), new SimpleString(getTopicName()), null, true, false); + } + + @Test(timeout = 60000) + public void testCreateDurableReceiver() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.createConnection()); + connection.setContainerId(getContainerID()); + connection.connect(); + + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName()); + receiver.flow(1); + + assertEquals(getTopicName(), lookupSubscription()); + + AmqpSender sender = session.createSender(getTopicName()); + AmqpMessage message = new AmqpMessage(); + message.setMessageId("message:1"); + sender.send(message); + + message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + + connection.close(); + + assertEquals(getTopicName(), lookupSubscription()); + } + + @Test(timeout = 60000) + public void testDetachedDurableReceiverRemainsActive() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.createConnection()); + connection.setContainerId(getContainerID()); + connection.connect(); + + AmqpSession session = connection.createSession(); + AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName()); + + assertEquals(getTopicName(), lookupSubscription()); + + receiver.detach(); + + assertEquals(getTopicName(), lookupSubscription()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testCloseDurableReceiverRemovesSubscription() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.createConnection()); + connection.setContainerId(getContainerID()); + connection.connect(); + + AmqpSession session = connection.createSession(); + AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName()); + + assertEquals(getTopicName(), lookupSubscription()); + + receiver.close(); + + assertNull(lookupSubscription()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testReattachToDurableNode() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.createConnection()); + connection.setContainerId(getContainerID()); + connection.connect(); + + AmqpSession session = connection.createSession(); + AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName()); + + receiver.detach(); + + receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName()); + + receiver.close(); + + connection.close(); + } + + @Test(timeout = 60000) + public void testLookupExistingSubscription() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.createConnection()); + connection.setContainerId(getContainerID()); + connection.connect(); + + AmqpSession session = connection.createSession(); + AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName()); + + receiver.detach(); + + receiver = session.lookupSubscription(getSubscriptionName()); + + assertNotNull(receiver); + + Receiver protonReceiver = receiver.getReceiver(); + assertNotNull(protonReceiver.getRemoteSource()); + Source remoteSource = (Source) protonReceiver.getRemoteSource(); + + if (remoteSource.getFilter() != null) { + assertFalse(remoteSource.getFilter().containsKey(NO_LOCAL_NAME)); + assertFalse(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME)); + } + + assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy()); + assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable()); + assertEquals(COPY, remoteSource.getDistributionMode()); + + receiver.close(); + + try { + receiver = session.lookupSubscription(getSubscriptionName()); + fail("Should not be able to lookup the subscription"); + } catch (Exception e) { + } + + connection.close(); + } + + @Test(timeout = 60000) + public void testLookupExistingSubscriptionWithSelector() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.createConnection()); + connection.setContainerId(getContainerID()); + connection.connect(); + + AmqpSession session = connection.createSession(); + AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName(), SELECTOR_STRING, false); + + receiver.detach(); + + receiver = session.lookupSubscription(getSubscriptionName()); + + assertNotNull(receiver); + + Receiver protonReceiver = receiver.getReceiver(); + assertNotNull(protonReceiver.getRemoteSource()); + Source remoteSource = (Source) protonReceiver.getRemoteSource(); + + assertNotNull(remoteSource.getFilter()); + assertFalse(remoteSource.getFilter().containsKey(NO_LOCAL_NAME)); + assertTrue(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME)); + String selector = (String) ((DescribedType) remoteSource.getFilter().get(JMS_SELECTOR_NAME)).getDescribed(); + assertEquals(SELECTOR_STRING, selector); + + assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy()); + assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable()); + assertEquals(COPY, remoteSource.getDistributionMode()); + + receiver.close(); + + try { + receiver = session.lookupSubscription(getSubscriptionName()); + fail("Should not be able to lookup the subscription"); + } catch (Exception e) { + } + + connection.close(); + } + + @Test(timeout = 60000) + public void testLookupExistingSubscriptionWithNoLocal() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.createConnection()); + connection.setContainerId(getContainerID()); + connection.connect(); + + AmqpSession session = connection.createSession(); + AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName(), null, true); + + receiver.detach(); + + receiver = session.lookupSubscription(getSubscriptionName()); + + assertNotNull(receiver); + + Receiver protonReceiver = receiver.getReceiver(); + assertNotNull(protonReceiver.getRemoteSource()); + Source remoteSource = (Source) protonReceiver.getRemoteSource(); + + assertNotNull(remoteSource.getFilter()); + assertTrue(remoteSource.getFilter().containsKey(NO_LOCAL_NAME)); + assertFalse(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME)); + + assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy()); + assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable()); + assertEquals(COPY, remoteSource.getDistributionMode()); + + receiver.close(); + + try { + receiver = session.lookupSubscription(getSubscriptionName()); + fail("Should not be able to lookup the subscription"); + } catch (Exception e) { + } + + connection.close(); + } + + @Test(timeout = 60000) + public void testLookupExistingSubscriptionWithSelectorAndNoLocal() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.createConnection()); + connection.setContainerId(getContainerID()); + connection.connect(); + + AmqpSession session = connection.createSession(); + AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName(), SELECTOR_STRING, true); + + receiver.detach(); + + receiver = session.lookupSubscription(getSubscriptionName()); + + assertNotNull(receiver); + + Receiver protonReceiver = receiver.getReceiver(); + assertNotNull(protonReceiver.getRemoteSource()); + Source remoteSource = (Source) protonReceiver.getRemoteSource(); + + assertNotNull(remoteSource.getFilter()); + assertTrue(remoteSource.getFilter().containsKey(NO_LOCAL_NAME)); + assertTrue(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME)); + String selector = (String) ((DescribedType) remoteSource.getFilter().get(JMS_SELECTOR_NAME)).getDescribed(); + assertEquals(SELECTOR_STRING, selector); + + assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy()); + assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable()); + assertEquals(COPY, remoteSource.getDistributionMode()); + + receiver.close(); + + try { + receiver = session.lookupSubscription(getSubscriptionName()); + fail("Should not be able to lookup the subscription"); + } catch (Exception e) { + } + + connection.close(); + } + + @Test(timeout = 60000) + public void testLookupNonExistingSubscription() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.createConnection()); + connection.setContainerId(getContainerID()); + connection.connect(); + + AmqpSession session = connection.createSession(); + + try { + session.lookupSubscription(getSubscriptionName()); + fail("Should throw an exception since there is not subscription"); + } catch (Exception e) { + LOG.info("Error on lookup: {}", e.getMessage()); + } + + connection.close(); + } + + public String lookupSubscription() { + Binding binding = server.getPostOffice().getBinding(new SimpleString(getContainerID() + "." + getSubscriptionName())); + if (binding != null) { + return binding.getAddress().toString(); + } + + return null; + } + + public String getContainerID() { + return "myContainerID"; + } + + public String getSubscriptionName() { + return "mySubscription"; + } + + public String getTopicName() { + return "jms.topic.myTopic"; + } +}