diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 3661f3db5e..5a73a25a76 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -1417,15 +1417,18 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source) sender.getRemoteSource(); try { + final Map supportedFilters = new HashMap(); final ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++); final ConsumerContext consumerContext = new ConsumerContext(id, sender); sender.setContext(consumerContext); + boolean noLocal = false; String selector = null; + if (source != null) { - DescribedType filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS); + Map.Entry filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS); if (filter != null) { - selector = filter.getDescribed().toString(); + selector = filter.getValue().getDescribed().toString(); // Validate the Selector. try { SelectorParser.parse(selector); @@ -1436,6 +1439,14 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { consumerContext.closed = true; return; } + + supportedFilters.put(filter.getKey(), filter.getValue()); + } + + filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS); + if (filter != null) { + noLocal = true; + supportedFilters.put(filter.getKey(), filter.getValue()); } } @@ -1449,7 +1460,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { source.setAddress(destination.getQualifiedName()); source.setDurable(TerminusDurability.UNSETTLED_STATE); source.setExpiryPolicy(TerminusExpiryPolicy.NEVER); - sender.setSource(source); } else { consumerContext.closed = true; sender.setSource(null); @@ -1465,7 +1475,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { source = new org.apache.qpid.proton.amqp.messaging.Source(); source.setAddress(destination.getQualifiedName()); source.setDynamic(true); - sender.setSource(source); consumerContext.addCloseAction(new Runnable() { @Override @@ -1477,6 +1486,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { destination = createDestination(source); } + source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters); + sender.setSource(source); + int senderCredit = sender.getRemoteCredit(); subscriptionsByConsumerId.put(id, consumerContext); @@ -1486,6 +1498,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { consumerInfo.setDestination(destination); consumerInfo.setPrefetchSize(senderCredit >= 0 ? senderCredit : 0); consumerInfo.setDispatchAsync(true); + consumerInfo.setNoLocal(noLocal); if (source.getDistributionMode() == COPY && destination.isQueue()) { consumerInfo.setBrowser(true); @@ -1495,11 +1508,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { consumerInfo.setSubscriptionName(sender.getName()); } - DescribedType filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS); - if (filter != null) { - consumerInfo.setNoLocal(true); - } - consumerContext.info = consumerInfo; consumerContext.setDestination(destination); consumerContext.credit = senderCredit; diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java index c0cfb945f6..7af4c2c91a 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java @@ -17,6 +17,7 @@ package org.apache.activemq.transport.amqp; import java.nio.ByteBuffer; +import java.util.AbstractMap; import java.util.Map; import org.apache.qpid.proton.amqp.Binary; @@ -86,7 +87,7 @@ public class AmqpSupport { * * @return the filter if found in the mapping or null if not found. */ - public static DescribedType findFilter(Map filters, Object[] filterIds) { + public static Map.Entry findFilter(Map filters, Object[] filterIds) { if (filterIds == null || filterIds.length == 0) { throw new IllegalArgumentException("Invalid Filter Ids array passed: " + filterIds); @@ -96,14 +97,14 @@ public class AmqpSupport { return null; } - for (Object value : filters.values()) { - if (value instanceof DescribedType) { - DescribedType describedType = ((DescribedType) value); + for (Map.Entry filter : filters.entrySet()) { + if (filter.getValue() instanceof DescribedType) { + DescribedType describedType = ((DescribedType) filter.getValue()); Object descriptor = describedType.getDescriptor(); for (Object filterId : filterIds) { if (descriptor.equals(filterId)) { - return describedType; + return new AbstractMap.SimpleImmutableEntry(filter.getKey(), describedType); } } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java index e7d3eaf0d5..27627324a4 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java @@ -187,7 +187,7 @@ public class AmqpClient { * @param stateInspector * the new state inspector to use. */ - public void setStateInspector(AmqpValidator stateInspector) { + public void setValidator(AmqpValidator stateInspector) { if (stateInspector == null) { stateInspector = new AmqpValidator(); } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorType.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorFilter.java similarity index 85% rename from activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorType.java rename to activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorFilter.java index d93e0527a3..9fad2efc77 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorType.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorFilter.java @@ -16,23 +16,24 @@ */ package org.apache.activemq.transport.amqp.client; +import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_CODE; + import org.apache.qpid.proton.amqp.DescribedType; -import org.apache.qpid.proton.amqp.UnsignedLong; /** * A Described Type wrapper for JMS selector values. */ -public class AmqpJmsSelectorType implements DescribedType { +public class AmqpJmsSelectorFilter implements DescribedType { private final String selector; - public AmqpJmsSelectorType(String selector) { + public AmqpJmsSelectorFilter(String selector) { this.selector = selector; } @Override public Object getDescriptor() { - return UnsignedLong.valueOf(0x0000468C00000004L); + return JMS_SELECTOR_CODE; } @Override diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalType.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalFilter.java similarity index 81% rename from activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalType.java rename to activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalFilter.java index 2d61b83e8f..0bdd71ee6b 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalType.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalFilter.java @@ -16,25 +16,26 @@ */ package org.apache.activemq.transport.amqp.client; +import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_CODE; + import org.apache.qpid.proton.amqp.DescribedType; -import org.apache.qpid.proton.amqp.UnsignedLong; /** * A Described Type wrapper for JMS no local option for MessageConsumer. */ -public class AmqpNoLocalType implements DescribedType { +public class AmqpNoLocalFilter implements DescribedType { - public static final AmqpNoLocalType NO_LOCAL = new AmqpNoLocalType(); + public static final AmqpNoLocalFilter NO_LOCAL = new AmqpNoLocalFilter(); private final String noLocal; - public AmqpNoLocalType() { + public AmqpNoLocalFilter() { this.noLocal = "NoLocalFilter{}"; } @Override public Object getDescriptor() { - return UnsignedLong.valueOf(0x0000468C00000003L); + return NO_LOCAL_CODE; } @Override diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java index ff530b9250..1290d270cd 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java @@ -76,6 +76,7 @@ public class AmqpReceiver extends AmqpAbstractResource { private String selector; private boolean presettle; private boolean noLocal; + private Source userSpecifiedSource; /** * Create a new receiver instance. @@ -93,6 +94,28 @@ public class AmqpReceiver extends AmqpAbstractResource { this.receiverId = receiverId; } + /** + * Create a new receiver instance. + * + * @param session + * The parent session that created the receiver. + * @param source + * The Source instance to use instead of creating and configuring one. + * @param receiverId + * The unique ID assigned to this receiver. + */ + public AmqpReceiver(AmqpSession session, Source source, String receiverId) { + + if (source == null) { + throw new IllegalArgumentException("User specified Source cannot be null"); + } + + this.session = session; + this.userSpecifiedSource = source; + this.address = source.getAddress(); + this.receiverId = receiverId; + } + /** * Close the receiver, a closed receiver will throw exceptions if any further send * calls are made. @@ -423,11 +446,14 @@ public class AmqpReceiver extends AmqpAbstractResource { @Override protected void doOpen() { - Source source = new Source(); - source.setAddress(address); + Source source = userSpecifiedSource; Target target = new Target(); - configureSource(source); + if (userSpecifiedSource == null) { + source = new Source(); + source.setAddress(address); + configureSource(source); + } String receiverName = receiverId + ":" + address; @@ -523,11 +549,11 @@ public class AmqpReceiver extends AmqpAbstractResource { source.setDefaultOutcome(modified); if (isNoLocal()) { - filters.put(NO_LOCAL_NAME, AmqpNoLocalType.NO_LOCAL); + filters.put(NO_LOCAL_NAME, AmqpNoLocalFilter.NO_LOCAL); } if (getSelector() != null && !getSelector().trim().equals("")) { - filters.put(JMS_SELECTOR_NAME, new AmqpJmsSelectorType(getSelector())); + filters.put(JMS_SELECTOR_NAME, new AmqpJmsSelectorFilter(getSelector())); } if (!filters.isEmpty()) { diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index c747dc694d..8b039b682f 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.transport.amqp.client.util.ClientFuture; import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession; +import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Session; @@ -151,6 +152,40 @@ public class AmqpSession extends AmqpAbstractResource { return receiver; } + /** + * Create a receiver instance using the given address + * + * @param address + * the address to which the receiver will subscribe for its messages. + * @param source + * the caller created and configured Source used to create the receiver link. + * + * @return a newly created receiver that is ready for use. + * + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpReceiver createReceiver(Source source) throws Exception { + checkClosed(); + + final ClientFuture request = new ClientFuture(); + final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, getNextReceiverId()); + + connection.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + receiver.setStateInspector(getStateInspector()); + receiver.open(request); + pumpToProtonTransport(); + } + }); + + request.sync(); + + return receiver; + } + /** * Create a receiver instance using the given address that creates a durable subscription. * diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpUnknownFilterType.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpUnknownFilterType.java index c86a2c9926..9f3c84050b 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpUnknownFilterType.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpUnknownFilterType.java @@ -17,6 +17,7 @@ package org.apache.activemq.transport.amqp.client; import org.apache.qpid.proton.amqp.DescribedType; +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.UnsignedLong; /** @@ -26,6 +27,10 @@ public class AmqpUnknownFilterType implements DescribedType { public static final AmqpUnknownFilterType UNKOWN_FILTER = new AmqpUnknownFilterType(); + public static final UnsignedLong UNKNOWN_FILTER_CODE = UnsignedLong.valueOf(0x0000468C00000099L); + public static final Symbol UNKNOWN_FILTER_NAME = Symbol.valueOf("apache.org:unkown-filter:string"); + public static final Object[] UNKNOWN_FILTER_IDS = new Object[] { UNKNOWN_FILTER_CODE, UNKNOWN_FILTER_NAME }; + private final String payload; public AmqpUnknownFilterType() { @@ -34,7 +39,7 @@ public class AmqpUnknownFilterType implements DescribedType { @Override public Object getDescriptor() { - return UnsignedLong.valueOf(0x0000468C00000099L); + return UNKNOWN_FILTER_CODE; } @Override diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java index c7a99d3edc..fd44dcd7c4 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java @@ -147,13 +147,13 @@ public class UnmodifiableLink implements Link { @Override public Source getRemoteSource() { // TODO Figure out a simple way to wrap the odd Source types in Proton-J - return link.getSource(); + return link.getRemoteSource(); } @Override public Target getRemoteTarget() { // TODO Figure out a simple way to wrap the odd Target types in Proton-J - return link.getTarget(); + return link.getRemoteTarget(); } @Override diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java index 2f9935fc66..dfe3a4b1c4 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java @@ -62,7 +62,7 @@ public class AmqpConnectionsTest extends AmqpClientTestSupport { AmqpClient client = createAmqpClient(); assertNotNull(client); - client.setStateInspector(new AmqpValidator() { + client.setValidator(new AmqpValidator() { @Override public void inspectOpenedResource(Connection connection) { diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java index cdecab03ca..13b5904ac7 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -33,10 +34,14 @@ import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.activemq.transport.amqp.client.AmqpUnknownFilterType; import org.apache.activemq.transport.amqp.client.AmqpValidator; import org.apache.activemq.util.Wait; +import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.TerminusDurability; +import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; import org.apache.qpid.proton.engine.Receiver; import org.junit.Ignore; import org.junit.Test; @@ -74,18 +79,18 @@ public class AmqpReceiverTest extends AmqpClientTestSupport { public void testCreateQueueReceiverWithJMSSelector() throws Exception { AmqpClient client = createAmqpClient(); - client.setStateInspector(new AmqpValidator() { + client.setValidator(new AmqpValidator() { @SuppressWarnings("unchecked") @Override public void inspectOpenedResource(Receiver receiver) { LOG.info("Receiver opened: {}", receiver); - if (receiver.getSource() == null) { + if (receiver.getRemoteSource() == null) { markAsInvalid("Link opened with null source."); } - Source source = (Source) receiver.getSource(); + Source source = (Source) receiver.getRemoteSource(); Map filters = source.getFilter(); if (findFilter(filters, JMS_SELECTOR_FILTER_IDS) == null) { @@ -111,18 +116,18 @@ public class AmqpReceiverTest extends AmqpClientTestSupport { public void testCreateQueueReceiverWithNoLocalSet() throws Exception { AmqpClient client = createAmqpClient(); - client.setStateInspector(new AmqpValidator() { + client.setValidator(new AmqpValidator() { @SuppressWarnings("unchecked") @Override public void inspectOpenedResource(Receiver receiver) { LOG.info("Receiver opened: {}", receiver); - if (receiver.getSource() == null) { + if (receiver.getRemoteSource() == null) { markAsInvalid("Link opened with null source."); } - Source source = (Source) receiver.getSource(); + Source source = (Source) receiver.getRemoteSource(); Map filters = source.getFilter(); if (findFilter(filters, NO_LOCAL_FILTER_IDS) == null) { @@ -363,4 +368,50 @@ public class AmqpReceiverTest extends AmqpClientTestSupport { connection.close(); } + + @Test(timeout = 60000) + public void testUnsupportedFiltersAreNotListedAsSupported() throws Exception { + AmqpClient client = createAmqpClient(); + + client.setValidator(new AmqpValidator() { + + @SuppressWarnings("unchecked") + @Override + public void inspectOpenedResource(Receiver receiver) { + LOG.info("Receiver opened: {}", receiver); + + if (receiver.getRemoteSource() == null) { + markAsInvalid("Link opened with null source."); + } + + Source source = (Source) receiver.getRemoteSource(); + Map filters = source.getFilter(); + + if (findFilter(filters, AmqpUnknownFilterType.UNKNOWN_FILTER_IDS) != null) { + markAsInvalid("Broker should not return unsupported filter on attach."); + } + } + }); + + Map filters = new HashMap(); + filters.put(AmqpUnknownFilterType.UNKNOWN_FILTER_NAME, AmqpUnknownFilterType.UNKOWN_FILTER); + + Source source = new Source(); + source.setAddress("queue://" + getTestName()); + source.setFilter(filters); + source.setDurable(TerminusDurability.NONE); + source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); + + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + assertEquals(0, brokerService.getAdminView().getQueues().length); + + session.createReceiver(source); + + assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length); + + connection.getStateInspector().assertValid(); + connection.close(); + } }