This closes #827

This commit is contained in:
Clebert Suconic 2016-10-07 18:32:17 -04:00
commit 1bb3c15362
3 changed files with 238 additions and 110 deletions

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.protocol.amqp.proton;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@ -59,7 +60,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
private static final Logger log = Logger.getLogger(ProtonServerSenderContext.class);
private static final Symbol SELECTOR = Symbol.getSymbol("jms-selector");
private static final Symbol COPY = Symbol.valueOf("copy");
private static final Symbol TOPIC = Symbol.valueOf("topic");
@ -72,10 +72,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
protected final AMQPSessionCallback sessionSPI;
protected CreditsSemaphore creditsSemaphore = new CreditsSemaphore(0);
public ProtonServerSenderContext(AMQPConnectionContext connection,
Sender sender,
AMQPSessionContext protonSession,
AMQPSessionCallback server) {
public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, AMQPSessionContext protonSession, AMQPSessionCallback server) {
super();
this.connection = connection;
this.sender = sender;
@ -98,20 +95,20 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
/*
* start the session
* */
* start the session
*/
public void start() throws ActiveMQAMQPException {
sessionSPI.start();
// protonSession.getServerSession().start();
//todo add flow control
// todo add flow control
try {
// to do whatever you need to make the broker start sending messages to the consumer
//this could be null if a link reattach has happened
// this could be null if a link reattach has happened
if (brokerConsumer != null) {
sessionSPI.startSender(brokerConsumer);
}
//protonSession.getServerSession().receiveConsumerCredits(consumerID, -1);
// protonSession.getServerSession().receiveConsumerCredits(consumerID, -1);
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorStartingConsumer(e.getMessage());
}
@ -120,20 +117,19 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
/**
* create the actual underlying ActiveMQ Artemis Server Consumer
*/
@SuppressWarnings("unchecked")
@Override
public void initialise() throws Exception {
super.initialise();
Source source = (Source) sender.getRemoteSource();
String queue;
String selector = null;
final Map<Symbol, Object> supportedFilters = new HashMap<>();
/*
* even tho the filter is a map it will only return a single filter unless a nolocal is also provided
* */
if (source != null) {
// We look for message selectors on every receiver, while in other cases we might only
// consume the filter depending on the subscription type.
Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.JMS_SELECTOR_FILTER_IDS);
if (filter != null) {
selector = filter.getValue().getDescribed().toString();
@ -144,17 +140,18 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage()));
return;
}
supportedFilters.put(filter.getKey(), filter.getValue());
}
}
/*
* if we have a capability for a topic (qpid-jms) or we are configured on this address to act like a topic then act
* like a subscription.
* */
// if we have a capability for a Topic (AMQP -> JMS Mapping) or we are configured on this
// address to act like a topic then act like a subscription.
boolean isPubSub = hasCapabilities(TOPIC, source) || isPubSub(source);
if (isPubSub) {
if (AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS) != null) {
Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS);
if (filter != null) {
String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
if (selector != null) {
@ -162,20 +159,25 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} else {
selector = noLocalFilter;
}
supportedFilters.put(filter.getKey(), filter.getValue());
}
}
if (source == null) {
// Attempt to recover a previous subscription happens when a link reattach happens on a subscription queue
// Attempt to recover a previous subscription happens when a link reattach happens on a
// subscription queue
String clientId = connection.getRemoteContainer();
String pubId = sender.getName();
queue = createQueueName(clientId, pubId);
boolean exists = sessionSPI.queueQuery(queue, false).isExists();
/*
* If it exists then we know it is a subscription so we set the capabilities on the source so we can delete on a
* link remote close.
* */
// Once confirmed that the address exists we need to return a Source that reflects
// the lifetime policy and capabilities of the new subscription.
//
// TODO we are not applying selector or noLocal filters to the source we just
// looked up which would violate expectations if the client checked that they
// are present on subscription recovery (JMS Durable Re-subscribe) etc
if (exists) {
source = new org.apache.qpid.proton.amqp.messaging.Source();
source.setAddress(queue);
@ -187,23 +189,23 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} else {
throw new ActiveMQAMQPNotFoundException("Unknown subscription link: " + sender.getName());
}
} else {
if (source.getDynamic()) {
//if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and
// will be deleted on closing of the session
} else if (source.getDynamic()) {
// if dynamic we have to create the node (queue) and set the address on the target, the
// node is temporary and will be deleted on closing of the session
queue = java.util.UUID.randomUUID().toString();
try {
sessionSPI.createTemporaryQueue(queue);
//protonSession.getServerSession().createQueue(queue, queue, null, true, false);
// protonSession.getServerSession().createQueue(queue, queue, null, true, false);
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
}
source.setAddress(queue);
} else {
//if not dynamic then we use the targets address as the address to forward the messages to, however there has to
//be a queue bound to it so we nee to check this.
// if not dynamic then we use the target's address as the address to forward the
// messages to, however there has to be a queue bound to it so we need to check this.
if (isPubSub) {
// if we are a subscription and durable create a durable queue using the container id and link name
// if we are a subscription and durable create a durable queue using the container
// id and link name
if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) {
String clientId = connection.getRemoteContainer();
String pubId = sender.getName();
@ -211,10 +213,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
QueueQueryResult result = sessionSPI.queueQuery(queue, false);
if (result.isExists()) {
// If a client reattaches to a durable subscription with a different no-local filter value, selector
// or address then we must recreate the queue (JMS semantics).
// If a client reattaches to a durable subscription with a different no-local
// filter value, selector or address then we must recreate the queue (JMS semantics).
if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString(selector)) ||
(sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) {
if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString(selector)) || (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) {
if (result.getConsumerCount() == 0) {
sessionSPI.deleteQueue(queue);
sessionSPI.createDurableQueue(source.getAddress(), queue, selector);
@ -227,7 +230,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
source.setAddress(queue);
} else {
//otherwise we are a volatile subscription
// otherwise we are a volatile subscription
queue = java.util.UUID.randomUUID().toString();
try {
sessionSPI.createTemporaryQueue(source.getAddress(), queue, selector);
@ -239,6 +242,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} else {
queue = source.getAddress();
}
if (queue == null) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressNotSet();
}
@ -254,6 +258,12 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
}
// We need to update the source with any filters we support otherwise the client
// is free to consider the attach as having failed if we don't send back what we
// do support or if we send something we don't support the client won't know we
// have not honored what it asked for.
source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters);
boolean browseOnly = !isPubSub && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
try {
brokerConsumer = sessionSPI.createSender(this, queue, isPubSub ? null : selector, browseOnly);
@ -261,7 +271,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage());
}
}
}
private boolean isPubSub(Source source) {
String pubSubPrefix = sessionSPI.getPubSubPrefix();
@ -270,7 +279,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
/*
* close the session
* */
*/
@Override
public void close(ErrorCondition condition) throws ActiveMQAMQPException {
closed = true;
@ -290,13 +299,13 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
/*
* close the session
* */
*/
@Override
public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
try {
sessionSPI.closeSender(brokerConsumer);
//if this is a link close rather than a connection close or detach, we need to delete any durable resources for
// say pub subs
// if this is a link close rather than a connection close or detach, we need to delete
// any durable resources for say pub subs
if (remoteLinkClose) {
Source source = (Source) sender.getSource();
if (source != null && source.getAddress() != null && hasCapabilities(TOPIC, source)) {
@ -324,7 +333,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
}
@Override
public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
Object message = delivery.getContext();
@ -349,7 +357,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
delivery.disposition(txAccepted);
}
//we have to individual ack as we can't guarantee we will get the delivery updates (including acks) in order
// we have to individual ack as we can't guarantee we will get the delivery
// updates (including acks) in order
// from dealer, a perf hit but a must
try {
sessionSPI.ack(tx, brokerConsumer, message);
@ -359,7 +368,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
}
} else if (remoteState instanceof Accepted) {
//we have to individual ack as we can't guarantee we will get the delivery updates (including acks) in order
// we have to individual ack as we can't guarantee we will get the delivery updates
// (including acks) in order
// from dealer, a perf hit but a must
try {
sessionSPI.ack(null, brokerConsumer, message);
@ -379,7 +389,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
}
}
//todo add tag caching
// todo add tag caching
if (!preSettle) {
protonSession.replaceTag(delivery.getTag());
}
@ -390,7 +400,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
} else {
//todo not sure if we need to do anything here
// todo not sure if we need to do anything here
}
}
@ -417,7 +427,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
}
// presettle means we can settle the message on the dealer side before we send it, i.e. for browsers
// presettle means we can settle the message on the dealer side before we send it, i.e.
// for browsers
boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
// we only need a tag if we are going to settle later
@ -478,5 +489,4 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
private static String createQueueName(String clientId, String pubId) {
return clientId + "." + pubId;
}
}

View File

@ -48,6 +48,7 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
@Override
public void setUp() throws Exception {
super.setUp();
server = createServer(true, true);
server.start();
}
@ -55,8 +56,6 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
@After
@Override
public void tearDown() throws Exception {
super.tearDown();
for (AmqpConnection conn : connections) {
try {
conn.close();
@ -65,6 +64,8 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
}
}
server.stop();
super.tearDown();
}
public Queue getProxyToQueue(String queueName) {

View File

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpUnknownFilterType;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.engine.Receiver;
import org.junit.Test;
/**
* Test various behaviors of AMQP receivers with the broker.
*/
public class AmqpReceiverWithFiltersTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testUnsupportedFiltersAreNotListedAsSupported() throws Exception {
AmqpClient client = createAmqpClient();
client.setValidator(new AmqpValidator() {
@SuppressWarnings("unchecked")
@Override
public void inspectOpenedResource(Receiver receiver) {
if (receiver.getRemoteSource() == null) {
markAsInvalid("Link opened with null source.");
}
Source source = (Source) receiver.getRemoteSource();
Map<Symbol, Object> filters = source.getFilter();
if (findFilter(filters, AmqpUnknownFilterType.UNKNOWN_FILTER_IDS) != null) {
markAsInvalid("Broker should not return unsupported filter on attach.");
}
}
});
Map<Symbol, DescribedType> filters = new HashMap<>();
filters.put(AmqpUnknownFilterType.UNKNOWN_FILTER_NAME, AmqpUnknownFilterType.UNKNOWN_FILTER);
Source source = new Source();
source.setAddress(getTestName());
source.setFilter(filters);
source.setDurable(TerminusDurability.NONE);
source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
session.createReceiver(source);
assertEquals(1, server.getTotalConsumerCount());
connection.getStateInspector().assertValid();
connection.close();
}
@Test(timeout = 60000)
public void testSupportedFiltersAreListedAsSupported() throws Exception {
AmqpClient client = createAmqpClient();
client.setValidator(new AmqpValidator() {
@SuppressWarnings("unchecked")
@Override
public void inspectOpenedResource(Receiver receiver) {
if (receiver.getRemoteSource() == null) {
markAsInvalid("Link opened with null source.");
}
Source source = (Source) receiver.getRemoteSource();
Map<Symbol, Object> filters = source.getFilter();
if (findFilter(filters, AmqpSupport.JMS_SELECTOR_FILTER_IDS) == null) {
markAsInvalid("Broker should return selector filter on attach.");
}
}
});
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
session.createReceiver(getTestName(), "color = red");
connection.getStateInspector().assertValid();
connection.close();
}
}