diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 94e6a47741..1ed57458d2 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.protocol.amqp.proton; +import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -59,7 +60,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr private static final Logger log = Logger.getLogger(ProtonServerSenderContext.class); - private static final Symbol SELECTOR = Symbol.getSymbol("jms-selector"); private static final Symbol COPY = Symbol.valueOf("copy"); private static final Symbol TOPIC = Symbol.valueOf("topic"); @@ -72,10 +72,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr protected final AMQPSessionCallback sessionSPI; protected CreditsSemaphore creditsSemaphore = new CreditsSemaphore(0); - public ProtonServerSenderContext(AMQPConnectionContext connection, - Sender sender, - AMQPSessionContext protonSession, - AMQPSessionCallback server) { + public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, AMQPSessionContext protonSession, AMQPSessionCallback server) { super(); this.connection = connection; this.sender = sender; @@ -98,20 +95,20 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } /* -* start the session -* */ + * start the session + */ public void start() throws ActiveMQAMQPException { sessionSPI.start(); // protonSession.getServerSession().start(); - //todo add flow control + // todo add flow control try { // to do whatever you need to make the broker start sending messages to the consumer - //this could be null if a link reattach has happened + // this could be null if a link reattach has happened if (brokerConsumer != null) { sessionSPI.startSender(brokerConsumer); } - //protonSession.getServerSession().receiveConsumerCredits(consumerID, -1); + // protonSession.getServerSession().receiveConsumerCredits(consumerID, -1); } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorStartingConsumer(e.getMessage()); } @@ -120,20 +117,19 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr /** * create the actual underlying ActiveMQ Artemis Server Consumer */ + @SuppressWarnings("unchecked") @Override public void initialise() throws Exception { super.initialise(); Source source = (Source) sender.getRemoteSource(); - String queue; - String selector = null; + final Map supportedFilters = new HashMap<>(); - /* - * even tho the filter is a map it will only return a single filter unless a nolocal is also provided - * */ if (source != null) { + // We look for message selectors on every receiver, while in other cases we might only + // consume the filter depending on the subscription type. Map.Entry filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.JMS_SELECTOR_FILTER_IDS); if (filter != null) { selector = filter.getValue().getDescribed().toString(); @@ -144,17 +140,18 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage())); return; } + + supportedFilters.put(filter.getKey(), filter.getValue()); } } - /* - * if we have a capability for a topic (qpid-jms) or we are configured on this address to act like a topic then act - * like a subscription. - * */ + // if we have a capability for a Topic (AMQP -> JMS Mapping) or we are configured on this + // address to act like a topic then act like a subscription. boolean isPubSub = hasCapabilities(TOPIC, source) || isPubSub(source); if (isPubSub) { - if (AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS) != null) { + Map.Entry filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS); + if (filter != null) { String remoteContainerId = sender.getSession().getConnection().getRemoteContainer(); String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'"; if (selector != null) { @@ -162,20 +159,25 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } else { selector = noLocalFilter; } + + supportedFilters.put(filter.getKey(), filter.getValue()); } } if (source == null) { - // Attempt to recover a previous subscription happens when a link reattach happens on a subscription queue + // Attempt to recover a previous subscription happens when a link reattach happens on a + // subscription queue String clientId = connection.getRemoteContainer(); String pubId = sender.getName(); queue = createQueueName(clientId, pubId); boolean exists = sessionSPI.queueQuery(queue, false).isExists(); - /* - * If it exists then we know it is a subscription so we set the capabilities on the source so we can delete on a - * link remote close. - * */ + // Once confirmed that the address exists we need to return a Source that reflects + // the lifetime policy and capabilities of the new subscription. + // + // TODO we are not applying selector or noLocal filters to the source we just + // looked up which would violate expectations if the client checked that they + // are present on subscription recovery (JMS Durable Re-subscribe) etc if (exists) { source = new org.apache.qpid.proton.amqp.messaging.Source(); source.setAddress(queue); @@ -187,79 +189,86 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } else { throw new ActiveMQAMQPNotFoundException("Unknown subscription link: " + sender.getName()); } - } else { - if (source.getDynamic()) { - //if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and - // will be deleted on closing of the session - queue = java.util.UUID.randomUUID().toString(); - try { - sessionSPI.createTemporaryQueue(queue); - //protonSession.getServerSession().createQueue(queue, queue, null, true, false); - } catch (Exception e) { - throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage()); - } - source.setAddress(queue); - } else { - //if not dynamic then we use the targets address as the address to forward the messages to, however there has to - //be a queue bound to it so we nee to check this. - if (isPubSub) { - // if we are a subscription and durable create a durable queue using the container id and link name - if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) { - String clientId = connection.getRemoteContainer(); - String pubId = sender.getName(); - queue = createQueueName(clientId, pubId); - QueueQueryResult result = sessionSPI.queueQuery(queue, false); - - if (result.isExists()) { - // If a client reattaches to a durable subscription with a different no-local filter value, selector - // or address then we must recreate the queue (JMS semantics). - - if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString(selector)) || (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) { - if (result.getConsumerCount() == 0) { - sessionSPI.deleteQueue(queue); - sessionSPI.createDurableQueue(source.getAddress(), queue, selector); - } else { - throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist"); - } - } - } else { - sessionSPI.createDurableQueue(source.getAddress(), queue, selector); - } - source.setAddress(queue); - } else { - //otherwise we are a volatile subscription - queue = java.util.UUID.randomUUID().toString(); - try { - sessionSPI.createTemporaryQueue(source.getAddress(), queue, selector); - } catch (Exception e) { - throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage()); - } - source.setAddress(queue); - } - } else { - queue = source.getAddress(); - } - if (queue == null) { - throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressNotSet(); - } - - try { - if (!sessionSPI.queueQuery(queue, !isPubSub).isExists()) { - throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist(); - } - } catch (ActiveMQAMQPNotFoundException e) { - throw e; - } catch (Exception e) { - throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e); - } - } - - boolean browseOnly = !isPubSub && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY); + } else if (source.getDynamic()) { + // if dynamic we have to create the node (queue) and set the address on the target, the + // node is temporary and will be deleted on closing of the session + queue = java.util.UUID.randomUUID().toString(); try { - brokerConsumer = sessionSPI.createSender(this, queue, isPubSub ? null : selector, browseOnly); + sessionSPI.createTemporaryQueue(queue); + // protonSession.getServerSession().createQueue(queue, queue, null, true, false); } catch (Exception e) { - throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage()); + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage()); } + source.setAddress(queue); + } else { + // if not dynamic then we use the target's address as the address to forward the + // messages to, however there has to be a queue bound to it so we need to check this. + if (isPubSub) { + // if we are a subscription and durable create a durable queue using the container + // id and link name + if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) { + String clientId = connection.getRemoteContainer(); + String pubId = sender.getName(); + queue = createQueueName(clientId, pubId); + QueueQueryResult result = sessionSPI.queueQuery(queue, false); + + if (result.isExists()) { + // If a client reattaches to a durable subscription with a different no-local + // filter value, selector or address then we must recreate the queue (JMS semantics). + if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString(selector)) || + (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) { + + if (result.getConsumerCount() == 0) { + sessionSPI.deleteQueue(queue); + sessionSPI.createDurableQueue(source.getAddress(), queue, selector); + } else { + throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist"); + } + } + } else { + sessionSPI.createDurableQueue(source.getAddress(), queue, selector); + } + source.setAddress(queue); + } else { + // otherwise we are a volatile subscription + queue = java.util.UUID.randomUUID().toString(); + try { + sessionSPI.createTemporaryQueue(source.getAddress(), queue, selector); + } catch (Exception e) { + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage()); + } + source.setAddress(queue); + } + } else { + queue = source.getAddress(); + } + + if (queue == null) { + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressNotSet(); + } + + try { + if (!sessionSPI.queueQuery(queue, !isPubSub).isExists()) { + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist(); + } + } catch (ActiveMQAMQPNotFoundException e) { + throw e; + } catch (Exception e) { + throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e); + } + } + + // We need to update the source with any filters we support otherwise the client + // is free to consider the attach as having failed if we don't send back what we + // do support or if we send something we don't support the client won't know we + // have not honored what it asked for. + source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters); + + boolean browseOnly = !isPubSub && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY); + try { + brokerConsumer = sessionSPI.createSender(this, queue, isPubSub ? null : selector, browseOnly); + } catch (Exception e) { + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage()); } } @@ -269,8 +278,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } /* - * close the session - * */ + * close the session + */ @Override public void close(ErrorCondition condition) throws ActiveMQAMQPException { closed = true; @@ -289,14 +298,14 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } /* - * close the session - * */ + * close the session + */ @Override public void close(boolean remoteLinkClose) throws ActiveMQAMQPException { try { sessionSPI.closeSender(brokerConsumer); - //if this is a link close rather than a connection close or detach, we need to delete any durable resources for - // say pub subs + // if this is a link close rather than a connection close or detach, we need to delete + // any durable resources for say pub subs if (remoteLinkClose) { Source source = (Source) sender.getSource(); if (source != null && source.getAddress() != null && hasCapabilities(TOPIC, source)) { @@ -324,7 +333,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } - @Override public void onMessage(Delivery delivery) throws ActiveMQAMQPException { Object message = delivery.getContext(); @@ -349,7 +357,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr delivery.disposition(txAccepted); } - //we have to individual ack as we can't guarantee we will get the delivery updates (including acks) in order + // we have to individual ack as we can't guarantee we will get the delivery + // updates (including acks) in order // from dealer, a perf hit but a must try { sessionSPI.ack(tx, brokerConsumer, message); @@ -359,7 +368,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } } else if (remoteState instanceof Accepted) { - //we have to individual ack as we can't guarantee we will get the delivery updates (including acks) in order + // we have to individual ack as we can't guarantee we will get the delivery updates + // (including acks) in order // from dealer, a perf hit but a must try { sessionSPI.ack(null, brokerConsumer, message); @@ -379,7 +389,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage()); } } - //todo add tag caching + // todo add tag caching if (!preSettle) { protonSession.replaceTag(delivery.getTag()); } @@ -390,7 +400,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } else { - //todo not sure if we need to do anything here + // todo not sure if we need to do anything here } } @@ -417,7 +427,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } - // presettle means we can settle the message on the dealer side before we send it, i.e. for browsers + // presettle means we can settle the message on the dealer side before we send it, i.e. + // for browsers boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED; // we only need a tag if we are going to settle later @@ -478,5 +489,4 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr private static String createQueueName(String clientId, String pubId) { return clientId + "." + pubId; } - } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java index f0f52bae66..2c7ce6fef5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java @@ -48,6 +48,7 @@ public class AmqpClientTestSupport extends ActiveMQTestBase { @Override public void setUp() throws Exception { super.setUp(); + server = createServer(true, true); server.start(); } @@ -55,8 +56,6 @@ public class AmqpClientTestSupport extends ActiveMQTestBase { @After @Override public void tearDown() throws Exception { - super.tearDown(); - for (AmqpConnection conn : connections) { try { conn.close(); @@ -65,6 +64,8 @@ public class AmqpClientTestSupport extends ActiveMQTestBase { } } server.stop(); + + super.tearDown(); } public Queue getProxyToQueue(String queueName) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverWithFiltersTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverWithFiltersTest.java new file mode 100644 index 0000000000..2c2438292c --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverWithFiltersTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.activemq.transport.amqp.client.AmqpUnknownFilterType; +import org.apache.activemq.transport.amqp.client.AmqpValidator; +import org.apache.qpid.proton.amqp.DescribedType; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.TerminusDurability; +import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; +import org.apache.qpid.proton.engine.Receiver; +import org.junit.Test; + +/** + * Test various behaviors of AMQP receivers with the broker. + */ +public class AmqpReceiverWithFiltersTest extends AmqpClientTestSupport { + + @Test(timeout = 60000) + public void testUnsupportedFiltersAreNotListedAsSupported() throws Exception { + AmqpClient client = createAmqpClient(); + + client.setValidator(new AmqpValidator() { + + @SuppressWarnings("unchecked") + @Override + public void inspectOpenedResource(Receiver receiver) { + + if (receiver.getRemoteSource() == null) { + markAsInvalid("Link opened with null source."); + } + + Source source = (Source) receiver.getRemoteSource(); + Map filters = source.getFilter(); + + if (findFilter(filters, AmqpUnknownFilterType.UNKNOWN_FILTER_IDS) != null) { + markAsInvalid("Broker should not return unsupported filter on attach."); + } + } + }); + + Map filters = new HashMap<>(); + filters.put(AmqpUnknownFilterType.UNKNOWN_FILTER_NAME, AmqpUnknownFilterType.UNKNOWN_FILTER); + + Source source = new Source(); + source.setAddress(getTestName()); + source.setFilter(filters); + source.setDurable(TerminusDurability.NONE); + source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); + + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + session.createReceiver(source); + + assertEquals(1, server.getTotalConsumerCount()); + + connection.getStateInspector().assertValid(); + connection.close(); + } + + @Test(timeout = 60000) + public void testSupportedFiltersAreListedAsSupported() throws Exception { + AmqpClient client = createAmqpClient(); + + client.setValidator(new AmqpValidator() { + + @SuppressWarnings("unchecked") + @Override + public void inspectOpenedResource(Receiver receiver) { + + if (receiver.getRemoteSource() == null) { + markAsInvalid("Link opened with null source."); + } + + Source source = (Source) receiver.getRemoteSource(); + Map filters = source.getFilter(); + + if (findFilter(filters, AmqpSupport.JMS_SELECTOR_FILTER_IDS) == null) { + markAsInvalid("Broker should return selector filter on attach."); + } + } + }); + + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + session.createReceiver(getTestName(), "color = red"); + + connection.getStateInspector().assertValid(); + connection.close(); + } +}