This closes #476
This commit is contained in:
commit
d142c2b52c
|
@ -0,0 +1,129 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.proton.plug;
|
||||
|
||||
import org.apache.qpid.proton.amqp.DescribedType;
|
||||
import org.apache.qpid.proton.amqp.Symbol;
|
||||
import org.apache.qpid.proton.amqp.UnsignedLong;
|
||||
|
||||
import java.util.AbstractMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Set of useful methods and definitions used in the AMQP protocol handling
|
||||
*/
|
||||
public class AmqpSupport {
|
||||
|
||||
// Identification values used to locating JMS selector types.
|
||||
public static final UnsignedLong JMS_SELECTOR_CODE = UnsignedLong.valueOf(0x0000468C00000004L);
|
||||
public static final Symbol JMS_SELECTOR_NAME = Symbol.valueOf("apache.org:selector-filter:string");
|
||||
public static final Object[] JMS_SELECTOR_FILTER_IDS = new Object[]{JMS_SELECTOR_CODE, JMS_SELECTOR_NAME};
|
||||
public static final UnsignedLong NO_LOCAL_CODE = UnsignedLong.valueOf(0x0000468C00000003L);
|
||||
public static final Symbol NO_LOCAL_NAME = Symbol.valueOf("apache.org:no-local-filter:list");
|
||||
public static final Object[] NO_LOCAL_FILTER_IDS = new Object[]{NO_LOCAL_CODE, NO_LOCAL_NAME};
|
||||
|
||||
// Capabilities used to identify destination type in some requests.
|
||||
public static final Symbol TEMP_QUEUE_CAPABILITY = Symbol.valueOf("temporary-queue");
|
||||
public static final Symbol TEMP_TOPIC_CAPABILITY = Symbol.valueOf("temporary-topic");
|
||||
|
||||
// Symbols used to announce connection information to remote peer.
|
||||
public static final Symbol INVALID_FIELD = Symbol.valueOf("invalid-field");
|
||||
public static final Symbol CONTAINER_ID = Symbol.valueOf("container-id");
|
||||
|
||||
// Symbols used to announce connection information to remote peer.
|
||||
public static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
|
||||
public static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix");
|
||||
public static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix");
|
||||
public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
|
||||
public static final Symbol PRODUCT = Symbol.valueOf("product");
|
||||
public static final Symbol VERSION = Symbol.valueOf("version");
|
||||
public static final Symbol PLATFORM = Symbol.valueOf("platform");
|
||||
|
||||
// Symbols used in configuration of newly opened links.
|
||||
public static final Symbol COPY = Symbol.getSymbol("copy");
|
||||
|
||||
// Lifetime policy symbols
|
||||
public static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
|
||||
|
||||
/**
|
||||
* Search for a given Symbol in a given array of Symbol object.
|
||||
*
|
||||
* @param symbols
|
||||
* the set of Symbols to search.
|
||||
* @param key
|
||||
* the value to try and find in the Symbol array.
|
||||
*
|
||||
* @return true if the key is found in the given Symbol array.
|
||||
*/
|
||||
public static boolean contains(Symbol[] symbols, Symbol key) {
|
||||
if (symbols == null || symbols.length == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (Symbol symbol : symbols) {
|
||||
if (symbol.equals(key)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Search for a particular filter using a set of known indentification values
|
||||
* in the Map of filters.
|
||||
*
|
||||
* @param filters
|
||||
* The filters map that should be searched.
|
||||
* @param filterIds
|
||||
* The aliases for the target filter to be located.
|
||||
*
|
||||
* @return the filter if found in the mapping or null if not found.
|
||||
*/
|
||||
public static Map.Entry<Symbol, DescribedType> findFilter(Map<Symbol, Object> filters, Object[] filterIds) {
|
||||
|
||||
if (filterIds == null || filterIds.length == 0) {
|
||||
StringBuilder ids = new StringBuilder();
|
||||
if (filterIds != null) {
|
||||
for (Object filterId : filterIds) {
|
||||
ids.append(filterId).append(" ");
|
||||
}
|
||||
}
|
||||
throw new IllegalArgumentException("Invalid Filter Ids array passed: " + ids);
|
||||
}
|
||||
|
||||
if (filters == null || filters.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
for (Map.Entry<Symbol, Object> filter : filters.entrySet()) {
|
||||
if (filter.getValue() instanceof DescribedType) {
|
||||
DescribedType describedType = ((DescribedType) filter.getValue());
|
||||
Object descriptor = describedType.getDescriptor();
|
||||
|
||||
for (Object filterId : filterIds) {
|
||||
if (descriptor.equals(filterId)) {
|
||||
return new AbstractMap.SimpleImmutableEntry<>(filter.getKey(), describedType);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
|
@ -18,6 +18,7 @@ package org.proton.plug.context;
|
|||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
|
||||
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
|
||||
import org.apache.qpid.proton.engine.Delivery;
|
||||
import org.apache.qpid.proton.engine.Sender;
|
||||
|
@ -76,6 +77,16 @@ public abstract class AbstractProtonContextSender extends ProtonInitializable im
|
|||
connection.flush();
|
||||
}
|
||||
|
||||
/*
|
||||
* close the session
|
||||
* */
|
||||
@Override
|
||||
public void close(ErrorCondition condition) throws ActiveMQAMQPException {
|
||||
closed = true;
|
||||
sender.setCondition(condition);
|
||||
close();
|
||||
}
|
||||
|
||||
@Override
|
||||
/*
|
||||
* handle an incoming Ack from Proton, basically pass to ActiveMQ Artemis to handle
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.proton.plug.context;
|
||||
|
||||
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
|
||||
import org.apache.qpid.proton.engine.Receiver;
|
||||
import org.proton.plug.AMQPSessionCallback;
|
||||
import org.proton.plug.exceptions.ActiveMQAMQPException;
|
||||
|
@ -56,6 +57,12 @@ public abstract class AbstractProtonReceiverContext extends ProtonInitializable
|
|||
protonSession.removeReceiver(receiver);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close(ErrorCondition condition) throws ActiveMQAMQPException {
|
||||
receiver.setCondition(condition);
|
||||
close();
|
||||
}
|
||||
|
||||
public void flow(int credits) {
|
||||
synchronized (connection.getLock()) {
|
||||
receiver.flow(credits);
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.proton.plug.context;
|
||||
|
||||
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
|
||||
import org.apache.qpid.proton.engine.Delivery;
|
||||
import org.proton.plug.exceptions.ActiveMQAMQPException;
|
||||
|
||||
|
@ -29,4 +30,6 @@ public interface ProtonDeliveryHandler {
|
|||
void onMessage(Delivery delivery) throws ActiveMQAMQPException;
|
||||
|
||||
void close() throws ActiveMQAMQPException;
|
||||
|
||||
void close(ErrorCondition condition) throws ActiveMQAMQPException;
|
||||
}
|
||||
|
|
|
@ -119,4 +119,9 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
|
|||
public void close() throws ActiveMQAMQPException {
|
||||
//noop
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close(ErrorCondition condition) throws ActiveMQAMQPException {
|
||||
//noop
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,13 +18,17 @@ package org.proton.plug.context.server;
|
|||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.activemq.artemis.selector.filter.FilterException;
|
||||
import org.apache.activemq.artemis.selector.impl.SelectorParser;
|
||||
import org.apache.qpid.proton.amqp.DescribedType;
|
||||
import org.apache.qpid.proton.amqp.Symbol;
|
||||
import org.apache.qpid.proton.amqp.messaging.Accepted;
|
||||
import org.apache.qpid.proton.amqp.messaging.Modified;
|
||||
import org.apache.qpid.proton.amqp.messaging.Rejected;
|
||||
import org.apache.qpid.proton.amqp.messaging.Released;
|
||||
import org.apache.qpid.proton.amqp.transport.AmqpError;
|
||||
import org.apache.qpid.proton.amqp.transport.DeliveryState;
|
||||
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
|
||||
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
|
||||
import org.apache.qpid.proton.engine.Delivery;
|
||||
import org.apache.qpid.proton.engine.Sender;
|
||||
|
@ -39,6 +43,9 @@ import org.proton.plug.logger.ActiveMQAMQPProtocolMessageBundle;
|
|||
import org.proton.plug.context.ProtonPlugSender;
|
||||
import org.apache.qpid.proton.amqp.messaging.Source;
|
||||
|
||||
import static org.proton.plug.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
|
||||
import static org.proton.plug.AmqpSupport.findFilter;
|
||||
|
||||
public class ProtonServerSenderContext extends AbstractProtonContextSender implements ProtonPlugSender {
|
||||
|
||||
private static final Symbol SELECTOR = Symbol.getSymbol("jms-selector");
|
||||
|
@ -94,14 +101,29 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
|
|||
String queue;
|
||||
|
||||
String selector = null;
|
||||
Map filter = source == null ? null : source.getFilter();
|
||||
|
||||
/*
|
||||
* even tho the filter is a map it will only return a single filter unless a nolocal is also provided
|
||||
* */
|
||||
Map.Entry<Symbol, DescribedType> filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS);
|
||||
if (filter != null) {
|
||||
DescribedType value = (DescribedType) filter.get(SELECTOR);
|
||||
if (value != null) {
|
||||
selector = value.getDescribed().toString();
|
||||
selector = filter.getValue().getDescribed().toString();
|
||||
// Validate the Selector.
|
||||
try {
|
||||
SelectorParser.parse(selector);
|
||||
}
|
||||
catch (FilterException e) {
|
||||
close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage()));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
//filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS);
|
||||
|
||||
//if (filter != null) {
|
||||
//todo implement nolocal filter
|
||||
//}
|
||||
|
||||
if (source != null) {
|
||||
if (source.getDynamic()) {
|
||||
//if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and
|
||||
|
@ -144,6 +166,21 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* close the session
|
||||
* */
|
||||
@Override
|
||||
public void close(ErrorCondition condition) throws ActiveMQAMQPException {
|
||||
super.close(condition);
|
||||
try {
|
||||
sessionSPI.closeSender(brokerConsumer);
|
||||
}
|
||||
catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
throw new ActiveMQAMQPInternalErrorException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* close the session
|
||||
* */
|
||||
|
|
Loading…
Reference in New Issue