From ba7b8aff59978473862a4cc4b52168d36c991887 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Tue, 2 May 2017 18:14:55 -0400 Subject: [PATCH] ARTEMIS-1139 Add a few tests using Qpid JMS of AMQP over WS Adds a couple tests using Qpid JMS to validate that AMQP over WS is working. --- .../amqp/AmqpClientTestSupport.java | 18 ++- .../integration/amqp/AmqpTestSupport.java | 7 +- .../amqp/JMSClientTestSupport.java | 5 +- .../amqp/JMSWebSocketConnectionTest.java | 151 ++++++++++++++++++ 4 files changed, 174 insertions(+), 7 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSWebSocketConnectionTest.java diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java index bfb81840f5..8d278954b8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java @@ -48,8 +48,8 @@ import org.junit.After; import org.junit.Before; /** - * Test support class for tests that will be using the AMQP Proton wrapper client. - * This is to make it easier to migrate tests from ActiveMQ5 + * Test support class for tests that will be using the AMQP Proton wrapper client. This is to + * make it easier to migrate tests from ActiveMQ5 */ public class AmqpClientTestSupport extends AmqpTestSupport { @@ -124,7 +124,19 @@ public class AmqpClientTestSupport extends AmqpTestSupport { } public URI getBrokerOpenWireConnectionURI() { - return getBrokerAmqpConnectionURI(); + try { + String uri = null; + + if (isUseSSL()) { + uri = "ssl://127.0.0.1:" + AMQP_PORT; + } else { + uri = "tcp://127.0.0.1:" + AMQP_PORT; + } + + return new URI(uri); + } catch (Exception e) { + throw new RuntimeException(); + } } protected ActiveMQServer createServer() throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java index 216b0ecdcf..15873a63f4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java @@ -35,6 +35,7 @@ public class AmqpTestSupport extends ActiveMQTestBase { protected LinkedList connections = new LinkedList<>(); protected boolean useSSL; + protected boolean useWebSockets; protected AmqpConnection addConnection(AmqpConnection connection) { connections.add(connection); @@ -59,12 +60,16 @@ public class AmqpTestSupport extends ActiveMQTestBase { return useSSL; } + public boolean isUseWebSockets() { + return useWebSockets; + } + public String getAmqpConnectionURIOptions() { return ""; } public URI getBrokerAmqpConnectionURI() { - boolean webSocket = false; + boolean webSocket = isUseWebSockets(); try { int port = AMQP_PORT; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java index 7de05aad20..78ca309bd1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java @@ -67,7 +67,6 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport { } protected URI getBrokerQpidJMSConnectionURI() { - boolean webSocket = false; try { int port = AMQP_PORT; @@ -75,13 +74,13 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport { String uri = null; if (isUseSSL()) { - if (webSocket) { + if (isUseWebSockets()) { uri = "amqpwss://127.0.0.1:" + port; } else { uri = "amqps://127.0.0.1:" + port; } } else { - if (webSocket) { + if (isUseWebSockets()) { uri = "amqpws://127.0.0.1:" + port; } else { uri = "amqp://127.0.0.1:" + port; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSWebSocketConnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSWebSocketConnectionTest.java new file mode 100644 index 0000000000..2faa881c07 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSWebSocketConnectionTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.junit.Ignore; +import org.junit.Test; + +/** + * Test connections can be established to remote peers via WebSockets + */ +public class JMSWebSocketConnectionTest extends JMSClientTestSupport { + + @Override + public boolean isUseWebSockets() { + return true; + } + + @Test(timeout = 30000) + public void testCreateConnectionAndStart() throws Exception { + JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI()); + JmsConnection connection = (JmsConnection) factory.createConnection(); + assertNotNull(connection); + connection.start(); + connection.close(); + } + + @Test(timeout = 30000) + public void testSendReceiveOverWS() throws Exception { + JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI()); + JmsConnection connection = (JmsConnection) factory.createConnection(); + + try { + Session session = connection.createSession(); + Queue queue = session.createQueue(getQueueName()); + + MessageProducer producer = session.createProducer(queue); + producer.send(session.createMessage()); + producer.close(); + + connection.start(); + + MessageConsumer consumer = session.createConsumer(queue); + Message message = consumer.receive(1000); + + assertNotNull(message); + } finally { + connection.close(); + } + } + + @Test(timeout = 30000) + public void testSendLargeMessageToClientFromOpenWire() throws Exception { + JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI()); + JmsConnection connection = (JmsConnection) factory.createConnection(); + + sendLargeMessageViaOpenWire(); + + try { + Session session = connection.createSession(); + Queue queue = session.createQueue(getQueueName()); + connection.start(); + + MessageConsumer consumer = session.createConsumer(queue); + Message message = consumer.receive(1000); + + assertNotNull(message); + assertTrue(message instanceof BytesMessage); + } finally { + connection.close(); + } + } + + @Ignore("Broker can't accept messages over 65535 right now") + @Test(timeout = 30000) + public void testSendLargeMessageToClientFromAMQP() throws Exception { + JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI()); + JmsConnection connection = (JmsConnection) factory.createConnection(); + + sendLargeMessageViaAMQP(); + + try { + Session session = connection.createSession(); + Queue queue = session.createQueue(getQueueName()); + connection.start(); + + MessageConsumer consumer = session.createConsumer(queue); + Message message = consumer.receive(1000); + + assertNotNull(message); + assertTrue(message instanceof BytesMessage); + } finally { + connection.close(); + } + } + + protected void sendLargeMessageViaOpenWire() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(getBrokerOpenWireConnectionURI()); + doSendLargeMessageViaOpenWire(factory.createConnection()); + } + + protected void sendLargeMessageViaAMQP() throws Exception { + JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI()); + doSendLargeMessageViaOpenWire(factory.createConnection()); + } + + protected void doSendLargeMessageViaOpenWire(Connection connection) throws Exception { + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getQueueName()); + MessageProducer producer = session.createProducer(queue); + + // Normal netty default max frame size is 65535 so bump up the size a bit + // to see if we can handle it + byte[] payload = new byte[65535 + 8192]; + for (int i = 0; i < payload.length; ++i) { + payload[i] = (byte) (i % 256); + } + BytesMessage message = session.createBytesMessage(); + message.writeBytes(payload); + + producer.send(message); + } finally { + connection.close(); + } + } +}