Fix and tests for filter handling on attach.  We only support JMS
selector and NoLocal type filters for receivers so only report those
back, all others are dropped to indicate we will not honor them.
This commit is contained in:
Timothy Bish 2015-03-17 18:44:24 -04:00
parent 1a0f73ed19
commit ca456c4601
11 changed files with 167 additions and 39 deletions

View File

@ -1417,15 +1417,18 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source) sender.getRemoteSource();
try {
final Map<Symbol, Object> supportedFilters = new HashMap<Symbol, Object>();
final ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++);
final ConsumerContext consumerContext = new ConsumerContext(id, sender);
sender.setContext(consumerContext);
boolean noLocal = false;
String selector = null;
if (source != null) {
DescribedType filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS);
Map.Entry<Symbol, DescribedType> filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS);
if (filter != null) {
selector = filter.getDescribed().toString();
selector = filter.getValue().getDescribed().toString();
// Validate the Selector.
try {
SelectorParser.parse(selector);
@ -1436,6 +1439,14 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
consumerContext.closed = true;
return;
}
supportedFilters.put(filter.getKey(), filter.getValue());
}
filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS);
if (filter != null) {
noLocal = true;
supportedFilters.put(filter.getKey(), filter.getValue());
}
}
@ -1449,7 +1460,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
source.setAddress(destination.getQualifiedName());
source.setDurable(TerminusDurability.UNSETTLED_STATE);
source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
sender.setSource(source);
} else {
consumerContext.closed = true;
sender.setSource(null);
@ -1465,7 +1475,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
source = new org.apache.qpid.proton.amqp.messaging.Source();
source.setAddress(destination.getQualifiedName());
source.setDynamic(true);
sender.setSource(source);
consumerContext.addCloseAction(new Runnable() {
@Override
@ -1477,6 +1486,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
destination = createDestination(source);
}
source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters);
sender.setSource(source);
int senderCredit = sender.getRemoteCredit();
subscriptionsByConsumerId.put(id, consumerContext);
@ -1486,6 +1498,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
consumerInfo.setDestination(destination);
consumerInfo.setPrefetchSize(senderCredit >= 0 ? senderCredit : 0);
consumerInfo.setDispatchAsync(true);
consumerInfo.setNoLocal(noLocal);
if (source.getDistributionMode() == COPY && destination.isQueue()) {
consumerInfo.setBrowser(true);
@ -1495,11 +1508,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
consumerInfo.setSubscriptionName(sender.getName());
}
DescribedType filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS);
if (filter != null) {
consumerInfo.setNoLocal(true);
}
consumerContext.info = consumerInfo;
consumerContext.setDestination(destination);
consumerContext.credit = senderCredit;

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport.amqp;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.Map;
import org.apache.qpid.proton.amqp.Binary;
@ -86,7 +87,7 @@ public class AmqpSupport {
*
* @return the filter if found in the mapping or null if not found.
*/
public static DescribedType findFilter(Map<Symbol, Object> filters, Object[] filterIds) {
public static Map.Entry<Symbol, DescribedType> findFilter(Map<Symbol, Object> filters, Object[] filterIds) {
if (filterIds == null || filterIds.length == 0) {
throw new IllegalArgumentException("Invalid Filter Ids array passed: " + filterIds);
@ -96,14 +97,14 @@ public class AmqpSupport {
return null;
}
for (Object value : filters.values()) {
if (value instanceof DescribedType) {
DescribedType describedType = ((DescribedType) value);
for (Map.Entry<Symbol, Object> filter : filters.entrySet()) {
if (filter.getValue() instanceof DescribedType) {
DescribedType describedType = ((DescribedType) filter.getValue());
Object descriptor = describedType.getDescriptor();
for (Object filterId : filterIds) {
if (descriptor.equals(filterId)) {
return describedType;
return new AbstractMap.SimpleImmutableEntry<Symbol, DescribedType>(filter.getKey(), describedType);
}
}
}

View File

@ -187,7 +187,7 @@ public class AmqpClient {
* @param stateInspector
* the new state inspector to use.
*/
public void setStateInspector(AmqpValidator stateInspector) {
public void setValidator(AmqpValidator stateInspector) {
if (stateInspector == null) {
stateInspector = new AmqpValidator();
}

View File

@ -16,23 +16,24 @@
*/
package org.apache.activemq.transport.amqp.client;
import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_CODE;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.UnsignedLong;
/**
* A Described Type wrapper for JMS selector values.
*/
public class AmqpJmsSelectorType implements DescribedType {
public class AmqpJmsSelectorFilter implements DescribedType {
private final String selector;
public AmqpJmsSelectorType(String selector) {
public AmqpJmsSelectorFilter(String selector) {
this.selector = selector;
}
@Override
public Object getDescriptor() {
return UnsignedLong.valueOf(0x0000468C00000004L);
return JMS_SELECTOR_CODE;
}
@Override

View File

@ -16,25 +16,26 @@
*/
package org.apache.activemq.transport.amqp.client;
import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_CODE;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.UnsignedLong;
/**
* A Described Type wrapper for JMS no local option for MessageConsumer.
*/
public class AmqpNoLocalType implements DescribedType {
public class AmqpNoLocalFilter implements DescribedType {
public static final AmqpNoLocalType NO_LOCAL = new AmqpNoLocalType();
public static final AmqpNoLocalFilter NO_LOCAL = new AmqpNoLocalFilter();
private final String noLocal;
public AmqpNoLocalType() {
public AmqpNoLocalFilter() {
this.noLocal = "NoLocalFilter{}";
}
@Override
public Object getDescriptor() {
return UnsignedLong.valueOf(0x0000468C00000003L);
return NO_LOCAL_CODE;
}
@Override

View File

@ -76,6 +76,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
private String selector;
private boolean presettle;
private boolean noLocal;
private Source userSpecifiedSource;
/**
* Create a new receiver instance.
@ -93,6 +94,28 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
this.receiverId = receiverId;
}
/**
* Create a new receiver instance.
*
* @param session
* The parent session that created the receiver.
* @param source
* The Source instance to use instead of creating and configuring one.
* @param receiverId
* The unique ID assigned to this receiver.
*/
public AmqpReceiver(AmqpSession session, Source source, String receiverId) {
if (source == null) {
throw new IllegalArgumentException("User specified Source cannot be null");
}
this.session = session;
this.userSpecifiedSource = source;
this.address = source.getAddress();
this.receiverId = receiverId;
}
/**
* Close the receiver, a closed receiver will throw exceptions if any further send
* calls are made.
@ -423,11 +446,14 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
@Override
protected void doOpen() {
Source source = new Source();
source.setAddress(address);
Source source = userSpecifiedSource;
Target target = new Target();
if (userSpecifiedSource == null) {
source = new Source();
source.setAddress(address);
configureSource(source);
}
String receiverName = receiverId + ":" + address;
@ -523,11 +549,11 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
source.setDefaultOutcome(modified);
if (isNoLocal()) {
filters.put(NO_LOCAL_NAME, AmqpNoLocalType.NO_LOCAL);
filters.put(NO_LOCAL_NAME, AmqpNoLocalFilter.NO_LOCAL);
}
if (getSelector() != null && !getSelector().trim().equals("")) {
filters.put(JMS_SELECTOR_NAME, new AmqpJmsSelectorType(getSelector()));
filters.put(JMS_SELECTOR_NAME, new AmqpJmsSelectorFilter(getSelector()));
}
if (!filters.isEmpty()) {

View File

@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Session;
@ -151,6 +152,40 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
return receiver;
}
/**
* Create a receiver instance using the given address
*
* @param address
* the address to which the receiver will subscribe for its messages.
* @param source
* the caller created and configured Source used to create the receiver link.
*
* @return a newly created receiver that is ready for use.
*
* @throws Exception if an error occurs while creating the receiver.
*/
public AmqpReceiver createReceiver(Source source) throws Exception {
checkClosed();
final ClientFuture request = new ClientFuture();
final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, getNextReceiverId());
connection.getScheduler().execute(new Runnable() {
@Override
public void run() {
checkClosed();
receiver.setStateInspector(getStateInspector());
receiver.open(request);
pumpToProtonTransport();
}
});
request.sync();
return receiver;
}
/**
* Create a receiver instance using the given address that creates a durable subscription.
*

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport.amqp.client;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedLong;
/**
@ -26,6 +27,10 @@ public class AmqpUnknownFilterType implements DescribedType {
public static final AmqpUnknownFilterType UNKOWN_FILTER = new AmqpUnknownFilterType();
public static final UnsignedLong UNKNOWN_FILTER_CODE = UnsignedLong.valueOf(0x0000468C00000099L);
public static final Symbol UNKNOWN_FILTER_NAME = Symbol.valueOf("apache.org:unkown-filter:string");
public static final Object[] UNKNOWN_FILTER_IDS = new Object[] { UNKNOWN_FILTER_CODE, UNKNOWN_FILTER_NAME };
private final String payload;
public AmqpUnknownFilterType() {
@ -34,7 +39,7 @@ public class AmqpUnknownFilterType implements DescribedType {
@Override
public Object getDescriptor() {
return UnsignedLong.valueOf(0x0000468C00000099L);
return UNKNOWN_FILTER_CODE;
}
@Override

View File

@ -147,13 +147,13 @@ public class UnmodifiableLink implements Link {
@Override
public Source getRemoteSource() {
// TODO Figure out a simple way to wrap the odd Source types in Proton-J
return link.getSource();
return link.getRemoteSource();
}
@Override
public Target getRemoteTarget() {
// TODO Figure out a simple way to wrap the odd Target types in Proton-J
return link.getTarget();
return link.getRemoteTarget();
}
@Override

View File

@ -62,7 +62,7 @@ public class AmqpConnectionsTest extends AmqpClientTestSupport {
AmqpClient client = createAmqpClient();
assertNotNull(client);
client.setStateInspector(new AmqpValidator() {
client.setValidator(new AmqpValidator() {
@Override
public void inspectOpenedResource(Connection connection) {

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -33,10 +34,14 @@ import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpUnknownFilterType;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.activemq.util.Wait;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.engine.Receiver;
import org.junit.Ignore;
import org.junit.Test;
@ -74,18 +79,18 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
public void testCreateQueueReceiverWithJMSSelector() throws Exception {
AmqpClient client = createAmqpClient();
client.setStateInspector(new AmqpValidator() {
client.setValidator(new AmqpValidator() {
@SuppressWarnings("unchecked")
@Override
public void inspectOpenedResource(Receiver receiver) {
LOG.info("Receiver opened: {}", receiver);
if (receiver.getSource() == null) {
if (receiver.getRemoteSource() == null) {
markAsInvalid("Link opened with null source.");
}
Source source = (Source) receiver.getSource();
Source source = (Source) receiver.getRemoteSource();
Map<Symbol, Object> filters = source.getFilter();
if (findFilter(filters, JMS_SELECTOR_FILTER_IDS) == null) {
@ -111,18 +116,18 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
public void testCreateQueueReceiverWithNoLocalSet() throws Exception {
AmqpClient client = createAmqpClient();
client.setStateInspector(new AmqpValidator() {
client.setValidator(new AmqpValidator() {
@SuppressWarnings("unchecked")
@Override
public void inspectOpenedResource(Receiver receiver) {
LOG.info("Receiver opened: {}", receiver);
if (receiver.getSource() == null) {
if (receiver.getRemoteSource() == null) {
markAsInvalid("Link opened with null source.");
}
Source source = (Source) receiver.getSource();
Source source = (Source) receiver.getRemoteSource();
Map<Symbol, Object> filters = source.getFilter();
if (findFilter(filters, NO_LOCAL_FILTER_IDS) == null) {
@ -363,4 +368,50 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
connection.close();
}
@Test(timeout = 60000)
public void testUnsupportedFiltersAreNotListedAsSupported() throws Exception {
AmqpClient client = createAmqpClient();
client.setValidator(new AmqpValidator() {
@SuppressWarnings("unchecked")
@Override
public void inspectOpenedResource(Receiver receiver) {
LOG.info("Receiver opened: {}", receiver);
if (receiver.getRemoteSource() == null) {
markAsInvalid("Link opened with null source.");
}
Source source = (Source) receiver.getRemoteSource();
Map<Symbol, Object> filters = source.getFilter();
if (findFilter(filters, AmqpUnknownFilterType.UNKNOWN_FILTER_IDS) != null) {
markAsInvalid("Broker should not return unsupported filter on attach.");
}
}
});
Map<Symbol, DescribedType> filters = new HashMap<Symbol, DescribedType>();
filters.put(AmqpUnknownFilterType.UNKNOWN_FILTER_NAME, AmqpUnknownFilterType.UNKOWN_FILTER);
Source source = new Source();
source.setAddress("queue://" + getTestName());
source.setFilter(filters);
source.setDurable(TerminusDurability.NONE);
source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
assertEquals(0, brokerService.getAdminView().getQueues().length);
session.createReceiver(source);
assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length);
connection.getStateInspector().assertValid();
connection.close();
}
}