From ca456c4601c5e659f9864041af87f489a0e63e4b Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Tue, 17 Mar 2015 18:44:24 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5559 Fix and tests for filter handling on attach. We only support JMS selector and NoLocal type filters for receivers so only report those back, all others are dropped to indicate we will not honor them. --- .../transport/amqp/AmqpProtocolConverter.java | 26 +++++--- .../activemq/transport/amqp/AmqpSupport.java | 11 ++-- .../transport/amqp/client/AmqpClient.java | 2 +- ...orType.java => AmqpJmsSelectorFilter.java} | 9 +-- ...oLocalType.java => AmqpNoLocalFilter.java} | 11 ++-- .../transport/amqp/client/AmqpReceiver.java | 36 +++++++++-- .../transport/amqp/client/AmqpSession.java | 35 +++++++++++ .../amqp/client/AmqpUnknownFilterType.java | 7 ++- .../amqp/client/util/UnmodifiableLink.java | 4 +- .../amqp/interop/AmqpConnectionsTest.java | 2 +- .../amqp/interop/AmqpReceiverTest.java | 63 +++++++++++++++++-- 11 files changed, 167 insertions(+), 39 deletions(-) rename activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/{AmqpJmsSelectorType.java => AmqpJmsSelectorFilter.java} (85%) rename activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/{AmqpNoLocalType.java => AmqpNoLocalFilter.java} (81%) diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 3661f3db5e..5a73a25a76 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -1417,15 +1417,18 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source) sender.getRemoteSource(); try { + final Map supportedFilters = new HashMap(); final ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++); final ConsumerContext consumerContext = new ConsumerContext(id, sender); sender.setContext(consumerContext); + boolean noLocal = false; String selector = null; + if (source != null) { - DescribedType filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS); + Map.Entry filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS); if (filter != null) { - selector = filter.getDescribed().toString(); + selector = filter.getValue().getDescribed().toString(); // Validate the Selector. try { SelectorParser.parse(selector); @@ -1436,6 +1439,14 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { consumerContext.closed = true; return; } + + supportedFilters.put(filter.getKey(), filter.getValue()); + } + + filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS); + if (filter != null) { + noLocal = true; + supportedFilters.put(filter.getKey(), filter.getValue()); } } @@ -1449,7 +1460,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { source.setAddress(destination.getQualifiedName()); source.setDurable(TerminusDurability.UNSETTLED_STATE); source.setExpiryPolicy(TerminusExpiryPolicy.NEVER); - sender.setSource(source); } else { consumerContext.closed = true; sender.setSource(null); @@ -1465,7 +1475,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { source = new org.apache.qpid.proton.amqp.messaging.Source(); source.setAddress(destination.getQualifiedName()); source.setDynamic(true); - sender.setSource(source); consumerContext.addCloseAction(new Runnable() { @Override @@ -1477,6 +1486,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { destination = createDestination(source); } + source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters); + sender.setSource(source); + int senderCredit = sender.getRemoteCredit(); subscriptionsByConsumerId.put(id, consumerContext); @@ -1486,6 +1498,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { consumerInfo.setDestination(destination); consumerInfo.setPrefetchSize(senderCredit >= 0 ? senderCredit : 0); consumerInfo.setDispatchAsync(true); + consumerInfo.setNoLocal(noLocal); if (source.getDistributionMode() == COPY && destination.isQueue()) { consumerInfo.setBrowser(true); @@ -1495,11 +1508,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { consumerInfo.setSubscriptionName(sender.getName()); } - DescribedType filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS); - if (filter != null) { - consumerInfo.setNoLocal(true); - } - consumerContext.info = consumerInfo; consumerContext.setDestination(destination); consumerContext.credit = senderCredit; diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java index c0cfb945f6..7af4c2c91a 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java @@ -17,6 +17,7 @@ package org.apache.activemq.transport.amqp; import java.nio.ByteBuffer; +import java.util.AbstractMap; import java.util.Map; import org.apache.qpid.proton.amqp.Binary; @@ -86,7 +87,7 @@ public class AmqpSupport { * * @return the filter if found in the mapping or null if not found. */ - public static DescribedType findFilter(Map filters, Object[] filterIds) { + public static Map.Entry findFilter(Map filters, Object[] filterIds) { if (filterIds == null || filterIds.length == 0) { throw new IllegalArgumentException("Invalid Filter Ids array passed: " + filterIds); @@ -96,14 +97,14 @@ public class AmqpSupport { return null; } - for (Object value : filters.values()) { - if (value instanceof DescribedType) { - DescribedType describedType = ((DescribedType) value); + for (Map.Entry filter : filters.entrySet()) { + if (filter.getValue() instanceof DescribedType) { + DescribedType describedType = ((DescribedType) filter.getValue()); Object descriptor = describedType.getDescriptor(); for (Object filterId : filterIds) { if (descriptor.equals(filterId)) { - return describedType; + return new AbstractMap.SimpleImmutableEntry(filter.getKey(), describedType); } } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java index e7d3eaf0d5..27627324a4 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java @@ -187,7 +187,7 @@ public class AmqpClient { * @param stateInspector * the new state inspector to use. */ - public void setStateInspector(AmqpValidator stateInspector) { + public void setValidator(AmqpValidator stateInspector) { if (stateInspector == null) { stateInspector = new AmqpValidator(); } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorType.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorFilter.java similarity index 85% rename from activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorType.java rename to activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorFilter.java index d93e0527a3..9fad2efc77 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorType.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorFilter.java @@ -16,23 +16,24 @@ */ package org.apache.activemq.transport.amqp.client; +import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_CODE; + import org.apache.qpid.proton.amqp.DescribedType; -import org.apache.qpid.proton.amqp.UnsignedLong; /** * A Described Type wrapper for JMS selector values. */ -public class AmqpJmsSelectorType implements DescribedType { +public class AmqpJmsSelectorFilter implements DescribedType { private final String selector; - public AmqpJmsSelectorType(String selector) { + public AmqpJmsSelectorFilter(String selector) { this.selector = selector; } @Override public Object getDescriptor() { - return UnsignedLong.valueOf(0x0000468C00000004L); + return JMS_SELECTOR_CODE; } @Override diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalType.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalFilter.java similarity index 81% rename from activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalType.java rename to activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalFilter.java index 2d61b83e8f..0bdd71ee6b 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalType.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalFilter.java @@ -16,25 +16,26 @@ */ package org.apache.activemq.transport.amqp.client; +import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_CODE; + import org.apache.qpid.proton.amqp.DescribedType; -import org.apache.qpid.proton.amqp.UnsignedLong; /** * A Described Type wrapper for JMS no local option for MessageConsumer. */ -public class AmqpNoLocalType implements DescribedType { +public class AmqpNoLocalFilter implements DescribedType { - public static final AmqpNoLocalType NO_LOCAL = new AmqpNoLocalType(); + public static final AmqpNoLocalFilter NO_LOCAL = new AmqpNoLocalFilter(); private final String noLocal; - public AmqpNoLocalType() { + public AmqpNoLocalFilter() { this.noLocal = "NoLocalFilter{}"; } @Override public Object getDescriptor() { - return UnsignedLong.valueOf(0x0000468C00000003L); + return NO_LOCAL_CODE; } @Override diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java index ff530b9250..1290d270cd 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java @@ -76,6 +76,7 @@ public class AmqpReceiver extends AmqpAbstractResource { private String selector; private boolean presettle; private boolean noLocal; + private Source userSpecifiedSource; /** * Create a new receiver instance. @@ -93,6 +94,28 @@ public class AmqpReceiver extends AmqpAbstractResource { this.receiverId = receiverId; } + /** + * Create a new receiver instance. + * + * @param session + * The parent session that created the receiver. + * @param source + * The Source instance to use instead of creating and configuring one. + * @param receiverId + * The unique ID assigned to this receiver. + */ + public AmqpReceiver(AmqpSession session, Source source, String receiverId) { + + if (source == null) { + throw new IllegalArgumentException("User specified Source cannot be null"); + } + + this.session = session; + this.userSpecifiedSource = source; + this.address = source.getAddress(); + this.receiverId = receiverId; + } + /** * Close the receiver, a closed receiver will throw exceptions if any further send * calls are made. @@ -423,11 +446,14 @@ public class AmqpReceiver extends AmqpAbstractResource { @Override protected void doOpen() { - Source source = new Source(); - source.setAddress(address); + Source source = userSpecifiedSource; Target target = new Target(); - configureSource(source); + if (userSpecifiedSource == null) { + source = new Source(); + source.setAddress(address); + configureSource(source); + } String receiverName = receiverId + ":" + address; @@ -523,11 +549,11 @@ public class AmqpReceiver extends AmqpAbstractResource { source.setDefaultOutcome(modified); if (isNoLocal()) { - filters.put(NO_LOCAL_NAME, AmqpNoLocalType.NO_LOCAL); + filters.put(NO_LOCAL_NAME, AmqpNoLocalFilter.NO_LOCAL); } if (getSelector() != null && !getSelector().trim().equals("")) { - filters.put(JMS_SELECTOR_NAME, new AmqpJmsSelectorType(getSelector())); + filters.put(JMS_SELECTOR_NAME, new AmqpJmsSelectorFilter(getSelector())); } if (!filters.isEmpty()) { diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index c747dc694d..8b039b682f 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.transport.amqp.client.util.ClientFuture; import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession; +import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Session; @@ -151,6 +152,40 @@ public class AmqpSession extends AmqpAbstractResource { return receiver; } + /** + * Create a receiver instance using the given address + * + * @param address + * the address to which the receiver will subscribe for its messages. + * @param source + * the caller created and configured Source used to create the receiver link. + * + * @return a newly created receiver that is ready for use. + * + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpReceiver createReceiver(Source source) throws Exception { + checkClosed(); + + final ClientFuture request = new ClientFuture(); + final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, getNextReceiverId()); + + connection.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + receiver.setStateInspector(getStateInspector()); + receiver.open(request); + pumpToProtonTransport(); + } + }); + + request.sync(); + + return receiver; + } + /** * Create a receiver instance using the given address that creates a durable subscription. * diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpUnknownFilterType.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpUnknownFilterType.java index c86a2c9926..9f3c84050b 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpUnknownFilterType.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpUnknownFilterType.java @@ -17,6 +17,7 @@ package org.apache.activemq.transport.amqp.client; import org.apache.qpid.proton.amqp.DescribedType; +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.UnsignedLong; /** @@ -26,6 +27,10 @@ public class AmqpUnknownFilterType implements DescribedType { public static final AmqpUnknownFilterType UNKOWN_FILTER = new AmqpUnknownFilterType(); + public static final UnsignedLong UNKNOWN_FILTER_CODE = UnsignedLong.valueOf(0x0000468C00000099L); + public static final Symbol UNKNOWN_FILTER_NAME = Symbol.valueOf("apache.org:unkown-filter:string"); + public static final Object[] UNKNOWN_FILTER_IDS = new Object[] { UNKNOWN_FILTER_CODE, UNKNOWN_FILTER_NAME }; + private final String payload; public AmqpUnknownFilterType() { @@ -34,7 +39,7 @@ public class AmqpUnknownFilterType implements DescribedType { @Override public Object getDescriptor() { - return UnsignedLong.valueOf(0x0000468C00000099L); + return UNKNOWN_FILTER_CODE; } @Override diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java index c7a99d3edc..fd44dcd7c4 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java @@ -147,13 +147,13 @@ public class UnmodifiableLink implements Link { @Override public Source getRemoteSource() { // TODO Figure out a simple way to wrap the odd Source types in Proton-J - return link.getSource(); + return link.getRemoteSource(); } @Override public Target getRemoteTarget() { // TODO Figure out a simple way to wrap the odd Target types in Proton-J - return link.getTarget(); + return link.getRemoteTarget(); } @Override diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java index 2f9935fc66..dfe3a4b1c4 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java @@ -62,7 +62,7 @@ public class AmqpConnectionsTest extends AmqpClientTestSupport { AmqpClient client = createAmqpClient(); assertNotNull(client); - client.setStateInspector(new AmqpValidator() { + client.setValidator(new AmqpValidator() { @Override public void inspectOpenedResource(Connection connection) { diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java index cdecab03ca..13b5904ac7 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -33,10 +34,14 @@ import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.activemq.transport.amqp.client.AmqpUnknownFilterType; import org.apache.activemq.transport.amqp.client.AmqpValidator; import org.apache.activemq.util.Wait; +import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.TerminusDurability; +import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; import org.apache.qpid.proton.engine.Receiver; import org.junit.Ignore; import org.junit.Test; @@ -74,18 +79,18 @@ public class AmqpReceiverTest extends AmqpClientTestSupport { public void testCreateQueueReceiverWithJMSSelector() throws Exception { AmqpClient client = createAmqpClient(); - client.setStateInspector(new AmqpValidator() { + client.setValidator(new AmqpValidator() { @SuppressWarnings("unchecked") @Override public void inspectOpenedResource(Receiver receiver) { LOG.info("Receiver opened: {}", receiver); - if (receiver.getSource() == null) { + if (receiver.getRemoteSource() == null) { markAsInvalid("Link opened with null source."); } - Source source = (Source) receiver.getSource(); + Source source = (Source) receiver.getRemoteSource(); Map filters = source.getFilter(); if (findFilter(filters, JMS_SELECTOR_FILTER_IDS) == null) { @@ -111,18 +116,18 @@ public class AmqpReceiverTest extends AmqpClientTestSupport { public void testCreateQueueReceiverWithNoLocalSet() throws Exception { AmqpClient client = createAmqpClient(); - client.setStateInspector(new AmqpValidator() { + client.setValidator(new AmqpValidator() { @SuppressWarnings("unchecked") @Override public void inspectOpenedResource(Receiver receiver) { LOG.info("Receiver opened: {}", receiver); - if (receiver.getSource() == null) { + if (receiver.getRemoteSource() == null) { markAsInvalid("Link opened with null source."); } - Source source = (Source) receiver.getSource(); + Source source = (Source) receiver.getRemoteSource(); Map filters = source.getFilter(); if (findFilter(filters, NO_LOCAL_FILTER_IDS) == null) { @@ -363,4 +368,50 @@ public class AmqpReceiverTest extends AmqpClientTestSupport { connection.close(); } + + @Test(timeout = 60000) + public void testUnsupportedFiltersAreNotListedAsSupported() throws Exception { + AmqpClient client = createAmqpClient(); + + client.setValidator(new AmqpValidator() { + + @SuppressWarnings("unchecked") + @Override + public void inspectOpenedResource(Receiver receiver) { + LOG.info("Receiver opened: {}", receiver); + + if (receiver.getRemoteSource() == null) { + markAsInvalid("Link opened with null source."); + } + + Source source = (Source) receiver.getRemoteSource(); + Map filters = source.getFilter(); + + if (findFilter(filters, AmqpUnknownFilterType.UNKNOWN_FILTER_IDS) != null) { + markAsInvalid("Broker should not return unsupported filter on attach."); + } + } + }); + + Map filters = new HashMap(); + filters.put(AmqpUnknownFilterType.UNKNOWN_FILTER_NAME, AmqpUnknownFilterType.UNKOWN_FILTER); + + Source source = new Source(); + source.setAddress("queue://" + getTestName()); + source.setFilter(filters); + source.setDurable(TerminusDurability.NONE); + source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); + + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + assertEquals(0, brokerService.getAdminView().getQueues().length); + + session.createReceiver(source); + + assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length); + + connection.getStateInspector().assertValid(); + connection.close(); + } }