From 1a0f73ed193fce9152bad866b4116d5f0f437337 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Tue, 17 Mar 2015 15:28:48 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5666 Add some tests and cleanup of the testing client --- .../amqp/client/AmqpAbstractResource.java | 8 +- .../transport/amqp/client/AmqpClient.java | 8 +- .../transport/amqp/client/AmqpConnection.java | 3 +- .../transport/amqp/client/AmqpSession.java | 94 ++++++++++++++++++- .../amqp/client/AmqpUnknownFilterType.java | 44 +++++++++ ...StateInspector.java => AmqpValidator.java} | 25 +++-- .../amqp/client/util/UnmodifiableLink.java | 16 ++-- .../amqp/interop/AmqpConnectionsTest.java | 9 +- .../amqp/interop/AmqpReceiverTest.java | 82 ++++++++++++++++ 9 files changed, 258 insertions(+), 31 deletions(-) create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpUnknownFilterType.java rename activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/{AmqpStateInspector.java => AmqpValidator.java} (79%) diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java index b5a6324469..fbc4ddae1c 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java @@ -41,7 +41,7 @@ public abstract class AmqpAbstractResource implements AmqpRe protected AsyncResult openRequest; protected AsyncResult closeRequest; - private AmqpStateInspector amqpStateInspector = new AmqpStateInspector(); + private AmqpValidator amqpStateInspector; private E endpoint; @@ -184,13 +184,13 @@ public abstract class AmqpAbstractResource implements AmqpRe this.endpoint = endpoint; } - public AmqpStateInspector getStateInspector() { + public AmqpValidator getStateInspector() { return amqpStateInspector; } - public void setStateInspector(AmqpStateInspector stateInspector) { + public void setStateInspector(AmqpValidator stateInspector) { if (stateInspector == null) { - stateInspector = new AmqpStateInspector(); + stateInspector = new AmqpValidator(); } this.amqpStateInspector = stateInspector; diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java index 0b299e4f37..e7d3eaf0d5 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java @@ -38,7 +38,7 @@ public class AmqpClient { private final String password; private final URI remoteURI; - private AmqpStateInspector stateInspector = new AmqpStateInspector(); + private AmqpValidator stateInspector = new AmqpValidator(); private List offeredCapabilities = Collections.emptyList(); private Map offeredProperties = Collections.emptyMap(); @@ -176,7 +176,7 @@ public class AmqpClient { /** * @return the currently set state inspector used to check state after various events. */ - public AmqpStateInspector getStateInspector() { + public AmqpValidator getStateInspector() { return stateInspector; } @@ -187,9 +187,9 @@ public class AmqpClient { * @param stateInspector * the new state inspector to use. */ - public void setStateInspector(AmqpStateInspector stateInspector) { + public void setStateInspector(AmqpValidator stateInspector) { if (stateInspector == null) { - stateInspector = new AmqpStateInspector(); + stateInspector = new AmqpValidator(); } this.stateInspector = stateInspector; diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java index a98f711a5c..b4fd66186a 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java @@ -222,10 +222,9 @@ public class AmqpConnection extends AmqpAbstractResource implements @Override public void run() { checkClosed(); - session.setEndpoint(getEndpoint().session()); + session.setStateInspector(getStateInspector()); session.open(request); - pumpToProtonTransport(); } }); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index b2fc2f18a6..c747dc694d 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -69,6 +69,7 @@ public class AmqpSession extends AmqpAbstractResource { @Override public void run() { checkClosed(); + sender.setStateInspector(getStateInspector()); sender.open(request); pumpToProtonTransport(); } @@ -83,23 +84,63 @@ public class AmqpSession extends AmqpAbstractResource { * Create a receiver instance using the given address * * @param address - * the address to which the receiver will subscribe for its messages. + * the address to which the receiver will subscribe for its messages. * * @return a newly created receiver that is ready for use. * * @throws Exception if an error occurs while creating the receiver. */ public AmqpReceiver createReceiver(String address) throws Exception { + return createReceiver(address, null, false); + } + + /** + * Create a receiver instance using the given address + * + * @param address + * the address to which the receiver will subscribe for its messages. + * @param selector + * the JMS selector to use for the subscription + * + * @return a newly created receiver that is ready for use. + * + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpReceiver createReceiver(String address, String selector) throws Exception { + return createReceiver(address, selector, false); + } + + /** + * Create a receiver instance using the given address + * + * @param address + * the address to which the receiver will subscribe for its messages. + * @param selector + * the JMS selector to use for the subscription + * @param noLocal + * should the subscription have messages from its connection filtered. + * + * @return a newly created receiver that is ready for use. + * + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpReceiver createReceiver(String address, String selector, boolean noLocal) throws Exception { checkClosed(); - final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, address, getNextReceiverId()); final ClientFuture request = new ClientFuture(); + final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, address, getNextReceiverId()); + + receiver.setNoLocal(noLocal); + if (selector != null && !selector.isEmpty()) { + receiver.setSelector(selector); + } connection.getScheduler().execute(new Runnable() { @Override public void run() { checkClosed(); + receiver.setStateInspector(getStateInspector()); receiver.open(request); pumpToProtonTransport(); } @@ -123,17 +164,64 @@ public class AmqpSession extends AmqpAbstractResource { * @throws Exception if an error occurs while creating the receiver. */ public AmqpReceiver createDurableReceiver(String address, String subscriptionName) throws Exception { + return createDurableReceiver(address, subscriptionName, null, false); + } + + /** + * Create a receiver instance using the given address that creates a durable subscription. + * + * @param address + * the address to which the receiver will subscribe for its messages. + * @param subscriptionName + * the name of the subscription that is being created. + * @param selector + * the JMS selector to use for the subscription + * + * @return a newly created receiver that is ready for use. + * + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpReceiver createDurableReceiver(String address, String subscriptionName, String selector) throws Exception { + return createDurableReceiver(address, subscriptionName, selector, false); + } + + /** + * Create a receiver instance using the given address that creates a durable subscription. + * + * @param address + * the address to which the receiver will subscribe for its messages. + * @param subscriptionName + * the name of the subscription that is being created. + * @param selector + * the JMS selector to use for the subscription + * @param noLocal + * should the subscription have messages from its connection filtered. + * + * @return a newly created receiver that is ready for use. + * + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpReceiver createDurableReceiver(String address, String subscriptionName, String selector, boolean noLocal) throws Exception { checkClosed(); + if (subscriptionName == null || subscriptionName.isEmpty()) { + throw new IllegalArgumentException("subscription name must not be null or empty."); + } + + final ClientFuture request = new ClientFuture(); final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, address, getNextReceiverId()); receiver.setSubscriptionName(subscriptionName); - final ClientFuture request = new ClientFuture(); + receiver.setNoLocal(noLocal); + if (selector != null && !selector.isEmpty()) { + receiver.setSelector(selector); + } connection.getScheduler().execute(new Runnable() { @Override public void run() { checkClosed(); + receiver.setStateInspector(getStateInspector()); receiver.open(request); pumpToProtonTransport(); } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpUnknownFilterType.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpUnknownFilterType.java new file mode 100644 index 0000000000..c86a2c9926 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpUnknownFilterType.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import org.apache.qpid.proton.amqp.DescribedType; +import org.apache.qpid.proton.amqp.UnsignedLong; + +/** + * A Described Type wrapper for an unsupported filter that the broker should ignore. + */ +public class AmqpUnknownFilterType implements DescribedType { + + public static final AmqpUnknownFilterType UNKOWN_FILTER = new AmqpUnknownFilterType(); + + private final String payload; + + public AmqpUnknownFilterType() { + this.payload = "UnknownFilter{}"; + } + + @Override + public Object getDescriptor() { + return UnsignedLong.valueOf(0x0000468C00000099L); + } + + @Override + public Object getDescribed() { + return this.payload; + } +} diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpStateInspector.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java similarity index 79% rename from activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpStateInspector.java rename to activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java index 5471876196..fc088fcad6 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpStateInspector.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java @@ -17,14 +17,15 @@ package org.apache.activemq.transport.amqp.client; import org.apache.qpid.proton.engine.Connection; -import org.apache.qpid.proton.engine.Link; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Sender; import org.apache.qpid.proton.engine.Session; /** * Abstract base for a validation hook that is used in tests to check * the state of a remote resource after a variety of lifecycle events. */ -public class AmqpStateInspector { +public class AmqpValidator { private boolean valid = true; private String errorMessage; @@ -37,7 +38,11 @@ public class AmqpStateInspector { } - public void inspectOpenedResource(Link link) { + public void inspectOpenedResource(Sender sender) { + + } + + public void inspectOpenedResource(Receiver receiver) { } @@ -49,11 +54,19 @@ public class AmqpStateInspector { } - public void inspectClosedResource(Link link) { + public void inspectClosedResource(Sender sender) { } - public void inspectDetachedResource(Link link) { + public void inspectClosedResource(Receiver receiver) { + + } + + public void inspectDetachedResource(Sender sender) { + + } + + public void inspectDetachedResource(Receiver receiver) { } @@ -80,7 +93,7 @@ public class AmqpStateInspector { } } - public void assertIfStateChecksFailed() { + public void assertValid() { if (!isValid()) { throw new AssertionError(errorMessage); } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java index 70665c09e2..c7a99d3edc 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java @@ -124,14 +124,14 @@ public class UnmodifiableLink implements Link { @Override public Source getSource() { - // TODO Auto-generated method stub - return null; + // TODO Figure out a simple way to wrap the odd Source types in Proton-J + return link.getSource(); } @Override public Target getTarget() { - // TODO Auto-generated method stub - return null; + // TODO Figure out a simple way to wrap the odd Source types in Proton-J + return link.getTarget(); } @Override @@ -146,14 +146,14 @@ public class UnmodifiableLink implements Link { @Override public Source getRemoteSource() { - // TODO Auto-generated method stub - return null; + // TODO Figure out a simple way to wrap the odd Source types in Proton-J + return link.getSource(); } @Override public Target getRemoteTarget() { - // TODO Auto-generated method stub - return null; + // TODO Figure out a simple way to wrap the odd Target types in Proton-J + return link.getTarget(); } @Override diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java index a35709d616..2f9935fc66 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java @@ -26,7 +26,7 @@ import java.util.Map; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; import org.apache.activemq.transport.amqp.client.AmqpConnection; -import org.apache.activemq.transport.amqp.client.AmqpStateInspector; +import org.apache.activemq.transport.amqp.client.AmqpValidator; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.ErrorCondition; @@ -62,7 +62,7 @@ public class AmqpConnectionsTest extends AmqpClientTestSupport { AmqpClient client = createAmqpClient(); assertNotNull(client); - client.setStateInspector(new AmqpStateInspector() { + client.setStateInspector(new AmqpValidator() { @Override public void inspectOpenedResource(Connection connection) { @@ -88,6 +88,7 @@ public class AmqpConnectionsTest extends AmqpClientTestSupport { assertEquals(1, getProxyToBroker().getCurrentConnectionsCount()); + connection.getStateInspector().assertValid(); connection.close(); assertEquals(0, getProxyToBroker().getCurrentConnectionsCount()); @@ -131,7 +132,7 @@ public class AmqpConnectionsTest extends AmqpClientTestSupport { connection1.connect(); assertEquals(1, getProxyToBroker().getCurrentConnectionsCount()); - connection2.setStateInspector(new AmqpStateInspector() { + connection2.setStateInspector(new AmqpValidator() { @Override public void inspectOpenedResource(Connection connection) { @@ -160,7 +161,7 @@ public class AmqpConnectionsTest extends AmqpClientTestSupport { LOG.info("Second connection with same container Id failed as expected."); } - connection2.getStateInspector().assertIfStateChecksFailed(); + connection2.getStateInspector().assertValid(); assertEquals(1, getProxyToBroker().getCurrentConnectionsCount()); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java index 1bc3d6618f..cdecab03ca 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java @@ -16,10 +16,14 @@ */ package org.apache.activemq.transport.amqp.interop; +import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS; +import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS; +import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.activemq.broker.jmx.QueueViewMBean; @@ -29,7 +33,11 @@ import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.activemq.transport.amqp.client.AmqpValidator; import org.apache.activemq.util.Wait; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.engine.Receiver; import org.junit.Ignore; import org.junit.Test; @@ -62,6 +70,80 @@ public class AmqpReceiverTest extends AmqpClientTestSupport { connection.close(); } + @Test(timeout = 60000) + public void testCreateQueueReceiverWithJMSSelector() throws Exception { + AmqpClient client = createAmqpClient(); + + client.setStateInspector(new AmqpValidator() { + + @SuppressWarnings("unchecked") + @Override + public void inspectOpenedResource(Receiver receiver) { + LOG.info("Receiver opened: {}", receiver); + + if (receiver.getSource() == null) { + markAsInvalid("Link opened with null source."); + } + + Source source = (Source) receiver.getSource(); + Map filters = source.getFilter(); + + if (findFilter(filters, JMS_SELECTOR_FILTER_IDS) == null) { + markAsInvalid("Broker did not return the JMS Filter on Attach"); + } + } + }); + + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + assertEquals(0, brokerService.getAdminView().getQueues().length); + + session.createReceiver("queue://" + getTestName(), "JMSPriority > 8"); + + assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length); + + connection.getStateInspector().assertValid(); + connection.close(); + } + + @Test(timeout = 60000) + public void testCreateQueueReceiverWithNoLocalSet() throws Exception { + AmqpClient client = createAmqpClient(); + + client.setStateInspector(new AmqpValidator() { + + @SuppressWarnings("unchecked") + @Override + public void inspectOpenedResource(Receiver receiver) { + LOG.info("Receiver opened: {}", receiver); + + if (receiver.getSource() == null) { + markAsInvalid("Link opened with null source."); + } + + Source source = (Source) receiver.getSource(); + Map filters = source.getFilter(); + + if (findFilter(filters, NO_LOCAL_FILTER_IDS) == null) { + markAsInvalid("Broker did not return the NoLocal Filter on Attach"); + } + } + }); + + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + assertEquals(0, brokerService.getAdminView().getQueues().length); + + session.createReceiver("queue://" + getTestName(), null, true); + + assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length); + + connection.getStateInspector().assertValid(); + connection.close(); + } + @Test(timeout = 60000) public void testCreateTopicReceiver() throws Exception { AmqpClient client = createAmqpClient();