diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java new file mode 100644 index 0000000000..f57fd81267 --- /dev/null +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.proton.plug; + +import org.apache.qpid.proton.amqp.DescribedType; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.UnsignedLong; + +import java.util.AbstractMap; +import java.util.Map; + +/** + * Set of useful methods and definitions used in the AMQP protocol handling + */ +public class AmqpSupport { + + // Identification values used to locating JMS selector types. + public static final UnsignedLong JMS_SELECTOR_CODE = UnsignedLong.valueOf(0x0000468C00000004L); + public static final Symbol JMS_SELECTOR_NAME = Symbol.valueOf("apache.org:selector-filter:string"); + public static final Object[] JMS_SELECTOR_FILTER_IDS = new Object[]{JMS_SELECTOR_CODE, JMS_SELECTOR_NAME}; + public static final UnsignedLong NO_LOCAL_CODE = UnsignedLong.valueOf(0x0000468C00000003L); + public static final Symbol NO_LOCAL_NAME = Symbol.valueOf("apache.org:no-local-filter:list"); + public static final Object[] NO_LOCAL_FILTER_IDS = new Object[]{NO_LOCAL_CODE, NO_LOCAL_NAME}; + + // Capabilities used to identify destination type in some requests. + public static final Symbol TEMP_QUEUE_CAPABILITY = Symbol.valueOf("temporary-queue"); + public static final Symbol TEMP_TOPIC_CAPABILITY = Symbol.valueOf("temporary-topic"); + + // Symbols used to announce connection information to remote peer. + public static final Symbol INVALID_FIELD = Symbol.valueOf("invalid-field"); + public static final Symbol CONTAINER_ID = Symbol.valueOf("container-id"); + + // Symbols used to announce connection information to remote peer. + public static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY"); + public static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix"); + public static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix"); + public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed"); + public static final Symbol PRODUCT = Symbol.valueOf("product"); + public static final Symbol VERSION = Symbol.valueOf("version"); + public static final Symbol PLATFORM = Symbol.valueOf("platform"); + + // Symbols used in configuration of newly opened links. + public static final Symbol COPY = Symbol.getSymbol("copy"); + + // Lifetime policy symbols + public static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy"); + + /** + * Search for a given Symbol in a given array of Symbol object. + * + * @param symbols + * the set of Symbols to search. + * @param key + * the value to try and find in the Symbol array. + * + * @return true if the key is found in the given Symbol array. + */ + public static boolean contains(Symbol[] symbols, Symbol key) { + if (symbols == null || symbols.length == 0) { + return false; + } + + for (Symbol symbol : symbols) { + if (symbol.equals(key)) { + return true; + } + } + + return false; + } + + /** + * Search for a particular filter using a set of known indentification values + * in the Map of filters. + * + * @param filters + * The filters map that should be searched. + * @param filterIds + * The aliases for the target filter to be located. + * + * @return the filter if found in the mapping or null if not found. + */ + public static Map.Entry findFilter(Map filters, Object[] filterIds) { + + if (filterIds == null || filterIds.length == 0) { + StringBuilder ids = new StringBuilder(); + if (filterIds != null) { + for (Object filterId : filterIds) { + ids.append(filterId).append(" "); + } + } + throw new IllegalArgumentException("Invalid Filter Ids array passed: " + ids); + } + + if (filters == null || filters.isEmpty()) { + return null; + } + + for (Map.Entry filter : filters.entrySet()) { + if (filter.getValue() instanceof DescribedType) { + DescribedType describedType = ((DescribedType) filter.getValue()); + Object descriptor = describedType.getDescriptor(); + + for (Object filterId : filterIds) { + if (descriptor.equals(filterId)) { + return new AbstractMap.SimpleImmutableEntry<>(filter.getKey(), describedType); + } + } + } + } + + return null; + } + +} diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java index baf0710c00..7a4d295340 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java @@ -18,6 +18,7 @@ package org.proton.plug.context; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Sender; @@ -76,6 +77,16 @@ public abstract class AbstractProtonContextSender extends ProtonInitializable im connection.flush(); } + /* + * close the session + * */ + @Override + public void close(ErrorCondition condition) throws ActiveMQAMQPException { + closed = true; + sender.setCondition(condition); + close(); + } + @Override /* * handle an incoming Ack from Proton, basically pass to ActiveMQ Artemis to handle diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java index 2cb38b3c9f..8481853a34 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java @@ -16,6 +16,7 @@ */ package org.proton.plug.context; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Receiver; import org.proton.plug.AMQPSessionCallback; import org.proton.plug.exceptions.ActiveMQAMQPException; @@ -56,6 +57,12 @@ public abstract class AbstractProtonReceiverContext extends ProtonInitializable protonSession.removeReceiver(receiver); } + @Override + public void close(ErrorCondition condition) throws ActiveMQAMQPException { + receiver.setCondition(condition); + close(); + } + public void flow(int credits) { synchronized (connection.getLock()) { receiver.flow(credits); diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java index 63bc277ed9..128ea6564a 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java @@ -16,6 +16,7 @@ */ package org.proton.plug.context; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Delivery; import org.proton.plug.exceptions.ActiveMQAMQPException; @@ -29,4 +30,6 @@ public interface ProtonDeliveryHandler { void onMessage(Delivery delivery) throws ActiveMQAMQPException; void close() throws ActiveMQAMQPException; + + void close(ErrorCondition condition) throws ActiveMQAMQPException; } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java index 5e5115f15e..6a9ad6ae45 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java @@ -119,4 +119,9 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { public void close() throws ActiveMQAMQPException { //noop } + + @Override + public void close(ErrorCondition condition) throws ActiveMQAMQPException { + //noop + } } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java index db8b40923b..dfc69dfadd 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java @@ -18,13 +18,17 @@ package org.proton.plug.context.server; import java.util.Map; +import org.apache.activemq.artemis.selector.filter.FilterException; +import org.apache.activemq.artemis.selector.impl.SelectorParser; import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.messaging.Modified; import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.messaging.Released; +import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.DeliveryState; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Sender; @@ -39,6 +43,9 @@ import org.proton.plug.logger.ActiveMQAMQPProtocolMessageBundle; import org.proton.plug.context.ProtonPlugSender; import org.apache.qpid.proton.amqp.messaging.Source; +import static org.proton.plug.AmqpSupport.JMS_SELECTOR_FILTER_IDS; +import static org.proton.plug.AmqpSupport.findFilter; + public class ProtonServerSenderContext extends AbstractProtonContextSender implements ProtonPlugSender { private static final Symbol SELECTOR = Symbol.getSymbol("jms-selector"); @@ -94,14 +101,29 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple String queue; String selector = null; - Map filter = source == null ? null : source.getFilter(); + + /* + * even tho the filter is a map it will only return a single filter unless a nolocal is also provided + * */ + Map.Entry filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS); if (filter != null) { - DescribedType value = (DescribedType) filter.get(SELECTOR); - if (value != null) { - selector = value.getDescribed().toString(); + selector = filter.getValue().getDescribed().toString(); + // Validate the Selector. + try { + SelectorParser.parse(selector); + } + catch (FilterException e) { + close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage())); + return; } } + //filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS); + + //if (filter != null) { + //todo implement nolocal filter + //} + if (source != null) { if (source.getDynamic()) { //if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and @@ -144,6 +166,21 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple } } + /* + * close the session + * */ + @Override + public void close(ErrorCondition condition) throws ActiveMQAMQPException { + super.close(condition); + try { + sessionSPI.closeSender(brokerConsumer); + } + catch (Exception e) { + e.printStackTrace(); + throw new ActiveMQAMQPInternalErrorException(e.getMessage()); + } + } + /* * close the session * */