This closes #1243
This commit is contained in:
commit
5073744679
|
@ -48,8 +48,8 @@ import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test support class for tests that will be using the AMQP Proton wrapper client.
|
* Test support class for tests that will be using the AMQP Proton wrapper client. This is to
|
||||||
* This is to make it easier to migrate tests from ActiveMQ5
|
* make it easier to migrate tests from ActiveMQ5
|
||||||
*/
|
*/
|
||||||
public class AmqpClientTestSupport extends AmqpTestSupport {
|
public class AmqpClientTestSupport extends AmqpTestSupport {
|
||||||
|
|
||||||
|
@ -124,7 +124,19 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
|
||||||
}
|
}
|
||||||
|
|
||||||
public URI getBrokerOpenWireConnectionURI() {
|
public URI getBrokerOpenWireConnectionURI() {
|
||||||
return getBrokerAmqpConnectionURI();
|
try {
|
||||||
|
String uri = null;
|
||||||
|
|
||||||
|
if (isUseSSL()) {
|
||||||
|
uri = "ssl://127.0.0.1:" + AMQP_PORT;
|
||||||
|
} else {
|
||||||
|
uri = "tcp://127.0.0.1:" + AMQP_PORT;
|
||||||
|
}
|
||||||
|
|
||||||
|
return new URI(uri);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ActiveMQServer createServer() throws Exception {
|
protected ActiveMQServer createServer() throws Exception {
|
||||||
|
|
|
@ -35,6 +35,7 @@ public class AmqpTestSupport extends ActiveMQTestBase {
|
||||||
protected LinkedList<AmqpConnection> connections = new LinkedList<>();
|
protected LinkedList<AmqpConnection> connections = new LinkedList<>();
|
||||||
|
|
||||||
protected boolean useSSL;
|
protected boolean useSSL;
|
||||||
|
protected boolean useWebSockets;
|
||||||
|
|
||||||
protected AmqpConnection addConnection(AmqpConnection connection) {
|
protected AmqpConnection addConnection(AmqpConnection connection) {
|
||||||
connections.add(connection);
|
connections.add(connection);
|
||||||
|
@ -59,12 +60,16 @@ public class AmqpTestSupport extends ActiveMQTestBase {
|
||||||
return useSSL;
|
return useSSL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isUseWebSockets() {
|
||||||
|
return useWebSockets;
|
||||||
|
}
|
||||||
|
|
||||||
public String getAmqpConnectionURIOptions() {
|
public String getAmqpConnectionURIOptions() {
|
||||||
return "";
|
return "";
|
||||||
}
|
}
|
||||||
|
|
||||||
public URI getBrokerAmqpConnectionURI() {
|
public URI getBrokerAmqpConnectionURI() {
|
||||||
boolean webSocket = false;
|
boolean webSocket = isUseWebSockets();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
int port = AMQP_PORT;
|
int port = AMQP_PORT;
|
||||||
|
|
|
@ -67,7 +67,6 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected URI getBrokerQpidJMSConnectionURI() {
|
protected URI getBrokerQpidJMSConnectionURI() {
|
||||||
boolean webSocket = false;
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
int port = AMQP_PORT;
|
int port = AMQP_PORT;
|
||||||
|
@ -75,13 +74,13 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport {
|
||||||
String uri = null;
|
String uri = null;
|
||||||
|
|
||||||
if (isUseSSL()) {
|
if (isUseSSL()) {
|
||||||
if (webSocket) {
|
if (isUseWebSockets()) {
|
||||||
uri = "amqpwss://127.0.0.1:" + port;
|
uri = "amqpwss://127.0.0.1:" + port;
|
||||||
} else {
|
} else {
|
||||||
uri = "amqps://127.0.0.1:" + port;
|
uri = "amqps://127.0.0.1:" + port;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (webSocket) {
|
if (isUseWebSockets()) {
|
||||||
uri = "amqpws://127.0.0.1:" + port;
|
uri = "amqpws://127.0.0.1:" + port;
|
||||||
} else {
|
} else {
|
||||||
uri = "amqp://127.0.0.1:" + port;
|
uri = "amqp://127.0.0.1:" + port;
|
||||||
|
|
|
@ -0,0 +1,151 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.amqp;
|
||||||
|
|
||||||
|
import javax.jms.BytesMessage;
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Queue;
|
||||||
|
import javax.jms.Session;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.qpid.jms.JmsConnection;
|
||||||
|
import org.apache.qpid.jms.JmsConnectionFactory;
|
||||||
|
import org.junit.Ignore;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test connections can be established to remote peers via WebSockets
|
||||||
|
*/
|
||||||
|
public class JMSWebSocketConnectionTest extends JMSClientTestSupport {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isUseWebSockets() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testCreateConnectionAndStart() throws Exception {
|
||||||
|
JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI());
|
||||||
|
JmsConnection connection = (JmsConnection) factory.createConnection();
|
||||||
|
assertNotNull(connection);
|
||||||
|
connection.start();
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testSendReceiveOverWS() throws Exception {
|
||||||
|
JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI());
|
||||||
|
JmsConnection connection = (JmsConnection) factory.createConnection();
|
||||||
|
|
||||||
|
try {
|
||||||
|
Session session = connection.createSession();
|
||||||
|
Queue queue = session.createQueue(getQueueName());
|
||||||
|
|
||||||
|
MessageProducer producer = session.createProducer(queue);
|
||||||
|
producer.send(session.createMessage());
|
||||||
|
producer.close();
|
||||||
|
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
Message message = consumer.receive(1000);
|
||||||
|
|
||||||
|
assertNotNull(message);
|
||||||
|
} finally {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testSendLargeMessageToClientFromOpenWire() throws Exception {
|
||||||
|
JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI());
|
||||||
|
JmsConnection connection = (JmsConnection) factory.createConnection();
|
||||||
|
|
||||||
|
sendLargeMessageViaOpenWire();
|
||||||
|
|
||||||
|
try {
|
||||||
|
Session session = connection.createSession();
|
||||||
|
Queue queue = session.createQueue(getQueueName());
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
Message message = consumer.receive(1000);
|
||||||
|
|
||||||
|
assertNotNull(message);
|
||||||
|
assertTrue(message instanceof BytesMessage);
|
||||||
|
} finally {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Ignore("Broker can't accept messages over 65535 right now")
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testSendLargeMessageToClientFromAMQP() throws Exception {
|
||||||
|
JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI());
|
||||||
|
JmsConnection connection = (JmsConnection) factory.createConnection();
|
||||||
|
|
||||||
|
sendLargeMessageViaAMQP();
|
||||||
|
|
||||||
|
try {
|
||||||
|
Session session = connection.createSession();
|
||||||
|
Queue queue = session.createQueue(getQueueName());
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
Message message = consumer.receive(1000);
|
||||||
|
|
||||||
|
assertNotNull(message);
|
||||||
|
assertTrue(message instanceof BytesMessage);
|
||||||
|
} finally {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void sendLargeMessageViaOpenWire() throws Exception {
|
||||||
|
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(getBrokerOpenWireConnectionURI());
|
||||||
|
doSendLargeMessageViaOpenWire(factory.createConnection());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void sendLargeMessageViaAMQP() throws Exception {
|
||||||
|
JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI());
|
||||||
|
doSendLargeMessageViaOpenWire(factory.createConnection());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void doSendLargeMessageViaOpenWire(Connection connection) throws Exception {
|
||||||
|
try {
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Queue queue = session.createQueue(getQueueName());
|
||||||
|
MessageProducer producer = session.createProducer(queue);
|
||||||
|
|
||||||
|
// Normal netty default max frame size is 65535 so bump up the size a bit
|
||||||
|
// to see if we can handle it
|
||||||
|
byte[] payload = new byte[65535 + 8192];
|
||||||
|
for (int i = 0; i < payload.length; ++i) {
|
||||||
|
payload[i] = (byte) (i % 256);
|
||||||
|
}
|
||||||
|
BytesMessage message = session.createBytesMessage();
|
||||||
|
message.writeBytes(payload);
|
||||||
|
|
||||||
|
producer.send(message);
|
||||||
|
} finally {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue