ARTEMIS-776 Attach response should only list supported filters

The broker needs to return only the filters that are supported on a
receiver attach otherwise the remote is not aware that the broker is not
able to honor the requested configuration of the receiver.
This commit is contained in:
Timothy Bish 2016-10-07 17:29:46 -04:00
parent 736886fc13
commit 398da40f7d
3 changed files with 238 additions and 110 deletions

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.artemis.protocol.amqp.proton; package org.apache.activemq.artemis.protocol.amqp.proton;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@ -59,7 +60,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
private static final Logger log = Logger.getLogger(ProtonServerSenderContext.class); private static final Logger log = Logger.getLogger(ProtonServerSenderContext.class);
private static final Symbol SELECTOR = Symbol.getSymbol("jms-selector");
private static final Symbol COPY = Symbol.valueOf("copy"); private static final Symbol COPY = Symbol.valueOf("copy");
private static final Symbol TOPIC = Symbol.valueOf("topic"); private static final Symbol TOPIC = Symbol.valueOf("topic");
@ -72,10 +72,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
protected final AMQPSessionCallback sessionSPI; protected final AMQPSessionCallback sessionSPI;
protected CreditsSemaphore creditsSemaphore = new CreditsSemaphore(0); protected CreditsSemaphore creditsSemaphore = new CreditsSemaphore(0);
public ProtonServerSenderContext(AMQPConnectionContext connection, public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, AMQPSessionContext protonSession, AMQPSessionCallback server) {
Sender sender,
AMQPSessionContext protonSession,
AMQPSessionCallback server) {
super(); super();
this.connection = connection; this.connection = connection;
this.sender = sender; this.sender = sender;
@ -98,20 +95,20 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} }
/* /*
* start the session * start the session
* */ */
public void start() throws ActiveMQAMQPException { public void start() throws ActiveMQAMQPException {
sessionSPI.start(); sessionSPI.start();
// protonSession.getServerSession().start(); // protonSession.getServerSession().start();
//todo add flow control // todo add flow control
try { try {
// to do whatever you need to make the broker start sending messages to the consumer // to do whatever you need to make the broker start sending messages to the consumer
//this could be null if a link reattach has happened // this could be null if a link reattach has happened
if (brokerConsumer != null) { if (brokerConsumer != null) {
sessionSPI.startSender(brokerConsumer); sessionSPI.startSender(brokerConsumer);
} }
//protonSession.getServerSession().receiveConsumerCredits(consumerID, -1); // protonSession.getServerSession().receiveConsumerCredits(consumerID, -1);
} catch (Exception e) { } catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorStartingConsumer(e.getMessage()); throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorStartingConsumer(e.getMessage());
} }
@ -120,20 +117,19 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
/** /**
* create the actual underlying ActiveMQ Artemis Server Consumer * create the actual underlying ActiveMQ Artemis Server Consumer
*/ */
@SuppressWarnings("unchecked")
@Override @Override
public void initialise() throws Exception { public void initialise() throws Exception {
super.initialise(); super.initialise();
Source source = (Source) sender.getRemoteSource(); Source source = (Source) sender.getRemoteSource();
String queue; String queue;
String selector = null; String selector = null;
final Map<Symbol, Object> supportedFilters = new HashMap<>();
/*
* even tho the filter is a map it will only return a single filter unless a nolocal is also provided
* */
if (source != null) { if (source != null) {
// We look for message selectors on every receiver, while in other cases we might only
// consume the filter depending on the subscription type.
Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.JMS_SELECTOR_FILTER_IDS); Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.JMS_SELECTOR_FILTER_IDS);
if (filter != null) { if (filter != null) {
selector = filter.getValue().getDescribed().toString(); selector = filter.getValue().getDescribed().toString();
@ -144,17 +140,18 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage())); close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage()));
return; return;
} }
supportedFilters.put(filter.getKey(), filter.getValue());
} }
} }
/* // if we have a capability for a Topic (AMQP -> JMS Mapping) or we are configured on this
* if we have a capability for a topic (qpid-jms) or we are configured on this address to act like a topic then act // address to act like a topic then act like a subscription.
* like a subscription.
* */
boolean isPubSub = hasCapabilities(TOPIC, source) || isPubSub(source); boolean isPubSub = hasCapabilities(TOPIC, source) || isPubSub(source);
if (isPubSub) { if (isPubSub) {
if (AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS) != null) { Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS);
if (filter != null) {
String remoteContainerId = sender.getSession().getConnection().getRemoteContainer(); String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'"; String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
if (selector != null) { if (selector != null) {
@ -162,20 +159,25 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} else { } else {
selector = noLocalFilter; selector = noLocalFilter;
} }
supportedFilters.put(filter.getKey(), filter.getValue());
} }
} }
if (source == null) { if (source == null) {
// Attempt to recover a previous subscription happens when a link reattach happens on a subscription queue // Attempt to recover a previous subscription happens when a link reattach happens on a
// subscription queue
String clientId = connection.getRemoteContainer(); String clientId = connection.getRemoteContainer();
String pubId = sender.getName(); String pubId = sender.getName();
queue = createQueueName(clientId, pubId); queue = createQueueName(clientId, pubId);
boolean exists = sessionSPI.queueQuery(queue, false).isExists(); boolean exists = sessionSPI.queueQuery(queue, false).isExists();
/* // Once confirmed that the address exists we need to return a Source that reflects
* If it exists then we know it is a subscription so we set the capabilities on the source so we can delete on a // the lifetime policy and capabilities of the new subscription.
* link remote close. //
* */ // TODO we are not applying selector or noLocal filters to the source we just
// looked up which would violate expectations if the client checked that they
// are present on subscription recovery (JMS Durable Re-subscribe) etc
if (exists) { if (exists) {
source = new org.apache.qpid.proton.amqp.messaging.Source(); source = new org.apache.qpid.proton.amqp.messaging.Source();
source.setAddress(queue); source.setAddress(queue);
@ -187,79 +189,86 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} else { } else {
throw new ActiveMQAMQPNotFoundException("Unknown subscription link: " + sender.getName()); throw new ActiveMQAMQPNotFoundException("Unknown subscription link: " + sender.getName());
} }
} else { } else if (source.getDynamic()) {
if (source.getDynamic()) { // if dynamic we have to create the node (queue) and set the address on the target, the
//if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and // node is temporary and will be deleted on closing of the session
// will be deleted on closing of the session queue = java.util.UUID.randomUUID().toString();
queue = java.util.UUID.randomUUID().toString();
try {
sessionSPI.createTemporaryQueue(queue);
//protonSession.getServerSession().createQueue(queue, queue, null, true, false);
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
}
source.setAddress(queue);
} else {
//if not dynamic then we use the targets address as the address to forward the messages to, however there has to
//be a queue bound to it so we nee to check this.
if (isPubSub) {
// if we are a subscription and durable create a durable queue using the container id and link name
if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) {
String clientId = connection.getRemoteContainer();
String pubId = sender.getName();
queue = createQueueName(clientId, pubId);
QueueQueryResult result = sessionSPI.queueQuery(queue, false);
if (result.isExists()) {
// If a client reattaches to a durable subscription with a different no-local filter value, selector
// or address then we must recreate the queue (JMS semantics).
if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString(selector)) || (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) {
if (result.getConsumerCount() == 0) {
sessionSPI.deleteQueue(queue);
sessionSPI.createDurableQueue(source.getAddress(), queue, selector);
} else {
throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist");
}
}
} else {
sessionSPI.createDurableQueue(source.getAddress(), queue, selector);
}
source.setAddress(queue);
} else {
//otherwise we are a volatile subscription
queue = java.util.UUID.randomUUID().toString();
try {
sessionSPI.createTemporaryQueue(source.getAddress(), queue, selector);
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
}
source.setAddress(queue);
}
} else {
queue = source.getAddress();
}
if (queue == null) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressNotSet();
}
try {
if (!sessionSPI.queueQuery(queue, !isPubSub).isExists()) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
}
} catch (ActiveMQAMQPNotFoundException e) {
throw e;
} catch (Exception e) {
throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
}
}
boolean browseOnly = !isPubSub && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
try { try {
brokerConsumer = sessionSPI.createSender(this, queue, isPubSub ? null : selector, browseOnly); sessionSPI.createTemporaryQueue(queue);
// protonSession.getServerSession().createQueue(queue, queue, null, true, false);
} catch (Exception e) { } catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage()); throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
} }
source.setAddress(queue);
} else {
// if not dynamic then we use the target's address as the address to forward the
// messages to, however there has to be a queue bound to it so we need to check this.
if (isPubSub) {
// if we are a subscription and durable create a durable queue using the container
// id and link name
if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) {
String clientId = connection.getRemoteContainer();
String pubId = sender.getName();
queue = createQueueName(clientId, pubId);
QueueQueryResult result = sessionSPI.queueQuery(queue, false);
if (result.isExists()) {
// If a client reattaches to a durable subscription with a different no-local
// filter value, selector or address then we must recreate the queue (JMS semantics).
if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString(selector)) ||
(sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) {
if (result.getConsumerCount() == 0) {
sessionSPI.deleteQueue(queue);
sessionSPI.createDurableQueue(source.getAddress(), queue, selector);
} else {
throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist");
}
}
} else {
sessionSPI.createDurableQueue(source.getAddress(), queue, selector);
}
source.setAddress(queue);
} else {
// otherwise we are a volatile subscription
queue = java.util.UUID.randomUUID().toString();
try {
sessionSPI.createTemporaryQueue(source.getAddress(), queue, selector);
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
}
source.setAddress(queue);
}
} else {
queue = source.getAddress();
}
if (queue == null) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressNotSet();
}
try {
if (!sessionSPI.queueQuery(queue, !isPubSub).isExists()) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
}
} catch (ActiveMQAMQPNotFoundException e) {
throw e;
} catch (Exception e) {
throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
}
}
// We need to update the source with any filters we support otherwise the client
// is free to consider the attach as having failed if we don't send back what we
// do support or if we send something we don't support the client won't know we
// have not honored what it asked for.
source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters);
boolean browseOnly = !isPubSub && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
try {
brokerConsumer = sessionSPI.createSender(this, queue, isPubSub ? null : selector, browseOnly);
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage());
} }
} }
@ -269,8 +278,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} }
/* /*
* close the session * close the session
* */ */
@Override @Override
public void close(ErrorCondition condition) throws ActiveMQAMQPException { public void close(ErrorCondition condition) throws ActiveMQAMQPException {
closed = true; closed = true;
@ -289,14 +298,14 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} }
/* /*
* close the session * close the session
* */ */
@Override @Override
public void close(boolean remoteLinkClose) throws ActiveMQAMQPException { public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
try { try {
sessionSPI.closeSender(brokerConsumer); sessionSPI.closeSender(brokerConsumer);
//if this is a link close rather than a connection close or detach, we need to delete any durable resources for // if this is a link close rather than a connection close or detach, we need to delete
// say pub subs // any durable resources for say pub subs
if (remoteLinkClose) { if (remoteLinkClose) {
Source source = (Source) sender.getSource(); Source source = (Source) sender.getSource();
if (source != null && source.getAddress() != null && hasCapabilities(TOPIC, source)) { if (source != null && source.getAddress() != null && hasCapabilities(TOPIC, source)) {
@ -324,7 +333,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} }
} }
@Override @Override
public void onMessage(Delivery delivery) throws ActiveMQAMQPException { public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
Object message = delivery.getContext(); Object message = delivery.getContext();
@ -349,7 +357,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
delivery.disposition(txAccepted); delivery.disposition(txAccepted);
} }
//we have to individual ack as we can't guarantee we will get the delivery updates (including acks) in order // we have to individual ack as we can't guarantee we will get the delivery
// updates (including acks) in order
// from dealer, a perf hit but a must // from dealer, a perf hit but a must
try { try {
sessionSPI.ack(tx, brokerConsumer, message); sessionSPI.ack(tx, brokerConsumer, message);
@ -359,7 +368,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} }
} }
} else if (remoteState instanceof Accepted) { } else if (remoteState instanceof Accepted) {
//we have to individual ack as we can't guarantee we will get the delivery updates (including acks) in order // we have to individual ack as we can't guarantee we will get the delivery updates
// (including acks) in order
// from dealer, a perf hit but a must // from dealer, a perf hit but a must
try { try {
sessionSPI.ack(null, brokerConsumer, message); sessionSPI.ack(null, brokerConsumer, message);
@ -379,7 +389,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage()); throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
} }
} }
//todo add tag caching // todo add tag caching
if (!preSettle) { if (!preSettle) {
protonSession.replaceTag(delivery.getTag()); protonSession.replaceTag(delivery.getTag());
} }
@ -390,7 +400,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} }
} else { } else {
//todo not sure if we need to do anything here // todo not sure if we need to do anything here
} }
} }
@ -417,7 +427,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} }
} }
// presettle means we can settle the message on the dealer side before we send it, i.e. for browsers // presettle means we can settle the message on the dealer side before we send it, i.e.
// for browsers
boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED; boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
// we only need a tag if we are going to settle later // we only need a tag if we are going to settle later
@ -478,5 +489,4 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
private static String createQueueName(String clientId, String pubId) { private static String createQueueName(String clientId, String pubId) {
return clientId + "." + pubId; return clientId + "." + pubId;
} }
} }

View File

@ -48,6 +48,7 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
@Override @Override
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
server = createServer(true, true); server = createServer(true, true);
server.start(); server.start();
} }
@ -55,8 +56,6 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
@After @After
@Override @Override
public void tearDown() throws Exception { public void tearDown() throws Exception {
super.tearDown();
for (AmqpConnection conn : connections) { for (AmqpConnection conn : connections) {
try { try {
conn.close(); conn.close();
@ -65,6 +64,8 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
} }
} }
server.stop(); server.stop();
super.tearDown();
} }
public Queue getProxyToQueue(String queueName) { public Queue getProxyToQueue(String queueName) {

View File

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpUnknownFilterType;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.engine.Receiver;
import org.junit.Test;
/**
* Test various behaviors of AMQP receivers with the broker.
*/
public class AmqpReceiverWithFiltersTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testUnsupportedFiltersAreNotListedAsSupported() throws Exception {
AmqpClient client = createAmqpClient();
client.setValidator(new AmqpValidator() {
@SuppressWarnings("unchecked")
@Override
public void inspectOpenedResource(Receiver receiver) {
if (receiver.getRemoteSource() == null) {
markAsInvalid("Link opened with null source.");
}
Source source = (Source) receiver.getRemoteSource();
Map<Symbol, Object> filters = source.getFilter();
if (findFilter(filters, AmqpUnknownFilterType.UNKNOWN_FILTER_IDS) != null) {
markAsInvalid("Broker should not return unsupported filter on attach.");
}
}
});
Map<Symbol, DescribedType> filters = new HashMap<>();
filters.put(AmqpUnknownFilterType.UNKNOWN_FILTER_NAME, AmqpUnknownFilterType.UNKNOWN_FILTER);
Source source = new Source();
source.setAddress(getTestName());
source.setFilter(filters);
source.setDurable(TerminusDurability.NONE);
source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
session.createReceiver(source);
assertEquals(1, server.getTotalConsumerCount());
connection.getStateInspector().assertValid();
connection.close();
}
@Test(timeout = 60000)
public void testSupportedFiltersAreListedAsSupported() throws Exception {
AmqpClient client = createAmqpClient();
client.setValidator(new AmqpValidator() {
@SuppressWarnings("unchecked")
@Override
public void inspectOpenedResource(Receiver receiver) {
if (receiver.getRemoteSource() == null) {
markAsInvalid("Link opened with null source.");
}
Source source = (Source) receiver.getRemoteSource();
Map<Symbol, Object> filters = source.getFilter();
if (findFilter(filters, AmqpSupport.JMS_SELECTOR_FILTER_IDS) == null) {
markAsInvalid("Broker should return selector filter on attach.");
}
}
});
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
session.createReceiver(getTestName(), "color = red");
connection.getStateInspector().assertValid();
connection.close();
}
}