diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 816008fc9e..1e28fefac7 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -140,8 +140,12 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { private static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix"); private static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix"); private static final Symbol COPY = Symbol.getSymbol("copy"); - private static final UnsignedLong JMS_SELECTOR = UnsignedLong.valueOf(0x0000468C00000004L); - private static final UnsignedLong NO_LOCAL = UnsignedLong.valueOf(0x0000468C00000003L); + private static final UnsignedLong JMS_SELECTOR_CODE = UnsignedLong.valueOf(0x0000468C00000004L); + private static final Symbol JMS_SELECTOR_NAME = Symbol.valueOf("apache.org:selector-filter:string"); + private static final Object[] JMS_SELECTOR_FILTER_IDS = new Object[] { JMS_SELECTOR_CODE, JMS_SELECTOR_NAME }; + private static final UnsignedLong NO_LOCAL_CODE = UnsignedLong.valueOf(0x0000468C00000003L); + private static final Symbol NO_LOCAL_NAME = Symbol.valueOf("apache.org:selector-filter:string"); + private static final Object[] NO_LOCAL_FILTER_IDS = new Object[] { NO_LOCAL_CODE, NO_LOCAL_NAME }; private static final Symbol TEMP_QUEUE_CAPABILITY = Symbol.valueOf("temporary-queue"); private static final Symbol TEMP_TOPIC_CAPABILITY = Symbol.valueOf("temporary-topic"); @@ -1414,7 +1418,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { String selector = null; if (source != null) { - DescribedType filter = findFilter(source.getFilter(), JMS_SELECTOR); + DescribedType filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS); if (filter != null) { selector = filter.getDescribed().toString(); // Validate the Selector. @@ -1503,7 +1507,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { consumerInfo.setSubscriptionName(sender.getName()); } - DescribedType filter = findFilter(source.getFilter(), NO_LOCAL); + DescribedType filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS); if (filter != null) { consumerInfo.setNoLocal(true); } @@ -1629,16 +1633,25 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { return false; } - private DescribedType findFilter(Map filters, UnsignedLong filterId) { + private DescribedType findFilter(Map filters, Object[] filterIds) { + + if (filterIds == null || filterIds.length == 0) { + throw new IllegalArgumentException("Invliad Filter Ids array passed: " + filterIds); + } + if (filters == null || filters.isEmpty()) { return null; } for (Object value : filters.values()) { if (value instanceof DescribedType) { - DescribedType describedType = (DescribedType) value; - if (describedType.getDescriptor().equals(filterId)) { - return describedType; + DescribedType describedType = ((DescribedType) value); + Object descriptor = ((DescribedType) value).getDescriptor(); + + for (Object filterId : filterIds) { + if (descriptor.equals(filterId)) { + return describedType; + } } } }