This closes #837
This commit is contained in:
commit
9743043fb8
|
@ -21,8 +21,12 @@ import java.net.URI;
|
|||
import java.util.LinkedList;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.jms.server.JMSServerManager;
|
||||
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||
|
@ -35,9 +39,11 @@ import org.junit.Before;
|
|||
*/
|
||||
public class AmqpClientTestSupport extends ActiveMQTestBase {
|
||||
|
||||
ActiveMQServer server;
|
||||
private boolean useSSL;
|
||||
|
||||
LinkedList<AmqpConnection> connections = new LinkedList<>();
|
||||
protected JMSServerManager serverManager;
|
||||
protected ActiveMQServer server;
|
||||
protected LinkedList<AmqpConnection> connections = new LinkedList<>();
|
||||
|
||||
protected AmqpConnection addConnection(AmqpConnection connection) {
|
||||
connections.add(connection);
|
||||
|
@ -48,9 +54,7 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
|
|||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
|
||||
server = createServer(true, true);
|
||||
server.start();
|
||||
server = createServer();
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -63,18 +67,36 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
|
|||
ignored.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
if (serverManager != null) {
|
||||
try {
|
||||
serverManager.stop();
|
||||
} catch (Throwable ignored) {
|
||||
ignored.printStackTrace();
|
||||
}
|
||||
serverManager = null;
|
||||
}
|
||||
|
||||
server.stop();
|
||||
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
protected ActiveMQServer createServer() throws Exception {
|
||||
ActiveMQServer server = createServer(true, true);
|
||||
serverManager = new JMSServerManagerImpl(server);
|
||||
Configuration serverConfig = server.getConfiguration();
|
||||
serverConfig.getAddressesSettings().put("jms.queue.#", new AddressSettings().setAutoCreateJmsQueues(true).setDeadLetterAddress(new SimpleString("jms.queue.ActiveMQ.DLQ")));
|
||||
serverConfig.setSecurityEnabled(false);
|
||||
serverManager.start();
|
||||
server.start();
|
||||
return server;
|
||||
}
|
||||
|
||||
public Queue getProxyToQueue(String queueName) {
|
||||
return server.locateQueue(SimpleString.toSimpleString(queueName));
|
||||
}
|
||||
|
||||
private String connectorScheme = "amqp";
|
||||
private boolean useSSL;
|
||||
|
||||
public String getTestName() {
|
||||
return "jms.queue." + getName();
|
||||
}
|
||||
|
@ -83,14 +105,9 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
|
|||
}
|
||||
|
||||
public AmqpClientTestSupport(String connectorScheme, boolean useSSL) {
|
||||
this.connectorScheme = connectorScheme;
|
||||
this.useSSL = useSSL;
|
||||
}
|
||||
|
||||
public String getConnectorScheme() {
|
||||
return connectorScheme;
|
||||
}
|
||||
|
||||
public boolean isUseSSL() {
|
||||
return useSSL;
|
||||
}
|
||||
|
@ -99,30 +116,6 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
|
|||
return "";
|
||||
}
|
||||
|
||||
protected boolean isUseTcpConnector() {
|
||||
return !isUseSSL() && !connectorScheme.contains("nio") && !connectorScheme.contains("ws");
|
||||
}
|
||||
|
||||
protected boolean isUseSslConnector() {
|
||||
return isUseSSL() && !connectorScheme.contains("nio") && !connectorScheme.contains("wss");
|
||||
}
|
||||
|
||||
protected boolean isUseNioConnector() {
|
||||
return !isUseSSL() && connectorScheme.contains("nio");
|
||||
}
|
||||
|
||||
protected boolean isUseNioPlusSslConnector() {
|
||||
return isUseSSL() && connectorScheme.contains("nio");
|
||||
}
|
||||
|
||||
protected boolean isUseWsConnector() {
|
||||
return !isUseSSL() && connectorScheme.contains("ws");
|
||||
}
|
||||
|
||||
protected boolean isUseWssConnector() {
|
||||
return isUseSSL() && connectorScheme.contains("wss");
|
||||
}
|
||||
|
||||
public URI getBrokerAmqpConnectionURI() {
|
||||
boolean webSocket = false;
|
||||
|
||||
|
|
|
@ -0,0 +1,64 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.amqp;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSender;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Test around the handling of Deliver Annotations in messages sent and received.
|
||||
*/
|
||||
public class AmqpDeliveryAnnotationsTest extends AmqpClientTestSupport {
|
||||
|
||||
private final String DELIVERY_ANNOTATION_NAME = "TEST-DELIVERY-ANNOTATION";
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testDeliveryAnnotationsStrippedFromIncoming() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = addConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender(getTestName());
|
||||
AmqpReceiver receiver = session.createReceiver(getTestName());
|
||||
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
|
||||
message.setText("Test-Message");
|
||||
message.setDeliveryAnnotation(DELIVERY_ANNOTATION_NAME, getTestName());
|
||||
|
||||
sender.send(message);
|
||||
receiver.flow(1);
|
||||
|
||||
Queue queue = getProxyToQueue(getTestName());
|
||||
assertEquals(1, queue.getMessageCount());
|
||||
|
||||
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
|
||||
assertNotNull(received);
|
||||
assertNull(received.getDeliveryAnnotation(DELIVERY_ANNOTATION_NAME));
|
||||
|
||||
sender.close();
|
||||
connection.close();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,151 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.amqp;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpNoLocalFilter;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSender;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Test that the broker can pass through an AMQP message with a described type in the message
|
||||
* body regardless of transformer in use.
|
||||
*/
|
||||
public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport {
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testSendMessageWithDescribedTypeInBody() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = addConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender(getTestName());
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
message.setDescribedType(new AmqpNoLocalFilter());
|
||||
sender.send(message);
|
||||
sender.close();
|
||||
|
||||
Queue queue = getProxyToQueue(getTestName());
|
||||
assertEquals(1, queue.getMessageCount());
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver(getTestName());
|
||||
receiver.flow(1);
|
||||
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
|
||||
assertNotNull(received);
|
||||
assertNotNull(received.getDescribedType());
|
||||
receiver.close();
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testSendMessageWithDescribedTypeInBodyReceiveOverOpenWire() throws Exception {
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = addConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender(getTestName());
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
message.setDescribedType(new AmqpNoLocalFilter());
|
||||
sender.send(message);
|
||||
sender.close();
|
||||
connection.close();
|
||||
|
||||
Queue queue = getProxyToQueue(getTestName());
|
||||
assertEquals(1, queue.getMessageCount());
|
||||
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
|
||||
Connection jmsConnection = factory.createConnection();
|
||||
try {
|
||||
Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = jmsSession.createQueue(getName());
|
||||
MessageConsumer jmsConsumer = jmsSession.createConsumer(destination);
|
||||
jmsConnection.start();
|
||||
|
||||
Message received = jmsConsumer.receive(5000);
|
||||
assertNotNull(received);
|
||||
assertTrue(received instanceof BytesMessage);
|
||||
} finally {
|
||||
jmsConnection.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testDescribedTypeMessageRoundTrips() throws Exception {
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = addConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
// Send with AMQP client.
|
||||
AmqpSender sender = session.createSender(getTestName());
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
message.setDescribedType(new AmqpNoLocalFilter());
|
||||
sender.send(message);
|
||||
sender.close();
|
||||
|
||||
Queue queue = getProxyToQueue(getTestName());
|
||||
assertEquals(1, queue.getMessageCount());
|
||||
|
||||
// Receive and resend with OpenWire JMS client
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
|
||||
Connection jmsConnection = factory.createConnection();
|
||||
try {
|
||||
Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = jmsSession.createQueue(getName());
|
||||
MessageConsumer jmsConsumer = jmsSession.createConsumer(destination);
|
||||
jmsConnection.start();
|
||||
|
||||
Message received = jmsConsumer.receive(5000);
|
||||
assertNotNull(received);
|
||||
assertTrue(received instanceof BytesMessage);
|
||||
|
||||
MessageProducer jmsProducer = jmsSession.createProducer(destination);
|
||||
jmsProducer.send(received);
|
||||
} finally {
|
||||
jmsConnection.close();
|
||||
}
|
||||
|
||||
assertEquals(1, queue.getMessageCount());
|
||||
|
||||
// Now lets receive it with AMQP and see that we get back what we expected.
|
||||
AmqpReceiver receiver = session.createReceiver(getTestName());
|
||||
receiver.flow(1);
|
||||
AmqpMessage returned = receiver.receive(5, TimeUnit.SECONDS);
|
||||
assertNotNull(returned);
|
||||
assertNotNull(returned.getDescribedType());
|
||||
receiver.close();
|
||||
connection.close();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,165 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.amqp;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSender;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Tests various behaviors of broker side drain support.
|
||||
*/
|
||||
public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testReceiverCanDrainMessages() throws Exception {
|
||||
int MSG_COUNT = 20;
|
||||
sendMessages(getTestName(), MSG_COUNT);
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver(getTestName());
|
||||
|
||||
Queue queueView = getProxyToQueue(getTestName());
|
||||
assertEquals(MSG_COUNT, queueView.getMessageCount());
|
||||
|
||||
receiver.drain(MSG_COUNT);
|
||||
for (int i = 0; i < MSG_COUNT; ++i) {
|
||||
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
|
||||
assertNotNull(message);
|
||||
message.accept();
|
||||
}
|
||||
receiver.close();
|
||||
|
||||
assertEquals(0, queueView.getMessageCount());
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testPullWithNoMessageGetDrained() throws Exception {
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver(getTestName());
|
||||
|
||||
receiver.flow(10);
|
||||
|
||||
Queue queueView = getProxyToQueue(getTestName());
|
||||
assertEquals(0, queueView.getMessageCount());
|
||||
assertEquals(0, queueView.getDeliveringCount());
|
||||
|
||||
assertEquals(10, receiver.getReceiver().getRemoteCredit());
|
||||
|
||||
assertNull(receiver.pull(1, TimeUnit.SECONDS));
|
||||
|
||||
assertEquals(0, receiver.getReceiver().getRemoteCredit());
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testPullOneFromRemote() throws Exception {
|
||||
int MSG_COUNT = 20;
|
||||
sendMessages(getTestName(), MSG_COUNT);
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver(getTestName());
|
||||
|
||||
Queue queueView = getProxyToQueue(getTestName());
|
||||
assertEquals(MSG_COUNT, queueView.getMessageCount());
|
||||
|
||||
assertEquals(0, receiver.getReceiver().getRemoteCredit());
|
||||
|
||||
AmqpMessage message = receiver.pull(5, TimeUnit.SECONDS);
|
||||
assertNotNull(message);
|
||||
message.accept();
|
||||
|
||||
assertEquals(0, receiver.getReceiver().getRemoteCredit());
|
||||
|
||||
receiver.close();
|
||||
|
||||
assertEquals(MSG_COUNT - 1, queueView.getMessageCount());
|
||||
assertEquals(1, queueView.getMessagesAcknowledged());
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testMultipleZeroResultPulls() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver(getTestName());
|
||||
|
||||
receiver.flow(10);
|
||||
|
||||
Queue queueView = getProxyToQueue(getTestName());
|
||||
assertEquals(0, queueView.getMessageCount());
|
||||
|
||||
assertEquals(10, receiver.getReceiver().getRemoteCredit());
|
||||
|
||||
assertNull(receiver.pull(1, TimeUnit.SECONDS));
|
||||
|
||||
assertEquals(0, receiver.getReceiver().getRemoteCredit());
|
||||
|
||||
assertNull(receiver.pull(1, TimeUnit.SECONDS));
|
||||
assertNull(receiver.pull(1, TimeUnit.SECONDS));
|
||||
|
||||
assertEquals(0, receiver.getReceiver().getRemoteCredit());
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
public void sendMessages(String destinationName, int count) throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = null;
|
||||
|
||||
try {
|
||||
connection = client.connect();
|
||||
AmqpSession session = connection.createSession();
|
||||
AmqpSender sender = session.createSender(destinationName);
|
||||
|
||||
for (int i = 0; i < count; ++i) {
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
message.setText("Test-Message-" + i);
|
||||
sender.send(message);
|
||||
}
|
||||
|
||||
sender.close();
|
||||
} finally {
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,136 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.amqp;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSender;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Test for scheduled message support using AMQP message annotations.
|
||||
*/
|
||||
public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testSendWithDeliveryTimeIsScheduled() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = addConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender(getTestName());
|
||||
|
||||
// Get the Queue View early to avoid racing the delivery.
|
||||
final Queue queueView = getProxyToQueue(getTestName());
|
||||
assertNotNull(queueView);
|
||||
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
long deliveryTime = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2);
|
||||
message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
|
||||
message.setText("Test-Message");
|
||||
sender.send(message);
|
||||
sender.close();
|
||||
|
||||
assertEquals(1, queueView.getScheduledCount());
|
||||
|
||||
// Now try and get the message
|
||||
AmqpReceiver receiver = session.createReceiver(getTestName());
|
||||
receiver.flow(1);
|
||||
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
|
||||
assertNull(received);
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testSendRecvWithDeliveryTime() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = addConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender(getTestName());
|
||||
|
||||
// Get the Queue View early to avoid racing the delivery.
|
||||
final Queue queueView = getProxyToQueue(getTestName());
|
||||
assertNotNull(queueView);
|
||||
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
long deliveryTime = System.currentTimeMillis() + 6000;
|
||||
message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
|
||||
message.setText("Test-Message");
|
||||
sender.send(message);
|
||||
sender.close();
|
||||
|
||||
assertEquals(1, queueView.getScheduledCount());
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver(getTestName());
|
||||
receiver.flow(1);
|
||||
|
||||
// Now try and get the message, should not due to being scheduled.
|
||||
AmqpMessage received = receiver.receive(2, TimeUnit.SECONDS);
|
||||
assertNull(received);
|
||||
|
||||
// Now try and get the message, should get it now
|
||||
received = receiver.receive(10, TimeUnit.SECONDS);
|
||||
assertNotNull(received);
|
||||
received.accept();
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScheduleWithDelay() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = addConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender(getTestName());
|
||||
|
||||
// Get the Queue View early to avoid racing the delivery.
|
||||
final Queue queueView = getProxyToQueue(getTestName());
|
||||
assertNotNull(queueView);
|
||||
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
long delay = 6000;
|
||||
message.setMessageAnnotation("x-opt-delivery-delay", delay);
|
||||
message.setText("Test-Message");
|
||||
sender.send(message);
|
||||
sender.close();
|
||||
|
||||
assertEquals(1, queueView.getScheduledCount());
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver(getTestName());
|
||||
receiver.flow(1);
|
||||
|
||||
// Now try and get the message, should not due to being scheduled.
|
||||
AmqpMessage received = receiver.receive(2, TimeUnit.SECONDS);
|
||||
assertNull(received);
|
||||
|
||||
// Now try and get the message, should get it now
|
||||
received = receiver.receive(10, TimeUnit.SECONDS);
|
||||
assertNotNull(received);
|
||||
received.accept();
|
||||
|
||||
connection.close();
|
||||
}
|
||||
}
|
|
@ -16,22 +16,31 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.amqp;
|
||||
|
||||
import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
|
||||
import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS;
|
||||
import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSender;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpValidator;
|
||||
import org.apache.qpid.proton.amqp.Symbol;
|
||||
import org.apache.qpid.proton.amqp.messaging.Source;
|
||||
import org.apache.qpid.proton.engine.Receiver;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -43,6 +52,255 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
|||
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(AmqpSendReceiveTest.class);
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testCreateQueueReceiver() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = addConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver(getTestName());
|
||||
|
||||
Queue queue = getProxyToQueue(getTestName());
|
||||
assertNotNull(queue);
|
||||
|
||||
receiver.close();
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testCreateQueueReceiverWithJMSSelector() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
|
||||
client.setValidator(new AmqpValidator() {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void inspectOpenedResource(Receiver receiver) {
|
||||
|
||||
if (receiver.getRemoteSource() == null) {
|
||||
markAsInvalid("Link opened with null source.");
|
||||
}
|
||||
|
||||
Source source = (Source) receiver.getRemoteSource();
|
||||
Map<Symbol, Object> filters = source.getFilter();
|
||||
|
||||
if (findFilter(filters, JMS_SELECTOR_FILTER_IDS) == null) {
|
||||
markAsInvalid("Broker did not return the JMS Filter on Attach");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
AmqpConnection connection = addConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
session.createReceiver(getTestName(), "JMSPriority > 8");
|
||||
|
||||
connection.getStateInspector().assertValid();
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testCreateQueueReceiverWithNoLocalSet() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
|
||||
client.setValidator(new AmqpValidator() {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void inspectOpenedResource(Receiver receiver) {
|
||||
|
||||
if (receiver.getRemoteSource() == null) {
|
||||
markAsInvalid("Link opened with null source.");
|
||||
}
|
||||
|
||||
Source source = (Source) receiver.getRemoteSource();
|
||||
Map<Symbol, Object> filters = source.getFilter();
|
||||
|
||||
// Currently don't support noLocal on a Queue
|
||||
if (findFilter(filters, NO_LOCAL_FILTER_IDS) != null) {
|
||||
markAsInvalid("Broker did not return the NoLocal Filter on Attach");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
AmqpConnection connection = addConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
session.createReceiver(getTestName(), null, true);
|
||||
|
||||
connection.getStateInspector().assertValid();
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testQueueReceiverReadMessage() throws Exception {
|
||||
sendMessages(getTestName(), 1);
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = addConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver(getTestName());
|
||||
|
||||
Queue queueView = getProxyToQueue(getTestName());
|
||||
assertEquals(1, queueView.getMessageCount());
|
||||
|
||||
receiver.flow(1);
|
||||
assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
|
||||
receiver.close();
|
||||
|
||||
assertEquals(1, queueView.getMessageCount());
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception {
|
||||
int MSG_COUNT = 4;
|
||||
sendMessages(getTestName(), MSG_COUNT);
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = addConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpReceiver receiver1 = session.createReceiver(getTestName());
|
||||
|
||||
Queue queueView = getProxyToQueue(getTestName());
|
||||
assertEquals(MSG_COUNT, queueView.getMessageCount());
|
||||
|
||||
receiver1.flow(2);
|
||||
assertNotNull(receiver1.receive(5, TimeUnit.SECONDS));
|
||||
assertNotNull(receiver1.receive(5, TimeUnit.SECONDS));
|
||||
|
||||
AmqpReceiver receiver2 = session.createReceiver(getTestName());
|
||||
|
||||
assertEquals(2, server.getTotalConsumerCount());
|
||||
|
||||
receiver2.flow(2);
|
||||
assertNotNull(receiver2.receive(5, TimeUnit.SECONDS));
|
||||
assertNotNull(receiver2.receive(5, TimeUnit.SECONDS));
|
||||
|
||||
assertEquals(0, queueView.getMessagesAcknowledged());
|
||||
|
||||
receiver1.close();
|
||||
receiver2.close();
|
||||
|
||||
assertEquals(MSG_COUNT, queueView.getMessageCount());
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testTwoQueueReceiversOnSameConnectionReadMessagesAcceptOnEach() throws Exception {
|
||||
int MSG_COUNT = 4;
|
||||
sendMessages(getTestName(), MSG_COUNT);
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = addConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpReceiver receiver1 = session.createReceiver(getTestName());
|
||||
|
||||
final Queue queueView = getProxyToQueue(getTestName());
|
||||
assertEquals(MSG_COUNT, queueView.getMessageCount());
|
||||
|
||||
receiver1.flow(2);
|
||||
AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS);
|
||||
assertNotNull(message);
|
||||
message.accept();
|
||||
message = receiver1.receive(5, TimeUnit.SECONDS);
|
||||
assertNotNull(message);
|
||||
message.accept();
|
||||
|
||||
assertTrue("Should have ack'd two", Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
@Override
|
||||
public boolean isSatisfied() throws Exception {
|
||||
return queueView.getMessagesAcknowledged() == 2;
|
||||
}
|
||||
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50)));
|
||||
|
||||
AmqpReceiver receiver2 = session.createReceiver(getTestName());
|
||||
|
||||
assertEquals(2, server.getTotalConsumerCount());
|
||||
|
||||
receiver2.flow(2);
|
||||
message = receiver2.receive(5, TimeUnit.SECONDS);
|
||||
assertNotNull(message);
|
||||
message.accept();
|
||||
message = receiver2.receive(5, TimeUnit.SECONDS);
|
||||
assertNotNull(message);
|
||||
message.accept();
|
||||
|
||||
assertTrue("Queue should be empty now", Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
@Override
|
||||
public boolean isSatisfied() throws Exception {
|
||||
return queueView.getMessagesAcknowledged() == 4;
|
||||
}
|
||||
}, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(10)));
|
||||
|
||||
receiver1.close();
|
||||
receiver2.close();
|
||||
|
||||
assertEquals(0, queueView.getMessageCount());
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testSecondReceiverOnQueueGetsAllUnconsumedMessages() throws Exception {
|
||||
int MSG_COUNT = 20;
|
||||
sendMessages(getTestName(), MSG_COUNT);
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = addConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpReceiver receiver1 = session.createReceiver(getTestName());
|
||||
|
||||
final Queue queueView = getProxyToQueue(getTestName());
|
||||
assertEquals(MSG_COUNT, queueView.getMessageCount());
|
||||
|
||||
receiver1.flow(20);
|
||||
|
||||
assertTrue("Should have dispatch to prefetch", Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
@Override
|
||||
public boolean isSatisfied() throws Exception {
|
||||
return queueView.getDeliveringCount() >= 2;
|
||||
}
|
||||
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50)));
|
||||
|
||||
receiver1.close();
|
||||
|
||||
AmqpReceiver receiver2 = session.createReceiver(getTestName());
|
||||
|
||||
assertEquals(1, server.getTotalConsumerCount());
|
||||
|
||||
receiver2.flow(MSG_COUNT * 2);
|
||||
AmqpMessage message = receiver2.receive(5, TimeUnit.SECONDS);
|
||||
assertNotNull(message);
|
||||
message.accept();
|
||||
message = receiver2.receive(5, TimeUnit.SECONDS);
|
||||
assertNotNull(message);
|
||||
message.accept();
|
||||
|
||||
assertTrue("Should have ack'd two", Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
@Override
|
||||
public boolean isSatisfied() throws Exception {
|
||||
return queueView.getMessagesAcknowledged() == 2;
|
||||
}
|
||||
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50)));
|
||||
|
||||
receiver2.close();
|
||||
|
||||
assertEquals(MSG_COUNT - 2, queueView.getMessageCount());
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testSimpleSendOneReceiveOne() throws Exception {
|
||||
|
||||
|
@ -476,7 +734,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
|||
assertTrue("Should be no inflight messages: " + destinationView.getDeliveringCount(), Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
public boolean isSatisfied() throws Exception {
|
||||
return destinationView.getDeliveringCount() == 0;
|
||||
}
|
||||
}));
|
||||
|
@ -554,4 +812,21 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
|||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
public void sendMessages(String destinationName, int count) throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = addConnection(client.connect());
|
||||
try {
|
||||
AmqpSession session = connection.createSession();
|
||||
AmqpSender sender = session.createSender(destinationName);
|
||||
|
||||
for (int i = 0; i < count; ++i) {
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
message.setMessageId("MessageID:" + i);
|
||||
sender.send(message);
|
||||
}
|
||||
} finally {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue